大家好,欢迎来到IT知识分享网。
前言基础:
同步:
提交一个任务,必须等待任务执行结果才能接着提交任务。
异步:
调教任务后,将任务交由后台的其他执行任务的进程或者线程来处理,用户可以继续提交任务,并且任务处理完调用回调函数讲任务提交结果返回。
分布式:
一个任务拆开分为多部,交到不同的计算机执行,极大的提高了执行效率,因为多台计算机的处理性能较高
celery简介
Celery是一个强大的分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以分配到其他主机上去(分布式:一个任务分几段,分配到不同主机)运行,我们通常使用异步(async task)和定时任务(crontab).
celery的架构由三部分组成,消息中间件(message broker),任务执行单元(worker),任务执行单元(worker),任务执行结果存储(task result store)组成。
消息中间件与结果存储都可以用redis数据库。
从上图可以看到,Celery主要有几个模块:
任务模块Task
包含异步任务与定时任务。异步任务一般在执行耗时的IO操作任务(首先返回一个调提交的任务号给用户,等处理结束发送处理结束的信号给用户,用户凭借任务号取数据),定时任务Celery Beat进程周期性的讲任务发往任务队列,如数据库每天凌晨更新同步数据。
消息中间件Broker
Broker,即为任务调度队列,接受任务生产者发送过来的消息(操作任务),将用户提交的任务或者操作入队列,Celery本身不提供队列的服务,因此需要第三方的,如redis,以及RabbitMq等个人推荐redis,速度较快
任务执行单元Worker
worker是执行任务的处理单元,它实时监控消息队列,从队列中调度获取任务,并执行。
任务执行结果存储Backend
Backend用于存储任务执行结果,以提供查询使用,它也可以使用RabbitMQ,redis
小结:celery处理大量消息的分布式系统,能处理异步定时任务,使用一般用于处理耗时操作的多任务或者定时性的任务;
celery安装与使用:
pycharm安装celery
pip3 install celery
from celery import Celery import time # broker = broker = "redis://127.0.0.1:6379/1" # # broker = "redis://:密码@127.0.0.1:6379/1" broker = "redis://127.0.0.1:6379/1" backend = "redis://127.0.0.1:6379/1" # 实例化一个celery对象,传入消息中间件,消息存储队列,然后传入数据处理结果数据库 #celery1为实例化的celery对象的名字,因为可以有多个celery。 app = Celery("celery1",broker=broker,backend=backend) #通过该装饰器,使得该任务与celery建立绑定关系 @app.task def multi(x,y): time.sleep(1) return x*y
2.将装饰的任务函数提交到消息队列中,此时提交的任务函数并没有执行,只是放在消息队列中,等待worker的执行,一旦提交到worker(delay等函数),它将返回一个标识任务的字符串,你可以通过该字符串取出任务执行后的结果:
from testCelery import app,multi # 调用 任务类对象的return self.apply_async(args, kwargs) #提交任务delay中为传入的参数,如 res = multi.delay(8,8) print(res) #结果标识打印码: # 74c0b735-afb0-4ceb-a5cb-2d7f9f0d0172
3.使用命令启动worker去执行刚刚调教的任务::
linux下:celery worker -A testCelery -l info windows下:celery worker -A testCelery -l info -P eventlet
4.查询结果:
from celery.result import AsyncResult from testCelery import app ret = AsyncResult(id="74c0b735-afb0-4ceb-a5cb-2d7f9f0d0172",app=app) if ret.successful(): res = ret.get() print(res) print(ret.result)
Celery的简单使用流程:
安装: pip3 install celery 写一个py文件:testCelery 1.指定broker(消息中间件),指定backend(结果存储) 2.实例化一个celery对象传参:name,broker,backend ;app = Celery("celery1",broker=broker,backend=backend) 3.加装饰器绑定任务,在函数(add)上加装饰器app.task 4. 其他程序提交任务,先导入add,add.delay(参,参数),会将该函数提交到消息中间件,但是并不会执行,有个返回值,直接print会打印出任务的id,以后用id去查询任务是否执行完成 5 启动worker去执行任务: linux: celery worker -A celery_task_s1 -l info windows下:celery worker -A celery_task_s1 -l info -P eventlet 6 查看结果:根据id去查询 async = AsyncResult(id="74c0b735-afb0-4ceb-a5cb-2d7f9f0d0172", app=app) if async.successful(): #取出它return的值 result = async.get() print(result)
celery的多任务
celery的多任务结构
注意celery.py这个文件的文件名是固定的,不能改,task_1和task_2可以自己定义,他俩代表自定义的任务分类,还可以再创建task_3。。。等其它名字的任务文件,send_task.py是提交任务到worker,check_result.py是查看结果的,注实例化时用到关键字include
#celery.py from celery import Celery broker = "redis://127.0.0.1:6379/1" backend = "redis://127.0.0.1:6379/1" app = Celery("c2",broker=broker,backend=backend,include=[ "celery_task.task_1", "celery_task.task_2" ]) # celery提供一些配置,具体可查看官方文档 # app.conf.timezone = 'Asia/Shanghai'
task_1.py,task_2.py
task_1.py from celery_task import celery @celery.app.task def add1_task(x,y): import time time.sleep(1) return x+y task_2.py from celery_task import celery @celery.app.task def add2_task(x,y): import time time.sleep(1) return x+y
from celery_task.task_1 import add1_task from celery_task.task_2 import add2_task # 启动worker,celery_task是包的名字 # celery worker -A celery_task -l info -P eventlet #注celery_task为包名 from datetime import datetime 设置任务执行时间2019年7月14日12点59分58秒 v1 = datetime(2019,7,14,12,59,58) print(v1) #将v1时间转换为utc时间 v2 = datetime.utcfromtimestamp(v1.timestamp()) print(v2) result1 = add1_task.apply_async(args=[6,6],eta=v2) result2 = add2_task.apply_async(args=[6,6],eta=v2) print(result1.id) print(result2.id)
from celery_task.task_1 import add1_task from celery_task.task_2 import add2_task # res1 = add1_task.delay(8,8) # res2 = add2_task.delay(8,8) # print(res1) # print(res2) # 启动worker,celery_task是包的名字 # celery worker -A celery_task -l info -P eventlet #注celery_task为包名 from datetime import datetime v1 = datetime(2019,7,14,12,59,58) print(v1) v2 = datetime.utcfromtimestamp(v1.timestamp()) print(v2) from datetime import timedelta # 使用timedelta模块,拿到10秒后的时间对象,这里参数可以传秒、毫秒、微秒、分、小时、周、天 time_delay = timedelta(seconds=10) # 得到任务运行时间: task_time = v2 + time_delay result1 = add1_task.apply_async(args=[6,6],eta=task_time) result2 = add2_task.apply_async(args=[6,6],eta=task_time) print(result1.id) print(result2.id)
celery计划任务
计划任务需要在celery.py中添加代码,然后需要beat一下,才能将计划开启
from celery import Celery broker = "redis://127.0.0.1:6379/1" backend = "redis://127.0.0.1:6379/1" app = Celery("c2",broker=broker,backend=backend,include=[ "celery_task.task_1", "celery_task.task_2" ]) # celery提供一些配置,具体可查看官方文档 # app.conf.timezone = 'Asia/Shanghai' from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'submit_every_4_seconds':{ "task":"celery_task.task_1.add1_task", "schedule":timedelta(seconds=2), "args":[2,8] }, 'submit_every_8_seconds': { "task": "celery_task.task_1.add2_task", "schedule": crontab(minute=55,hour=8,month_of_year=8,day_of_month=2,), "args": [2, 8] } } # 上面写完后,需要起一个进程,启动计划任务 # celery beat -A celery_task -l info # 启动worker: # celery worker -A celery_task -l info -P eventlet
Django中使用celery
django-celery:由于djang-celery模块对版本的要求过于严格,而且容易出现很多bug,所以不建议使用
在Python脚本中调用Django环境
import os # 加载Django环境,bbs是所在的Django项目名称 os.environ.setdefault("DJANGO_SETTINGS_MODULE", 'bbs.settings') # 引入Django模块 import django # 初始化Django环境 django.setup() # 从app当中导入models from app01 import models # 调用操作,拿到数据库中的所有Book数据对象 books = models.Books.objects.all()
免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/104943.html

