前言
因为项目原因选择了gearman作为任务委派的中间件,但原生的python拓展包只支持单进程,期间为了将gearman改造成自适应多进程的方式在实现方式上走进了些误区,故在此记录这些误区的坑以及目前的最优解决方案。
实现思路
实现方式
- 主进程接收任务,子进程处理任务。以一个主进程作为任务委派的接收进程,接收到任务后将任务分派给子进程进行处理,处理完成后由该子进程直接返回任务结果给gearman。
- 多进程接收并处理任务。批量fork多个子进程注册任务,子进程间互不影响,各自完成接收、处理任务的过程。
先说说第一种实现方式的优缺点
优点:
- 由于worker较多的时间是消耗在等待接收请求上,因此主进程只单一的进行轮训任务接收可以提高单条gearman请求通道的利用率。
- 由子进程直接返回任务结果可分离主进程与子进程的作业,主进程无需关心任务的结果而只专注于接收任务。
缺点:
- 主进程接收到任务请求后将请求转发给子进程处理任务,由于子进程处理任务完成后需要将任务结果返回给gearman,因此子进程需要将该任务请求对应的gearman socket传递给子进程,而该过程实现起来过于复杂。(通常具有socket的实例无法通过pickle传递给子进程,虽然Unix的sendmsg可以用来传递socket,但将传递的socket构造成一个GearmanWorker又是另外一件痛苦的事情)
- 子进程通过传递的socket构造出GearmanWorker后,由于原socket的句柄仍被父进程持有,所以在等待结果的任务请求方无法收到子进程所返回的处理结果。
再来说说第二种实现方式的优缺点
优点:
- 等价于fork多个原进程,逻辑、作业方式均无改变。
- 可在fork子进程之前完成公有资源的加载而无需每个GearmanWorker匀加载一次。
缺点:
- 子进程异常退出后主进程无法正确感知,虽然主进程会维持相同的子进程数,但是异常退出所重启的子进程没有正确注册到gearman接收任务。
- 主进程异常退出后子进程无法感知,将导致出现僵尸进程。
解决方案
- 利用PID文件记录每个子进程的pid,确保主进程退出后仍能通过PID文件退出子进程。
- 利用Redis的发布订阅模式实现GearmanWorker的正常退出。
Show me the code
# -*- coding: utf-8 -*- import os import signal import threading import multiprocessing import redis from gearman.worker import GearmanWorker, POLL_TIMEOUT_IN_SECONDS WORKER_PROCESS_PID = '/tmp/multi_gearman_worker.pid' class MultiGearmanWorker(GearmanWorker): """ 多进程gearman worker""" def __init__(self, host_list=None, redis_host=None, redis_port=None, pid=WORKER_PROCESS_PID): super(MultiGearmanWorker, self).__init__(host_list=host_list) self.redis_host = redis_host self.redis_port = redis_port self.pid = pid def work(self, poll_timeout=POLL_TIMEOUT_IN_SECONDS, process=multiprocessing.cpu_count()): """ 开始作业,进程阻塞 :param poll_timeout: int gearman的连接时间,时间越短子进程worker召回越快但请求越频繁 :param process: int 工作进程数,默认为CPU个数 :return: """ print('Clear last process.') self.gearman_worker_exit() print('Ready to start %d process for work.' % process) gm_poll = multiprocessing.Pool(process) for x in range(0, process): gm_poll.apply_async(gearman_work, (self, poll_timeout, self.pid)) gm_poll.close() gm_poll.join() # 正常退出则删除子进程PID文件 if os.path.isfile(self.pid): os.remove(self.pid) print('Multi gearman worker exit.') def gearman_worker_exit(self): """ 结束子进程 """ if not os.path.isfile(self.pid): return True with open(self.pid, 'r+') as f: for pid in f.readlines(): pid = int(pid) try: os.kill(pid, signal.SIGKILL) print('Kill process %d.' % pid) except OSError: print('Process %d not exists' % pid) continue os.remove(self.pid) print('Remove process pid file.') return True # 子进程使用的gearman工作开关标识 GEARMAN_CONTINUE_WORK = True def gearman_work(gm_worker, poll_timeout=POLL_TIMEOUT_IN_SECONDS, pid=WORKER_PROCESS_PID): """ 以多进程的方式开启gearman的worker """ try: # 记录子进程pid以便主进程被supervisor重启后清除上次未退出的子进程 with open(pid, 'a+') as f: f.write("%d%s" % (os.getpid(), os.linesep)) print('Chile process start for work.') continue_working = True worker_connections = [] d = threading.Thread(name='monitor', target=gearman_monitor, args=(gm_worker.redis_host, gm_worker.redis_port)) d.start() def continue_while_connections_alive(any_activity): return gm_worker.after_poll(any_activity) # Shuffle our connections after the poll timeout while continue_working and GEARMAN_CONTINUE_WORK: worker_connections = gm_worker.establish_worker_connections() continue_working = gm_worker.poll_connections_until_stopped( worker_connections, continue_while_connections_alive, timeout=poll_timeout) # If we were kicked out of the worker loop, we should shutdown all our connections for current_connection in worker_connections: current_connection.close() print('Gearman worker closed') return None except Exception as e: print(e) def gearman_monitor(redis_host, redis_port): """ 监听动态更新指令 """ global GEARMAN_CONTINUE_WORK print('Start gearman monitor.') while GEARMAN_CONTINUE_WORK: # 防止运行异常导致线程挂死后无法监听redis响应,异常处理放在此处,发生异常后重新监听 try: sub = redis.StrictRedis(redis_host, redis_port).pubsub() sub.subscribe('hot') for i in sub.listen(): if isinstance(i.get('data'), str): if i.get('data') == 'exit': # worker退出的过程中将无法响应其他数据修改请求 print('Gearman monitor receive restart signal.') GEARMAN_CONTINUE_WORK = False sub.unsubscribe('hot') break # 因线程间变量共享,故此处可用于多进程gearman worker运行中数据的更改 except Exception as e: print(e) try: sub.unsubscribe('hot') except Exception: pass print('Gearman monitor closed') if __name__ == '__main__': def test_multi_gearman_worker(worker, job): print(worker) print(job) gearman_worker = MultiGearmanWorker(('127.0.0.1:4730', ), '127.0.0.1', 6379) gearman_worker.register_task('test_multi_gearman_worker', test_multi_gearman_worker) gearman_worker.work()
附录