赞
踩
RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通讯的世界里有很多公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),但是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),因此,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。
RabbitMQ的官网
http://www.rabbitmq.com
注意点:
RabbitMQ是采用erlang语言开发的,所以必须有erlang环境才可以运行。
生产者发送消息不会向传统方式直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列在将消息以推送或者拉取方式给消费者进行消费,这和我们之前学习Nginx有点类似。
交换机的作用根据具体的路由策略分发到不同的队列中,交换机有四种类型。
Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的
Fanout exchange(扇型交换机)将消息路由给绑定到它身上的所有队列
Topic exchange(主题交换机)队列通过路由键绑定到交换机上,然后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列
Headers exchange(头交换机)类似主题交换机,但是头交换机使用多个消息属性来代替路由键建立路由规则。通过判断消息头的值能否与指定的绑定相匹配来确立路由规则。
后面会讲到springboot怎么去整合RabbitMQ;springboot的整合非常简单,这里先看原生的操作方式,可以更好的去学习它。
首先在pom下引入RabbitMQ的依赖:
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
Rabbmit工具类,后面讲的生产者和消费者都会用到这个工具类:
public class RabbitMQConnection { public static Connection getConnection(){ // 定义连接工厂 ConnectionFactory factory = new ConnectionFactory(); // 设置服务器地址 factory.setHost("127.0.0.1"); // 设置协议端口号 factory.setPort(5672); // 设置vhost factory.setVirtualHost("/admin_test"); // 设置用户名称 factory.setUsername("admin"); // 设置用户密码 factory.setPassword("123456"); // 创建新的连接 Connection newConnection = null; try { newConnection = factory.newConnection(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } return newConnection; } }
是根据消息携带的路由键(routing key)将消息投递给对应队列的。
消息提供者:
private static void MQProviderSimple(){ //队列名称 String QUEUE_NAME = "test_queue"; // 1.获取连接 Connection newConnection = RabbitMQConnection.getConnection(); // 2.创建通道 Channel channel = null; try { channel = newConnection.createChannel(); // 3.创建队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String msg = "哈哈哈"; System.out.println("发送消息:" + msg); // 4.发送消息 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } catch (IOException e) { e.printStackTrace(); }finally { try { channel.close(); newConnection.close(); } catch (Exception e) { e.printStackTrace(); } } }
消息接收者:
private static void MQConsumerSimple(){ //队列名称 String QUEUE_NAME = "test_queue"; Connection newConnection = RabbitMQConnection.getConnection(); // 2.获取通道 Channel channel = null; try { channel = newConnection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "UTF-8"); System.out.println("消费者获取消息:" + msgString); } }; // 3.监听队列 System.out.println("消费者开始监听队列:="+QUEUE_NAME); channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } catch (IOException e) { e.printStackTrace(); } }
其中需要注意的便是提供者和消费者的队列名称一致才能完成消费,
其中channel.queueDeclare()方法中的参数分别意思为:
消费者监听消息默认是自动签收的,并且如果消费者是集群的话,RabbitMQ会以轮训的形式分发给消费者,这样如果集群服务器与服务器之间配置不同的话,配置差的服务器也要处理那么多消息,显然是不合理的,应该让配置高的多承担,配置第的少承担。因此需要,消费者收到消息还没处理完,RabbitMQ就不想它再推送消息,待处理完再推送。
为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收并且处理完毕了。RabbitMQ就可以删除它了。
如果一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理完全,然后交给另一个消费者去重新处理。这样,你就可以确认即使消费者偶尔挂掉也不会丢失任何消息了。
消息提供者:
private static void MQProviderWork(){ //队列名称 String QUEUE_NAME = "test_queue"; // 1.获取连接 Connection newConnection = RabbitMQConnection.getConnection(); // 2.创建通道 Channel channel = null; try { channel = newConnection.createChannel(); // 3.创建队列声明 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 String msg = "哈哈哈"; System.out.println("发送消息:" + msg); // 4.发送消息 channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } catch (IOException e) { e.printStackTrace(); }finally { try { channel.close(); newConnection.close(); } catch (Exception e) { e.printStackTrace(); } } }
消息接收者:
private static void MQConsumerWork(){ //队列名称 String QUEUE_NAME = "test_queue"; Connection newConnection = RabbitMQConnection.getConnection(); try { // 2.获取通道 Channel channel = newConnection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1);// 保证一次只分发一次 限制发送给同一个消费者 不得超过一条消息 DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "UTF-8"); System.out.println("消费者获取消息:" + msgString); // 手动回执消息 channel.basicAck(envelope.getDeliveryTag(), false); } }; // 3.监听队列 System.out.println("消费者开始监听队列:="+QUEUE_NAME); channel.basicConsume(QUEUE_NAME, false, defaultConsumer); } catch (IOException e) { e.printStackTrace(); } }
其中相比简单队列增加了:channel.basicQos(1),表示每次分发不得超过一条消息。
消费者开启监听channel.basicConsume的第二个参数,便代表着应答模式,true的话为自动应答,我们需要手动应答所以为false。
channel.basicAck(envelope.getDeliveryTag(), false)便是告知服务器,我的消息已处理完成,可以接收消息了。
一个生产者发送消息,多个消费者获取消息(同样的消息),包括一个生产者,一个交换机,多个队列,多个消费者。
消息提供者:
private static void MQProviderTopic(){ //交换机的名称 String EXCHANGE_NAME = "exchange"; // 1.获取连接 Connection newConnection = RabbitMQConnection.getConnection(); // 2.创建通道 Channel channel = null; try { channel = newConnection.createChannel(); // 3.绑定的交换机 参数1交互机名称 参数2 exchange类型 channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String msg = "fanout_exchange_msg"; // 4.发送消息 channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); } catch (IOException e) { e.printStackTrace(); }finally { try { channel.close(); newConnection.close(); } catch (Exception e) { e.printStackTrace(); } } }
消息消费者:
private static void MQConsumerTopic(){ //队列名称 String QUEUE_NAME = "test_queue"; //交换机的名称 String EXCHANGE_NAME = "exchange"; Connection newConnection = RabbitMQConnection.getConnection(); try { // 2.创建通道 Channel channel = newConnection.createChannel(); // 3.newConnection channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "UTF-8"); System.out.println("消费者获取消息:" + msgString); } }; // 5.消费者监听队列消息 channel.basicConsume(QUEUE_NAME, true, defaultConsumer); } catch (IOException e) { e.printStackTrace(); } }
相比队列形式,发布订阅,多了 channel.queueBind(),其中参数为:
路由模式,和上面发布订阅相差不大,差别多了个routingKey 。
消息提供者:
private static void MQProviderRouter(){ //交换机的名称 String EXCHANGE_NAME = "exchange"; // 1.获取连接 Connection newConnection = RabbitMQConnection.getConnection(); try { // 2.创建通道 Channel channel = newConnection.createChannel(); // 3.绑定的交换机 参数1交互机名称 参数2 exchange类型 channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String routingKey = "info"; String msg = "direct_exchange_msg" + routingKey; // 4.发送消息 channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("生产者发送msg:" + msg); } catch (IOException e) { e.printStackTrace(); } }
消费接收者:
private static void MQConsumerRouter(){ //队列名称 String QUEUE_NAME = "test_queue"; //交换机的名称 String EXCHANGE_NAME = "exchange"; // 1.获取连接 Connection newConnection = RabbitMQConnection.getConnection(); try{ Channel channel = newConnection.createChannel(); // 3.消费者关联队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error"); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "UTF-8"); System.out.println("消费者获取消息:" + msgString); } }; // 5.消费者监听队列消息 channel.basicConsume(QUEUE_NAME, true, defaultConsumer); }catch (Exception e){ e.printStackTrace(); } }
此模式是在路由key模式的基础上,使用了通配符来管理消费者接收消息。生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;
消息提供者:
private static void MQProviderTop(){ // //交换机的名称 String EXCHANGE_NAME = "exchange"; // 1.获取连接 Connection newConnection = RabbitMQConnection.getConnection(); try { Channel channel = newConnection.createChannel(); // 3.绑定的交换机 参数1交互机名称 参数2 exchange类型 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = "log.info.error"; String msg = "topic_exchange_msg" + routingKey; // 4.发送消息 channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("生产者发送msg:" + msg); } catch (IOException e) { e.printStackTrace(); } }
消息消费者:
private static void MQConsumerTop(){ //队列名称 String QUEUE_NAME = "test_queue"; //交换机的名称 String EXCHANGE_NAME = "exchange"; // 1.获取连接 Connection newConnection = RabbitMQConnection.getConnection(); try{ // 2.创建通道 Channel channel = newConnection.createChannel(); // 3.消费者关联队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.#"); // 4.消费者绑定交换机 参数1 队列 参数2交换机 参数3 routingKey DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msgString = new String(body, "UTF-8"); System.out.println("消费者获取消息:" + msgString); } }; // 5.消费者监听队列消息 channel.basicConsume(QUEUE_NAME, true, defaultConsumer); }catch (Exception e){ e.printStackTrace(); } }
注意点:
符号#:匹配一个或者多个词lazy.# 可以匹配lazy.irs或者lazy.irs.cor
符号*:只能匹配一个词lazy.* 可以匹配lazy.irs或者lazy.cor
生产者发送消息出去之后,不知道到底有没有发送到RabbitMQ服务器, 默认是不知道的。有时如果方法中出现错误,发出去的消息需要撤回时,遍需要有事物进行处理;
事务模式:
txSelect 将当前channel设置为transaction模式
txCommit 提交当前事务
txRollback 事务回滚
private static void MQProviderSelect() throws IOException { // //交换机的名称 String EXCHANGE_NAME = "exchange"; // 1.获取连接 Connection newConnection = RabbitMQConnection.getConnection(); Channel channel = newConnection.createChannel(); try { // 3.绑定的交换机 参数1交互机名称 参数2 exchange类型 channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = "log.info.error"; String msg = "topic_exchange_msg" + routingKey; // 4.发送消息 // 将当前管道设置为 txSelect 将当前channel设置为transaction模式 开启事务 channel.txSelect(); channel.basicPublish(EXCHANGE_NAME, routingKey, null, msg.getBytes()); System.out.println("生产者发送msg:" + msg); channel.txCommit();// 提交事务 } catch (IOException e) { e.printStackTrace(); channel.txRollback();// 回滚事务 } }
前面学习了在java中怎么使用原生api操作消息队列,下面看下SpringBoot中如何集成:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
server.port=8081
####连接地址
spring.rabbitmq.host=127.0.0.1
####端口号
spring.rabbitmq.port=5672
####账号
spring.rabbitmq.username=admin
####密码
spring.rabbitmq.password=123456
### 地址
spring.rabbitmq.virtual-host=/admin_test
这里以Fanout模式演示的
@Component public class FanoutConfig { // 邮件队列 private String FANOUT_EMAIL_QUEUE = "fanout_eamil_queue"; // 短信队列 private String FANOUT_SMS_QUEUE = "fanout_sms_queue"; // 交换机名称 private String EXCHANGE_NAME = "fanoutExchange"; // 1.定义队列邮件 @Bean public Queue fanOutEamilQueue() { return new Queue(FANOUT_EMAIL_QUEUE); } @Bean public Queue fanOutSmsQueue() { return new Queue(FANOUT_SMS_QUEUE); } // 2.定义交换机 @Bean FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_NAME); } // 3.队列与交换机绑定邮件队列 // 参数名称要和定义队列和交换机的名称一致 @Bean Binding bindingExchangeEamil(Queue fanOutEamilQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutEamilQueue).to(fanoutExchange); } // 4.队列与交换机绑定短信队列 @Bean Binding bindingExchangeSms(Queue fanOutSmsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(fanOutSmsQueue).to(fanoutExchange); } }
@Autowired
private AmqpTemplate amqpTemplate;
@GetMapping("/Send/{queueName}")
public void Send(@PathVariable("queueName") String queueName) {
String msg = "my_fanout_msg:" + new Date();
System.out.println(msg + ":" + msg);
amqpTemplate.convertAndSend(queueName, msg);
}
配制和上面相同。
监听为:
@Component
@RabbitListener(queues = "fanout_eamil_queue")
public class FanoutEamilConsumer {
@RabbitHandler
public void process(String msg) throws Exception {
System.out.println("邮件消费者获取生产者消息msg:" + msg);
}
}
参考资料来源:蚂蚁课堂资料。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。