赞
踩
https://www.rabbitmq.com/community-plugins.html
wget https://dl.bintray.com/rabbitmq/community-plugins/3.6.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
下载后解压,并将其拷贝至(使用Linux Debian/RPM部署)rabbitmq服务器目录:/usr/local/rabbitmq/plugins
中( windows安装目录\rabbitmq_server-version\plugins
).
/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.12/plugins
unzip rabbitmq_delayed_message_exchange-20171215-3.6.x.zip
使用命令rabbitmq-plugins enable rabbitmq_delayed_message_exchang
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchang
输出如下:
- The following plugins have been enabled:
- rabbitmq_delayed_message_exchange
通过rabbitmq-plugins list
查看已安装列表,如下:
- Configured: E = explicitly enabled; e = implicitly enabled
- | Status: * = running on rabbit@n1
- |/
- [e*] amqp_client 3.6.12
- [e*] cowboy 1.0.4
- [e*] cowlib 1.0.2
- [ ] rabbitmq_amqp1_0 3.6.12
- [ ] rabbitmq_auth_backend_ldap 3.6.12
- [ ] rabbitmq_auth_mechanism_ssl 3.6.12
- [ ] rabbitmq_consistent_hash_exchange 3.6.12
- [E*] rabbitmq_delayed_message_exchange 20171215-3.6.x
- [ ] rabbitmq_event_exchange 3.6.12
- [ ] rabbitmq_federation 3.6.12
- [ ] rabbitmq_federation_management 3.6.12
- [ ] rabbitmq_jms_topic_exchange 3.6.12
- [E*] rabbitmq_management 3.6.12
- [e*] rabbitmq_management_agent 3.6.12
- [ ] rabbitmq_management_visualiser 3.6.12
- [ ] rabbitmq_mqtt 3.6.12
- [ ] rabbitmq_recent_history_exchange 3.6.12
- [ ] rabbitmq_sharding 3.6.12
- [ ] rabbitmq_shovel 3.6.12
- [ ] rabbitmq_shovel_management 3.6.12
- [ ] rabbitmq_stomp 3.6.12
- [ ] rabbitmq_top 3.6.12
- [E*] rabbitmq_tracing 3.6.12
- [ ] rabbitmq_trust_store 3.6.12
- [e*] rabbitmq_web_dispatch 3.6.12
- [ ] rabbitmq_web_mqtt 3.6.12
- [ ] rabbitmq_web_mqtt_examples 3.6.12
- [ ] rabbitmq_web_stomp 3.6.12
- [ ] rabbitmq_web_stomp_examples 3.6.12
- [ ] sockjs 0.3.4
测试代码:
Producer:
- import java.util.Date;
- import java.util.HashMap;
- import java.util.Map;
-
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
-
- public class Producer {
- public static void main(String[] args) throws Exception {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setVirtualHost("/");
- connectionFactory.setHost("172.31.1.135");
- connectionFactory.setUsername("xx");
- connectionFactory.setPassword("xx");
- connectionFactory.setPort(5672);
- Connection connection = connectionFactory.newConnection();
- Channel channel = connection.createChannel();
- String exchangeName = "delay-exchange";
- String routingkey = "delay.delay";
-
- String queueName = "delay_queueName";
- //x-delayed-message 声明
- Map<String,Object> map =new HashMap<>();
- map.put("x-delayed-type", "direct");
- channel.exchangeDeclare(exchangeName, "x-delayed-message", true, false, map);
- //注意:arguments需要声明在队列上,声明在交换机上是不会起作用的。
- channel.queueDeclare(queueName, true, false, false, map);
- channel.queueBind(queueName,exchangeName,routingkey);
- for (int i = 0; i < 3; i++) {
- // deliveryMode=2 持久化,expiration 消息有效时间
- String msg = "delayed payload".getBytes("UTF-8") +" "+new Date().getTime();
- byte[] messageBodyBytes = msg.getBytes();
- Map<String, Object> headers = new HashMap<String, Object>();
- headers.put("x-delay", 50000);
- AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder().headers(headers);
- channel.basicPublish(exchangeName, routingkey, props.build(), messageBodyBytes);
- }
- }
- }
Consumer:
-
-
- import java.io.IOException;
- import java.util.Date;
-
- import com.rabbitmq.client.AMQP.BasicProperties;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
-
-
- public class Consumer {
- public static void main(String[] args) throws Exception {
- ConnectionFactory connectionFactory = new ConnectionFactory();
- connectionFactory.setVirtualHost("/");
- connectionFactory.setHost("172.31.1.135");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("xxx");
- connectionFactory.setPassword("xxx");
- Connection connection = connectionFactory.newConnection();
- Channel channel = connection.createChannel();
- String queueName = "delay_queueName";
- channel.queueDeclare(queueName,true,false,false,null);
-
- channel.basicConsume(queueName, false, "myConsumer Tag", new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException {
- String routingKey = envelope.getRoutingKey();
- String convernType = properties.getContentType();
- long deliveryTag = envelope.getDeliveryTag();
- System.out.println("routingKey:"+routingKey+",convernType:"+convernType+",deliveryTag:"+deliveryTag+",Msg body:"+new String(body)+ " "+new Date().getTime());
- channel.basicAck(deliveryTag, false);
- }
-
- });
- }
- }
执行结果
- routingKey:delay.delay,convernType:null,deliveryTag:13,Msg body:[B@387c703b 1575280086578 1575280136587
- routingKey:delay.delay,convernType:null,deliveryTag:14,Msg body:[B@50cbc42f 1575280086582 1575280136587
- routingKey:delay.delay,convernType:null,deliveryTag:15,Msg body:[B@75412c2f 1575280086582 1575280136587
- routingKey:delay.delay,convernType:null,deliveryTag:16,Msg body:[B@387c703b 1575280078869 1575280178875
- routingKey:delay.delay,convernType:null,deliveryTag:17,Msg body:[B@50cbc42f 1575280078870 1575280178875
- routingKey:delay.delay,convernType:null,deliveryTag:18,Msg body:[B@75412c2f 1575280078871 1575280178875
Note:使用rabbitmq-delayed-message-exchange插件时发送到队列的消息数量在web管理界面可能不可见,不影响正常功能使用
Note :使用过程中发现,当一台启用了rabbitmq-delayed-message-exchange插件的RAM节点在重启的时候会无法启动,查看日志发现了一个Timeout异常,开发者解释说这是节点在启动过程会同步集群相关数据造成启动超时,并建议不要使用Ram节点
插件开发者:
RAM nodes start blank and need a disk node to sync tables from. In this case it times out.
More importantly, you don’t need RAM nodes. If you’re not sure if you do, you certainly don’t, as don’t 99% of users.
- Error description:
- {could_not_start,rabbit,
- {{case_clause,
- {timeout,
- [rabbit_delayed_messagerabbit@n2,
- rabbit_delayed_messagerabbit@n2_index]}},
- [{rabbit_boot_steps,'-run_step/2-lc$^1/1-1-',1,
- [{file,"src/rabbit_boot_steps.erl"},{line,49}]},
- {rabbit_boot_steps,run_step,2,
- [{file,"src/rabbit_boot_steps.erl"},{line,49}]},
- {rabbit_boot_steps,'-run_boot_steps/1-lc$^0/1-0-',1,
- [{file,"src/rabbit_boot_steps.erl"},{line,26}]},
- {rabbit_boot_steps,run_boot_steps,1,
- [{file,"src/rabbit_boot_steps.erl"},{line,26}]},
- {rabbit,start,2,[{file,"src/rabbit.erl"},{line,733}]},
- {application_master,start_it_old,4,
- [{file,"application_master.erl"},{line,273}]}]}}
-
- Log files (may contain more information):
- /var/log/rabbitmq/rabbit@n2.log
- /var/log/rabbitmq/rabbit@n2-sasl.log
-
- Error: {could_not_start,rabbit,
- {{case_clause,
- {timeout,
- [rabbit_delayed_messagerabbit@n2,
- rabbit_delayed_messagerabbit@n2_index]}},
- [{rabbit_boot_steps,'-run_step/2-lc$^1/1-1-',1,
- [{file,"src/rabbit_boot_steps.erl"},{line,49}]},
- {rabbit_boot_steps,run_step,2,
- [{file,"src/rabbit_boot_steps.erl"},{line,49}]},
- {rabbit_boot_steps,'-run_boot_steps/1-lc$^0/1-0-',1,
- [{file,"src/rabbit_boot_steps.erl"},{line,26}]},
- {rabbit_boot_steps,run_boot_steps,1,
- [{file,"src/rabbit_boot_steps.erl"},{line,26}]},
- {rabbit,start,2,[{file,"src/rabbit.erl"},{line,733}]},
- {application_master,start_it_old,4,
- [{file,"application_master.erl"},{line,273}]}]}}
RAM
rabbitmqctl join_cluster rabbit@n1 --ram
- root@n2 plugins]# rabbitmqctl stop_app
- Stopping rabbit application on node rabbit@n2
- [root@n2 plugins]# rabbitmqctl reset
- Resetting node rabbit@n2
- [root@n2 plugins]# rabbitmqctl join_cluster rabbit@n1
- Clustering node rabbit@n2 with rabbit@n1
- [root@n2 plugins]# rabbitmqctl start_app
- Starting node rabbit@n2
- [root@n2 plugins]#
## 提示,如果 安装插件,导致集群崩溃
一般是要将 RAM -DISK 的那个节点, reset 后,重新加入集群即可,但是这样,会造成消息丢失
- 节点2
- 1.rabbitmqctl stop_app
- 2.rabbitmqctl reset
- 3.节点1执行 rabbitmqctl forget_cluster_node rabbit@rbmq92
- 4.rabbitmqctl join_cluster rabbit@rbmq91
- 5.rabbitmqctl change_cluster_node_type disk
- 6.rabbitmqctl start_app
参考资料
1.https://www.cnblogs.com/-mrl/p/11114116.html
2.https://blog.csdn.net/youjin/article/details/82586888
3.https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/
4.https://blog.csdn.net/skiof007/article/details/80914318
5.https://www.rabbitmq.com/community-plugins.html
- 1. 进入docker容器内 docker exec -t rabbit bash
- 2. rabbitmq-plugins list 命令查看已安装插件
- 3. 在插件网址找到延迟插件的下载地址 http://www.rabbitmq.com/community-plugins.html
- 4. exit 退出容器到宿主机中,下载插件: wget https://dl.bintray.com/rabbitmq/community-plugins/3.7.x/rabbitmq_delayed_message_exchange/rabbitmq_delayed_message_exchange-20171201-3.7.x.zip
- 5. 解压 unzip XXX.zip -d .
- 6. 拷贝至docker容器内: docker cp xxx.xz rabbit:/plugins
- 7. 再次进入docker容器内: 进入docker容器内 docker exec -t rabbit bash
- 8. 执行命令让插件生效: 启动延时插件:rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。