前言

关于Celery的资料能从百度里面找到的不多,最多的就是被复制来复制去的“任务调度利器”,但是更多的详细资料则极少,当我尝试去寻找关于Celery的更层次定义以及内部工作机制时根本找不到相关的中文资料,于是发现在更深层次的技术搜索领域Googlestackoverflow才是真理。

说说我眼中的Celery:Celery可以作为一个“钩子”附着在函数上,当函数被调用时这个“钩子”会将这个调用的请求存储在指定的broker中,然后Celery进程从broker中取出调用请求去执行,从而能够实现异步和分布式。

Celery

一、起手式

(1)初始化

celery = Celery(__name__, broker='redis://localhost:6379/0')

注意:Celery的broker支持多种类型,为实现更快的调用以及避免死锁,不要使用普通的数据库(譬如mysql)作为broker,个人建议用redis。

(2)配置

# 这样做的好处是可以把Celery的配置写成一个类直接调用
# 不需要一个个定义,而只需定义在一个配置文件里
celery.conf.update(object)

二、定义

# tasks.py

@celery.task(name='mail',  # 自定义的任务名称
             bind=True,
             default_retry_delay=10,  # 重试间隔时间
             max_retries=5  # 最大重试次数
             )
def mail(self, sender, content):
    """ do something """
    
    """ 失败时重新尝试 """
    try:
        self.retry()
    except MaxRetriesExceededError:
        """ 重试次数超过后依然失败的处理 """
        pass

注意:当一个文件中定义多个task时需为每个task加上唯一的“name”,否则启动时会报错。

三、启动Celery进程

# 在当前tasks目录下终端输入shell指令
# 以worker的方式启动Celery
# -A 后是需要Celery调用的任务,如以“二、定义”中的整个tasks为例,
# 这种方式会为tasks文件中的所有task都加上钩子
# -Q 后是使用哪个队列接收task
# -l 后是以INFO级别作为日志显示级别
celery worker -A tasks -Q default -l INFO

四、调用

# 普通的异步调用方式
mail.delay(sender, content)
# 使用特定队列的异步调用方式
mail.apply_async(args=[sender, content], queue=current_queue)

进阶:当一个task具有优先级或者执行时间长短之分时,可以为Celery创建多个不同队列,不同等级的task分别调用对应的处理队列,并为每个队列开启一个或多个的Celery处理进程,这样可以避免出现较长时间的task堵塞。

五、实践

基于Celery实现异步发送邮件服务(其中包含了结合Flask实现异步的初始化,但考虑到不是本次分享的范围因此暂时隐藏具体实现的源码,而只实现类的调用方式)。

github:https://github.com/ZoaChou/Python-learn/blob/master/application/controller/tasks.py

Please,喜欢使用爬虫直接抓文章的站长以及简单复制粘贴又不注明出处的copyer请尊重文章作者的著作权,如需转载请邮件征得作者(Zoa Chou)同意,谢谢。