Workerman.php 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. <?php
  2. namespace app\command\workerman;
  3. use app\command\WorkerCommand;
  4. use app\dict\schedule\ScheduleDict;
  5. use app\model\sys\SysSchedule;
  6. use app\service\core\addon\CoreAddonService;
  7. use app\service\core\schedule\CoreScheduleService;
  8. use think\console\Command;
  9. use think\console\Input;
  10. use think\console\input\Argument;
  11. use think\console\input\Option;
  12. use think\console\Output;
  13. use think\facade\Log;
  14. use Workerman\Crontab\Crontab;
  15. use Workerman\RedisQueue\Client;
  16. use Workerman\Timer;
  17. use Workerman\Worker;
  18. class Workerman extends Command
  19. {
  20. use WorkerCommand;
  21. public function configure()
  22. {
  23. // 指令配置
  24. $this->setName('workerman')
  25. ->addArgument('action', Argument::OPTIONAL, "start|stop|restart|reload|status|connections", 'start')
  26. ->addOption('mode', 'm', Option::VALUE_OPTIONAL, 'Run the workerman server in daemon mode.')
  27. ->setDescription('Workerman,高性能PHP应用容器');
  28. }
  29. /**
  30. * 执行任务
  31. * @return void
  32. */
  33. protected function execute(Input $input, Output $output)
  34. {
  35. $this->resetCli($input, $output);
  36. //计划任务
  37. Worker::$pidFile = runtime_path() . 'workerman_schedule.pid';
  38. $worker = new Worker();
  39. $worker->name = 'schedule_work';
  40. $worker->count = 1;
  41. // 设置时区,避免运行结果与预期不一致
  42. date_default_timezone_set('PRC');
  43. $worker->onWorkerStart = function() use ($output) {
  44. $output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Schedule Starting...");
  45. // // 每分钟的第1秒执行.用于计划任务是否仍在执行
  46. new Crontab('*/10 * * * * *', function() {
  47. $file = root_path('runtime') . '.schedule';
  48. file_put_contents($file, time());
  49. });
  50. $core_schedule_service = new CoreScheduleService();
  51. //查询所有的计划任务
  52. $task_list = $core_schedule_service->getList([ 'status' => ScheduleDict::ON ]);
  53. foreach ($task_list as $item) {
  54. //获取定时任务时间字符串
  55. new Crontab($this->getCrontab($item[ 'time' ]), function() use ($core_schedule_service, $item, $output) {
  56. if (!empty($item[ 'class' ])) {
  57. $core_schedule_service->execute($item, $output);
  58. }
  59. });
  60. }
  61. $output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Schedule Started.");
  62. };
  63. //消息队列
  64. Worker::$pidFile = runtime_path() . 'workerman_queue.pid';
  65. Worker::$logFile = runtime_path() . 'workerman.log';
  66. $worker = new Worker();
  67. $worker->name = 'queue_work';
  68. // $worker->count = 3;
  69. $worker->onWorkerStart = function() use ($output) {
  70. $output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Queue Starting...");
  71. // 定时,每10秒一次
  72. Timer::add(30, function() use ($output) {
  73. ( new SysSchedule() )->select();
  74. });
  75. $redis_option = [
  76. 'connect_timeout' => 10,
  77. 'max_attempts' => 3,
  78. 'retry_seconds' => 5,
  79. 'prefix' => md5(root_path()),//缓存key前缀
  80. ];
  81. if (!empty(env('redis.redis_password'))) {
  82. $redis_option[ 'auth' ] = env('redis.redis_password');
  83. }
  84. $redis_option[ 'db' ] = env('redis.select');
  85. $client = new Client('redis://' . env('redis.redis_hostname') . ':' . env('redis.port'), $redis_option);
  86. $queue_list = $this->getAllQueue();
  87. foreach ($queue_list as $queue_class_name) {
  88. $queue_class_name = str_replace('.php', '', $queue_class_name);
  89. // 订阅
  90. $client->subscribe($queue_class_name, function($data) use ($queue_class_name, $output) {
  91. $output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . " Processing:" . $queue_class_name);
  92. try {
  93. $class_name = '\\' . $queue_class_name;
  94. $class = new $class_name();
  95. $class->fire($data);
  96. } catch (\Throwable $e) {
  97. Log::write(date('Y-m-d H:i:s') . ',队列有错误:' . $queue_class_name . '_' . $e->getMessage() . '_' . $e->getFile() . '_' . $e->getLine());
  98. }
  99. $output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . " Processed:" . $queue_class_name);
  100. });
  101. }
  102. // 消费失败触发的回调(可选)
  103. $client->onConsumeFailure(function(\Throwable $exception, $package) use ($output) {
  104. $output->writeln('[queue]队列 ' . $package[ 'queue' ] . " 消费失败," . $exception->getMessage());
  105. });
  106. $output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Queue Started.");
  107. };
  108. Worker::runAll();
  109. }
  110. /**
  111. * 获取计划任务所需的时间字符串
  112. * 0 1 2 3 4 5
  113. * | | | | | |
  114. * | | | | | +------ day of week (0 - 6) (Sunday=0)
  115. * | | | | +------ month (1 - 12)
  116. * | | | +-------- day of month (1 - 31)
  117. * | | +---------- hour (0 - 23)
  118. * | +------------ min (0 - 59)
  119. * +-------------- sec (0-59)[可省略,如果没有0位,则最小时间粒度是分钟]
  120. * @param $data
  121. * @return string
  122. */
  123. protected function getCrontab($data) : string
  124. {
  125. $sec = $data[ 'sec' ] ?? '*';
  126. $min = $data[ 'min' ] ?? '*';
  127. $hour = $data[ 'hour' ] ?? '*';
  128. $day = $data[ 'day' ] ?? '*';
  129. $week = $data[ 'week' ] ?? '*';
  130. $type = $data[ 'type' ] ?? '';
  131. switch ($type) {
  132. case 'sec':// 每隔几秒
  133. $crontab = '*/' . $sec . ' * * * * *';
  134. break;
  135. case 'min':// 每隔几分
  136. $crontab = '0 */' . $min . ' * * * *';
  137. break;
  138. case 'hour':// 每隔几时第几分钟执行
  139. $crontab = '0 ' . $min . ' */' . $hour . ' * * *';
  140. break;
  141. case 'day':// 每隔几日第几小时第几分钟执行
  142. $crontab = '0 ' . $min . ' ' . $hour . ' */' . $day . ' * *';
  143. break;
  144. case 'week':// 每周一次,周几具体时间执行
  145. $crontab = '0 ' . $min . ' ' . $hour . ' * * ' . $week;
  146. break;
  147. case 'month':// 每月一次,某日具体时间执行
  148. $crontab = '0 ' . $min . ' ' . $hour . ' ' . $day . ' * *';
  149. break;
  150. }
  151. return $crontab ?? '0 */1 * * * *';
  152. }
  153. /**
  154. * 捕获所有队列任务
  155. * @return array
  156. */
  157. public function getAllQueue()
  158. {
  159. $class_list = [];
  160. $system_dir = root_path() . 'app' . DIRECTORY_SEPARATOR . 'job';
  161. $addon_dir = root_path() . 'addon' . DIRECTORY_SEPARATOR;
  162. if (is_dir($system_dir)) {
  163. search_dir($system_dir, $app_data, root_path());
  164. $class_list = array_merge($class_list, $app_data);
  165. }
  166. $addons = ( new CoreAddonService() )->getInstallAddonList();
  167. foreach ($addons as $v) {
  168. $addon_path = $addon_dir . $v[ 'key' ] . DIRECTORY_SEPARATOR . 'app' . DIRECTORY_SEPARATOR . 'job';
  169. if (is_dir($addon_path)) {
  170. search_dir($addon_path, $addon_data, root_path());
  171. $class_list = array_merge($class_list, $addon_data);
  172. }
  173. }
  174. foreach ($class_list as &$v) {
  175. $v = str_replace('.php', '', $v);
  176. $v = str_replace('/', '\\', $v);
  177. }
  178. return $class_list;
  179. }
  180. }