当前位置:   article > 正文

rabbitmq-delayed-message-exchange_rabbitmq_delayed_message_exchange 3.6.x

rabbitmq_delayed_message_exchange 3.6.x

 

 

1.下载 

https://www.rabbitmq.com/community-plugins.html

wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

下载后解压,并将其拷贝至(使用Linux Debian/RPM部署)rabbitmq服务器目录:/usr/local/rabbitmq/plugins中( windows安装目录\rabbitmq_server-version\plugins ).

/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.12/plugins

2.解压

unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip

3.启用插件

使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchang

输出如下:

  1. The following plugins have been enabled:
  2. rabbitmq_delayed_message_exchange

通过rabbitmq-plugins list查看已安装列表,如下:

  1. Configured: E = explicitly enabled; e = implicitly enabled
  2. | Status: * = running on rabbit@n1
  3. |/
  4. [e*] amqp_client 3.6.12
  5. [e*] cowboy 1.0.4
  6. [e*] cowlib 1.0.2
  7. [ ] rabbitmq_amqp1_0 3.6.12
  8. [ ] rabbitmq_auth_backend_ldap 3.6.12
  9. [ ] rabbitmq_auth_mechanism_ssl 3.6.12
  10. [ ] rabbitmq_consistent_hash_exchange 3.6.12
  11. [E*] rabbitmq_delayed_message_exchange 20171215-3.6.x
  12. [ ] rabbitmq_event_exchange 3.6.12
  13. [ ] rabbitmq_federation 3.6.12
  14. [ ] rabbitmq_federation_management 3.6.12
  15. [ ] rabbitmq_jms_topic_exchange 3.6.12
  16. [E*] rabbitmq_management 3.6.12
  17. [e*] rabbitmq_management_agent 3.6.12
  18. [ ] rabbitmq_management_visualiser 3.6.12
  19. [ ] rabbitmq_mqtt 3.6.12
  20. [ ] rabbitmq_recent_history_exchange 3.6.12
  21. [ ] rabbitmq_sharding 3.6.12
  22. [ ] rabbitmq_shovel 3.6.12
  23. [ ] rabbitmq_shovel_management 3.6.12
  24. [ ] rabbitmq_stomp 3.6.12
  25. [ ] rabbitmq_top 3.6.12
  26. [E*] rabbitmq_tracing 3.6.12
  27. [ ] rabbitmq_trust_store 3.6.12
  28. [e*] rabbitmq_web_dispatch 3.6.12
  29. [ ] rabbitmq_web_mqtt 3.6.12
  30. [ ] rabbitmq_web_mqtt_examples 3.6.12
  31. [ ] rabbitmq_web_stomp 3.6.12
  32. [ ] rabbitmq_web_stomp_examples 3.6.12
  33. [ ] sockjs 0.3.4

 

测试代码:

Producer:

  1. import java.util.Date;
  2. import java.util.HashMap;
  3. import java.util.Map;
  4. import com.rabbitmq.client.AMQP;
  5. import com.rabbitmq.client.Channel;
  6. import com.rabbitmq.client.Connection;
  7. import com.rabbitmq.client.ConnectionFactory;
  8. public class Producer {
  9. public static void main(String[] args) throws Exception {
  10. ConnectionFactory connectionFactory = new ConnectionFactory();
  11. connectionFactory.setVirtualHost("/");
  12. connectionFactory.setHost("172.31.1.135");
  13. connectionFactory.setUsername("xx");
  14. connectionFactory.setPassword("xx");
  15. connectionFactory.setPort(5672);
  16. Connection connection = connectionFactory.newConnection();
  17. Channel channel = connection.createChannel();
  18. String exchangeName = "delay-exchange";
  19. String routingkey = "delay.delay";
  20. String queueName = "delay_queueName";
  21. //x-delayed-message 声明
  22. Map<String,Object> map =new HashMap<>();
  23. map.put("x-delayed-type", "direct");
  24. channel.exchangeDeclare(exchangeName, "x-delayed-message", true, false, map);
  25. //注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。
  26. channel.queueDeclare(queueName, true, false, false, map);
  27. channel.queueBind(queueName,exchangeName,routingkey);
  28. for (int i = 0; i < 3; i++) {
  29. // deliveryMode=2 持久化,expiration 消息有效时间
  30. String msg = "delayed payload".getBytes("UTF-8") +" "+new Date().getTime();
  31. byte[] messageBodyBytes = msg.getBytes();
  32. Map<String, Object> headers = new HashMap<String, Object>();
  33. headers.put("x-delay", 50000);
  34. AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
  35. channel.basicPublish(exchangeName, routingkey, props.build(), messageBodyBytes);
  36. }
  37. }
  38. }

 

