前言
关于Celery的资料能从百度里面找到的不多,最多的就是被复制来复制去的“任务调度利器”,但是更多的详细资料则极少,当我尝试去寻找关于Celery的更层次定义以及内部工作机制时根本找不到相关的中文资料,于是发现在更深层次的技术搜索领域Google和stackoverflow才是真理。
说说我眼中的Celery:Celery可以作为一个“钩子”附着在函数上,当函数被调用时这个“钩子”会将这个调用的请求存储在指定的broker中,然后Celery进程从broker中取出调用请求去执行,从而能够实现异步和分布式。
Celery
一、起手式
(1)初始化
celery = Celery(__name__, broker='redis://localhost:6379/0')
注意:Celery的broker支持多种类型,为实现更快的调用以及避免死锁,不要使用普通的数据库(譬如mysql)作为broker,个人建议用redis。
(2)配置
# 这样做的好处是可以把Celery的配置写成一个类直接调用 # 不需要一个个定义,而只需定义在一个配置文件里 celery.conf.update(object)
二、定义
# tasks.py @celery.task(name='mail', # 自定义的任务名称 bind=True, default_retry_delay=10, # 重试间隔时间 max_retries=5 # 最大重试次数 ) def mail(self, sender, content): """ do something """ """ 失败时重新尝试 """ try: self.retry() except MaxRetriesExceededError: """ 重试次数超过后依然失败的处理 """ pass
注意:当一个文件中定义多个task时需为每个task加上唯一的“name”,否则启动时会报错。
三、启动Celery进程
# 在当前tasks目录下终端输入shell指令 # 以worker的方式启动Celery # -A 后是需要Celery调用的任务,如以“二、定义”中的整个tasks为例, # 这种方式会为tasks文件中的所有task都加上钩子 # -Q 后是使用哪个队列接收task # -l 后是以INFO级别作为日志显示级别 celery worker -A tasks -Q default -l INFO
四、调用
# 普通的异步调用方式 mail.delay(sender, content) # 使用特定队列的异步调用方式 mail.apply_async(args=[sender, content], queue=current_queue)
进阶:当一个task具有优先级或者执行时间长短之分时,可以为Celery创建多个不同队列,不同等级的task分别调用对应的处理队列,并为每个队列开启一个或多个的Celery处理进程,这样可以避免出现较长时间的task堵塞。
五、实践
基于Celery实现异步发送邮件服务(其中包含了结合Flask实现异步的初始化,但考虑到不是本次分享的范围因此暂时隐藏具体实现的源码,而只实现类的调用方式)。
github:https://github.com/ZoaChou/Python-learn/blob/master/application/controller/tasks.py
Please,喜欢使用爬虫直接抓文章的站长以及简单复制粘贴又不注明出处的copyer请尊重文章作者的著作权,如需转载请邮件征得作者(Zoa Chou)同意,谢谢。