赞
踩
在互联网架构中,mq是一种非常常见的上下游”逻辑解耦+物理解耦“的消息通信服务,使用了mq之后,消息发送上游只需要依赖mq,不需要依赖其他服务。
因为它的三大功能:流量消峰、应用解耦、异步处理
简单举例:
流量消峰:订单系统在平缓期最多处理10000单,但是高峰期会超过这个值,就会使订单系统宕机,所以我们在人点单和订单系统出现订单的中间添加一个mq作为使用消息队列做缓存,这样虽然人下单后出现订单成功会慢一些,但是总比订单系统宕机好使!
应用解耦:订单系统包含在电商应用中,除此之外还有物流系统,支付系统、库存系统,当用户创建后,如果耦合调用物流系统,支付系统、库存系统,任何一个子系统出现故障,都会使下单操作异常,这是我们在中间添加一个mq消息队列,系统调用的问题就会减少,子系统一旦出现故障,需时间修复,在这特殊时间里,子系统的消息会缓存在消息队列中,订单不会出现异常正常执行。
异步处理:在这里使用mq,会节省时间,提高系统的应用性,当a调用b服务后,只需要监听b处理完成的消息,b处理完成后,发送一个消息到mq,mq将消息转发给a服务,这样使a及时得到异步处理成功的消息
ActiveMQ 、kfaka(为大数据而生的消息中间件,适合大量数据互联网服务的收集业务,适合大公司)、RocketMQ (适合金融互联网领域)、RabbitMQ(当前最主流的消息中间件之一,性能较好,支持多种语言,mq功能完备,稳定、跨平台、开源提供的管理界面非常好,社区活跃度高,更新频率高,适合数据量不那么大,中小型公司)
生产者、交换机、队列、消费者
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.7.15</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>com.example</groupId>
- <artifactId>springboot_rabbitmq_demo</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>springboot_rabbitmq_demo</name>
- <description>springboot_rabbitmq_demo</description>
- <properties>
- <java.version>8</java.version>
- </properties>
- <dependencies>
- <!-- rabbitmq依赖客户端-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
-
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- <version>2.7.15</version>
- </plugin>
- </plugins>
- </build>
-
- </project>
- spring.rabbitmq.host=localhost
- spring.rabbitmq.port=25672
- spring.rabbitmq.username=admin
- spring.rabbitmq.password=123456
- /**
- * 生产者代码
- */
- public class Producer {
- //队列
- public static final String QUEUE_NAME="HelloWorld";
- //发送消息
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建一个连接工厂
- ConnectionFactory factory=new ConnectionFactory();
- //工厂ip连接rabbitmq的队列
- factory.setHost("localhost");
- factory.setPort(25672);
- //用户名
- factory.setUsername("admin");
- //密码
- factory.setPassword("123456");
- //创建连接
- Connection connection = factory.newConnection();
- //获取信道
- Channel channel = connection.createChannel();
- /**
- * 生产一个队列
- * com.rabbitmq.client.AMQP.Queue.DeclareOk queueDeclare
- * (String var1, boolean var2, boolean var3, boolean var4, Map<String, Object> var5) throws IOException;
- * 1.队列名称
- * 2.队列里面的消息是否持久化,默认消息存放在内存中
- * 3.该队列是否只供一个消费者进行消费,是否进行消费共享,true:表示可以多个消费者消费 false::表示只能一个消费者消费
- * 4.是否自动删除 true自动删除,false不自动删除
- * 5.其它参数
- */
- channel.queueDeclare(QUEUE_NAME,false,false,false,null);
- //发消息
- String message="hello world!";
- /**
- * void basicPublish(String var1, String var2, BasicProperties var3, byte[] var4) throws IOException;
- * 发送一个消息
- * 1.发送到哪个交换机
- * 2.路由的key值
- * 3.其他的参数值
- * 4.发送消息的消息体
- */
- channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
- System.out.println( "消息发送完毕");
-
- }
- }
4.消费者代码
- /**
- * 消费者代码
- */
- public class Consumer {
- //队列
- public static final String QUEUE_NAME = "HelloWorld";
- //接受消息
- public static void main(String[] args) throws IOException, TimeoutException {
- //创建一个连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //工厂ip连接rabbitmq的队列
- factory.setHost("localhost");
- factory.setPort(25672);
- //用户名
- factory.setUsername("admin");
- //密码
- factory.setPassword("123456");
- //创建连接
- Connection connection = factory.newConnection();
- //获取信道
- Channel channel = connection.createChannel();
-
- /**
- * 消费者消费队列
- * 1.消费哪个队列
- * 2.消费成功后是否要自动应答,true:自动应答,false:手动应答
- * 3.消费者未成功消费的回调
- * 4.消费者取消消费的回调
- */
- //监听队列消息, 如果有消息则会回调客户端
- channel.basicConsume(QUEUE_NAME, new DefaultConsumer(channel){
-
- //处理消息函数
- public void handleDelivery(String consumerTag,
- Envelope envelope,
- AMQP.BasicProperties properties,
- byte[] body)
- throws IOException
- {
- channel.basicAck(envelope.getDeliveryTag(),false);
- //我们收到消息后打印消息
- System.out.println( new String( body ) );
- }
-
- });
- }
- }
- @Configuration
- public class TtlQueueConfig {
- //普通交换机
- @Bean("commonExchange")
- public DirectExchange commonExchange(){
- return new DirectExchange("commonExchange");
- }
- //普通队列
- @Bean("commonQueue")
- public Queue commonQueueA(){
- HashMap<String, Object> args = new HashMap<>(3);
- args.put("x-dead-letter-exchange","deadExchange");
- args.put("x-dead-letter-routing-key","dead");
- args.put("x-message-ttl",15000);
- return QueueBuilder.durable("commonQueue").withArguments(args).build();
- }
-
- //死信交换机
- @Bean
- public DirectExchange deadExchange(){
- return new DirectExchange("deadExchange");
- }
- //死信队列
- @Bean("deadQueue")
- public Queue deadQueue(){
- return QueueBuilder.durable("deadQueue").build();
- }
-
- /**
- *关于ttl队列的优化
- */
- @Bean("optimizeQueue")
- public Queue commonQueueB(){
- HashMap<String, Object> args = new HashMap<>(3);
- args.put("x-dead-letter-exchange","deadExchange");
- args.put("x-dead-letter-routing-key","dead");
- return QueueBuilder.durable("optimizeQueue").withArguments(args).build();
- }
- @Bean
- public Binding optimizeQueueBinding(@Qualifier("optimizeQueue") Queue optimizeQueue, @Qualifier("commonExchange") DirectExchange commonExchange){
- return BindingBuilder.bind(optimizeQueue).to(commonExchange).with("optimize");
- }
-
-
- //绑定普通队列和交换机
- @Bean
- public Binding commonQueueBinding(@Qualifier("commonQueue") Queue commonQueue, DirectExchange commonExchange){
- return BindingBuilder.bind(commonQueue).to(commonExchange).with("common");
- }
- //绑定死信队列和交换机
- @Bean
- public Binding deadQueueBinding(@Qualifier("deadQueue") Queue deadQueue, DirectExchange deadExchange){
- return BindingBuilder.bind(deadQueue).to(deadExchange).with("dead");
- }
-
- }
- /**
- * 生产者代码
- */
- @Slf4j
- @RestController
- @RequestMapping("/ttl")
- public class SendMessageController {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- //开始发消息
- @GetMapping("/sendMsg/{message}")
- public void sendMsg(@PathVariable String message){
- log.info("当前时间:{},发送一个消息给一个ttl队列:{}",new Date().toString(),message);
- rabbitTemplate.convertAndSend("commonExchange","common","消息来自ttl为15s的队列:"+message);
- }
-
- /**
- * 优化
- * @param message
- * @param ttlTime
- */
- @GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
- public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
- log.info("当前时间:{},发送一条时长:{}毫秒,ttl信息给optimize队列:{}",
- new Date().toString(),ttlTime,message);
- rabbitTemplate.convertAndSend("commonExchange","optimize",message,msg->{
- msg.getMessageProperties().setExpiration(ttlTime);
- return msg;
- });
- }
- }
- /**
- * 消费者代码
- */
- @Slf4j
- @Component
- public class DeadLetterQueueConsumer {
- @RabbitListener(queues = "deadQueue")
- public void receive(Message message, Channel channel)throws Exception{
- String msg=new String(message.getBody());
- log.info("当前时间:{},收到死信队列的消息:{}",new Date().toString(),msg);
- }
- }
问题1
[CachingConnectionFactory.java:1567]- Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
解决:
- #消费端配置 去掉自动签收功能 #自动签收auto 手动 manual
- spring.rabbitmq.listener.simple.acknowledge-mode=manual
问题2
Shutdown Signal: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - invalid expiration ' ': no_integer, class-id=60, method-id=40)
1、如果队列设置的是客户端是自动创建的,直接删除队列。
2、如果客户端没有配置自动创建队列的话,手动去MQ客户端创建队列,并且设置对应的TTL值。
3、 你所设置的TTL值要真实存在,不能为空
-
- @Configuration
- public class ConfirmConfig {
- @Bean
- public DirectExchange directExchange(){
- return new DirectExchange("com.lc.direct.demo");
- }
- @Bean("directQueueA")
- public Queue directQueueA(){
- return new Queue("directQueueA");
- }
- @Bean("directQueueB")
- public Queue directQueueB(){
- return new Queue("directQueueB");
- }
- @Bean
- public Binding getBindingDirectQueueA(@Qualifier("directQueueA") Queue directQueueA, DirectExchange directExchange){
- return BindingBuilder.bind(directQueueA).to(directExchange).with("北京");
- }
- @Bean
- public Binding getBindingDirectQueueB(@Qualifier("directQueueB") Queue directQueueB, DirectExchange directExchange){
- return BindingBuilder.bind(directQueueB).to(directExchange).with("哈尔滨");
- }
- }
-
-
- @Slf4j
- @RestController
- @RequestMapping("/confirm")
- public class ProducerController {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @GetMapping("/sendMessage/{messsage}")
- public void sendMessageA(@PathVariable String messsage){
- rabbitTemplate.convertAndSend("com.lc.direct.demo","北京",messsage);
- log.info("发送消息内容:{}",messsage);
- }
- @GetMapping("/sendMessage/{messsage}")
- public void sendMessageB(@PathVariable String messsage){
- rabbitTemplate.convertAndSend("com.lc.direct.demo","哈尔滨",messsage);
- log.info("发送消息内容:{}",messsage);
- }
-
- }
- @Slf4j
- @Component
- public class DirectConsumer {
- @RabbitListener(queues = "directQueueA")
- public void receiveConfirmMessageA(Message message){
- String msg = new String(message.getBody());
- log.info("接收到的队列消息:{}",msg);
- }
- @RabbitListener(queues = "directQueueB")
- public void receiveConfirmMessageB(Message message){
- String msg = new String(message.getBody());
- log.info("接收到的队列消息:{}",msg);
- }
- }
spring-boot-starter-amqp
连接rabbitmq时出现报错: Failed to check/redeclare auto-delete queue(s).
这里我是没有开启RabbitMQ,直接连接,当然报错了
Queue declaration failed; retries left=3
只监听了队列,但是没有在config中配置,又或者是链接错误写错符号或者有重复相同连接
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。