赞
踩
官网下载-需要手动安装Erlang环境
brew install --cask --appdir=/Applications docker
$ docker run -d --hostname my-rabbit --name some-rabbit rabbitmq:3
- 1
全称:高级消息队列协议,由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
特性:
1:分布式事务支持。
2:消息的持久化支持。
3:高性能和高可靠的消息处理优势。
ActieveMQ | RabbitMQ | kafka | RockeetMQ | |
---|---|---|---|---|
发布订阅 | 支持 | 支持 | 支持 | 支持 |
轮询分发 | 支持 | 支持 | 支持 | / |
公平分发 | / | 支持 | 支持 | / |
重发 | 支持 | 支持 | / | 支持 |
消息拉取 | / | 支持 | 支持 | 支持 |
Connection :连接,应用程序与Broker的网络连接TCP/IP三次握手和四次握手
Channel :网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立多个Channel。每个Channel代表一个会话任务
Virtual Host:虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机里有若干个Exchange和queue,同一个虚拟主机里面不能有相同名字的Exchange
bindings:Exchange和Queue之间的虚拟连接,bingding中可以保护多个routing key
routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息
发布与订阅
原生代码
交换机和队列在可视化界面手动绑定情况下
public class Producer { public static void main(String[] args) { //所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp //1.创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(""); factory.setPort(5672); factory.setUsername(""); factory.setPassword(""); factory.setVirtualHost("/"); Connection connection = null; Channel channel = null; //2.创建连接Connection try { connection = factory.newConnection("生产者"); //3.通过连接获取通道Channel connection.createChannel(); //4.通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接受消息 String queueName = "queue1"; /** * @Params1 队列名称 * @Params2 是否要持久化durable =false 所谓持久化消息是否存盘 * @Params3 排他性,是否是独占独立 * @Params4 是否自动删除,随着最后一个消费者消息完毕以后是否把队列自动删除 * @Params5 携带附属参数 */ channel.queueDeclare(queueName, false, false, false, null); //5.准备消息内容 String message = "Hello kun"; //6.准备交换机 String exchange ="fanout-exchange"; //7.定义路由key String routingkey = ""; //8.指定交换机的类型 String exchangeType ="direct"; channel.basicPublish(exchange, queueName, null, message.getBytes()); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { //7.关闭连接 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } //8.关闭通道 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
public class Consumer { public static void main(String[] args) { //所有的中间件技术都是基于tcp/ip协议基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp //1.创建连接工程 ConnectionFactory factory = new ConnectionFactory(); factory.setHost(""); factory.setPort(5672); factory.setUsername(""); factory.setPassword(""); factory.setVirtualHost("/"); Connection connection = null; final Channel channel = null; //2.创建连接Connection try { connection = factory.newConnection("生产者"); //3.通过连接获取通道Channel connection.createChannel(); //4.通过创建交换机,声明队列,绑定关系,路由key,发送消息,和接受消息 channel.basicConsume("queue1", true, new DeliverCallback() { public void handle(String consumerTag, Delivery message) throws IOException { System.out.println("收到消息是" + new String(message.getBody(), "UTF-8")); } }, new CancelCallback() { public void handle(String consumerTag) throws IOException { System.out.println("接受消息失败"); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } finally { //7.关闭连接 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } //8.关闭通道 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
绑定关系代码实现----原生代码
//声明交换机
channel.exchangeDeclare(exchange,exchangeType,true);
//声明队列
channel.queueDeclare("queue",true,false,false,null);
//绑定队列
channel.queueBind("queue",exchange,exchangeType);
路由模式
主题模式
lazy.#. #代表0个或者一个或者多个
*.orange. * *代表至少要满足一个
#.order.#
#.user.*
轮训分发
公平分发
//指标定义出来,qos=1
xxxx.basicQos("")
解耦、削峰、异步
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
fanout模式
package com.kun.order.config; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitmqConfiguration { //1.声明交换机 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange("direct_order_exchange", true, false); } //2.声明队列 sms.fanout.queue/email.fanout.queue @Bean public Queue smsQueue() { return new Queue("smsQueue", true); } @Bean public Queue emailQueue() { return new Queue("emailQueue", true); } //3.绑定关系 @Bean public Binding smsBinding() { return BindingBuilder.bind(smsQueue()).to(fanoutExchange()); } }
direct 模式
//3.绑定关系
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(smsQueue()).to(directExchange()).with("sms");
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(smsQueue()).to(directExchange()).with("email");
}
package com.kun.order.topic; import org.springframework.amqp.core.ExchangeTypes; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component; @Component @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "sms.topic.queue",durable = "true",autoDelete = "false"), exchange = @Exchange(value = "topic_order_exchange",type = ExchangeTypes.TOPIC), key = "#.sms.*" )) public class TopicEmailConsumer { @RabbitHandler public void reviceMessage(String message) { System.out.println(message); } }
package com.kun.order.service; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import java.util.UUID; @Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; /** * 模拟用户下单 * * @param userId * @param productId * @param num */ public void makeOrder(String userId, String productId, int num) { String orderId = UUID.randomUUID().toString(); String exchangeName = "direct"; String routingKey = ""; rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId); } }
package com.kun.order.service; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = "email.fanout.queue") public class FanoutEmailConsumer { @RabbitHandler public void reviceMessage(String message) { System.out.println(message); } }
过期时间TTL表示可以对消息设置预期的时间,目前有两种方法可以设置
1.通过队列属性设置,队列中所有消息都有相同的过期时间
2.通过消息进行单独设置,每条消息TTL可以不同
如果两种方法同时使用,则消息的过期时间以两者之间TTL较小的为准
@Bean
public Queue smsQueue() {
HashMap<String, Object> map = new HashMap<>();
map.put("x-message-ttl",5000);
return new Queue("smsQueue", true,false,false,map);
}
public void makeOrder(String userId, String productId, int num) { String orderId = UUID.randomUUID().toString(); String exchangeName = "direct"; String routingKey = ""; MessagePostProcessor processor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); message.getMessageProperties().setContentEncoding("UTF-8"); return message; } }; rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId,processor); }
@Bean
public Queue smsQueue() {
HashMap<String, Object> map = new HashMap<>();
map.put("x-message-ttl",5000);
//x-dead-letter-exchange -固定写法
//x-dead-letter-routing-key
map.put("x-dead-letter-exchange","exchangeName");
map.put("x-dead-letter-routing-key","routingName");
return new Queue("smsQueue", true,false,false,map);
}
具体实现
@Service public class OrderMQService { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private JdbcTemplate jdbcTemplate; //@PostConstruct注解好多人以为是Spring提供的。其实是Java自己的注解。 //Java中该注解的说明:@PostConstruct该注解被用来修饰一个非静态的void()方法。被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行, // 并且只会被服务器执行一次。PostConstruct在构造函数之后执行,init()方法之前执行。 @PostConstruct public void regCallback() { // 消息发送成功以后,给予生产者的消息回执,来确保生产者的可靠性 rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { System.out.println("cause:"+cause); // 如果ack为true代表消息已经收到 String orderId = correlationData.getId(); if (!ack) { // 这里可能要进行其他的方式进行存储 System.out.println("MQ队列应答失败,orderId是:" + orderId); return; } try { String updatesql = "update ksd_order_message set status = 1 where order_id = ?"; int count = jdbcTemplate.update(updatesql, orderId); if (count == 1) { System.out.println("本地消息状态修改成功,消息成功投递到消息队列中..."); } } catch (Exception ex) { System.out.println("本地消息状态修改失败,出现异常:" + ex.getMessage()); } } }); }
可靠消费问题
如果死信队列报错就进行人工处理
@Service public class DeadMqConsumer { @Autowired private DispatchService dispatchService; // 解决消息重试的集中方案: // 1: 控制重发的次数 + 死信队列 // 2: try+catch+手动ack // 3: try+catch+手动ack + 死信队列处理 + 人工干预 @RabbitListener(queues = {"dead.order.queue"}) public void messageconsumer(String ordermsg, Channel channel, CorrelationData correlationData, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { try { // 1:获取消息队列的消息 System.out.println("收到MQ的消息是: " + ordermsg ); // 2: 获取订单服务的信息 Order order = JsonUtil.string2Obj(ordermsg, Order.class); // 3: 获取订单id String orderId = order.getOrderId(); // 幂等性问题 //int count = countOrderById(orderId); // 4:保存运单 //if(count==0)dispatchService.dispatch(orderId); //if(count>0)dispatchService.updateDispatch(orderId); dispatchService.dispatch(orderId); // 3:手动ack告诉mq消息已经正常消费 channel.basicAck(tag, false); } catch (Exception ex) { System.out.println("人工干预"); System.out.println("发短信预警"); System.out.println("同时把消息转移别的存储DB"); channel.basicNack(tag, false,false); } } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。