赞
踩
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
-
- <groupId>com.ex</groupId>
- <artifactId>tmap2</artifactId>
- <version>1.0-SNAPSHOT</version>
-
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.2.2.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-web</artifactId>
- </dependency>
-
- <!-- <dependency>-->
- <!-- <groupId>io.agora.rtm</groupId>-->
- <!-- <artifactId>agora-rtm-sdk</artifactId>-->
- <!-- <version>1.3</version>-->
- <!-- </dependency>-->
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- </dependency>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
- </project>
- spring:
- rabbitmq:
- host: localhost
- port: 5672
- username: admin
- password: 123456
- virtual-host: /ems
- template:
- retry: #重试,消息发送失败会重试
- enabled: true # 开启重试
- initial-interval: 10000ms #第一次十秒重试
- max-interval: 80000ms #最后一次是八秒重试
- multiplier: 2 #重试翻倍率
- publisher-confirms: true #发送者开启 confirm 确认机制
- publisher-returns: true # 发送者开启 return 确认机制
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.connection.ConnectionFactory;
- import org.springframework.amqp.rabbit.connection.CorrelationData;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.context.annotation.Bean;
- import org.springframework.context.annotation.Configuration;
-
- @Slf4j
- @Configuration
- public class RabbitCallbackConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
- @Bean
- RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
- RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
- //rabbitTemplate发送消息json转换配置
- rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
- rabbitTemplate.setMandatory(true);
- rabbitTemplate.setConfirmCallback(this);
- rabbitTemplate.setReturnCallback(this);
- return rabbitTemplate;
- }
- /**
- * 配置接收消息json转换为对象
- * @return
- */
- @Bean
- public MessageConverter jsonMessageConverter(){
- return new Jackson2JsonMessageConverter();
- }
-
- // 下边这样写也可以
- // @Autowired
- // private RabbitTemplate rabbitTemplate;
- // @PostConstruct
- // public void init() {
- // rabbitTemplate.setMandatory(true);
- // rabbitTemplate.setReturnCallback(this);
- // rabbitTemplate.setConfirmCallback(this);
- // }
-
- @Override
- public void confirm(CorrelationData correlationData, boolean ack, String cause) {
- if (!ack) {
- log.error("confirm==>发送到broker.exchange失败\r\n" +
- "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
- correlationData, ack, cause);
- } else {
- log.info("confirm==>发送到broker.exchange成功\r\n" +
- "correlationData={}\r\n" + "ack={}\r\n" + "cause={}",
- correlationData, ack, cause);
- }
- }
-
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText,
- String exchange, String routingKey) {
- log.info("returnedMessage==> \r\n" + "message={}\r\n" + "replyCode={}\r\n" +
- "replyText={}\r\n" + "exchange={}\r\n" + "routingKey={}",
- message, replyCode, replyText, exchange, routingKey);
- }
- }
- import com.rabbitmq.client.Channel;
- import lombok.SneakyThrows;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.amqp.rabbit.annotation.Exchange;
- import org.springframework.amqp.rabbit.annotation.Queue;
- import org.springframework.amqp.rabbit.annotation.QueueBinding;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.support.AmqpHeaders;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.stereotype.Component;
-
- import java.io.IOException;
- @Slf4j
- @Component
- public class WorkCustomer {
-
-
- @RabbitListener(queuesToDeclare = @Queue(value = "hello",declare = "true"),ackMode ="MANUAL" )
- @SneakyThrows
- public void receive12(Student student, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag){
- try {
- log.info("message: {}",student);
- // 处理实际业务
- // 制造异常
- // int wrongNumber = 1/0;
- // 无异常,确认消息消费成功
- channel.basicAck(deliveryTag, true);
- }catch (IOException | ArithmeticException exception) {
- log.error("处理消息发生异常", exception);
- // 有异常,将消息返回给Queue里,第三个参数requeue可以直接看出来,是否返回到Queue中
- channel.basicNack(deliveryTag, true, true);
- }
-
- }
-
-
- //-----------------工作模式---------------------------------------
- // 生产端没有指定交换机只有routingKey和Object。
- //消费方产生work队列,放在默认的交换机(AMQP default)上。
- //而默认的交换机有一个特点,只要你的routerKey的名字与这个
- //交换机的队列有相同的名字,他就会自动路由上。
- //生产端routingKey 叫work ,消费端生产work队列。
- //他们就路由上了
- @RabbitListener(queuesToDeclare = @Queue("work"))
- public void receive1(String message){
- System.out.println("work message1 = " + message);
- }
-
-
- @RabbitListener(queuesToDeclare = @Queue("work"))
- public void receive2(String message){
- System.out.println("work message2 = " + message);
- }
-
- //-------------------------广播模式--------------------------------------------------
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue, // 创建临时队列
- exchange = @Exchange(name = "logs", type = "fanout")
- ))
- public void fanout1(String message) {
- System.out.println("message1 = " + message);
- }
-
- @RabbitListener(bindings = @QueueBinding(
- value = @Queue, // 创建临时队列
- exchange = @Exchange(name = "logs", type = "fanout")
- ))
- public void fanout2(String message) {
- System.out.println("message2 = " + message);
- }
- //-------------------------路由模式--------------------------------------------------
-
- @RabbitListener(bindings = {
- @QueueBinding(
- value = @Queue(value = "infoQue",declare = "true"), // 创建info队列,declare默认队列持久化
- key = {"info"}, // 路由key
- exchange = @Exchange(type = "direct", name = "directs")
- )})
- public void receive1221(String message) {
- System.out.println("message1 = " + message);
- }
-
-
-
- @RabbitListener(bindings = {
- @QueueBinding(
- value = @Queue(value = "info||error"), // 创建临时队列
- key = {"info", "error"}, // 路由key
- exchange = @Exchange(type = "direct", name = "directs")
- )})
- public void receive11(String message) {
- System.out.println("message1 = " + message);
- }
-
- @RabbitListener(bindings = {
- @QueueBinding(
- value = @Queue,
- key = {"error"},
- exchange = @Exchange(type = "direct", name = "directs")
- )})
- public void receive22(String message) {
- System.out.println("message2 = " + message);
- }
- //-------------------------Topic 订阅模型(动态路由模型)--------------------------------------------------
- @RabbitListener(bindings = {
- @QueueBinding(
- value = @Queue,
- key = {"user.*"},
- exchange = @Exchange(type = "topic",name = "topics")
- )
- })
- public void receive111(String message){
- System.out.println("message1 = " + message);
- }
-
- @RabbitListener(bindings = {
- @QueueBinding(
- value = @Queue,
- key = {"user.#"},
- exchange = @Exchange(type = "topic",name = "topics")
- )
- })
- public void receive222(String message){
- System.out.println("message2 = " + message);
- }
-
-
-
- /**
- * 监听Queue的时候,直接获取消息体.
- * 在注解上开启手动确认, 必须是ackMode的大写.
- * 在进行消息确认的时候,要带上Rabbit MQ Server发送过来头上的tag,可以通过@Header注解获取delivery tag,
- * @param firstTopicQueueMessage 消息体
- * @param channel Broker和Consumer建立的channel
- * @param tag 消息头中的tag
- */
- // @RabbitListener(queues = "${queue.topic.first}", ackMode = "MANUAL")
- // @RabbitHandler
- // @SneakyThrows
- // public void receiveFirstTopicQueueMessage(String firstTopicQueueMessage, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
- // log.info("This is firstTopicQueue received message: {}", firstTopicQueueMessage);
- // try {
- // // 处理实际业务
- // TimeUnit.SECONDS.sleep(5);
- // // 制造异常
- // // int wrongNumber = 1/0;
- // // 无异常,确认消息消费成功
- // channel.basicAck(tag, true);
- // }catch (IOException | ArithmeticException exception) {
- // log.error("处理消息发生异常", exception);
- // // 有异常,将消息返回给Queue里,第三个参数requeue可以直接看出来,是否返回到Queue中
- // channel.basicNack(tag, true, true);
- // }
- // }
-
- }
-
- import org.junit.Test;
- import org.junit.runner.RunWith;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.test.context.SpringBootTest;
- import org.springframework.test.context.junit4.SpringRunner;
-
- @RunWith(SpringRunner.class)
- @SpringBootTest
- public class TTest {
-
- @Autowired
- private RabbitTemplate rabbitTemplate;
-
- //5.2 第一种hello world模型使用
- @Test
- public void test1() {
- Student student = new Student();
- student.setName("小明");
- student.setAge("18");
- student.setAddress("杭州");
- rabbitTemplate.convertAndSend("hello", student);
- // 生产端没有指定交换机只有routingKey和Object。
- //消费方产生hello队列,放在默认的交换机(AMQP default)上。
- //而默认的交换机有一个特点,只要你的routerKey的名字与这个
- //交换机的队列有相同的名字,他就会自动路由上。
- //生产端routingKey 叫hello ,消费端生产hello队列。
- //他们就路由上了
- }
-
- //5.3 第二种work模型使用
- @Test
- public void test2() {
- for (int i = 0; i < 10; i++) {
- rabbitTemplate.convertAndSend("work", "hello work!");
- // 生产端没有指定交换机只有routingKey和Object。
- //消费方产生work队列,放在默认的交换机(AMQP default)上。
- //而默认的交换机有一个特点,只要你的routerKey的名字与这个
- //交换机的队列有相同的名字,他就会自动路由上。
- //生产端routingKey 叫work ,消费端生产work队列。
- //他们就路由上了
- }
- }
- // 5.4 Fanout 广播模型
-
- @Test
- public void test3() {
- rabbitTemplate.convertAndSend("logs", "", "这是日志广播"); // 参数1为交换机,参数2为路由key,“”表示为任意路由,参数3为消息内容
- }
-
- //5.5 Route 路由模型
- @Test
- public void contextLoads() {
- // rabbitTemplate.convertAndSend("directs", "error", "error 的日志信息");
- rabbitTemplate.convertAndSend("directs", "info", "info 的日志信息");
- }
-
- //5.6 Topic 订阅模型(动态路由模型)
- @Test
- public void contextLoads1() {
- rabbitTemplate.convertAndSend("topics", "user.save.findAll", "user.save.findAll 的消息");
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。