分布式框架Celery

分布式框架Celerycelery 是开源的 有很多的使用者 celery 完全基于 Python 语言编写所以 celery 本质上是一个分布式的异步任务调度框架 类似于 Apache 的 airflowceler 只是用来调度任务

大家好,欢迎来到IT知识分享网。

一、Celery介绍

  1. Celery 是什么?

    1. celery 是一个灵活且可靠的,处理大量消息的分布式系统,可以在多个节点之间处理某个任务
    2. celery 是一个专注于实时处理的任务队列,支持任务调度
    3. celery 完全基于 Python 语言编写
    4. 所以 celery 本质上是一个分布式的异步任务调度框架,类似于 Apache 的 airflow
    5. celery 只是用来调度任务的,但它本身并不具备存储任务的功能,而调度任务的时候肯定是要把任务存起来的。因此要使用 celery 的话,还需要搭配一些具备存储、访问功能的工具,比如:消息队列、Redis缓存、数据库等等。官方推荐的是消息队列 RabbitMQ,我们使用 Redis 

    celery 是开源的,有很多的使用者

  2. celery 使用场景

    1. 异步任务
      1. 一些耗时的操作可以交给celery异步执行,而不用等着程序处理完才知道结果。
      2. 视频转码、邮件发送、消息推送等等
    2. 定时任务
      1. 定时推送消息、定时爬取数据、定时统计数据等
    3. 延迟任务
      1. 提交任务后,等待一段时间再执行某个任务
  3. Celery官网

    1. 官网:https://docs.celeryq.dev/en/stable/
    2. 最新版本Celery (5.3)
    3. python支持 
      1. Celery version 5.3 runs on Python ❨3.8, 3.9, 3.10, 3.11❩
    4. Django支持
      1. Celery 5.3.x supports Django 2.2 LTS or newer versions. Please use Celery 5.2.x for versions older than Django 2.2 or Celery 4.4.x if your Django version is older than 1.11

    开源地址(源码)https://github.com/celery/celery

  4. Celery架构,Celery 架构,它采用典型的生产者-消费者模式,主要由以下部分组成:

    1. Celery Beat:任务调度器,Beat 进程会读取配置文件的内容,周期性地将配置中到期需要执行的任务发送给任务队列。
    2. Producer:需要在队列中进行的任务,一般由用户、触发器或其他操作将任务入队,然后交由 workers 进行处理。调用了 Celery 提供的 API、函数或者装饰器而产生任务并交给任务队列处理的都是任务生产者。
    3. Broker,即消息中间件,在这指任务队列本身,Celery 扮演生产者和消费者的角色,brokers 就是生产者和消费者存放/获取产品的地方(队列)。
    4. Celery Worker,执行任务的消费者,从队列中取出任务并执行。通常会在多台服务器运行多个消费者来提高执行效率。
    5. Result Backend:任务处理完后保存状态信息和结果,以供查询。Celery 默认已支持 Redis、RabbitMQ、MongoDB、Django ORM、SQLAlchemy 等方式。
    6. 实际应用中,用户从 Web 前端发起一个请求,我们只需要将请求所要处理的任务丢入任务队列 broker 中,由空闲的 worker 去处理任务即可,处理的结果会暂存在后台数据库 backend 中。我们可以在一台机器或多台机器上同时起多个 worker 进程来实现分布式地并行处理任务。
  5. Celery 快速使用

    1. 安装

      1. 创建Python项目
      2. 创建虚拟环境
      3. 安装eventlet(win 平台)pip install eventlet

      安装redis(消息队列和结果存储使用redis) pip install redis

    2. 快速使用

      1. celery_demo.py–主文件
        from celery import Celery import time broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery('celery_test', broker=broker, backend=backend) @app.task def add(n, m): time.sleep(2) print('n+m的结果:%s' % (n + m)) return n + m @app.task def send_email(mail='@.com'): print('模拟发送延迟--开始') time.sleep(2) print('模拟发送延迟--结束') return '邮件发送成功:%s' % mail 

        通过resp查看任务 分布式框架Celery

        from celery_demo import add,send_email 1 同步调用 res=send_email() print(res) # 2 异步调用 res = add.delay(10, 20) print(res.id)
      2. 开启worker  celery -A celery_demo worker -l info -P eventlet分布式框架Celery
      3. get_result.py-查看结果
        from celery_demo import app from celery.result import AsyncResult id = 'd0ae78c8-9a8e-4f93-9d32-b17d4e295fe9' if __name__ == '__main__': result = AsyncResult(id=id, app=app) if result.successful(): result = result.get() print(result) elif result.failed(): print('任务失败') elif result.status == 'PENDING': print('任务等待中被执行') elif result.status == 'RETRY': print('任务异常后正在重试') elif result.status == 'STARTED': print('任务已经开始被执行')

      add_task.py–提交异步任务

    安装celery pip install celery

  6. 包结构

  7. 目录结构

    celery.py

    项目名 ├── celery_task # celery包 │ ├── __init__.py # 包文件 │ ├── celery.py # celery连接和配置相关文件,且名字必须叫celery.py │ └── tasks.py # 所有任务函数 ├── add_task.py # 添加任务 └── get_result.py # 获取结果
    from celery import Celery broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])

    tasks.py

    from .celery import app import time @app.task def add(n, m): time.sleep(2) print('n+m的结果:%s' % (n + m)) return n + m @app.task def send_email(mail='@.com'): print('模拟发送延迟--开始') time.sleep(2) print('模拟发送延迟--结束') return '邮件发送成功:%s' % mail

  8. 执行异步–延迟–定时任务

    1. 异步任务 res = add.delay(10, 20)

    2. 延迟任务

      ​​​​​

      rom datetime import datetime, timedelta eta=datetime.utcnow() + timedelta(seconds=10) tasks.add.apply_async(args=(200, 50), eta=eta)
    3. 定时任务

      #1 celery.py中加入 # 时区 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False # 任务的定时配置 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'low-task': { 'task': 'celery_task.tasks.add', 'schedule': timedelta(seconds=3), # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 'args': (300, 150), } } # 2 启动worker celery -A celery_task worker -l debug -P eventlet # 3 启动beat celery -A celery_task beat -l debug 

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/158370.html

(0)
上一篇 2025-01-24 17:15
下一篇 2025-01-24 17:25

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信