Queue.php 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. <?php
  2. /**
  3. * ----------------------------------------------------------------------------
  4. * 行到水穷处,坐看云起时
  5. * 开发软件,找贵阳云起信息科技,官网地址:https://www.56q7.com/
  6. * ----------------------------------------------------------------------------
  7. * Author: 老成
  8. * email:85556713@qq.com
  9. */
  10. declare(strict_types=1);
  11. namespace app\admin\command;
  12. use app\common\model\Queue as QueueModel;
  13. use think\console\Command;
  14. use think\console\Input;
  15. use think\console\Output;
  16. class Queue extends Command
  17. {
  18. protected $output;
  19. private static $EventTime=[];
  20. public static $timetxt=__DIR__.DIRECTORY_SEPARATOR.'queueEvent'.DIRECTORY_SEPARATOR.'time.txt';
  21. public static $locktxt=__DIR__.DIRECTORY_SEPARATOR.'queueEvent'.DIRECTORY_SEPARATOR.'lock.txt';
  22. private $breath=0;
  23. //定义更新时间
  24. const refreshTime=300;
  25. protected function configure()
  26. {
  27. $this->setName('Event')->setDescription('队列任务');
  28. }
  29. protected function execute(Input $input, Output $output)
  30. {
  31. $this->output=$output;
  32. $this->getQueueList();
  33. $this->output('启动队列服务');
  34. while(true){
  35. sleep(1);
  36. $r=intval(file_get_contents(self::$timetxt));
  37. if($r==0){
  38. $this->output('关闭轮询任务服务');
  39. (new QueueModel())->saveAll(self::$EventTime);
  40. break;
  41. }
  42. if(file_exists(self::$locktxt)){
  43. $r=intval(file_get_contents(self::$locktxt));
  44. $this->output('更新了轮询任务');
  45. unlink(self::$locktxt);
  46. }
  47. foreach (self::$EventTime as &$value){
  48. $function=$value['function'];
  49. $delay=$value['delay'];
  50. $filter=$value['filter'];
  51. if($value['limit']!==0 && $value['times']>=$value['limit']){
  52. continue;
  53. }
  54. if($value['status']!='normal'){
  55. continue;
  56. }
  57. $lasttime='';
  58. if($value['lasttime']){
  59. $lasttime=strtotime($value['lasttime']);
  60. }
  61. try {
  62. if($this->runEvent($function,$delay,$filter,$lasttime)){
  63. $value['times']++;
  64. $value['error']='';
  65. $value['lasttime']=date('Y-m-d H:i:s');
  66. }
  67. }catch (\Exception $e) {
  68. $this->output('执行出错:'.$e->getMessage());
  69. $value['times']++;
  70. $value['lasttime']=date('Y-m-d H:i:s');
  71. $value['error']=$e->getMessage();
  72. $value['status']='hidden';
  73. }
  74. if($value['filter']){
  75. $value['filter']=json_encode($value['filter']);
  76. }
  77. }
  78. //每5分钟更新一次数据库
  79. if($r%self::refreshTime===0){
  80. (new QueueModel())->saveAll(self::$EventTime);
  81. $this->getQueueList();
  82. }
  83. $this->breath++;
  84. file_put_contents(self::$timetxt,$this->breath);
  85. }
  86. }
  87. private function getQueueList()
  88. {
  89. $list=QueueModel::alias('queue')->whereRaw("queue.limit=0 or queue.limit>queue.times")->select()->toArray();
  90. self::$EventTime=$list;
  91. }
  92. private function runEvent($event,$time,$filter,$lasttime):bool
  93. {
  94. $now=time();
  95. if($filter){
  96. foreach ($filter as $key=>$fx){
  97. if(date($key,$now)!=$fx){
  98. return false;
  99. }
  100. }
  101. }
  102. if($lasttime && $lasttime+$time>$now){
  103. return false;
  104. }
  105. $class="\\app\\admin\\command\\queueEvent\\".$event;
  106. if(!class_exists($class)){
  107. throw new \Exception('处理类'.$event.'不存在');
  108. }else{
  109. $class::handle($this->output);
  110. return true;
  111. }
  112. }
  113. private function output($msg)
  114. {
  115. $this->output->info(date('Y-m-d H:i:s').'-'.$msg);
  116. }
  117. }