当前位置:   article > 正文

laravel实现AMQP(rabbitmq)生产者以及消费者_laravel amqp

laravel amqp

基于php-amqplib/php-amqplib组件适配laravel框架的amqp封装库

支持便捷可配置的队列工作模式 官网详情

在此基础上可支持延迟消息、死信队列等机制。

环境要求:

PHP版本: ^7.3|^8.0

需要开启的扩展: socket

其他:

  1. 如果需要实现延迟任务需要安装对应版本的rabbitmq延迟插件,以rabbitmq3.9.0版本为例:
  1. wget https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/3.9.0/rabbitmq_delayed_message_exchange-3.9.0.ez
  2. cp rabbitmq_delayed_message_exchange-3.9.0.ez /opt/rabbitmq/plugins/
  3. rabbitmq-plugins enable rabbitmq_delayed_message_exchange

用法:

第一步 安装组件:
composer require sai97/laravel-amqp
第二步 发布服务以及配置:
php artisan vendor:publish --provider="Sai97\LaravelAmqp\AmqpQueueProviders"

执行完后会分别在app/config目录下生成amqp.php(amqp连接配置等)、app/QueueJob/DefaultQueueJob.php(默认队列任务)

amqp.php

  1. <?php
  2. use App\QueueJob\DefaultQueueJob;
  3. return [
  4. "connection" => [
  5. "default" => [
  6. "host" => env("AMQP_HOST", "127.0.0.1"),
  7. "port" => env("AMQP_PORT", 5672),
  8. "user" => env("AMQP_USER", "root"),
  9. "password" => env("AMQP_PASSWORD", "root")
  10. ]
  11. ],
  12. "event" => [
  13. "default" => DefaultQueueJob::class,
  14. ]
  15. ];

connection为amqp连接配置,可根据自身业务去调整,完全对应php-amqplib/php-amqplib相关配置项, event是队列实例标识,最好和connection用相同的key以便管理。

目前可支持相关接口项:
  1. //获取连接名称
  2. public function getConnectName(): string;
  3. //获取交换机名称
  4. public function getExchangeName(): string;
  5. //获取交换机类型
  6. public function getExchangeType(): string;
  7. //获取队列名称
  8. public function getQueueName(): string;
  9. //获取路由KEY
  10. public function getRoutingKey(): string;
  11. //获取ContentType
  12. public function getContentType(): string;
  13. //是否开启死信模式
  14. public function isDeadLetter(): bool;
  15. //获取死信交换机名称
  16. public function getDeadLetterExchangeName(): string;
  17. //获取死信路由KEY
  18. public function getDeadLetterRoutingKey(): string;
  19. //获取死信队列名称
  20. public function getDeadLetterQueueName(): string;
  21. //是否开启延迟任务
  22. public function isDelay(): bool;
  23. //获取延迟任务过期时长
  24. public function getDelayTTL(): int;
  25. //获取队列附加参数
  26. public function getQueueArgs(): array;
  27. //获取回调函数
  28. public function getCallback(): callable;
  29. //是否自动提交ACK
  30. public function isAutoAck(): bool;

当然你也可以自定义队列实例,只要继承Sai97\LaravelAmqp\Queue基类即可,具体功能配置参数参考Sai97\LaravelAmqp\QueueInterface。

代码示例:

生产者:
  1. $message = "This is message...";
  2. $amqpQueueServices = new AmqpQueueServices(QueueFactory::getInstance(DefaultQueueJob::class));
  3. $amqpQueueServices->producer($message);
消费者:

利用laravel自带的Command去定义一个RabbitMQWorker自定义命令行,仅需要定义一次,后续只需要更改amqp.php配置文件添加不同的队列实例绑定关系即可,以下是RabbitMQWorker演示代码:

  1. <?php
  2. namespace App\Console\Commands;
  3. use Illuminate\Console\Command;
  4. use Sai97\LaravelAmqp\AmqpQueueServices;
  5. use Sai97\LaravelAmqp\QueueFactory;
  6. class RabbitMQWorker extends Command
  7. {
  8. /**
  9. * The name and signature of the console command.
  10. *
  11. * @var string
  12. */
  13. protected $signature = 'rabbitmq:worker {event}';
  14. /**
  15. * The console command description.
  16. *
  17. * @var string
  18. */
  19. protected $description = 'rabbitmq worker 消费进程';
  20. /**
  21. * Create a new command instance.
  22. *
  23. * @return void
  24. */
  25. public function __construct()
  26. {
  27. parent::__construct();
  28. }
  29. /**
  30. * Execute the console command.
  31. *
  32. * @return mixed
  33. */
  34. public function handle()
  35. {
  36. try {
  37. $event = $this->argument("event");
  38. $eventConfig = config("amqp.event");
  39. if (!isset($eventConfig[$event]) || empty($entity = $eventConfig[$event])) {
  40. return $this->error("未知的事件: {$event}");
  41. }
  42. $this->info("rabbitmq worker of event[{$event}] process start ...");
  43. $amqpQueueServices = new AmqpQueueServices(QueueFactory::getInstance($entity));
  44. $amqpQueueServices->consumer();
  45. } catch (\Throwable $throwable) {
  46. $event = $event ?? "";
  47. $this->error($throwable->getFile() . " [{$throwable->getLine()}]");
  48. return $this->error("rabbitmq worker of event[{$event}] process error:{$throwable->getMessage()}");
  49. }
  50. $this->info("rabbitmq worker of event[{$event}] process stop ...");
  51. }
  52. }

完成RabbitMQWorker消费者命令后,我们只需执行php artisan rabbitmq:worker default 完成监听,其中default是可变的,请根据的amqp.php配置中的队列实例绑定标识去输入。

因为队列的消费者都需要是守护进程,所以我们可以依托supervisord进程管理器去定义RabbitMQWorker消费者命令,这样可以保证进程可后台允许以及重启启动等,以下是supervisord.conf配置文件示例:

  1. [program:rabbitmq-worker-default]
  2. #process_name=%(program_name)s_%(process_num)d
  3. process_name=worker_%(process_num)d
  4. numprocs=3
  5. command=/usr/local/bin/php /app/www/laravel8/artisan rabbitmq:worker default
  6. autostart=true
  7. autorestart=true
  8. startretries=3
  9. priority=3
  10. stdout_logfile=/var/log/rabbitmq-worker-default.log
  11. redirect_stderr=true

搭配supervisord来进行管理消费者进程有许多便捷的方面:

  1. 如果需要新增一个队列实例,只需要按照上述格式复制一个program,可以在不影响其他进程的情况下进程更新supervisord配置:
supervisorctl update

    2. 通过配置numprocs参数来设定需要开启多少个相同配置项的消费者worker,这在任务分发、并行处理等场景十分适用,大大提高消费者执行效率。

这里不详细叙述supervisord相关操作,具体可查看supervisord官方文档

参考链接:https://github.com/Z-Sai/laravel-amqp

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/424459
推荐阅读
相关标签
  

闽ICP备14008679号