Python学习笔记-Celery
Python邮件发送与Celery异步任务实现
前言
很久没有写技术博客了,最近因为工作需要,我从PHP转投Python阵营。虽然一开始有很多不习惯的地方(比如Python的语法结构、数据类型处理等),但不得不说,Python确实是一门值得学习的语言,正如那句名言所说:“人生苦短,我用Python”。
本文将分享基于SMTP协议的邮件发送实现,以及如何使用Celery实现异步任务处理。
邮件发送逻辑流程
一、格式化邮件数据
-
编码格式选择:建议使用UTF-8编码(Unicode的编码方式之一),以确保邮件能被全球用户正确显示,避免使用GBK等区域性编码。
-
邮件类型选择:
- 纯文本邮件
- HTML格式邮件
- 带附件的邮件
-
邮件地址格式化:安全处理收件人和发件人地址,避免特殊字符导致的安全问题。多个收件人时,格式应为"姓名 “。
-
附件处理:将附件转换为Base64编码,便于邮件传输。
重要提示:邮件服务提供商通常会屏蔽HTML中直接引用的图片(如
<img src="http://www.mudoom.com/xxx.jpg" />)。若需在邮件中显示图片,可将图片作为附件添加,并在HTML中使用<img src="cid:xxx.jpg" />引用,这样图片才能正常显示。
二、建立SMTP连接
-
客户端请求连接:建议使用SSL/TLS加密连接(即使某些邮件服务商允许非加密连接,也应使用加密以保障安全)。
-
服务器响应:邮件服务器接受连接请求,建立通信通道。
三、登录邮件账户
- 发送账号密码:使用邮箱账户和密码进行身份验证。
注意:如果是自建邮件服务器且允许匿名发送,则可跳过此步骤。
四、发送邮件数据
- 使用TCP传输:将邮件数据通过TCP协议发送到邮件服务器。
扩展:Google最近推广的QUIC协议(结合了TCP的稳定性和UDP的高速性)可能是未来网络协议的发展方向。
五、关闭SMTP连接
- 主动关闭连接:邮件发送完成后,客户端应主动发送关闭请求(而非等待服务器超时关闭),以更合理地利用邮件服务器资源。
Python邮件发送实现
基于smtplib库实现,针对网络异常情况(如socket.gaierror)进行了异常处理,确保程序不会因异常而终止。
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from email.header import Header
class EmailSender:
def __init__(self, smtp_server, port, username, password):
self.smtp_server = smtp_server
self.port = port
self.username = username
self.password = password
def send(self, sender, recipients, subject, content, is_html=False, attachments=None):
try:
# 创建邮件对象
msg = MIMEMultipart()
msg['From'] = Header(sender, 'utf-8')
msg['To'] = Header(','.join(recipients), 'utf-8')
msg['Subject'] = Header(subject, 'utf-8')
# 添加邮件内容
if is_html:
msg.attach(MIMEText(content, 'html', 'utf-8'))
else:
msg.attach(MIMEText(content, 'plain', 'utf-8'))
# 添加附件
if attachments:
for attachment in attachments:
with open(attachment, 'rb') as f:
mime = MIMEBase('application', 'octet-stream')
mime.set_payload(f.read())
mime.add_header('Content-Disposition', 'attachment', filename=attachment)
mime.add_header('Content-ID', f'<{attachment}>')
msg.attach(mime)
# 连接SMTP服务器并发送邮件
with smtplib.SMTP_SSL(self.smtp_server, self.port) as server:
server.login(self.username, self.password)
server.sendmail(sender, recipients, msg.as_string())
return True
except Exception as e:
print(f"邮件发送失败: {str(e)}")
return False
代码说明:GitHub上提供了完整的实现:https://github.com/ZoaChou/Python-learn/blob/master/application/controller/mailer.py
Celery异步任务框架
什么是Celery
Celery可以被视为一个"钩子”,它将函数调用请求存储在指定的broker(消息中间件)中,然后由Celery工作进程从broker中取出请求并执行,从而实现任务的异步和分布式处理。
Celery使用指南
1. 初始化
from celery import Celery
# 初始化Celery,使用Redis作为broker
celery = Celery(__name__, broker='redis://localhost:6379/0')
建议:避免使用普通数据库(如MySQL)作为broker,推荐使用Redis,以实现更快的调用和避免死锁。
2. 配置
# 将Celery配置集中管理
celery.conf.update({
'CELERY_TASK_RESULT_BACKEND': 'redis://localhost:6379/0',
'CELERY_ACCEPT_CONTENT': ['json'],
'CELERY_TASK_SERIALIZER': 'json',
'CELERY_RESULT_SERIALIZER': 'json'
})
3. 定义任务
# tasks.py
@celery.task(name='send_email', # 任务名称
bind=True, # 绑定任务实例
default_retry_delay=10, # 重试间隔
max_retries=5) # 最大重试次数
def send_email(self, sender, recipients, subject, content, is_html=False):
"""发送邮件任务"""
try:
# 实际发送邮件的逻辑
# ...
return True
except Exception as e:
# 失败时重试
try:
self.retry(exc=e, countdown=10)
except MaxRetriesExceededError:
# 重试次数超过后的处理
return False
注意:当文件中定义多个任务时,必须为每个任务指定唯一的
name,否则启动时会报错。
4. 启动Celery工作进程
# 在tasks目录下执行
celery worker -A tasks -Q default -l INFO
-A tasks:指定任务模块-Q default:指定队列-l INFO:设置日志级别
5. 调用任务
# 异步调用
send_email.delay(sender, recipients, subject, content, is_html)
# 使用特定队列的异步调用
send_email.apply_async(
args=[sender, recipients, subject, content, is_html],
queue='high_priority'
)
6. 高级用法
当任务有优先级或执行时间长短之分时,可以创建多个队列:
- 为不同优先级的任务分配不同的队列
- 为每个队列启动一个或多个Celery工作进程
- 避免长时间任务阻塞其他任务
实践案例
基于Celery实现异步邮件发送服务(结合Flask框架,此处省略Flask相关实现,仅展示Celery调用方式)。
GitHub代码:https://github.com/ZoaChou/Python-learn/blob/master/application/controller/tasks.py
版权声明:本文为原创技术文章,如需转载,请邮件联系作者(Zoa Chou)获得授权。请尊重原创作者的劳动成果,不要简单复制粘贴而不注明出处。
End:近期目标:完善并公开本站所有源码(基于ThinkPHP的PHP项目),方便开发者交流学习。