Consumer:

  1. import java.io.IOException;
  2. import java.util.Date;
  3. import com.rabbitmq.client.AMQP.BasicProperties;
  4. import com.rabbitmq.client.Channel;
  5. import com.rabbitmq.client.Connection;
  6. import com.rabbitmq.client.ConnectionFactory;
  7. import com.rabbitmq.client.DefaultConsumer;
  8. import com.rabbitmq.client.Envelope;
  9. public class Consumer {
  10. public static void main(String[] args) throws Exception {
  11. ConnectionFactory connectionFactory = new ConnectionFactory();
  12. connectionFactory.setVirtualHost("/");
  13. connectionFactory.setHost("172.31.1.135");
  14. connectionFactory.setPort(5672);
  15. connectionFactory.setUsername("xxx");
  16. connectionFactory.setPassword("xxx");
  17. Connection connection = connectionFactory.newConnection();
  18. Channel channel = connection.createChannel();
  19. String queueName = "delay_queueName";
  20. channel.queueDeclare(queueName,true,false,false,null);
  21. channel.basicConsume(queueName, false, "myConsumer Tag", new DefaultConsumer(channel){
  22. @Override
  23. public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
  24. String routingKey = envelope.getRoutingKey();
  25. String convernType = properties.getContentType();
  26. long deliveryTag = envelope.getDeliveryTag();
  27. System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body)+ " "+new Date().getTime());
  28. channel.basicAck(deliveryTag, false);
  29. }
  30. });
  31. }
  32. }

