目录

Python学习笔记-Celery

Python邮件发送与Celery异步任务实现

前言

很久没有写技术博客了,最近因为工作需要,我从PHP转投Python阵营。虽然一开始有很多不习惯的地方(比如Python的语法结构、数据类型处理等),但不得不说,Python确实是一门值得学习的语言,正如那句名言所说:“人生苦短,我用Python”。

本文将分享基于SMTP协议的邮件发送实现,以及如何使用Celery实现异步任务处理。

邮件发送逻辑流程

一、格式化邮件数据

  1. 编码格式选择:建议使用UTF-8编码(Unicode的编码方式之一),以确保邮件能被全球用户正确显示,避免使用GBK等区域性编码。

  2. 邮件类型选择

    • 纯文本邮件
    • HTML格式邮件
    • 带附件的邮件
  3. 邮件地址格式化:安全处理收件人和发件人地址,避免特殊字符导致的安全问题。多个收件人时,格式应为"姓名 “。

  4. 附件处理:将附件转换为Base64编码,便于邮件传输。

重要提示:邮件服务提供商通常会屏蔽HTML中直接引用的图片(如<img src="http://www.mudoom.com/xxx.jpg" />)。若需在邮件中显示图片,可将图片作为附件添加,并在HTML中使用<img src="cid:xxx.jpg" />引用,这样图片才能正常显示。

二、建立SMTP连接

  1. 客户端请求连接:建议使用SSL/TLS加密连接(即使某些邮件服务商允许非加密连接,也应使用加密以保障安全)。

  2. 服务器响应:邮件服务器接受连接请求,建立通信通道。

三、登录邮件账户

  1. 发送账号密码:使用邮箱账户和密码进行身份验证。

注意:如果是自建邮件服务器且允许匿名发送,则可跳过此步骤。

四、发送邮件数据

  1. 使用TCP传输:将邮件数据通过TCP协议发送到邮件服务器。

扩展:Google最近推广的QUIC协议(结合了TCP的稳定性和UDP的高速性)可能是未来网络协议的发展方向。

五、关闭SMTP连接

  1. 主动关闭连接:邮件发送完成后,客户端应主动发送关闭请求(而非等待服务器超时关闭),以更合理地利用邮件服务器资源。

Python邮件发送实现

基于smtplib库实现,针对网络异常情况(如socket.gaierror)进行了异常处理,确保程序不会因异常而终止。

import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header

class EmailSender:
    def __init__(self, smtp_server, port, username, password):
        self.smtp_server = smtp_server
        self.port = port
        self.username = username
        self.password = password
        
    def send(self, sender, recipients, subject, content, is_html=False, attachments=None):
        try:
            # 创建邮件对象
            msg = MIMEMultipart()
            msg['From'] = Header(sender, 'utf-8')
            msg['To'] = Header(','.join(recipients), 'utf-8')
            msg['Subject'] = Header(subject, 'utf-8')
            
            # 添加邮件内容
            if is_html:
                msg.attach(MIMEText(content, 'html', 'utf-8'))
            else:
                msg.attach(MIMEText(content, 'plain', 'utf-8'))
            
            # 添加附件
            if attachments:
                for attachment in attachments:
                    with open(attachment, 'rb') as f:
                        mime = MIMEBase('application', 'octet-stream')
                        mime.set_payload(f.read())
                        mime.add_header('Content-Disposition', 'attachment', filename=attachment)
                        mime.add_header('Content-ID', f'<{attachment}>')
                        msg.attach(mime)
            
            # 连接SMTP服务器并发送邮件
            with smtplib.SMTP_SSL(self.smtp_server, self.port) as server:
                server.login(self.username, self.password)
                server.sendmail(sender, recipients, msg.as_string())
            return True
        except Exception as e:
            print(f"邮件发送失败: {str(e)}")
            return False

代码说明:GitHub上提供了完整的实现:https://github.com/ZoaChou/Python-learn/blob/master/application/controller/mailer.py

Celery异步任务框架

什么是Celery

Celery可以被视为一个"钩子”,它将函数调用请求存储在指定的broker(消息中间件)中,然后由Celery工作进程从broker中取出请求并执行,从而实现任务的异步和分布式处理。

Celery使用指南

1. 初始化

from celery import Celery

# 初始化Celery,使用Redis作为broker
celery = Celery(__name__, broker='redis://localhost:6379/0')

建议:避免使用普通数据库(如MySQL)作为broker,推荐使用Redis,以实现更快的调用和避免死锁。

2. 配置

# 将Celery配置集中管理
celery.conf.update({
    'CELERY_TASK_RESULT_BACKEND': 'redis://localhost:6379/0',
    'CELERY_ACCEPT_CONTENT': ['json'],
    'CELERY_TASK_SERIALIZER': 'json',
    'CELERY_RESULT_SERIALIZER': 'json'
})

3. 定义任务

# tasks.py

@celery.task(name='send_email',  # 任务名称
             bind=True,           # 绑定任务实例
             default_retry_delay=10,  # 重试间隔
             max_retries=5)         # 最大重试次数
def send_email(self, sender, recipients, subject, content, is_html=False):
    """发送邮件任务"""
    try:
        # 实际发送邮件的逻辑
        # ...
        return True
    except Exception as e:
        # 失败时重试
        try:
            self.retry(exc=e, countdown=10)
        except MaxRetriesExceededError:
            # 重试次数超过后的处理
            return False

注意:当文件中定义多个任务时,必须为每个任务指定唯一的name,否则启动时会报错。

4. 启动Celery工作进程

# 在tasks目录下执行
celery worker -A tasks -Q default -l INFO
  • -A tasks:指定任务模块
  • -Q default:指定队列
  • -l INFO:设置日志级别

5. 调用任务

# 异步调用
send_email.delay(sender, recipients, subject, content, is_html)

# 使用特定队列的异步调用
send_email.apply_async(
    args=[sender, recipients, subject, content, is_html], 
    queue='high_priority'
)

6. 高级用法

当任务有优先级或执行时间长短之分时,可以创建多个队列:

  • 为不同优先级的任务分配不同的队列
  • 为每个队列启动一个或多个Celery工作进程
  • 避免长时间任务阻塞其他任务

实践案例

基于Celery实现异步邮件发送服务(结合Flask框架,此处省略Flask相关实现,仅展示Celery调用方式)。

GitHub代码:https://github.com/ZoaChou/Python-learn/blob/master/application/controller/tasks.py


版权声明:本文为原创技术文章,如需转载,请邮件联系作者(Zoa Chou)获得授权。请尊重原创作者的劳动成果,不要简单复制粘贴而不注明出处。

End:近期目标:完善并公开本站所有源码(基于ThinkPHP的PHP项目),方便开发者交流学习。