赞
踩
目录
rabbitmq有五种工作模式;按照有无交换机分为两大类
无交换机的:简单队列(一对一,单生产单消费)、工作队列(工作队列有轮训分发和公平分发两种模式)
有交换机:发布-订阅、路由模式、主题模式
安装rabbitmq,并成功启动
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
生产端项目结构:
逻辑:生产者只对交换机进行生产,至于队列绑定等放在消费端进行执行
BusinessConfig
定义了三个不同类型的交换机
direct类型:(当生产者往该交换机发送消息时,他必须指定固定的routingkey,当routingkey值为空,他也会匹配routingkey为空的队列)
fanout类型:(当生产者往该交换机发送消息时,他所绑定的队列都会收到消息,routingkey即使写了也会忽略,一般为空字符串)
Topic类型:(当生产者往该交换机发送消息时,他并不像direct指定固定的routingkey,可以进行模糊匹配,当该routingkey为空时,他会匹配routingkey为空的队列)
- package com.zsp.quartz.queue;
-
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.context.annotation.Configuration;
-
-
- /**
- * @Author: ZhangSP
- * @Date: 2023/12/7 14:05
- */
- public class BusinessConfig {
- // 声明direct交换机
- public static final String EXCHANGE_DIRECT= "exchange_direct_inform";
-
- // 声明fanout交换机
- public static final String EXCHANGE_FANOUT= "exchange_fanout_inform";
-
- // 声明topic交换机
- public static final String EXCHANGE_TOPIC= "exchange_topic_inform";
- }
TestProducer
生产消息
- package com.zsp.quartz.queue;
-
- import com.alibaba.fastjson.JSON;
- import com.zsp.quartz.entity.User;
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringRunner;
-
- @SpringBootTest
- @RunWith(SpringRunner.class)
- public class TestProducer {
- @Autowired
- RabbitTemplate rabbitTemplate;
-
- @Test
- public void Producer_topics_springbootTest() {
-
- //使用rabbitTemplate发送消息
- String message = "";
- User user = new User();
- user.setName("张三");
- user.setEmail("anjduahsd");
- message = JSON.toJSONString(user);
-
- // direct
- rabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_DIRECT,"",message);
-
- // fanout
- rabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_FANOUT,"",message);
-
- // topic
- rabbitTemplate.convertAndSend(BusinessConfig.EXCHANGE_TOPIC,"",message);
- }
- }
消费者目录结构:
BusinessConfig内容解析:
①定义交换机类型
②配置交换机与队列的绑定关系
③通过容器工厂声明队列
- package com.zsp.consumer.queue;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.connection.Connection;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Configuration;
-
- import javax.annotation.PostConstruct;
-
- /**
- * @Author: ZhangSP
- * @Date: 2023/12/7 14:05
- */
- @Slf4j
- @Configuration
- public class BusinessConfig {
- // 声明direct
- public static final String EXCHANGE_DIRECT= "exchange_direct_inform";
- public static final String QUEUE_DIRECT_EMAIL = "queue_direct_inform_email";
- public static final String QUEUE_DIRECT_SMS = "queue_direct_inform_sms";
- public void BindDirectEmail(Channel channel) {
- try {
- channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);
- channel.queueDeclare(QUEUE_DIRECT_EMAIL, true, false, false, null);
- channel.queueBind(QUEUE_DIRECT_EMAIL, EXCHANGE_DIRECT, "");
- } catch (Exception e) {
- log.error("声明Direct->email队列时失败", e);
- }
- }
- public void BindDirectSms(Channel channel) {
- try {
- channel.exchangeDeclare(EXCHANGE_DIRECT, BuiltinExchangeType.DIRECT.getType(), true);
- channel.queueDeclare(QUEUE_DIRECT_SMS, true, false, false, null);
- channel.queueBind(QUEUE_DIRECT_SMS, EXCHANGE_DIRECT, "123");
- } catch (Exception e) {
- log.error("声明Direct->sms失败", e);
- }
- }
- // 声明fanout
- public static final String EXCHANGE_FANOUT= "exchange_fanout_inform";
- public static final String QUEUE_FANOUT_EMAIL = "queue_fanout_inform_email";
- public static final String QUEUE_FANOUT_SMS = "queue_fanout_inform_sms";
- public void BindFanoutEmail(Channel channel) {
- try {
- channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);
- channel.queueDeclare(QUEUE_FANOUT_EMAIL, true, false, false, null);
- channel.queueBind(QUEUE_FANOUT_EMAIL, EXCHANGE_FANOUT, "");
- } catch (Exception e) {
- log.error("声明Fanout->email队列时失败", e);
- }
- }
- public void BindFanoutSms(Channel channel) {
- try {
- channel.exchangeDeclare(EXCHANGE_FANOUT, BuiltinExchangeType.FANOUT.getType(), true);
- channel.queueDeclare(QUEUE_FANOUT_SMS, true, false, false, null);
- channel.queueBind(QUEUE_FANOUT_SMS, EXCHANGE_FANOUT,"");
- } catch (Exception e) {
- log.error("声明Fanout->sms失败", e);
- }
- }
-
- // 声明topic
- public static final String EXCHANGE_TOPIC= "exchange_topic_inform";
- public static final String QUEUE_TOPIC_EMAIL = "queue_topic_inform_email";
- public static final String QUEUE_TOPIC_SMS = "queue_topic_inform_sms";
- public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
- public static final String ROUTINGKEY_SMS="inform.#.sms.#";
-
- public void BindTopicEmail(Channel channel) {
- try {
- channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);
- channel.queueDeclare(QUEUE_TOPIC_EMAIL, true, false, false, null);
- channel.queueBind(QUEUE_TOPIC_EMAIL, EXCHANGE_TOPIC, ROUTINGKEY_EMAIL);
- } catch (Exception e) {
- log.error("声明Topic->email队列时失败", e);
- }
- }
- public void BindTopicSms(Channel channel) {
- try {
- channel.exchangeDeclare(EXCHANGE_TOPIC, BuiltinExchangeType.TOPIC.getType(),true);
- channel.queueDeclare(QUEUE_TOPIC_SMS, true, false, false, null);
- channel.queueBind(QUEUE_TOPIC_SMS, EXCHANGE_TOPIC,"");
- } catch (Exception e) {
- log.error("声明Topic->sms失败", e);
- }
- }
-
-
-
- // 声明队列
- @Autowired
- @Qualifier(value = "zspConnectionFactory")
- private ConnectionFactory connectionFactory;
- @PostConstruct
- public void shengmingQueue() {
- try {
- Connection connection = connectionFactory.createConnection();
- Channel channel = connection.createChannel(false);
- BindDirectEmail(channel);
- BindDirectSms(channel);
- BindFanoutEmail(channel);
- BindFanoutSms(channel);
- BindTopicEmail(channel);
- BindTopicSms(channel);
- } catch (Exception e) {
- log.error("业务实例声明绑定队列报错:",e);
- }
- }
- }
RabbitFactory内容解析:
①创建自定义连接工厂
②通过@Qualifier准确注入连接工厂,创建个性化容器工厂
- package com.zsp.consumer.queue;
-
- import org.springframework.amqp.rabbit.annotation.EnableRabbit;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- @EnableRabbit
- public class RabbitFactory {
-
- @Bean("zspConnectionFactory")
- public ConnectionFactory connectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- // 设置RabbitMQ的连接信息,如主机名、端口号、用户名和密码等
- connectionFactory.setHost("localhost");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("root");
- connectionFactory.setPassword("root");
- return connectionFactory;
- }
-
- @Bean("rabbitListenerContainerFactory")
- public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("zspConnectionFactory") ConnectionFactory connectionFactory) {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setConnectionFactory(connectionFactory);
- factory.setConcurrentConsumers(5);
- factory.setMaxConcurrentConsumers(10);
- return factory;
- }
- }
ReceiveHandler内容解析:
监听绑定的队列消息
- package com.zsp.consumer.queue;
-
- import com.alibaba.fastjson.JSONObject;
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
-
- @Component
- public class ReceiveHandler {
- //监听自定义的Direct队列
- @RabbitListener(queues = BusinessConfig.QUEUE_DIRECT_SMS, containerFactory = "rabbitListenerContainerFactory")
- public void directSMS(String msg, Message message, Channel channel) {
- JSONObject jsonObject = JSONObject.parseObject(msg);
- System.out.println("Direct队列->sms队列" + jsonObject);
- }
-
- @RabbitListener(queues = BusinessConfig.QUEUE_DIRECT_EMAIL, containerFactory = "rabbitListenerContainerFactory")
- public void directEmail(String msg, Message message, Channel channel) {
- JSONObject jsonObject = JSONObject.parseObject(msg);
- System.out.println("Direct队列->email队列" + jsonObject);
- }
-
- //监听自定义的Fanout队列
- @RabbitListener(queues = BusinessConfig.QUEUE_FANOUT_SMS, containerFactory = "rabbitListenerContainerFactory")
- public void FanoutSMS(String msg, Message message, Channel channel) {
- JSONObject jsonObject = JSONObject.parseObject(msg);
- System.out.println("Fanout队列->sms队列" + jsonObject);
- }
-
- @RabbitListener(queues = BusinessConfig.QUEUE_FANOUT_EMAIL, containerFactory = "rabbitListenerContainerFactory")
- public void FanoutEmail(String msg, Message message, Channel channel) {
- JSONObject jsonObject = JSONObject.parseObject(msg);
- System.out.println("Fanout队列->email队列" + jsonObject);
- }
-
- //监听自定义的Topic队列
- @RabbitListener(queues = BusinessConfig.QUEUE_TOPIC_SMS, containerFactory = "rabbitListenerContainerFactory")
- public void TopicSMS(String msg, Message message, Channel channel) {
- JSONObject jsonObject = JSONObject.parseObject(msg);
- System.out.println("Topic队列->sms队列" + jsonObject);
- }
-
- @RabbitListener(queues = BusinessConfig.QUEUE_TOPIC_EMAIL, containerFactory = "rabbitListenerContainerFactory")
- public void TopicEmail(String msg, Message message, Channel channel) {
- JSONObject jsonObject = JSONObject.parseObject(msg);
- System.out.println("Topic队列->email队列" + jsonObject);
- }
- }
先启动消费者端,然后执行TestProducer
1.向routingkey为空的队列发消息
我们在消费者端配置了routingkey为空的队列,叫做 QUEUE_DIRECT_EMAIL
因此会打印出下面这条记录
2.向routingkey为123的队列发消息
我们在消费者端配置了routingkey为123的队列,叫做 QUEUE_DIRECT_SMS
因此会打出下面这条记录
谁跟我绑定了,我都发
模糊匹配routingkey
匹配sms队列
会把下面这个打印出来
需要注意的是如果我们没有自定义容器工厂的话,这个containerFactory可以不写
简单理解就是实例,也就是rabbitmq服务地址是在哪里,实例包括了域名、端口、账号、密码等。
消费者监听队列时增加消息确认ack,改动两个地方
1.自定义容器工厂,增加手动确认机制
RabbitFactory
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
- package com.zsp.consumer.queue;
-
- import org.springframework.amqp.core.AcknowledgeMode;
- import org.springframework.amqp.rabbit.annotation.EnableRabbit;
- import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
- import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.beans.factory.annotation.Qualifier;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Configuration
- @EnableRabbit
- public class RabbitFactory {
-
- @Bean("zspConnectionFactory")
- public ConnectionFactory connectionFactory() {
- CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
- // 设置RabbitMQ的连接信息,如主机名、端口号、用户名和密码等
- connectionFactory.setHost("localhost");
- connectionFactory.setPort(5672);
- connectionFactory.setUsername("root");
- connectionFactory.setPassword("zsproot");
- return connectionFactory;
- }
-
- @Bean("rabbitListenerContainerFactory")
- public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(@Qualifier("zspConnectionFactory") ConnectionFactory connectionFactory) {
- SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
- factory.setConnectionFactory(connectionFactory);
- factory.setConcurrentConsumers(5);
- factory.setMaxConcurrentConsumers(10);
- factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
- return factory;
- }
- }
2.监听队列增加try{}catch(){}finally{}代码块
以DirectSMS举例
- public void DirectSMS(String msg, Message message, Channel channel) {
- try {
- JSONObject jsonObject = JSONObject.parseObject(msg);
- System.out.println("Direct队列->sms队列" + jsonObject);
- } catch (Exception e) {
- e.printStackTrace();
- log.info("[referral],directSMS队列error", e);
- } finally {
- try {
- channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
解释:
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
是用于手动确认消息的方法。在RabbitMQ中,当消费者成功处理消息后,需要向RabbitMQ服务器发送确认消息,告诉服务器可以将该消息从队列中移除。这个方法就是用来发送这个确认消息的。
在这行代码中,channel
是用来与RabbitMQ服务器通信的通道对象,basicAck
是发送确认消息的方法。message.getMessageProperties().getDeliveryTag()
是获取消息的交付标签,用于告诉RabbitMQ服务器是哪条消息需要被确认。false
是一个布尔值,表示是否批量确认,这里是单条确认,所以是false
。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。