123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- <?php
- namespace app\command\workerman;
- use app\command\WorkerCommand;
- use app\dict\schedule\ScheduleDict;
- use app\model\sys\SysSchedule;
- use app\service\core\addon\CoreAddonService;
- use app\service\core\schedule\CoreScheduleService;
- use think\console\Command;
- use think\console\Input;
- use think\console\input\Argument;
- use think\console\input\Option;
- use think\console\Output;
- use think\facade\Log;
- use Workerman\Crontab\Crontab;
- use Workerman\RedisQueue\Client;
- use Workerman\Timer;
- use Workerman\Worker;
- class Workerman extends Command
- {
- use WorkerCommand;
- public function configure()
- {
-
- $this->setName('workerman')
- ->addArgument('action', Argument::OPTIONAL, "start|stop|restart|reload|status|connections", 'start')
- ->addOption('mode', 'm', Option::VALUE_OPTIONAL, 'Run the workerman server in daemon mode.')
- ->setDescription('Workerman,高性能PHP应用容器');
- }
-
- protected function execute(Input $input, Output $output)
- {
- $this->resetCli($input, $output);
-
- Worker::$pidFile = runtime_path() . 'workerman_schedule.pid';
- $worker = new Worker();
- $worker->name = 'schedule_work';
- $worker->count = 1;
-
- date_default_timezone_set('PRC');
- $worker->onWorkerStart = function() use ($output) {
- $output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Schedule Starting...");
- new Crontab('*/10 * * * * *', function() {
- $file = root_path('runtime') . '.schedule';
- file_put_contents($file, time());
- });
- $core_schedule_service = new CoreScheduleService();
-
- $task_list = $core_schedule_service->getList([ 'status' => ScheduleDict::ON ]);
- foreach ($task_list as $item) {
-
- new Crontab($this->getCrontab($item[ 'time' ]), function() use ($core_schedule_service, $item, $output) {
- if (!empty($item[ 'class' ])) {
- $core_schedule_service->execute($item, $output);
- }
- });
- }
- $output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Schedule Started.");
- };
-
- Worker::$pidFile = runtime_path() . 'workerman_queue.pid';
- Worker::$logFile = runtime_path() . 'workerman.log';
- $worker = new Worker();
- $worker->name = 'queue_work';
- $worker->onWorkerStart = function() use ($output) {
- $output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Queue Starting...");
-
- Timer::add(30, function() use ($output) {
- ( new SysSchedule() )->select();
- });
- $redis_option = [
- 'connect_timeout' => 10,
- 'max_attempts' => 3,
- 'retry_seconds' => 5,
- 'prefix' => md5(root_path()),
- ];
- if (!empty(env('redis.redis_password'))) {
- $redis_option[ 'auth' ] = env('redis.redis_password');
- }
- $redis_option[ 'db' ] = env('redis.select');
- $client = new Client('redis://' . env('redis.redis_hostname') . ':' . env('redis.port'), $redis_option);
- $queue_list = $this->getAllQueue();
- foreach ($queue_list as $queue_class_name) {
- $queue_class_name = str_replace('.php', '', $queue_class_name);
-
- $client->subscribe($queue_class_name, function($data) use ($queue_class_name, $output) {
- $output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . " Processing:" . $queue_class_name);
- try {
- $class_name = '\\' . $queue_class_name;
- $class = new $class_name();
- $class->fire($data);
- } catch (\Throwable $e) {
- Log::write(date('Y-m-d H:i:s') . ',队列有错误:' . $queue_class_name . '_' . $e->getMessage() . '_' . $e->getFile() . '_' . $e->getLine());
- }
- $output->writeln('[queue][' . date('Y-m-d H:i:s') . ']' . " Processed:" . $queue_class_name);
- });
- }
-
- $client->onConsumeFailure(function(\Throwable $exception, $package) use ($output) {
- $output->writeln('[queue]队列 ' . $package[ 'queue' ] . " 消费失败," . $exception->getMessage());
- });
- $output->writeln('[' . date('Y-m-d H:i:s') . ']' . " Queue Started.");
- };
- Worker::runAll();
- }
-
- protected function getCrontab($data) : string
- {
- $sec = $data[ 'sec' ] ?? '*';
- $min = $data[ 'min' ] ?? '*';
- $hour = $data[ 'hour' ] ?? '*';
- $day = $data[ 'day' ] ?? '*';
- $week = $data[ 'week' ] ?? '*';
- $type = $data[ 'type' ] ?? '';
- switch ($type) {
- case 'sec':
- $crontab = '*/' . $sec . ' * * * * *';
- break;
- case 'min':
- $crontab = '0 */' . $min . ' * * * *';
- break;
- case 'hour':
- $crontab = '0 ' . $min . ' */' . $hour . ' * * *';
- break;
- case 'day':
- $crontab = '0 ' . $min . ' ' . $hour . ' */' . $day . ' * *';
- break;
- case 'week':
- $crontab = '0 ' . $min . ' ' . $hour . ' * * ' . $week;
- break;
- case 'month':
- $crontab = '0 ' . $min . ' ' . $hour . ' ' . $day . ' * *';
- break;
- }
- return $crontab ?? '0 */1 * * * *';
- }
-
- public function getAllQueue()
- {
- $class_list = [];
- $system_dir = root_path() . 'app' . DIRECTORY_SEPARATOR . 'job';
- $addon_dir = root_path() . 'addon' . DIRECTORY_SEPARATOR;
- if (is_dir($system_dir)) {
- search_dir($system_dir, $app_data, root_path());
- $class_list = array_merge($class_list, $app_data);
- }
- $addons = ( new CoreAddonService() )->getInstallAddonList();
- foreach ($addons as $v) {
- $addon_path = $addon_dir . $v[ 'key' ] . DIRECTORY_SEPARATOR . 'app' . DIRECTORY_SEPARATOR . 'job';
- if (is_dir($addon_path)) {
- search_dir($addon_path, $addon_data, root_path());
- $class_list = array_merge($class_list, $addon_data);
- }
- }
- foreach ($class_list as &$v) {
- $v = str_replace('.php', '', $v);
- $v = str_replace('/', '\\', $v);
- }
- return $class_list;
- }
- }
|