目录

GearmanWorker的多进程实现

Gearman多进程实现方案

前言

项目中选择了Gearman作为任务委派的中间件,但原生的Python扩展包仅支持单进程。在尝试将Gearman改造成自适应多进程的过程中,我走了一些误区,特此记录这些"坑"以及目前的最优解决方案。

实现思路

实现方式

  1. 主进程接收任务,子进程处理任务:主进程作为任务接收者,接收任务后分派给子进程处理,子进程直接返回结果给Gearman
  2. 多进程接收并处理任务:批量fork多个子进程注册任务,子进程间互不影响,各自完成接收、处理任务

第一种实现方式的优缺点

优点

  • 主进程只进行轮训任务接收,提高单条Gearman请求通道利用率(worker多数时间消耗在等待接收请求上)
  • 子进程直接返回任务结果,主进程无需关心结果,专注接收任务

缺点

  • 需将Gearman socket传递给子进程,实现复杂(socket实例无法通过pickle传递,Unix的sendmsg虽可传递socket但构造GearmanWorker很麻烦)
  • 父进程仍持有原socket句柄,导致任务请求方无法收到子进程返回的结果

第二种实现方式的优缺点

优点

  • 等价于fork多个原进程,逻辑和作业方式无改变
  • 可在fork子进程前完成公有资源加载,避免重复加载

缺点

  • 子进程异常退出后主进程无法感知,重启的子进程未正确注册到Gearman
  • 主进程异常退出后子进程无法感知,导致僵尸进程

解决方案

  1. 利用PID文件记录子进程PID:确保主进程退出后仍能通过PID文件终止子进程
  2. 利用Redis的发布订阅模式:实现GearmanWorker的正常退出

代码实现

# -*- coding: utf-8 -*-

import os
import signal
import threading
import multiprocessing

import redis
from gearman.worker import GearmanWorker, POLL_TIMEOUT_IN_SECONDS

WORKER_PROCESS_PID = '/tmp/multi_gearman_worker.pid'


class MultiGearmanWorker(GearmanWorker):
    """多进程Gearman worker"""
    def __init__(self, host_list=None, redis_host=None, redis_port=None, pid=WORKER_PROCESS_PID):
        super(MultiGearmanWorker, self).__init__(host_list=host_list)
        self.redis_host = redis_host
        self.redis_port = redis_port
        self.pid = pid

    def work(self, poll_timeout=POLL_TIMEOUT_IN_SECONDS, process=multiprocessing.cpu_count()):
        """开始作业,进程阻塞
        :param poll_timeout: gearman连接超时时间,值越小worker召回越快但请求越频繁
        :param process: 工作进程数,默认为CPU核心数
        :return:
        """
        print('Clear last process.')
        self.gearman_worker_exit()
        print('Ready to start %d process for work.' % process)
        gm_poll = multiprocessing.Pool(process)
        for x in range(0, process):
            gm_poll.apply_async(gearman_work, (self, poll_timeout, self.pid))
        gm_poll.close()
        gm_poll.join()
        # 正常退出则删除子进程PID文件
        if os.path.isfile(self.pid):
            os.remove(self.pid)

        print('Multi gearman worker exit.')

    def gearman_worker_exit(self):
        """结束子进程"""
        if not os.path.isfile(self.pid):
            return True

        with open(self.pid, 'r+') as f:
            for pid in f.readlines():
                pid = int(pid)
                try:
                    os.kill(pid, signal.SIGKILL)
                    print('Kill process %d.' % pid)
                except OSError:
                    print('Process %d not exists' % pid)
                    continue
        os.remove(self.pid)
        print('Remove process pid file.')
        return True

# 子进程使用的Gearman工作开关标识
GEARMAN_CONTINUE_WORK = True


def gearman_work(gm_worker, poll_timeout=POLL_TIMEOUT_IN_SECONDS, pid=WORKER_PROCESS_PID):
    """以多进程方式开启Gearman worker"""
    try:
        # 记录子进程PID以便主进程重启后清除未退出的子进程
        with open(pid, 'a+') as f:
            f.write("%d%s" % (os.getpid(), os.linesep))

        print('Child process start for work.')
        continue_working = True
        worker_connections = []
        d = threading.Thread(name='monitor', target=gearman_monitor,
                             args=(gm_worker.redis_host, gm_worker.redis_port))
        d.start()

        def continue_while_connections_alive(any_activity):
            return gm_worker.after_poll(any_activity)

        # 轮询连接,等待任务
        while continue_working and GEARMAN_CONTINUE_WORK:
            worker_connections = gm_worker.establish_worker_connections()
            continue_working = gm_worker.poll_connections_until_stopped(
                worker_connections, continue_while_connections_alive, timeout=poll_timeout)

        # 关闭所有连接
        for current_connection in worker_connections:
            current_connection.close()

        print('Gearman worker closed')
        return None
    except Exception as e:
        print(e)


def gearman_monitor(redis_host, redis_port):
    """监听Redis的发布订阅信号"""
    global GEARMAN_CONTINUE_WORK
    print('Start gearman monitor.')
    while GEARMAN_CONTINUE_WORK:
        try:
            sub = redis.StrictRedis(redis_host, redis_port).pubsub()
            sub.subscribe('hot')
            for i in sub.listen():
                if isinstance(i.get('data'), str):
                    if i.get('data') == 'exit':
                        print('Gearman monitor receive restart signal.')
                        GEARMAN_CONTINUE_WORK = False
                        sub.unsubscribe('hot')
                        break
        except Exception as e:
            print(e)
            try:
                sub.unsubscribe('hot')
            except Exception:
                pass

    print('Gearman monitor closed')


if __name__ == '__main__':
    def test_multi_gearman_worker(worker, job):
        print(worker)
        print(job)

    # 初始化多进程Gearman worker
    gearman_worker = MultiGearmanWorker(
        host_list=('127.0.0.1:4730',), 
        redis_host='127.0.0.1', 
        redis_port=6379
    )
    # 注册任务
    gearman_worker.register_task('test_multi_gearman_worker', test_multi_gearman_worker)
    # 启动工作
    gearman_worker.work()

附录

关键点说明

  1. 通过PID文件实现子进程的生命周期管理
  2. 利用Redis Pub/Sub实现优雅的进程退出机制
  3. 通过多进程池实现高效的任务处理
  4. 代码设计确保了主进程异常退出时子进程能被正确清理