一。rabbitmq介绍
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue高级消息队列 )的开源实现 主要包含以下组件
1.Server(broker): 接受客户端连接,实现AMQP消息队列和路由功能的进程。
2.Virtual Host:其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host
3.Exchange:接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。
4.Message Queue:消息队列,用于存储还未被消费者消费的消息。
5.Message: 由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。
6.Binding:Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。
7.Connection:连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。
8.Channel:信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。
9.Command:AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。
二。rabbitmq安装
1》安装rabbitmq
安装过程 参考 (http://www.rabbitmq.com/install-rpm.html)
rabbitmq-server 目前安装包被包含在 Fedora rpm仓库中 Fedora是epel库
yum -y install epel-release.noarch
查看是否存在rabbitmq 然后安装
- yum search rabbitmq-server
- yum -y install rabbitmq-server
查看安装包
tail -1000 /var/log/yum.log
安装了很多erlang语言包和一个rabbitmq-server包
- Nov 07 05:05:50 Installed: lksctp-tools-1.0.17-2.el7.x86_64
- Nov 07 05:05:50 Installed: erlang-crypto-R16B-03.18.el7.x86_64
- Nov 07 05:05:51 Installed: erlang-kernel-R16B-03.18.el7.x86_64
- Nov 07 05:05:52 Installed: erlang-stdlib-R16B-03.18.el7.x86_64
- Nov 07 05:05:53 Installed: erlang-erts-R16B-03.18.el7.x86_64
- Nov 07 05:05:53 Installed: erlang-syntax_tools-R16B-03.18.el7.x86_64
- Nov 07 05:05:53 Installed: erlang-compiler-R16B-03.18.el7.x86_64
- Nov 07 05:05:54 Installed: erlang-hipe-R16B-03.18.el7.x86_64
- Nov 07 05:05:55 Installed: erlang-mnesia-R16B-03.18.el7.x86_64
- Nov 07 05:05:55 Installed: erlang-runtime_tools-R16B-03.18.el7.x86_64
- Nov 07 05:05:56 Installed: erlang-snmp-R16B-03.18.el7.x86_64
- Nov 07 05:05:56 Installed: erlang-otp_mibs-R16B-03.18.el7.x86_64
- Nov 07 05:05:56 Installed: erlang-sd_notify-0.1-1.el7.x86_64
- Nov 07 05:05:56 Installed: erlang-xmerl-R16B-03.18.el7.x86_64
- Nov 07 05:05:56 Installed: erlang-asn1-R16B-03.18.el7.x86_64
- Nov 07 05:05:57 Installed: erlang-public_key-R16B-03.18.el7.x86_64
- Nov 07 05:05:57 Installed: erlang-ssl-R16B-03.18.el7.x86_64
- Nov 07 05:05:57 Installed: erlang-inets-R16B-03.18.el7.x86_64
- Nov 07 05:05:57 Installed: erlang-tools-R16B-03.18.el7.x86_64
- Nov 07 05:05:57 Installed: erlang-sasl-R16B-03.18.el7.x86_64
- Nov 07 05:05:58 Installed: erlang-os_mon-R16B-03.18.el7.x86_64
- Nov 07 05:06:00 Installed: rabbitmq-server-3.3.5-34.el7.noarch
查看rabbitmq-server被安装的所有文件的位置
- /etc/logrotate.d/rabbitmq-server
- /etc/rabbitmq
- /etc/rabbitmq/rabbitmq.config
- /usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
- /usr/lib/rabbitmq/bin
- /usr/lib/rabbitmq/bin/rabbitmq-defaults
- /usr/lib/rabbitmq/bin/rabbitmq-env
- /usr/lib/rabbitmq/bin/rabbitmq-plugins
- /usr/lib/rabbitmq/bin/rabbitmq-server
- /usr/lib/rabbitmq/bin/rabbitmqctl
查看rabbitmq-server所有的参考文档
- [root@node3 log]# rpm -qd rabbitmq-server-3.3.5-34.el7.noarch
- /usr/share/doc/rabbitmq-server-3.3.5/LICENSE
- /usr/share/doc/rabbitmq-server-3.3.5/LICENSE-APACHE2-ExplorerCanvas
- /usr/share/doc/rabbitmq-server-3.3.5/LICENSE-APL2-Stomp-Websocket
- /usr/share/doc/rabbitmq-server-3.3.5/LICENSE-Apache-Basho
- /usr/share/doc/rabbitmq-server-3.3.5/LICENSE-BSD-base64js
- /usr/share/doc/rabbitmq-server-3.3.5/LICENSE-BSD-glMatrix
- /usr/share/doc/rabbitmq-server-3.3.5/LICENSE-MIT-EJS10
- /usr/share/doc/rabbitmq-server-3.3.5/LICENSE-MIT-Flot
- /usr/share/doc/rabbitmq-server-3.3.5/LICENSE-MIT-Mochi
- /usr/share/doc/rabbitmq-server-3.3.5/LICENSE-MIT-Sammy060
- /usr/share/doc/rabbitmq-server-3.3.5/LICENSE-MIT-eldap
- /usr/share/doc/rabbitmq-server-3.3.5/LICENSE-MIT-jQuery164
- /usr/share/doc/rabbitmq-server-3.3.5/LICENSE-MPL-RabbitMQ
- /usr/share/doc/rabbitmq-server-3.3.5/rabbitmq.config.example
- /usr/share/man/man1/rabbitmq-plugins.1.gz
- /usr/share/man/man1/rabbitmq-server.1.gz
- /usr/share/man/man1/rabbitmqctl.1.gz
- /usr/share/man/man5/rabbitmq-env.conf.5.gz
其中比较重要的两个参考文件(这两个只是参考)
运行配置文件:rabbitmq.config.example
环境配置文件:/usr/share/man/man5/rabbitmq-env.conf.5.gz
默认该两文件需要配置在 /etc/rabbitmq目录下 默认只有一个
- [root@node3 log]# cd /etc/rabbitmq && ll
- total 20
- -rw-r--r-- 1 root root 18555 Aug 11 2014 rabbitmq.config
单机默认配置足够 具体配置参考官网 (http://www.rabbitmq.com/configure.html)
rabbitmq默认安装后 会添加账号rabbitmq 默认以该账号运行
- [root@node3 rabbitmq]# more /etc/passwd | grep rabbitmq
- rabbitmq:x:992:990:RabbitMQ messaging server:/var/lib/rabbitmq:/sbin/nologin
2》启动rabbitmq
启动后 查看默认的端口 5672
- [root@node3 rabbitmq]# netstat -aon | grep 5672
- tcp 0 0 0.0.0.0:25672 0.0.0.0:* LISTEN off (0.00/0/0)
- tcp6 0 0 :::5672 :::* LISTEN off (0.00/0/0)
查看运行状态
- [root@node3 rabbitmq]# service rabbitmq-server status
- Redirecting to /bin/systemctl status rabbitmq-server.service
- ● rabbitmq-server.service - RabbitMQ broker
- Loaded: loaded (/usr/lib/systemd/system/rabbitmq-server.service; disabled; vendor preset: disabled)
- Active: active (running) since Tue 2017-11-07 05:52:27 PST; 6min ago
- Main PID: 8507 (beam)
- CGroup: /system.slice/rabbitmq-server.service
- ├─8507 /usr/lib64/erlang/erts-5.10.4/bin/beam -W w -K true -A30 -P 1048576 -- -root /usr/lib64/erlang -progname...
- ├─8522 /usr/lib64/erlang/erts-5.10.4/bin/epmd -daemon
- ├─8579 inet_gethost 4
- └─8580 inet_gethost 4
-
- Nov 07 05:52:24 node3 systemd[1]: Starting RabbitMQ broker...
- Nov 07 05:52:24 node3 systemd[1]: rabbitmq-server.service: Got notification message from PID 8522, but reception on...ID 8507
- Nov 07 05:52:26 node3 rabbitmq-server[8507]: RabbitMQ 3.3.5. Copyright (C) 2007-2014 GoPivotal, Inc.
- Nov 07 05:52:26 node3 rabbitmq-server[8507]: ## ## Licensed under the MPL. See http://www.rabbitmq.com/
- Nov 07 05:52:26 node3 rabbitmq-server[8507]: ## ##
- Nov 07 05:52:26 node3 rabbitmq-server[8507]: ########## Logs: /var/log/rabbitmq/rabbit@node3.log
- Nov 07 05:52:26 node3 rabbitmq-server[8507]: ###### ## /var/log/rabbitmq/rabbit@node3-sasl.log
- Nov 07 05:52:26 node3 rabbitmq-server[8507]: ##########
- Nov 07 05:52:27 node3 systemd[1]: Started RabbitMQ broker.
- Nov 07 05:52:27 node3 rabbitmq-server[8507]: Starting broker... completed with 0 plugins.
- Hint: Some lines were ellipsized, use -l to show in full.
关闭和查看rabiitmq的状态可以使用命令
- [root@node3 system]# rabbitmqctl stop
- Stopping and halting node rabbit@node3 ...
- ...done.
默认的日志目录是(/var/log/rabbitmq)
- [root@node3 rabbitmq]# ll
- total 4
- -rw-r--r-- 1 rabbitmq rabbitmq 3190 Nov 7 06:12 rabbit@node3.log
- -rw-r--r-- 1 rabbitmq rabbitmq 0 Nov 7 05:52 rabbit@node3-sasl.log
rabbitmq默认提供了一个web管理工具(rabbitmq_management)参考官方http://www.rabbitmq.com/management.html 默认已经安装 是一个插件
查看所有插件
- [root@node3 rabbitmq]# rabbitmq-plugins list
- [ ] amqp_client 3.3.5
- [ ] cowboy 0.5.0-rmq3.3.5-git4b93c2d
- [ ] eldap 3.3.5-gite309de4
- [ ] mochiweb 2.7.0-rmq3.3.5-git680dba8
- [ ] rabbitmq_amqp1_0 3.3.5
- [ ] rabbitmq_auth_backend_ldap 3.3.5
- [ ] rabbitmq_auth_mechanism_ssl 3.3.5
- [ ] rabbitmq_consistent_hash_exchange 3.3.5
- [ ] rabbitmq_federation 3.3.5
- [ ] rabbitmq_federation_management 3.3.5
- [ ] rabbitmq_management 3.3.5
- [ ] rabbitmq_management_agent 3.3.5
- [ ] rabbitmq_management_visualiser 3.3.5
- [ ] rabbitmq_mqtt 3.3.5
- [ ] rabbitmq_shovel 3.3.5
- [ ] rabbitmq_shovel_management 3.3.5
- [ ] rabbitmq_stomp 3.3.5
- [ ] rabbitmq_test 3.3.5
- [ ] rabbitmq_tracing 3.3.5
- [ ] rabbitmq_web_dispatch 3.3.5
- [ ] rabbitmq_web_stomp 3.3.5
- [ ] rabbitmq_web_stomp_examples 3.3.5
- [ ] sockjs 0.3.4-rmq3.3.5-git3132eb9
- [ ] webmachine 1.10.3-rmq3.3.5-gite9359c7
启用该插件即可
rabbitmq-plugins enable rabbitmq_management
重启rabbitmq-server
- [root@node3 rabbitmq]# service rabbitmq-server restart
- Redirecting to /bin/systemctl restart rabbitmq-server.service
浏览器访问(开启了端口 15672 当前机器ip是58.150) (http://192.168.58.150:15672/)
输入用户名 guest和guest
该管理界面有几个视图 分别是
三。rabbitmq api调用
参考官方文档 http://www.rabbitmq.com/getstarted.html
rabbitmq 官方操作api提供了n多种语言 前面几种都会 java是本职 所以使用java
rabbitmq支持6种消息接受和转发机制
1》简单模式(http://www.rabbitmq.com/tutorials/tutorial-one-java.html)
单个发送者(生产者) 将消息发送到队列(每个队列都有一个唯一名字) 单个接受者(消费者)获取消息
eclipse添加maven的jar项目添加依赖
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>4.2.0</version>
- </dependency>
生产者代码
- package cn.et.p1;
-
- import java.io.IOException;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- /**
- * 消息发送者 - 生产者
- * @author jiaozi
- *
- */
- public class Pub {
- /**
- * 队列名称
- */
- private final static String QUEUE_NAME = "hello";
- public static void main(String[] args) throws Exception {
- //连接远程rabbit-server服务器
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.58.150");
- factory.setPort(5672);
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //定义创建一个队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- String message = "Hello World!";
- //发送消息
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); //注意发送和接受段相同字符集否则出现乱码
- System.out.println(" [x] Sent '" + message + "'");
- channel.close();
- connection.close();
- }
-
- }
消费者代码
- package cn.et.p1;
-
- import java.io.IOException;
-
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Consumer;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
- import com.rabbitmq.client.QueueingConsumer;
-
- /**
- * 消息接受者 - 消费者
- *
- * @author jiaozi
- *
- */
- public class Rec {
- /**
- * 获取消息队列名称
- */
- private final static String QUEUE_NAME = "hello";
- /**
- * 异步接收
- * @throws Exception
- */
- public static void asyncRec() throws Exception{
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.58.150");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //消费者也需要定义队列 有可能消费者先于生产者启动
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- //定义回调抓取消息
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- }
- };
- channel.basicConsume(QUEUE_NAME, true, consumer);
- }
-
- /**
- * 同步接收 消费者定时去抓取
- * 该种方式已过期
- * @throws Exception
- */
- public static void asyncFalseRec() throws Exception{
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.58.150");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(QUEUE_NAME, true, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println(" [x] Received '" + message + "'");
- }
-
- }
-
- public static void main(String[] args) throws Exception {
- asyncRec();
- }
- }
发布消息到队列可以通过web管理界面查看消息 使用命令行
- [root@node3 rabbitmq]# rabbitmqctl list_queues
- Listing queues ...
- hello 2 队列名称 队列的消息个数
- ...done.
web界面可以模拟添加队列 删除 模拟发送消息 也可以查看队列消息 具体自己研究
2》工作队列模式(http://www.rabbitmq.com/tutorials/tutorial-two-java.html)
工作队列一般用于任务分配 发布者发布任务到队列 多个消息接受者 接受消息 谁接受到某个消息 其他接受者就只能消费其他消息 队列中的
一个消息只能被一个接受者消费(类似12306抢票一样 比如某个车次 就相当于队列 该车次 出来一些座位票 一张票只能被一个人抢到 最终 所有的座位票
都被不同的人抢到 注意: 一个人可以抢多张票)
模拟 两个任务接受者 一个发布者发布6个消息 2个任务接受者抢这6个消息 一般来说都是rr轮询制 基本是平均分配给每个接受者的
发布者代码:
- package cn.et.p2;
-
- import java.io.IOException;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- /**
- * 消息发送者 - 生产者
- * @author jiaozi
- *
- */
- public class Pub {
- /**
- * 队列名称
- */
- private final static String QUEUE_NAME = "mywork";
- public static void main(String[] args) throws Exception {
- //连接远程rabbit-server服务器
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.58.150");
- factory.setPort(5672);
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //定义创建一个队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- String message = null;
- //同时发送5条消息
- for(int i=0;i<=5;i++){
- message="发送第"+i+"消息";
- //发送消息
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
- }
-
- System.out.println(" [x] Sent 5 message");
- channel.close();
- connection.close();
- }
-
- }
接受任务者
- package cn.et.p2;
-
- import java.io.IOException;
-
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Consumer;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
- import com.rabbitmq.client.QueueingConsumer;
-
- /**
- * 消息接受者 - 消费者
- *
- * @author jiaozi
- *
- */
- public class Rec {
- /**
- * 获取消息队列名称
- */
- private final static String QUEUE_NAME = "mywork";
- /**
- * 异步接收
- * @throws Exception
- */
- public static void asyncRec() throws Exception{
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.58.150");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //消费者也需要定义队列 有可能消费者先于生产者启动
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- //定义回调抓取消息
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- }
- };
- channel.basicConsume(QUEUE_NAME, true, consumer);
- }
-
-
-
- public static void main(String[] args) throws Exception {
- asyncRec();
- }
- }
测试先同时启动两个任务接受者
可以通过 webgui 查看 是否出现了两个连接 一个队列(监听同一个队列mywork)
运行 发布者 查看两个接受者的控制台 发现一个接收到三条消息
第一个接受者
- [*] Waiting for messages. To exit press CTRL+C
- [x] Received '发送第1消息'
- [x] Received '发送第3消息'
- [x] Received '发送第5消息'
第二个接受者
- [*] Waiting for messages. To exit press CTRL+C
- [x] Received '发送第0消息'
- [x] Received '发送第2消息'
- [x] Received '发送第4消息'
这个例子中 可以将以下概念弄清 (消息确认,持久化等 )
》》概念1 消息确认
假设 发布者推送了一个消息 消息接受者 接受到了消息 处理过程中出现中断或者异常 就说明消息处理失败 之前的例子中channel.basicConsume(QUEUE_NAME, true, consumer);
第二个参数是true 表示自动确定 服务器推送消息给接受者 一旦推送成功就删除消息 不会管接受者的死活了 这种情况明显存在问题
rabbitmq 提供了 手动确认的功能 等接受者 业务逻辑处理完成后 手动调用确认方法确认后 才会删除消息 否则消息就不会从服务器删除
测试一下 消息的这几种状态 (队列也有状态idle【不管是否有消息没有发布者和接受者在处理消息都是该状态】
和running【发布者或者接受者正在发布和接受消息】)
状态1 新建消息 未被消费
模拟 运行生产者类 发送消息 不启动接受者
状态2 已消费未确认
接受者 设置为手动确定 但是忘记写确认的代码 可能导致消息不会被删除
修改Rec类 (第二个参数改为false 就是手动确认)
channel.basicConsume(QUEUE_NAME, false, consumer);
运行该类
控制台 获取到了6条消息
- [*] Waiting for messages. To exit press CTRL+C
- [x] Received '发送第0消息'
- [x] Received '发送第1消息'
- [x] Received '发送第2消息'
- [x] Received '发送第3消息'
- [x] Received '发送第4消息'
- [x] Received '发送第5消息'
查看webgui
此时 Rec的程序 未关闭的 一直等其他消息 如果该进程一直不关闭 消息的状态一直是unpacked 如果其他Rec进程启动 无法获取该消息的 因为
新进程只能获取Ready的消息
测试 不关闭前面的|Rec进程 再启动一个|Rec 运行 该进程什么消息都没有
[*] Waiting for messages. To exit press CTRL+C
尝试关闭掉第一个Rec进程 因为该消费未确认 已关闭 所以消息会自动变为ready 查看Rec 2个进程发现消息来了
- [*] Waiting for messages. To exit press CTRL+C
- [x] Received '发送第0消息'
- [x] Received '发送第1消息'
- [x] Received '发送第2消息'
- [x] Received '发送第3消息'
- [x] Received '发送第4消息'
- [x] Received '发送第5消息'
此时再将第二个Rec也关掉 发现6条消息 还是变成ready状态了
也就是说 消息不确认的情况下 如果有进程消费 状态变成unacked状态 其他进程无法消息 一旦该进程被关 未确认的消息 重新变成ready状态被其他
接受者消费 这样确保了消息的最终一致性
状态2 已消费确认
再次修改Rec代码
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- //参数2 true表示确认该队列所有消息 false只确认当前消息 每个消息都有一个消息标记
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- };
- channel.basicConsume(QUEUE_NAME, false, consumer);
最后一行false 改为手动确认
回调函数中 channal.basicAck 手动确认 重新测试 发现消费完数据都被删除了
》》概念2 队列和消息持久化
虽然消息的手动确认保证了消息发送的正确性 但是当消息未被消费前的rabittmq-server服务器挂掉 会导致队列和消息的丢失 之前例子测试
Pub发送6条消息 重启rabbitmq-server
service rabbitmq-server restart
查看gui发现队列和消息都丢失了 因为这些数据存储在内存中
如果需要将队列和消息持久化到磁盘 需要修改发布者Pub参数
修改定义队列代码(参数2 true表示持久化 false表示非持久化)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
修改消息发送代码(修改第三个参数 持久化文本)
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
注意 如果某个队列之前是非持久化的 再次使用持久化定义是会出错的 应该先到webgui删除掉该队列或使用新的名字
重新发6条消息 重启 发现队列和消息都没有丢失
注意 如果发布者发消息到rabbit服务保存到磁盘之间 发生了宕机 此时数据还是会丢失 此时需要发布者从新发送确认 (参考https://www.rabbitmq.com/confirms.html)
》》概念3 合理消息转发
假设有消息 1 2 3 4 被发送到队列 两个消息接受者 第一个接受者被分到了1 和3号消息 第二个接受者分到了2和4号消息 看似很公平 假设
1号和3号消息是需要执行很久的消息 2号和4号消息 是毫秒级处理的消息 此时 接受者1 时长处于工作繁忙阶段 接受者2 很轻松 此时是不是不公平
因为rabiitmq 只是知道分发消息 并不知道哪个任务的重要性 解决这个问题 可以让rabbitmq服务器设置当某个接受者 处理完消息 确认后 再发送消息
给他 不就能保证每个工作者都能平均承担工作的时间吗 也就是说在一段时间只发送一条消息给接受者 等他处理完 在发送一条
修改接受者Rec代码 (注意接受者第二个参数也是持久化 true 否则会抛出错误)添加
- channel.queueDeclare(QUEUE_NAME, true, false, false, null);
- channel.basicQos(1);
3》发布订阅模式(http://www.rabbitmq.com/tutorials/tutorial-three-java.html)
生产者发布主题消息 订阅者订阅该主题 订阅该主题的所有订阅者都可以接受消息 (类似于广播 广播有个频道(主题) 听广播的用户就是订阅者
广播中推送的新闻和内容就是消息)
发布订阅模式 引入了交换器的概念 消息发布者发布消息到交换器 订阅者定义一个队列(每个订阅者定义一个)用于接受消息 交换器 起到了
中转消息的作用 这里面还有个routingkey 如果定义了routingkey 交换器只会将消息发给自己的routingkey和订阅者队列绑定routringkey和相同或者相似的队列
比如交换器定义routingkey是 abc 订阅者将自己队列绑定到交换器时指定routingkey是abc 交换器发现routingkey相同所以消息被发送到这个队列
这里 因为所有的订阅者需要获取消息 所以routingkey为空 订阅者产生的队列名称应该为随机字符串就可
交换器 有以下几种类型 direct, topic, headers and fanout
发布订阅模式 应该使用fanout(广播)
通过命令 rabbitmqctl list_exchanges 或者查看webgui 查看exchange
列表中有很多amq.*开头的交换器 第一个是默认 之前的两种简单模式和工作队列模式未定义交换器 使用的是第一个defalt
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
发布消息的第一个参数 就是交换器 为空 所以使用默认的交换器 也就是消息不经过交换器,直接发给给队列 第二个参数就是发送的队列名称
官网日志的例子 非常经典
比如 发布者应用打印了日志 同时有两个订阅者 其中一个订阅者保存日志 另外一个订阅者也输出日志 可以用于分布式统计日志
这里仅仅演示两个订阅者都是打印消息
发布者
- package cn.et.p3;
-
- import java.io.IOException;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
-
- /**
- * 消息发送者 - 生产者
- * @author jiaozi
- *
- */
- public class Pub {
- /**
- * 交换器名称
- */
- private static final String EXCHANGE_NAME = "logs";
- public static void main(String[] args) throws Exception {
- //连接远程rabbit-server服务器
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.58.150");
- factory.setPort(5672);
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //定义创建一个交换器 参数1 名称 参数2 交换器类型 参数3表示将交换器信息永久保存在服务器磁盘上 关闭rabbitmqserver也不会丢失
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);
- String message = null;
- //同时发送5条消息
- for(int i=0;i<=5;i++){
- message="发送第"+i+"消息";
- //第二个参数就是routingkey 不填 默认会转发给所有的订阅者队列
- channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
- }
-
- System.out.println(" [x] Sent 6 message");
- channel.close();
- connection.close();
- }
-
- }
订阅者
- package cn.et.p3;
-
- import java.io.IOException;
-
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Consumer;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
- import com.rabbitmq.client.QueueingConsumer;
-
- /**
- * 消息接受者 - 消费者
- * @author jiaozi
- */
- public class Rec {
- /**
- * 交换器名称
- */
- private static final String EXCHANGE_NAME = "logs";
- /**
- * 异步接收
- * @throws Exception
- */
- public static void asyncRec() throws Exception{
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.58.150");
- Connection connection = factory.newConnection();
- final Channel channel = connection.createChannel();
- //消费者也需要定义队列 有可能消费者先于生产者启动
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout",true);
- channel.basicQos(1);
- //产生一个随机的队列 该队列用于从交换器获取消息
- String queueName = channel.queueDeclare().getQueue();
- //将队列和某个交换机丙丁 就可以正式获取消息了 routingkey和交换器的一样都设置成空
- channel.queueBind(queueName, EXCHANGE_NAME, "");
-
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- //定义回调抓取消息
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- //参数2 true表示确认该队列所有消息 false只确认当前消息 每个消息都有一个消息标记
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- };
- //参数2 表示手动确认
- channel.basicConsume(queueName, false, consumer);
-
- }
-
-
-
- public static void main(String[] args) throws Exception {
- asyncRec();
- }
- }
发布订阅模式的消息是 发布者发布到交换机后 交换机会将所有消息转发给在线的订阅者队列 发送完毕后消息就被删除 再次启动的的订阅者队列无法获取上一次发送的消息
先启动两个订阅者 Rec
发送者Pub执行
发现两个订阅者都获取到了消息
- [*] Waiting for messages. To exit press CTRL+C
- [x] Received '发送第0消息'
- [x] Received '发送第1消息'
- [x] Received '发送第2消息'
- [x] Received '发送第3消息'
- [x] Received '发送第4消息'
- [x] Received '发送第5消息'
再启动一个订阅Rec 发现前面的消息都获取不到 再次调用Pub发送一次消息 三个都获取到消息
4》路由模式(http://www.rabbitmq.com/tutorials/tutorial-three-java.html)
前面第三种发布订阅模式 每个订阅者都会收到交换器发出的消息 因为发布者发布消息的routingkey是空 订阅者接受消息队列的routingkey也是空
发布者发布的消息 所有的订阅者都能接收到 路由模式表示 发布者发布的消息的routingkey和订阅者接受消息队列的routingkey都不为空 相同的则可以
接受消息 比如发送日志的例子
该种模式 交换器的类型 必须是direct 而不是之前的faout 比如 发布者P 发布的多个消息 使用的多个routingkey 比如error info warning等
比如一条错误消息 routingkey定义为 error 警告消息是warning 如果某个订阅者的临时队列使用的routingkey是error 将接受到error消息
是warning将接受到警告消息 其他消息如果没有队列绑定routingkey 将被丢弃
模拟上图的例子 比如有四类消息 C1订阅者程序 绑定到交换机X routingkey是error 出现error写入到磁盘
C2订阅者程序绑定到交换机X routingkey绑定三个 info error warning 接受到消息只是打印
代码模拟
添加maven依赖
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>4.2.0</version>
- </dependency>
- <!-- https://mvnrepository.com/artifact/commons-io/commons-io -->
- <dependency>
- <groupId>commons-io</groupId>
- <artifactId>commons-io</artifactId>
- <version>2.0</version>
- </dependency>
commons-io用于写入文本信息到文件 模拟日志写入
Pub
- package cn.et.p4;
-
- import java.io.IOException;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
-
- /**
- * 消息发送者 - 生产者
- * @author jiaozi
- *
- */
- public class Pub {
- /**
- * 交换器名称
- */
- private static final String EXCHANGE_NAME = "X";
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- //连接远程rabbit-server服务器
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.58.150");
- factory.setPort(5672);
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //定义创建一个交换器 参数1 名称 参数2 交换器类型 参数3表示将交换器信息永久保存在服务器磁盘上 关闭rabbitmqserver也不会丢失
- channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);
- String message = args[1];
- //同时发送5条消息
- //第二个参数就是routingkey
- channel.basicPublish(EXCHANGE_NAME, "error", MessageProperties.PERSISTENT_TEXT_PLAIN, "这是错误信息".getBytes("UTF-8"));
- channel.basicPublish(EXCHANGE_NAME, "info", MessageProperties.PERSISTENT_TEXT_PLAIN, "这是程序运行信息".getBytes("UTF-8"));
- channel.basicPublish(EXCHANGE_NAME, "warning", MessageProperties.PERSISTENT_TEXT_PLAIN, "这是警告".getBytes("UTF-8"));
-
- channel.close();
- connection.close();
- }
-
- }
订阅者Rec
- package cn.et.p4;
-
- import java.io.File;
- import java.io.IOException;
-
- import org.apache.commons.io.FileUtils;
-
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Consumer;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
-
- /**
- * 消息接受者 - 消费者
- * @author jiaozi
- */
- public class Rec {
- /**
- * 交换器名称
- */
- private static final String EXCHANGE_NAME = "X";
- /**
- * 用一份代码模拟两个程序
- * 参数1:表示订阅者程序类型 有两个值 C1表示需要写入文件系统 C2 表示打印
- * 参数n:订阅者接受消息的routingkey 可以接受多个
- * @param args
- * @throws Exception
- */
- public static void main(final String[] args) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.58.150");
- Connection connection = factory.newConnection();
- final Channel channel = connection.createChannel();
- //消费者也需要定义队列 有可能消费者先于生产者启动
- channel.exchangeDeclare(EXCHANGE_NAME, "direct",true);
- channel.basicQos(1);
- //产生一个随机的队列 该队列用于从交换器获取消息
- String queueName = channel.queueDeclare().getQueue();
- //参数设置了多个routingkey 就可以绑定到多个 如果在当前交换机这些routingkey上发消息 都可以接受
- for(int i=1;i<args.length;i++){
- channel.queueBind(queueName, EXCHANGE_NAME, args[i]);
- }
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- //定义回调抓取消息
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- String routingKey=envelope.getRoutingKey();
- //如果是错误消息 将他写入到文件系统中
- if("error".equals(routingKey) && "C1".equals(args[0])){
- FileUtils.writeStringToFile(new File("c:/myerror.log"), message,"UTF-8");
- }
- System.out.println(routingKey+":"+message);
- //参数2 true表示确认该队列所有消息 false只确认当前消息 每个消息都有一个消息标记
- channel.basicAck(envelope.getDeliveryTag(), false);
-
- }
- };
- //参数2 表示手动确认
- channel.basicConsume(queueName, false, consumer);
- }
- }
启动两个订阅者
启动第一个订阅者 传递参数 C1 error
启动第二个订阅者 传递参数 C2 error info warning
运行Pub类测试发送消息
C1程序控制台打印
- [*] Waiting for messages. To exit press CTRL+C
- error:这是错误信息
C2程序控制台打印
- [*] Waiting for messages. To exit press CTRL+C
- error:这是错误信息
- info:这是程序运行信息
- warning:这是警告
并且C1程序打印的信息 写入了日志文件 c:/myerror.log
5》Topics路由模式(http://www.rabbitmq.com/tutorials/tutorial-three-java.html)
该种模式和路由模式类似 只是消息的routingkey 是通过.隔开的多个字符组成 订阅者的消息队列绑定的routingkey可以使用通配符通配所有满足
条件的交换机消息 匹配上则接受消息 这种类型的消息 使用的交换器类型是 topic
比如 发布者发布消息(模拟routingkey消息格式是 班级.学生姓名.性别)
- channel.basicPublish(EXCHANGE_NAME, "1610.cherry.girl", MessageProperties.PERSISTENT_TEXT_PLAIN, "新同学:切瑞".getBytes("UTF-8"));
- channel.basicPublish(EXCHANGE_NAME, "1610.qianqian.boy", MessageProperties.PERSISTENT_TEXT_PLAIN, "新同学:谦谦".getBytes("UTF-8"));
- channel.basicPublish(EXCHANGE_NAME, "1611.jiaozi.boy", MessageProperties.PERSISTENT_TEXT_PLAIN, "新同学:饺子".getBytes("UTF-8"));
- channel.basicPublish(EXCHANGE_NAME, "1701.john.boy", MessageProperties.PERSISTENT_TEXT_PLAIN, "新同学:冏安".getBytes("UTF-8"));
- channel.basicPublish(EXCHANGE_NAME, "1701.alise.girl", MessageProperties.PERSISTENT_TEXT_PLAIN, "新同学:爱丽丝".getBytes("UTF-8"));
接受者接受消息可以设置routingkey为表达式模糊匹配 *匹配一个单词 #匹配多个但是
比如 1610.# 表示 1610班所有学生 获取到 两条
比如 1701.*.girl 表示1701的所有女生 获取到一条
模拟代码
发布者 Pub
- package cn.et.p5;
-
- import java.io.IOException;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
-
- /**
- * 消息发送者 - 生产者
- * @author jiaozi
- *
- */
- public class Pub {
- /**
- * 交换器名称
- */
- private static final String EXCHANGE_NAME = "student";
- /**
- * @param args
- * @throws Exception
- */
- public static void main(String[] args) throws Exception {
- //连接远程rabbit-server服务器
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.58.150");
- factory.setPort(5672);
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //定义创建一个交换器 参数1 名称 参数2 交换器类型 参数3表示将交换器信息永久保存在服务器磁盘上 关闭rabbitmqserver也不会丢失
- channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);
- //同时发送5条消息
- //第二个参数就是routingkey
- channel.basicPublish(EXCHANGE_NAME, "1610.cherry.girl", MessageProperties.PERSISTENT_TEXT_PLAIN, "新同学:切瑞".getBytes("UTF-8"));
- channel.basicPublish(EXCHANGE_NAME, "1610.qianqian.boy", MessageProperties.PERSISTENT_TEXT_PLAIN, "新同学:谦谦".getBytes("UTF-8"));
- channel.basicPublish(EXCHANGE_NAME, "1611.jiaozi.boy", MessageProperties.PERSISTENT_TEXT_PLAIN, "新同学:饺子".getBytes("UTF-8"));
- channel.basicPublish(EXCHANGE_NAME, "1701.john.boy", MessageProperties.PERSISTENT_TEXT_PLAIN, "新同学:冏安".getBytes("UTF-8"));
- channel.basicPublish(EXCHANGE_NAME, "1701.alise.girl", MessageProperties.PERSISTENT_TEXT_PLAIN, "新同学:爱丽丝".getBytes("UTF-8"));
- channel.close();
- connection.close();
- }
-
- }
订阅者Rec
- package cn.et.p5;
-
- import java.io.File;
- import java.io.IOException;
-
- import org.apache.commons.io.FileUtils;
-
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Consumer;
- import com.rabbitmq.client.DefaultConsumer;
- import com.rabbitmq.client.Envelope;
-
- /**
- * 消息接受者 - 消费者
- * @author jiaozi
- */
- public class Rec {
- /**
- * 交换器名称
- */
- private static final String EXCHANGE_NAME = "student";
- /**
- * 用一份代码模拟两个程序
- * 参数1:表示接受的routingkey匹配的表达式 *代表一个单词 # 代表多个单词
- * @param args
- * @throws Exception
- */
- public static void main(final String[] args) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("192.168.58.150");
- Connection connection = factory.newConnection();
- final Channel channel = connection.createChannel();
- //消费者也需要定义队列 有可能消费者先于生产者启动
- channel.exchangeDeclare(EXCHANGE_NAME, "topic",true);
- channel.basicQos(1);
- //产生一个随机的队列 该队列用于从交换器获取消息
- String queueName = channel.queueDeclare().getQueue();
- //参数设置了多个routingkey 就可以绑定到多个 如果在当前交换机这些routingkey上发消息 都可以接受
- channel.queueBind(queueName, EXCHANGE_NAME, args[0]);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- //定义回调抓取消息
- Consumer consumer = new DefaultConsumer(channel) {
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- String message = new String(body, "UTF-8");
- //如果是错误消息 将他写入到文件系统中
- System.out.println(message);
- //参数2 true表示确认该队列所有消息 false只确认当前消息 每个消息都有一个消息标记
- channel.basicAck(envelope.getDeliveryTag(), false);
-
- }
- };
- //参数2 表示手动确认
- channel.basicConsume(queueName, false, consumer);
- }
- }
启动两个Rec 分别传入项目参数 1610.#和 1701.*.girl
运行Pub测试发送数据
1610.#的Rec输出
- [*] Waiting for messages. To exit press CTRL+C
- 新同学:切瑞
- 新同学:谦谦
1701.*.girl的Rec输出
- [*] Waiting for messages. To exit press CTRL+C
- 新同学:爱丽丝
6》RPC模式
该模式是rpc 远程方法调用 这里不介绍 http协议调用更经典