赞
踩
安装包:
链接 | https://pan.baidu.com/s/16C_0QQhVdNSWPU5f10JHQQ?pwd=dcsy |
---|---|
提取码 | dcsy |
erlang下载:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang
socat下载:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
RabbitMQ下载:https://www.rabbitmq.com/install-rpm.html#downloads
[root@localhost opt]# rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm
[root@localhost opt]# rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
[root@localhost opt]# rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm
[root@localhost opt]# rabbitmq-plugins enable rabbitmq_management
[root@localhost opt]# systemctl start rabbitmq-server.service
[root@localhost opt]# systemctl status rabbitmq-server.service
[root@localhost opt]# systemctl restart rabbitmq-server.service
[root@localhost opt]# systemctl stop rabbitmq-server.service
[root@localhost opt]# ps -ef | grep rabbitmq
关闭防火墙: systemctl stop firewalld
浏览器输入:http://ip:15672
默认帐号密码:guest,guest用户默认不允许远程连接
[root@localhost opt]# rabbitmqctl add_user hyq 123456
[root@localhost opt]# rabbitmqctl set_user_tags hyq administrator
[root@localhost opt]# rabbitmqctl set_permissions -p "/" hyq ".*" ".*" ".*"
[root@localhost opt]# rabbitmqctl list_users
[root@localhost opt]# rabbitmqctl change_password hyq 123123
管理界面介绍
<dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> </dependency> </dependencies>
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rebbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n
log4j.rootLogger=debug, stdout,file
package util; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * 专门与RabbitMQ获得连接 */ public class ConnectionUtil { public static Connection getConnection() throws Exception{ //1.创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2.在工厂对象中设置MQ的连接信息(ip,port,vhost,username,password) factory.setHost("192.168.10.128"); factory.setPort(5672); factory.setVirtualHost("/test"); factory.setUsername("hyq"); factory.setPassword("123456"); //3.通过工厂获得与MQ的连接 Connection connection = factory.newConnection(); return connection; } public static void main(String[] args) throws Exception{ Connection connection = getConnection(); System.out.println("connection = " + connection); connection.close(); } }
下面引用官网的一段介绍:
Introduction
RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as
a post office: when you put the mail that you want posting in a post box, you can be sure
that Mr. or Ms. Mailperson will eventually deliver the mail to your recipient. In this analogy,
RabbitMQ is a post box, a post office and a postman.
译文:RabbitMQ是一个消息代理:它接收和转发消息。你可以把它想象成一个邮局:当你把你想要
寄的邮件放到一个邮箱里,你可以确定邮递员先生或女士最终会把邮件送到你的收件人那里。在
这个类比中,RabbitMQ是一个邮箱、一个邮局和一个邮递员。
RabbitMQ本身只是接收,存储和转发消息,并不会对信息进行处理!
类似邮局,处理信件的应该是收件人而不是邮局!
package simplest; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import util.ConnectionUtil; /** * 消息生产者 */ public class Sender { public static void main(String[] args) throws Exception { String msg = "hyq:Hello,RabbitMQ!"; // 1.获得连接 Connection connection = ConnectionUtil.getConnection(); // 2.在连接中创建通道(信道) Channel channel = connection.createChannel(); // 3.创建消息队列(1,2,3,4,5) /* 参数1:队列的名称 参数2:队列中的数据是否持久化 参数3:是否排外(是否支持扩展,当前队列只能自己用,不能给别人用) 参数4:是否自动删除(当队列的连接数为0时,队列会销毁,不管队列是否还存保存数据) 参数5:队列参数(没有参数为null) */ channel.queueDeclare("queue1",false,false,false,null); // 4.向指定的队列发送消息(1,2,3,4) /* 参数1:交换机名称,当前是简单模式,也就是P2P模式,没有交换机,所以名称为"" 参数2:目标队列的名称 参数3:设置消息的属性(没有属性则为null) 参数4:消息的内容(只接收字节数组) */ channel.basicPublish("","queue1",null,msg.getBytes()); System.out.println("发送:" + msg); // 5.释放资源 channel.close(); connection.close(); } }
启动生产者,即可前往管理端查看队列中的信息,会有一条信息没有处理和确认
package com.hyq.simplest; import com.hyq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; /** * 消息接收者 */ public class Recer { public static void main(String[] args) throws Exception { // 1 获取连接 Connection connection = ConnectionUtil.getConnection(); // 2 获得信道(通道) Channel channel = connection.createChannel(); // 3 从信道中获取消息 DefaultConsumer consumer = new DefaultConsumer(channel){ /** * 交付处理 * @param consumerTag 收件人信息 * @param envelope 包裹上的标签 * @param properties 协议的配置 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 就是从队列中获取的消息 String s = new String(body); System.out.println("接收 = " + s ); } }; // 4 监听队列 true : 自动消息确认 channel.basicConsume("queue1" , true , consumer); } }
启动消费者,前往管理端查看队列中的信息,所有信息都已经处理和确认,显示0
// false:手动消息确认
channel.basicConsume("queue1", false, consumer);
package com.hyq.simplest; import com.hyq.utils.ConnectionUtil; import com.rabbitmq.client.*; import java.io.IOException; /** * 消息接收者 - 手动确认消息 */ public class RecerByACK { public static void main(String[] args) throws Exception{ // 1 获取连接 Connection connection = ConnectionUtil.getConnection(); // 2 获得信道(通道) Channel channel = connection.createChannel(); // 3 从信道中获取消息 DefaultConsumer consumer = new DefaultConsumer(channel){ /** * 交付处理 * @param consumerTag 收件人信息 * @param envelope 包裹上的标签 * @param properties 协议的配置 * @param body 消息 * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { // body 就是从队列中获取的消息 String s = new String(body); System.out.println("接收 = " + s ); // 手动确认 (收件人信息 , 是否同时确认多个消息) channel.basicAck(envelope.getDeliveryTag(), false); } }; // 4 监听队列 , false : 手动消息确认 channel.basicConsume("queue1" , false , consumer); } }
public class MessageSender {
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列(此处为生产者,创建队列)注明出餐口位置,通知大家来排队
channel.queueDeclare("test_work_queue",false,false,false,null);
for(int i = 1 ; i<=100 ; i++) {
String msg = "羊肉串 --> "+i;
channel.basicPublish("", "test_work_queue", null, msg.getBytes());
System.out.println("师傅烤好:" + msg);
}
channel.close();
connection.close();
}
}
public class MessageReceiver1 { static int i = 1; // 记录执行次数 public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); // 声明队列(此处为消费者,不是声明创建队列,而且获取,二者代码相同)出餐口排队 channel.queueDeclare("test_work_queue",false,false,false,null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("【顾客1】吃掉 " + msg+" ! 共吃【"+i++ +"】串"); // 撸一会,有延迟 try { Thread.sleep(200); }catch (InterruptedException e){ } channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("test_work_queue", false, consumer); } }
public class MessageReceiver2 { static int i = 1; // 记录执行次数 public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel(); // 声明队列(此处为消费者,不是声明创建队列,而且获取,二者代码相同)出餐口排队 channel.queueDeclare("test_work_queue",false,false,false,null); Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body); System.out.println("【顾客2】吃掉 " + msg+" ! 共吃【"+i++ +"】串"); // 撸一会,有延迟 try { Thread.sleep(200); }catch (InterruptedException e){ } channel.basicAck(envelope.getDeliveryTag(),false); } }; channel.basicConsume("test_work_queue", false, consumer); } }
先运行2个消费者,排队等候消费(取餐),再运行生产者开始生产消息(烤肉串)
虽然两个消费者的消费速度不一致(线程休眠时间),但是消费的数量却是一致的,各消费50个消息
例如:工作中,A同学编码速率高,B同学编码速率低,两个人同时开发一个项目,A10天完成,B30天完成,A完成自己的编码部分,就无所事事了,等着B完成就可以了,这样是不可以的,应该遵循“能者多劳”
效率高的多干点,效率低的少干点
看下面官网是如何给出解决思路的:
公平的分配
您可能已经注意到分派仍然不能完全按照我们的要求工作。例如,如果有两个员工,当所有
奇怪的消息都很重,甚至消息都很轻时,一个员工会一直很忙,而另一个人几乎什么工作都
不做。好吧,RabbitMQ对此一无所知,它仍然会均匀地分派消息。
这是因为RabbitMQ只在消息进入队列时发送消息。它不查看用户未确认消息的数量。它
只是盲目地将每条第n个消息分派给第n个消费者。
为了克服这个问题,我们可以使用设置为prefetchCount = 1的basicQos方法。这告诉
RabbitMQ一次不要给一个worker发送一条以上的消息。或者,换句话说,在worker处理并
确认前一个消息之前,不要向它发送新消息。相反,它将把它分派到下一个不繁忙的
worker。
// 声明队列(此处为消费者,不是声明创建队列,而且获取,二者代码相同)出餐口排队
channel.queueDeclare("test_work_queue",false,false,false,null);
// 可以理解为:快递一个一个送,送完一个再送下一个,速度快的送件就多
channel.basicQos(1);
能者多劳必须要配合手动的ACK机制才生效
看官网:
Publish/Subscribe
In the previous tutorial we created a work queue. The assumption behind a work queue is that each task is delivered to exactly one worker. In this part we’ll do something completely different – we’ll deliver a message to multiple consumers. This pattern is known as “publish/subscribe”.
To illustrate the pattern, we’re going to build a simple logging system. It will consist of two programs – the first will emit log messages and the second will receive and print them.
In our logging system every running copy of the receiver program will get the messages. That way we’ll be able to run one receiver and direct the logs to disk; and at the same time we’ll be able to run another receiver and see the logs on the screen.
Essentially, published log messages are going to be broadcast to all the receivers.
发布-订阅
在上一篇教程中,我们创建了一个工作队列。工作队列背后的假设是,每个任务都被准确地交
付给一个工作者。在这一部分中,我们将做一些完全不同的事情——将消息传递给多个消费者。
此模式称为“发布/订阅”。
为了演示这个模式,我们将构建一个简单的日志记录系统。它将由两个程序组成——第一个将
发送日志消息,第二个将接收和打印它们。
在我们的日志系统中,接收程序的每一个正在运行的副本都将获得消息。这样我们就可以运行
一个接收器并将日志指向磁盘;与此同时,我们可以运行另一个接收器并在屏幕上看到日志。
基本上,发布的日志消息将广播到所有接收方。
生活中的案例:就是玩抖音快手,众多粉丝关注一个视频主,视频主发布视频,所有粉丝都可以得到视频通知
上图中,X就是视频主,红色的队列就是粉丝。binding是绑定的意思(关注)
P生产者发送信息给X路由,X将信息转发给绑定X的队列
X队列将信息通过信道发送给消费者,从而进行消费
整个过程,必须先创建路由
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明路由(路由名,路由类型)
// fanout:不处理路由键(只需要将队列绑定到路由上,发送到路由的消息都会被转发到与该路由绑定的所有队列上)
channel.exchangeDeclare("test_exchange_fanout", "fanout");
String msg = "hello,大家好!";
channel.basicPublish("test_exchange_fanout", "", null, msg.getBytes());
System.out.println("生产者:" + msg);
channel.close();
connection.close();
}
}
public class Recer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare("test_exchange_fanout_queue_1",false,false,false,null); // 绑定路由(关注) /* 参数1:队列名 参数2:交换器名称 参数3:路由key(暂时无用,""即可) */ channel.queueBind("test_exchange_fanout_queue_1", "test_exchange_fanout", ""); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者1】 = " + s); } }; // 4.监听队列 true:自动消息确认 channel.basicConsume("test_exchange_fanout_queue_1", true,consumer); } }
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明路由(路由名,路由类型)
// direct:根据路由键进行定向分发消息
channel.exchangeDeclare("test_exchange_direct", "direct");
String msg = "用户注册,【userid=S101】";
channel.basicPublish("test_exchange_direct", "insert", null,
msg.getBytes());
System.out.println("[用户系统]:" + msg);
channel.close();
connection.close();
}
}
public class Recer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare("test_exchange_direct_queue_1",false,false,false,null); // 绑定路由(如果路由键的类型是 添加,删除,修改 的话,绑定到这个队列1上) channel.queueBind("test_exchange_direct_queue_1", "test_exchange_direct", "insert"); channel.queueBind("test_exchange_direct_queue_1", "test_exchange_direct", "update"); channel.queueBind("test_exchange_direct_queue_1", "test_exchange_direct", "delete"); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者1】 = " + s); } }; // 4.监听队列 true:自动消息确认 channel.basicConsume("test_exchange_direct_queue_1", true,consumer); } }
public class Recer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare("test_exchange_direct_queue_2",false,false,false,null); // 绑定路由(如果路由键的类型是 查询 的话,绑定到这个队列2上) channel.queueBind("test_exchange_direct_queue_2", "test_exchange_direct", "select"); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者2】 = " + s); } }; // 4.监听队列 true:自动消息确认 channel.basicConsume("test_exchange_direct_queue_2", true,consumer); } }
quick.orange.rabbit # Q1 Q2
lazy.orange.elephant # Q1 Q2
quick.orange.fox # Q1
lazy.brown.fox # Q2
lazy.pink.rabbit # Q2
quick.brown.fox # 无
orange # 无
quick.orange.male.rabbit # 无
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明路由(路由名,路由类型)
// topic:模糊匹配的定向分发
channel.exchangeDeclare("test_exchange_topic", "topic");
String msg = "商品降价";
channel.basicPublish("test_exchange_topic", "product.price", null,
msg.getBytes());
System.out.println("[用户系统]:" + msg);
channel.close();
connection.close();
}
}
public class Recer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare("test_exchange_topic_queue_1",false,false,false,null); // 绑定路由(绑定 用户相关 的消息) channel.queueBind("test_exchange_topic_queue_1", "test_exchange_topic", "user.#"); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者1】 = " + s); } }; // 4.监听队列 true:自动消息确认 channel.basicConsume("test_exchange_topic_queue_1", true,consumer); } }
public class Recer2 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列 channel.queueDeclare("test_exchange_topic_queue_2",false,false,false,null); // 绑定路由(绑定 商品和订单相关 的消息) channel.queueBind("test_exchange_topic_queue_2", "test_exchange_topic", "product.#"); channel.queueBind("test_exchange_topic_queue_2", "test_exchange_topic", "order.#"); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者2】 = " + s); } }; // 4.监听队列 true:自动消息确认 channel.basicConsume("test_exchange_topic_queue_2", true,consumer); } }
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明路由(路由名,路由类型,持久化)
channel.exchangeDeclare("test_exchange_topic", "topic",true);
String msg = "商品降价";
// 发送消息(第三个参数作用是让消息持久化)
channel.basicPublish("test_exchange_topic", "product.price",
MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
System.out.println("[用户系统]:" + msg);
channel.close();
connection.close();
}
}
public class Recer1 { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); // 声明队列( 第二个参数为true:支持持久化) channel.queueDeclare("test_exchange_topic_queue_1",true,false,false,null); channel.queueBind("test_exchange_topic_queue_1", "test_exchange_topic", "user.#"); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者1】 = " + s); } }; channel.basicConsume("test_exchange_topic_queue_1", true,consumer); } }
<dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.9</version> </dependency>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> <!-- 1.配置连接 --> <rabbit:connection-factory id="connectionFactory" host="192.168.10.128" port="5672" username="hyq" password="123456" virtual-host="/test"/> <!-- 2.配置队列 --> <rabbit:queue name="test_spring_queue_1"/> <!-- 3.配置rabbitAdmin:主要用于在Java代码中对理队和队列进行管理,用于创建、绑定、删除队列与交换机,发送消息等 --> <rabbit:admin connection-factory="connectionFactory"/> <!-- 4.配置topic类型exchange;队列绑定到交换机 --> <rabbit:topic-exchange name="spring_topic_exchange"> <rabbit:bindings> <rabbit:binding queue="test_spring_queue_1" pattern="msg.#"/> </rabbit:bindings> </rabbit:topic-exchange> <!-- 5. 配置消息对象json转换类 --> <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/> <!-- 6. 配置RabbitTemplate(消息生产者) --> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spring_topic_exchange" message-converter="jsonMessageConverter"/> </beans>
public class Sender {
public static void main(String[] args) {
// 1.创建spring容器
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
// 2.从容器中获取对象
RabbitTemplate template = context.getBean(RabbitTemplate.class);
// 3.发送消息
Map<String, String> map = new HashMap();
map.put("name", "tom");
map.put("email", "123456@qq.com");
template.convertAndSend("msg.user", map);
context.close();
}
}
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 1. 配置连接 --> <rabbit:connection-factory id="connectionFactory" host="192.168.10.128" port="5672" username="hyq" password="123456" virtual-host="/test"/> <!-- 2. 配置队列 --> <rabbit:queue name="test_spring_queue_1"/> <!-- 3.配置rabbitAdmin --> <rabbit:admin connection-factory="connectionFactory"/> <!-- 4.springIOC注解扫描包--> <context:component-scan base-package="listener"/> <!-- 5.配置监听 --> <rabbit:listener-container connection-factory="connectionFactory"> <rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1" /> </rabbit:listener-container> </beans>
消费者
@Component public class ConsumerListener implements MessageListener { // jackson提供序列化和反序列中使用最多的类,用来转换json的 private static final ObjectMapper MAPPER = new ObjectMapper(); public void onMessage(Message message) { try { // 将message对象转换成json JsonNode jsonNode = MAPPER.readTree(message.getBody()); String name = jsonNode.get("name").asText(); String email = jsonNode.get("email").asText(); System.out.println("从队列中获取:【"+name+"的邮箱是:"+email+"】"); } catch (Exception e){ e.printStackTrace(); } } }
启动项目
public class TestRunner {
public static void main(String[] args) throws Exception {
// 获得容器
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-consumer.xml");
// 让程序一直运行,别终止
System.in.read();
}
}
在实际场景下,有的生产者发送的消息是必须保证成功发送到消息队列中,那么如何保证成功投递呢?
public class Sender { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("test_transaction", "topic"); channel.txSelect(); // 开启事务 try { channel.basicPublish("test_transaction", "product.price", null, "商品降价1".getBytes()); // System.out.println(1/0); // 模拟异常! channel.basicPublish("test_transaction", "product.price", null, "商品降价2".getBytes()); System.out.println("消息全部发出!"); channel.txCommit(); // 事务提交 }catch (Exception e){ System.out.println("由于系统异常,消息全部撤回!"); channel.txRollback(); // 事务回滚 e.printStackTrace(); }finally { channel.close(); connection.close(); } } }
public class Recer { public static void main(String[] args) throws Exception { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("test_transaction_queue",false,false,false,null); channel.queueBind("test_transaction_queue", "test_transaction", "product.#"); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String s = new String(body); System.out.println("【消费者】 = " + s); } }; channel.basicConsume("test_transaction_queue", true,consumer); } }
RabbitMQ为了保证消息的成功投递,采用通过AMQP协议层面为我们提供事务机制的方案,但是
采用事务会大大降低消息的吞吐量
我本机SSD硬盘测试结果10w条消息未开启事务,大约8s发送完毕;而开启了事务后,需要将
近310s,差了30多倍。
接着翻阅官网,发现官网中已标注
Using standard AMQP 0-9-1, the only way to guarantee that a message isn’t lost is by
using transactions – make the channel transactional then for each message or set of
messages publish, commit. In this case, transactions are unnecessarily heavyweight
and decrease throughput by a factor of 250. To remedy this, a confirmation
mechanism was introduced. It mimics the consumer acknowledgements mechanism
already present in the protocol.
关键性译文:开启事务性能最大损失超过250倍
那么有没有更加高效的解决方式呢?答案就是采用Confirm模式。
事务效率为什么会这么低呢?试想一下:10条消息,前9条成功,如果第10条失败,那么9条消息要全部撤销回滚。太太太浪费
而confirm模式则采用补发第10条的措施来完成10条消息的送达
spring-rabbitmq-producer.xml
<!--1.配置连接,启动生产者确认机制: publisher-confirms="true"--> <rabbit:connection-factory id="connectionFactory" host="192.168.10.128" port="5672" username="hyq" password="123456" virtual-host="/test" publisher-confirms="true"/> <!--6.配置rabbitmq的模版,添加确认回调处理类:confirmcallback="msgSendConfirmCallback"--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="spring_topic_exchange" message-converter="jsonMessageConverter" confirm-callback="msgSendConfirmCallback"/> <!--7.确认机制处理类--> <bean id="msgSendConfirmCallback" class="confirm.MsgSendConfirmCallback"/>
package confirm; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.support.CorrelationData; import org.springframework.stereotype.Component; import java.io.IOException; /** * 确认机制 */ @Component public class MsgSendConfirmCallback implements RabbitTemplate.ConfirmCallback { public void confirm(CorrelationData correlationData, boolean b, String s) { if (b){ System.out.println("消息确认成功!!"); } else { System.out.println("消息确认失败。。。"); // 如果本条消息一定要发送到队列中,例如下订单消息,我们可以采用消息补发 // 采用递归(固定次数,不可无限)或 redis+定时任务 } } }
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rabbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l%m%n
log4j.rootLogger=debug, stdout,file
public class Sender { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml"); RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); Map<String,String> map = new HashMap<String, String>(); map.put("name", "吕布"); map.put("email", "666@qq.com"); // 第一个参数是路由名称, // 不写,则使用spring容器中创建的路由 // 乱写一个,因为路由名错误导致报错,则进入消息确认失败流程 rabbitTemplate.convertAndSend("x","msg.user",map); System.out.println("ok"); context.close(); } }
在沙漠中行走,3天不喝水,突然喝水,如果使劲喝,容易猝死,要一口一口慢慢喝
我们 Rabbitmq 服务器积压了成千上万条未处理的消息,然后随便打开一个消费者客户端,就会出现这样的情况: 巨量的消息瞬间全部喷涌推送过来,但是单个客户端无法同时处理这么多数据,就会被压垮崩溃
所以,当数据量特别大的时候,我们对生产端限流肯定是不科学的,因为有时候并发量就是特别大,有时候并发量又特别少,这是用户的行为,我们是无法约束的所以我们应该对消费端限流,用于保持消费端的稳定
例如:汽车企业不停的生产汽车,4S店有好多库存车卖不出去,但是也不会降价处理,就是要保证市值的稳定,如果生产多少台,就卖多少台,不管价格的话,市场就乱了,所以我们要用不变的价格来稳住消费者购车,才能平稳发展
RabbitMQ 提供了一种 Qos (Quality of Service,服务质量)服务质量保证功能即在非自动确认消息的前提下,如果一定数目的消息未被确认前,不再进行消费新的消息
生产者使用循环发出多条消息
for(int i = 1;i<=10;i++) {
rabbitTemplate.convertAndSend("msg.user", map);
System.out.println("消息已发出...");
}
生产10条堆积未处理的消息
消费者进行限流处理
<!--5.配置监听-->
<!-- prefetch="3" 一次性消费的消息数量。会告诉 RabbitMQ 不要同时给一个消费者推送多于
N 个消息,一旦有 N 个消息还没有ack,则该 consumer 将阻塞,直到消息被ack-->
<!-- acknowledge-mode: manual 手动确认-->
<rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual">
<rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1" />
</rabbit:listener-container>
// AbstractAdaptableMessageListener用于在spring容器接收到消息后用于处理消息的抽象基类 @Component public class ConsumerListener extends AbstractAdaptableMessageListener { // jackson提供序列化和反序列中使用最多的类,用来转换json的 private static final ObjectMapper MAPPER = new ObjectMapper(); public void onMessage(Message message, Channel channel) throws Exception { try { // String str = new String(message.getBody()); // 将message对象转换成json JsonNode jsonNode = MAPPER.readTree(message.getBody()); String name = jsonNode.get("name").asText(); String email = jsonNode.get("email").asText(); System.out.println("从队列中获取:【"+name+"的邮箱是:"+email+"】"); long deliveryTag = message.getMessageProperties().getDeliveryTag(); //确认收到(参数1,参数2) /* 参数1:RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递 增的正整数,delivery_tag 的范围仅限于 Channel 参数2:为了减少网络流量,手动确认可以被批处理,当该参数为 true 时,则可以 一次性确认 delivery_tag 小于等于传入值的所有消息 */ channel.basicAck(deliveryTag , true); Thread.sleep(3000); System.out.println("休息三秒然后再接收消息"); } catch (Exception e){ e.printStackTrace(); } } }
每次确认接收3条消息
<!--2.重新配置一个队列,同时,对队列中的消息设置过期时间-->
<rabbit:queue name="test_spring_queue_ttl" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value-type="long" value="5000"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<!--2.配置队列-->
<rabbit:queue name="test_spring_queue_ttl_2">
package test; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.support.ClassPathXmlApplicationContext; import java.util.HashMap; import java.util.Map; /** * 生产者 */ public class Sender2 { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml"); RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class); // 创建消息配置对象 MessageProperties messageProperties = new MessageProperties(); // 设置消息过期时间 messageProperties.setExpiration("6000"); // 创建消息 Message message = new Message("6秒后自动删除".getBytes(), messageProperties); // 发送消息 rabbitTemplate.convertAndSend("msg.user", message); System.out.println("消息已发出..."); context.close(); } }
<!--1.配置连接--> <rabbit:connection-factory id="connectionFactory" host="192.168.10.128" port="5672" username="hyq" password="123456" virtual-host="/test"/> <!--3.配置rabbitAdmin:主要用于在java代码中对队列的管理,用来创建,绑定,删除队列与交换机,发送消息等--> <rabbit:admin connection-factory="connectionFactory"/> <!--6.配置rabbitmq的模版--> <rabbit:template id="rabbitTemplate" connection-factory="connectionFactory" exchange="my_exchange"/> <!--####################################################################################################################--> <!--死信队列--> <rabbit:queue name="dlx_queue"/> <!--定向死信交换机--> <rabbit:direct-exchange name="dlx_exchange" > <rabbit:bindings> <rabbit:binding key="dlx_ttl" queue="dlx_queue"></rabbit:binding> <rabbit:binding key="dlx_max" queue="dlx_queue"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange> <!--测试超时的队列--> <rabbit:queue name="test_ttl_queue"> <rabbit:queue-arguments> <!--队列ttl为6秒--> <entry key="x-message-ttl" value-type="long" value="6000"/> <!--超时 消息 投递给 死信交换机--> <entry key="x-dead-letter-exchange" value="dlx_exchange"/> </rabbit:queue-arguments> </rabbit:queue> <!--测试超长度的队列--> <rabbit:queue name="test_max_queue"> <rabbit:queue-arguments> <!--队列ttl为6秒--> <entry key="x-max-length" value-type="long" value="2"/> <!--超时 消息 投递给 死信交换机--> <entry key="x-dead-letter-exchange" value="dlx_exchange"/> </rabbit:queue-arguments> </rabbit:queue> <!--定向测试交换机--> <rabbit:direct-exchange name="my_exchange" > <rabbit:bindings> <rabbit:binding key="dlx_ttl" queue="test_ttl_queue"></rabbit:binding> <rabbit:binding key="dlx_max" queue="test_max_queue"></rabbit:binding> </rabbit:bindings> </rabbit:direct-exchange>
public class SenderDLX {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new
ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
// rabbitTemplate.convertAndSend("dlx_ttl", "测试超时".getBytes());
rabbitTemplate.convertAndSend("dlx_max", "测试长度1".getBytes());
rabbitTemplate.convertAndSend("dlx_max", "测试长度2".getBytes());
rabbitTemplate.convertAndSend("dlx_max", "测试长度3".getBytes());
System.out.println("消息已发出...");
context.close();
}
}
沿用上面死信队列案例的超时测试,超时时间改为订单关闭时间即可
spring-rabbitmq-consumer.xml
<!-- 监听死信队列 -->
<rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual">
<rabbit:listener ref="consumerListener" queue-names="dlx_queue" />
</rabbit:listener-container>
rabbitmq有3种模式,但集群模式是2种。详细如下:
前置条件:准备两台linux,并安装好rabbitmq
127.0.0.1 A localhost localhost.localdomain localhost4
localhost4.localdomain4
::1 A localhost localhost.localdomain localhost6
localhost6.localdomain6
192.168.10.128 A
192.168.10.129 B
127.0.0.1 B localhost localhost.localdomain localhost4
localhost4.localdomain4
::1 B localhost localhost.localdomain localhost6
localhost6.localdomain6
192.168.10.128 A
192.168.10.129 B
[root@A opt]# scp /var/lib/rabbitmq/.erlang.cookie
192.168.204.142:/var/lib/rabbitmq
[root@A ~]# systemctl stop firewalld
[root@A ~]# systemctl start rabbitmq-server
[root@B ~]# rabbitmqctl stop_app
[root@B ~]# rabbitmqctl join_cluster rabbit@A
[root@B ~]# rabbitmqctl start_app
[root@B ~]# rabbitmqctl cluster_status
[root@A ~]# rabbitmqctl add_user hyq 123456
[root@A ~]# rabbitmqctl set_user_tags hyq administrator
[root@A ~]# rabbitmqctl set_permissions -p "/" hyq ".*" ".*" ".*"
将所有队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态一致
语法:set_policy {name} {pattern} {definition}
all:表示在集群中所有的节点上进行镜像
exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指定
[root@A ~]# rabbitmqctl set_policy xall "^" '{"ha-mode":"all"}'
通过管理端设置镜像策略
OSI:(Open System Interconnection:开放式系统互联 是把网络通信的工作分为7层,分别是物理层,数据链路层,网络层,传输层,会话层,表示层和应用层)
HAProxy下载:http://www.haproxy.org/download/1.8/src/haproxy-1.8.12.tar.gz
解压
[root@localhost opt]# tar -zxvf haproxy-1.8.12.tar.gz
make时需要使用 TARGET 指定内核及版本
[root@localhost ~]# uname -r
3.10.0-1160.el7.x86_64
根据内核版本选择编译参数:
进入目录,编译和安装
[root@localhost opt]# cd haproxy-1.8.12
[root@localhost haproxy-1.8.12]# make TARGET=linux2628 PREFIX=/usr/local/haproxy
[root@localhost haproxy-1.8.12]# make install PREFIX=/usr/local/haproxy
安装成功后,查看版本
[root@localhost haproxy-1.8.12]# /usr/local/haproxy/sbin/haproxy -v
配置启动文件,复制haproxy文件到/usr/sbin下 ,复制haproxy脚本,到/etc/init.d下
[root@localhost haproxy-1.8.12]# cp /usr/local/haproxy/sbin/haproxy /usr/sbin/
[root@localhost haproxy-1.8.12]# cp ./examples/haproxy.init /etc/init.d/haproxy
[root@localhost haproxy-1.8.12]# chmod 755 /etc/init.d/haproxy
创建系统账号
[root@localhost haproxy-1.8.12]# useradd -r haproxy
haproxy.cfg 配置文件需要自行创建
[root@localhost haproxy-1.8.12]# mkdir /etc/haproxy
[root@localhost haproxy-1.8.12]# vim /etc/haproxy/haproxy.cfg
添加配置信息到haproxy.cfg
#全局配置 global #设置日志 log 127.0.0.1 local0 info #当前工作目录 chroot /usr/local/haproxy #用户与用户组 user haproxy group haproxy #运行进程ID uid 99 gid 99 #守护进程启动 daemon #最大连接数 maxconn 4096 #默认配置 defaults #应用全局的日志配置 log global #默认的模式mode {tcp|http|health},TCP是4层,HTTP是7层,health只返回OK mode tcp #日志类别tcplog option tcplog #不记录健康检查日志信息 option dontlognull #3次失败则认为服务不可用 retries 3 #每个进程可用的最大连接数 maxconn 2000 #连接超时 timeout connect 5s #客户端超时30秒,ha就会发起重新连接 timeout client 30s #服务端超时15秒,ha就会发起重新连接 timeout server 15s #绑定配置 listen rabbitmq_cluster bind 192.168.204.143:5672 #配置TCP模式 mode tcp #简单的轮询 balance roundrobin #RabbitMQ集群节点配置,每隔5秒对mq集群做检查,2次正确证明服务可用,3次失败证明服务不可用 server A 192.168.10.128:5672 check inter 5000 rise 2 fall 3 server B 192.168.10.129:5672 check inter 5000 rise 2 fall 3 #haproxy监控页面地址 listen monitor bind 192.168.204.143:8100 mode http option httplog stats enable # 监控页面地址 http://192.168.10.130:8100/monitor stats uri /monitor stats refresh 5s
启动HAProxy
[root@localhost haproxy]# service haproxy start
访问监控中心:http://192.168.10.130:8100/monitor
记得关闭防火墙: systemctl stop firewalld
项目发消息,只需要将服务器地址修改为130即可,其余不变
所有的请求都会交给HAProxy,其负载均衡给每个rabbitmq服务器
现在的最后一个问题暴露出来了,如果HAProxy服务器宕机,rabbitmq服务器就不可用了。所以我们需要对HAProxy也要做高可用的集群
修改hosts文件的地址映射
IP | 用途 | 主机名 |
---|---|---|
192.168.204.143 | KeepAlived HAProxy | C |
192.168.204.144 | KeepAlived HAProxy | C |
安装 keepalived
[root@C ~]# yum install -y keepalived
修改配置文件(内容大改,不如删掉,重新创建)
[root@C ~]# rm -rf /etc/keepalived/keepalived.conf
[root@C ~]# vim /etc/keepalived/keepalived.conf
! Configuration File for keepalived global_defs { router_id C ## 非常重要,标识本机的hostname } vrrp_script chk_haproxy{ script "/etc/keepalived/haproxy_check.sh" ## 执行的脚本位置 interval 2 ## 检测时间间隔 weight -20 ## 如果条件成立则权重减20 } vrrp_instance VI_1 { state MASTER ## 非常重要,标识主机,备用机143改为 BACKUP interface ens33 ## 非常重要,网卡名(ifconfig查看) virtual_router_id 66 ## 非常重要,自定义,虚拟路由ID号(主备节点要相同) priority 100 ## 优先级(0-254),一般主机的大于备机 advert_int 1 ## 主备信息发送间隔,两个节点必须一致,默认1秒 authentication { ## 认证匹配,设置认证类型和密码,MASTER和BACKUP必须使用相同的密码才能正常通信 auth_type PASS auth_pass 1111 } track_script { chk_haproxy ## 检查haproxy健康状况的脚本 } virtual_ipaddress { ## 简称“VIP” 192.168.204.66/24 ## 非常重要,虚拟ip,可以指定多个,以后连接mq就用这个虚拟ip } } virtual_server 192.168.204.66 5672 { ## 虚拟ip的详细配置 delay_loop 6 # 健康检查间隔,单位为秒 lb_algo rr # lvs调度算法rr|wrr|lc|wlc|lblc|sh|dh lb_kind NAT # 负载均衡转发规则。一般包括DR,NAT,TUN 3种 protocol TCP # 转发协议,有TCP和UDP两种,一般用TCP real_server 192.168.204.143 5672 { ## 本机的真实ip weight 1 # 默认为1,0为失效 } }
创建执行脚本 /etc/keepalived/haproxy_check.sh
#!/bin/bash
COUNT=`ps -C haproxy --no-header |wc -l`
if [ $COUNT -eq 0 ];then
/usr/local/haproxy/sbin/haproxy -f /etc/haproxy/haproxy.cfg
sleep 2
if [ `ps -C haproxy --no-header |wc -l` -eq 0 ];then
killall keepalived
fi
fi
Keepalived 组之间的心跳检查并不能察觉到 HAproxy 负载是否正常,所以需要使用此脚本。在 Keepalived 主机上,开启此脚本检测 HAproxy 是否正常工作,如正常工作,记录日志。如进程不存在,则尝试重启 HAproxy ,2秒后检测,如果还没有,则关掉主 Keepalived ,此时备Keepalived 检测到主 Keepalived 挂掉,接管VIP,继续服务
授权,否则不能执行
[root@C etc]# chmod +x /etc/keepalived/haproxy_check.sh
启动keepalived(两台都启动)
[root@C etc]# systemctl stop firewalld
[root@C etc]# service keepalived start | stop | status | restart
查看状态
[root@C etc]# ps -ef | grep haproxy
[root@C etc]# ps -ef | grep keepalived
查看ip情况 ip addr 或 ip a
[root@C etc]# ip a
此时,安装完毕,按照上面的步骤就可以安装第二台了(服务器hostname和ip注意要修改)
常见的网络错误:子网掩码、网关等信息要一致
[root@A ~]# curl 192.168.204.66:5672
AMQP ## 正常提供AMQP服务,表示通过vip访问mq服务正常
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。