执行结果

  1. routingKey:delay.delay,convernType:null,deliveryTag:13,Msg body:[B@387c703b 1575280086578 1575280136587
  2. routingKey:delay.delay,convernType:null,deliveryTag:14,Msg body:[B@50cbc42f 1575280086582 1575280136587
  3. routingKey:delay.delay,convernType:null,deliveryTag:15,Msg body:[B@75412c2f 1575280086582 1575280136587
  4. routingKey:delay.delay,convernType:null,deliveryTag:16,Msg body:[B@387c703b 1575280078869 1575280178875
  5. routingKey:delay.delay,convernType:null,deliveryTag:17,Msg body:[B@50cbc42f 1575280078870 1575280178875
  6. routingKey:delay.delay,convernType:null,deliveryTag:18,Msg body:[B@75412c2f 1575280078871 1575280178875

 

Note:使用rabbitmq-delayed-message-exchange插件时发送到队列的消息数量在web管理界面可能不可见,不影响正常功能使用

Note :使用过程中发现,当一台启用了rabbitmq-delayed-message-exchange插件的RAM节点在重启的时候会无法启动,查看日志发现了一个Timeout异常,开发者解释说这是节点在启动过程会同步集群相关数据造成启动超时,并建议不要使用Ram节点

插件开发者: 
RAM nodes start blank and need a disk node to sync tables from. In this case it times out.

More importantly, you don’t need RAM nodes. If you’re not sure if you do, you certainly don’t, as don’t 99% of users.

报错信息如下

  1. Error description:
  2. {could_not_start,rabbit,
  3. {{case_clause,
  4. {timeout,
  5. [rabbit_delayed_messagerabbit@n2,
  6. rabbit_delayed_messagerabbit@n2_index]}},
  7. [{rabbit_boot_steps,'-run_step/2-lc$^1/1-1-',1,
  8. [{file,"src/rabbit_boot_steps.erl"},{line,49}]},
  9. {rabbit_boot_steps,run_step,2,
  10. [{file,"src/rabbit_boot_steps.erl"},{line,49}]},
  11. {rabbit_boot_steps,'-run_boot_steps/1-lc$^0/1-0-',1,
  12. [{file,"src/rabbit_boot_steps.erl"},{line,26}]},
  13. {rabbit_boot_steps,run_boot_steps,1,
  14. [{file,"src/rabbit_boot_steps.erl"},{line,26}]},
  15. {rabbit,start,2,[{file,"src/rabbit.erl"},{line,733}]},
  16. {application_master,start_it_old,4,
  17. [{file,"application_master.erl"},{line,273}]}]}}
  18. Log files (may contain more information):
  19. /var/log/rabbitmq/rabbit@n2.log
  20. /var/log/rabbitmq/rabbit@n2-sasl.log
  21. Error: {could_not_start,rabbit,
  22. {{case_clause,
  23. {timeout,
  24. [rabbit_delayed_messagerabbit@n2,
  25. rabbit_delayed_messagerabbit@n2_index]}},
  26. [{rabbit_boot_steps,'-run_step/2-lc$^1/1-1-',1,
  27. [{file,"src/rabbit_boot_steps.erl"},{line,49}]},
  28. {rabbit_boot_steps,run_step,2,
  29. [{file,"src/rabbit_boot_steps.erl"},{line,49}]},
  30. {rabbit_boot_steps,'-run_boot_steps/1-lc$^0/1-0-',1,
  31. [{file,"src/rabbit_boot_steps.erl"},{line,26}]},
  32. {rabbit_boot_steps,run_boot_steps,1,
  33. [{file,"src/rabbit_boot_steps.erl"},{line,26}]},
  34. {rabbit,start,2,[{file,"src/rabbit.erl"},{line,733}]},
  35. {application_master,start_it_old,4,
  36. [{file,"application_master.erl"},{line,273}]}]}}

RAM

rabbitmqctl join_cluster rabbit@n1 --ram

  1. root@n2 plugins]# rabbitmqctl stop_app
  2. Stopping rabbit application on node rabbit@n2
  3. [root@n2 plugins]# rabbitmqctl reset
  4. Resetting node rabbit@n2
  5. [root@n2 plugins]# rabbitmqctl join_cluster rabbit@n1
  6. Clustering node rabbit@n2 with rabbit@n1
  7. [root@n2 plugins]# rabbitmqctl start_app
  8. Starting node rabbit@n2
  9. [root@n2 plugins]#

 

## 提示,如果 安装插件,导致集群崩溃

一般是要将 RAM -DISK 的那个节点, reset 后,重新加入集群即可,但是这样,会造成消息丢失

  1. 节点2
  2. 1.rabbitmqctl stop_app
  3. 2.rabbitmqctl reset
  4. 3.节点1执行 rabbitmqctl forget_cluster_node rabbit@rbmq92
  5. 4.rabbitmqctl join_cluster rabbit@rbmq91
  6. 5.rabbitmqctl change_cluster_node_type disk
  7. 6.rabbitmqctl start_app

 

参考资料

1.https://www.cnblogs.com/-mrl/p/11114116.html

2.https://blog.csdn.net/youjin/article/details/82586888

3.https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/

4.https://blog.csdn.net/skiof007/article/details/80914318

5.https://www.rabbitmq.com/community-plugins.html

  1. 1. 进入docker容器内 docker exec -t rabbit bash
  2. 2. rabbitmq-plugins list 命令查看已安装插件
  3. 3. 在插件网址找到延迟插件的下载地址 http://www.rabbitmq.com/community-plugins.html
  4. 4. exit 退出容器到宿主机中,下载插件: wget https://dl.bintray.com/rabbitmq/community-plugins/3.7.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171201-3.7.x.zip
  5. 5. 解压 unzip XXX.zip -d .
  6. 6. 拷贝至docker容器内: docker cp xxx.xz rabbit:/plugins
  7. 7. 再次进入docker容器内: 进入docker容器内 docker exec -t rabbit bash
  8. 8. 执行命令让插件生效: 启动延时插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/126954?site
推荐阅读
相关标签
  

闽ICP备14008679号