Process daemon异步消息处理

来自技术开发小组内部wiki
跳转至: 导航搜索

开发说明

考虑到异步消息处理在业务中使用的场景越来越多,包括各种埋点触发,业务解耦降低复杂度,扩大并行处理能力,需要有一套稳定的异步消息处理机制来保证,
目前业务中有两套在用的消息机制,一套是基于redis的pub/sub机制,一套是基于殿林写的fmb.daemon,现行两套机制在稳定性上面存在未知的问题,消息处理不能完全保证
现通过swoole框架,利用其扩展构建一套稳定可靠的异步消息处理机制

代码库

git@code.fumubang.net:root/fmb-daemon.git   具体的文件process_daemon.php

逻辑说明

该消息处理机制只负责消息的接受与调度分配,针对消息的处理需要由fmb.dianping代码库中的代码逻辑来完善并实际处理

消息格式

目前消息格式没有特别的约定,简单的以约定的特定的字符串方式进行传输,例如  xxxx-xxxx-xxxx-xxxx,
具体说明第一段代表在fmb.dianping的application/controllers/cmdrun/中的控制器名(control),第二段代表方法名(method),
剩下的段代表对应的方法接受的参数列表(params),数量不定
实际运行的例如:test-testexec-2-3

特别约定

对应的处理控制器必须在fmb.dianping的application/controllers/cmdrun/下面
发送消息的redis队列名称为:msg_queue_list
redis使用的数据库编号为:1
消息处理的结果(对应的方法执行),正确执行后,需要提供该输出(后面的\n不可以省略):echo "ok"."\n";
消息的处理不适合大批量循环的操作

运行说明

useage:php process_daemon.php --run=test|product|abtest --num=xxx --help
参数说明:--run 设定 运行的环境  --num 设定进程数目  --help 显示使用说明

如何启动

进入cd /home/www/fmb.daemon/
运行之前需要查看在当前目录下是否有daemonpid.log文件,如果有需要删除再启动
运行nohup php process_daemon.php --run=product --num=10 >> ./daemon.log &

如何停止

程序在启动后会在当前目录下会写入一个主进程PID内容的文件 daemonpid.log,运行  cat daemonpid.log | xargs kill  就可以,
然后执行ps aux | grep process_daemon | grep -v grep 查看 进程是否存在

运行日志

针对消息的接受运行处理情况,会在日志文件 daemon.log 中进行相应的记录,可以根据消息内容进行grep查找

手动测试

进入redis的命令行:redis-cli,然后执行select 1,最后向对应的队列中推送数据:lpush msg_queue_list test-testexec

后续完善

消息执行不成功的重试策略;消息处理过程平滑重启;