当前位置:   article > 正文

RabbitMQ——七个案例_react rabbitmq 前端实现案例

react rabbitmq 前端实现案例

安装RabbitMQ

一、安装RabbitMQ需要先安装Erlang语言开发包

百度网盘链接:https://pan.baidu.com/s/1sFVkN0I7w52TFPrVXfjtxg 提取码:wrdj
直接下载地址:http://erlang.org/download/otp_win64_18.3.exe
安装完成后需要配置环境变量:
新建系统变量:
变量名 ERLANG_HOME
变量值 D:\softInstall\erl7.3(Erlang安装目录)
添加到PATH:%ERLANG_HOME%\bin;

二、安装RabbitMQ Server

百度网盘链接:https://pan.baidu.com/s/1tSIusKTdEhZtedGZ-WN1TA 提取码:fps4
直接下载地址:http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.9/rabbitmq-server-3.6.9.exe
安装完成后需要配置环境变量:
新建系统变量:
变量名 RABBITMQ_SERVER
变量值 D:\softInstall\rabbitMQ\rabbitmq_server-3.6.9(RabbitMQ Server安装目录)
添加到PATH:%RABBITMQ_SERVER%\sbin;

三、以管理员身份运行cmd.exe,进入目录

D:\softInstall\rabbitMQ\rabbitmq_server-3.6.9\sbin(RabbitMQ Server安装目录),
运行cmd命令:
rabbitmq-plugins.bat enable rabbitmq_management

四、以管理员身份运行cmd.exe,运行命令:

net start RabbitMQ
net stop RabbitMQ
启动RabbitMQ Server,在浏览器输入地址:http://localhost:15672
输入默认账号:guest 密码:guest,就能进入RabbitMQ界面了。
至此,RabbitMQ Server安装完成。
在这里插入图片描述

springboot-rabbitmq工程搭建流程

在这里插入图片描述

二、引入springboot和rabbitmq的依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.springboot.rabbitmq</groupId>
    <artifactId>springboot-rabbitmq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>springboot-rabbitmq</name>
    <description>springboot-rabbitmq</description>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.1.RELEASE</version>
    </parent>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
<!--            <version>5.2.6.RELEASE</version>-->
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-test</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

三、在src/main/resources里面新增application.properties

spring.application.name=springboot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.virtual-host=/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

四、新建springboot主类Application

该类初始化创建队列、转发器,并把队列绑定到转发器

package com.rabbit;

import org.springframework.amqp.core.*;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

/**
 * @author kongfanjun
 * @version 1.0
 * @date 2020/5/28 002816:35
 */
@SpringBootApplication
public class Application {
    final static String queueName = "hello";

    //===============以下是验证Direct<直接>:1对1-----一个消息只能被一个消费者消费==========
    @Bean
    public Queue helloQueue() {
//        return new Queue("hello");
        return new Queue("helloQueue");
    }

    @Bean
    public Queue userQueue() {
        return new Queue("user");
    }
    //===============以上是验证Direct<直接>:1对1-----一个消息只能被一个消费者消费==========

    //===============以下是验证Topic<主题>:1对多-----一个消息可以被多个消费者消费==========
    @Bean
    public Queue queueMessage() {
        return new Queue("topic.message");
    }

    @Bean
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }
    //===============以上是验证Topic<主题>:1对多-----一个消息可以被多个消费者消费==========


    //===============以下是验证Fanout<分列>:广播==========
    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }
    //===============以上是验证Fanout<分列>:广播==========


    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }

    /**
     * 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配
     * @param queueMessage
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    /**
     * 将队列topic.messages与exchange绑定,binding_key为topic.#,模糊匹配
     * @param queueMessages
     * @param exchange
     * @return
     */
    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }

    @Bean
    Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }


    public static void main(String[] args) throws Exception {
        SpringApplication.run(Application.class, args);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111

五、各种情景实现

1、最简单的hello生产和消费实现(单生产者和单消费者)

生产者:

package com.rabbit.hello;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;

/**
 * @description:消息队列发送类
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:54
 */
@Component
public class HelloProducer1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String sendMsg = "hello1 " + LocalDate.now();;
        System.out.println("生产者1 : " + sendMsg);
        this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

消费者:

package com.rabbit.hello;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @description:消息队列接收类
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:55
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloConsumer1 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("接收者1  : " + hello);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

controller:

package com.rabbit.controller;

import com.rabbit.hello.HelloProducer1;
import com.rabbit.hello.HelloProducer2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:55
 */
@RestController
@RequestMapping("/rabbit")
public class RabbitTest {

    @Autowired
    private HelloProducer1 helloProducer1;
    @Autowired
    private HelloProducer2 helloProducer2;

    @PostMapping("/hello")
    public void hello() {
        helloProducer1.send();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

启动程序
执行:http://127.0.0.1:8080/rabbit/hello(用POSTMAN发送请求。)

报错:
Caused by: com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no queue ‘helloQueue’ in vhost ‘/’, class-id=50, method-id=10)
由于rabbitmq 消费者 监听消息队列时, 消息队列没有创建 导致上面的异常
在这里插入图片描述
解决办法:

    @Bean
    public Queue helloQueue() {
//        return new Queue("hello");
        return new Queue("helloQueue");
    }
  • 1
  • 2
  • 3
  • 4
  • 5

在这里插入图片描述
在这里插入图片描述
显示结果:

生产者1 : hello1 2020-06-04
接收者1  : hello1 2020-06-04
  • 1
  • 2

2、单生产者-多消费者

生产者:

package com.rabbit.hello;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.util.Date;

/**
 * @description:消息队列发送类
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:54
 */
@Component
public class HelloProducer1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

//    public void send() {
//        String sendMsg = "hello1 " + LocalDate.now();
//        System.out.println("生产者1 : " + sendMsg);
//        this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
//    }

    public void send(String msg) {
        String sendMsg = msg + new Date();
        System.out.println("生产者1 : " + sendMsg);
        this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

消费者1:

package com.rabbit.hello;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @description:消息队列接收类
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:55
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloConsumer1 {

//    @RabbitHandler
//    public void process(String hello) {
//        System.out.println("接收者1  : " + hello);
//    }

    @RabbitHandler
    public void process(String hello) {
        System.out.println("消费者1  : " + hello);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

消费者2:

package com.rabbit.hello;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @description:消息队列接收类
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:55
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloConsumer2 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("消费者2  : " + hello);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

controller:

package com.rabbit.controller;

import com.rabbit.hello.HelloProducer1;
import com.rabbit.hello.HelloProducer2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:55
 */
@RestController
@RequestMapping("/rabbit")
public class RabbitTest {

    @Autowired
    private HelloProducer1 helloProducer1;
    @Autowired
    private HelloProducer2 helloProducer2;

    /**
     * 单生产者-单消费者
     */
//    @PostMapping("/hello")
//    public void hello() {
//        helloProducer1.send();
//    }

    /**
     * 单生产者-多消费者
     */
    @PostMapping("/oneToMany")
    public void oneToMany() {
        for(int i=0;i<10;i++){
            helloProducer1.send("hellomsg:"+i);
        }

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

用post方式执行:

http://127.0.0.1:8080/rabbit/oneToMany
  • 1

结果如下:

生产者1 : hellomsg:0Thu Jun 04 10:53:37 CST 2020
生产者1 : hellomsg:1Thu Jun 04 10:53:37 CST 2020
生产者1 : hellomsg:2Thu Jun 04 10:53:37 CST 2020
生产者1 : hellomsg:3Thu Jun 04 10:53:37 CST 2020
生产者1 : hellomsg:4Thu Jun 04 10:53:37 CST 2020
生产者1 : hellomsg:5Thu Jun 04 10:53:37 CST 2020
生产者1 : hellomsg:6Thu Jun 04 10:53:37 CST 2020
生产者1 : hellomsg:7Thu Jun 04 10:53:37 CST 2020
生产者1 : hellomsg:8Thu Jun 04 10:53:37 CST 2020
生产者1 : hellomsg:9Thu Jun 04 10:53:37 CST 2020
消费者1  : hellomsg:0Thu Jun 04 10:53:37 CST 2020
消费者2  : hellomsg:1Thu Jun 04 10:53:37 CST 2020
消费者2  : hellomsg:2Thu Jun 04 10:53:37 CST 2020
消费者1  : hellomsg:3Thu Jun 04 10:53:37 CST 2020
消费者2  : hellomsg:4Thu Jun 04 10:53:37 CST 2020
消费者1  : hellomsg:5Thu Jun 04 10:53:37 CST 2020
消费者1  : hellomsg:7Thu Jun 04 10:53:37 CST 2020
消费者2  : hellomsg:6Thu Jun 04 10:53:37 CST 2020
消费者1  : hellomsg:8Thu Jun 04 10:53:37 CST 2020
消费者2  : hellomsg:9Thu Jun 04 10:53:37 CST 2020
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

从以上结果可知,生产者发送的10条消息,分别被两个消费者接收了
在这里插入图片描述

3、多生产者-多消费者

生产者1:

package com.rabbit.hello;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.util.Date;

/**
 * @description:消息队列发送类
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:54
 */
@Component
public class HelloProducer1 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

//    public void send() {
//        String sendMsg = "hello1 " + LocalDate.now();
//        System.out.println("生产者1 : " + sendMsg);
//        this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
//    }

    public void send(String msg) {
        String sendMsg = msg + new Date();
        System.out.println("生产者1 : " + sendMsg);
        this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

生产者2:

package com.rabbit.hello;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.time.LocalDate;
import java.util.Date;

/**
 * @description:消息队列发送类
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:54
 */
@Component
public class HelloProducer2 {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send(String msg) {
        String sendMsg = msg + new Date();
        System.out.println("生产者2 : " + sendMsg);
        this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

消费者1:

package com.rabbit.hello;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @description:消息队列接收类
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:55
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloConsumer1 {

//    @RabbitHandler
//    public void process(String hello) {
//        System.out.println("接收者1  : " + hello);
//    }

    @RabbitHandler
    public void process(String hello) {
        System.out.println("消费者1  : " + hello);
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

消费者2:

package com.rabbit.hello;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @description:消息队列接收类
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:55
 */
@Component
@RabbitListener(queues = "helloQueue")
public class HelloConsumer2 {

    @RabbitHandler
    public void process(String hello) {
        System.out.println("消费者2  : " + hello);
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

controller:

package com.rabbit.controller;

import com.rabbit.hello.HelloProducer1;
import com.rabbit.hello.HelloProducer2;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:55
 */
@RestController
@RequestMapping("/rabbit")
public class RabbitTest {

    @Autowired
    private HelloProducer1 helloProducer1;
    @Autowired
    private HelloProducer2 helloProducer2;

    /**
     * 单生产者-单消费者
     */
//    @PostMapping("/hello")
//    public void hello() {
//        helloProducer1.send();
//    }

    /**
     * 单生产者-多消费者
     */
//    @PostMapping("/oneToMany")
//    public void oneToMany() {
//        for(int i=0;i<10;i++){
//            helloProducer1.send("hellomsg:"+i);
//        }
//    }

    /**
     * 多生产者-多消费者
     */
    @PostMapping("/manyToMany")
    public void manyToMany() {
        for(int i=0;i<10;i++){
            helloProducer1.send("hellomsg:"+i);
            helloProducer2.send("hellomsg:"+i);
        }

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

用post方式执行:

http://127.0.0.1:8080/rabbit/manyToMany
  • 1

结果如下:

生产者1 : hellomsg:0Thu Jun 04 11:01:43 CST 2020
生产者2 : hellomsg:0Thu Jun 04 11:01:43 CST 2020
生产者1 : hellomsg:1Thu Jun 04 11:01:43 CST 2020
生产者2 : hellomsg:1Thu Jun 04 11:01:43 CST 2020
生产者1 : hellomsg:2Thu Jun 04 11:01:43 CST 2020
生产者2 : hellomsg:2Thu Jun 04 11:01:43 CST 2020
生产者1 : hellomsg:3Thu Jun 04 11:01:43 CST 2020
生产者2 : hellomsg:3Thu Jun 04 11:01:43 CST 2020
生产者1 : hellomsg:4Thu Jun 04 11:01:43 CST 2020
生产者2 : hellomsg:4Thu Jun 04 11:01:43 CST 2020
生产者1 : hellomsg:5Thu Jun 04 11:01:43 CST 2020
生产者2 : hellomsg:5Thu Jun 04 11:01:43 CST 2020
生产者1 : hellomsg:6Thu Jun 04 11:01:43 CST 2020
生产者2 : hellomsg:6Thu Jun 04 11:01:43 CST 2020
生产者1 : hellomsg:7Thu Jun 04 11:01:43 CST 2020
生产者2 : hellomsg:7Thu Jun 04 11:01:43 CST 2020
生产者1 : hellomsg:8Thu Jun 04 11:01:43 CST 2020
生产者2 : hellomsg:8Thu Jun 04 11:01:43 CST 2020
生产者1 : hellomsg:9Thu Jun 04 11:01:43 CST 2020
生产者2 : hellomsg:9Thu Jun 04 11:01:43 CST 2020
消费者2  : hellomsg:0Thu Jun 04 11:01:43 CST 2020
消费者1  : hellomsg:0Thu Jun 04 11:01:43 CST 2020
消费者2  : hellomsg:1Thu Jun 04 11:01:43 CST 2020
消费者1  : hellomsg:1Thu Jun 04 11:01:43 CST 2020
消费者2  : hellomsg:2Thu Jun 04 11:01:43 CST 2020
消费者1  : hellomsg:2Thu Jun 04 11:01:43 CST 2020
消费者2  : hellomsg:3Thu Jun 04 11:01:43 CST 2020
消费者1  : hellomsg:3Thu Jun 04 11:01:43 CST 2020
消费者2  : hellomsg:4Thu Jun 04 11:01:43 CST 2020
消费者1  : hellomsg:4Thu Jun 04 11:01:43 CST 2020
消费者2  : hellomsg:5Thu Jun 04 11:01:43 CST 2020
消费者1  : hellomsg:5Thu Jun 04 11:01:43 CST 2020
消费者2  : hellomsg:6Thu Jun 04 11:01:43 CST 2020
消费者1  : hellomsg:6Thu Jun 04 11:01:43 CST 2020
消费者2  : hellomsg:7Thu Jun 04 11:01:43 CST 2020
消费者1  : hellomsg:7Thu Jun 04 11:01:43 CST 2020
消费者2  : hellomsg:8Thu Jun 04 11:01:43 CST 2020
消费者1  : hellomsg:8Thu Jun 04 11:01:43 CST 2020
消费者2  : hellomsg:9Thu Jun 04 11:01:43 CST 2020
消费者1  : hellomsg:9Thu Jun 04 11:01:43 CST 2020
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

和一对多一样,接收端仍然会均匀接收到消息
在这里插入图片描述

4、实体类传输

springboot完美的支持对象的发送和接收,不需要格外的配置。

实体类(必须实现序列化接口):

package com.rabbit.user;

import java.io.Serializable;

/**
 * @author kongfanjun
 * @version 1.0
 * @description:
 * @create 2020/6/4 0004 11:07
 */
public class User implements Serializable {
    private String name;
    private String pass;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getPass() {
        return pass;
    }
    public void setPass(String pass) {
        this.pass = pass;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

生产者:

package com.rabbit.user;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author kongfanjun
 * @version 1.0
 * @description:
 * @create 2020/6/4 0004 11:10
 */
@Component
public class UserProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        User user=new User();
        user.setName("JayChou");
        user.setPass("123456789");
        System.out.println("user发送信息 : " + user.getName()+"/"+user.getPass());
        this.rabbitTemplate.convertAndSend("userQueue", user);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

消费者:

package com.rabbit.user;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author kongfanjun
 * @version 1.0
 * @description:
 * @create 2020/6/4 0004 11:10
 */
@Component
@RabbitListener(queues = "userQueue")
public class UserConsumer {

    @RabbitHandler
    public void process(User user) {
        System.out.println("user接收信息  : " + user.getName()+"/"+user.getPass());
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

controller:

package com.rabbit.controller;

import com.rabbit.hello.HelloProducer1;
import com.rabbit.hello.HelloProducer2;
import com.rabbit.user.UserProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:55
 */
@RestController
@RequestMapping("/rabbit")
public class RabbitTest {

    @Autowired
    private HelloProducer1 helloProducer1;
    @Autowired
    private HelloProducer2 helloProducer2;
    @Autowired
    private UserProducer userProducer;
    /**
     * 单生产者-单消费者
     */
//    @PostMapping("/hello")
//    public void hello() {
//        helloProducer1.send();
//    }

    /**
     * 单生产者-多消费者
     */
//    @PostMapping("/oneToMany")
//    public void oneToMany() {
//        for(int i=0;i<10;i++){
//            helloProducer1.send("hellomsg:"+i);
//        }
//    }

    /**
     * 多生产者-多消费者
     */
//    @PostMapping("/manyToMany")
//    public void manyToMany() {
//        for(int i=0;i<10;i++){
//            helloProducer1.send("hellomsg:"+i);
//            helloProducer2.send("hellomsg:"+i);
//        }
//    }

    /**
     * 实体类传输测试
     */
    @PostMapping("/userTest")
    public void userTest() {
        userProducer.send();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

用post方式执行:

http://127.0.0.1:8080/rabbit/userTest
  • 1

结果如下:

user发送信息 : JayChou/123456789
user接收信息  : JayChou/123456789
  • 1
  • 2

在这里插入图片描述

5、topic ExChange示例

topic 是RabbitMQ中最灵活的一种方式,可以根据binding_key自由的绑定不同的队列

首先对topic规则配置,这里使用两个队列来测试(也就是在Application类中创建和绑定的topic.message和topic.messages两个队列),其中
topic.message的bindting_key为“topic.message”,
topic.messages的binding_key为“topic.#”;
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
生产者:

package com.rabbit.topic;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author kongfanjun
 * @version 1.0
 * @description:
 * @create 2020/6/4 0004 10:17
 */
@Component
public class topicProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String msg1 = "I am topic.mesaage msg======";
        System.out.println("生产者1 : " + msg1);
        this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);

        String msg2 = "I am topic.mesaages msg########";
        System.out.println("生产者2 : " + msg2);
        this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

消费者1(topic.message)

package com.rabbit.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author kongfanjun
 * @version 1.0
 * @description:
 * @create 2020/6/4 0004 10:17
 */
@Component
@RabbitListener(queues = "topic.message")
public class topicMessageConsumer {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("topicMessage消费者  : " +msg);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

消费者2(topic.messages)

package com.rabbit.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author kongfanjun
 * @version 1.0
 * @description:
 * @create 2020/6/4 0004 10:17
 */
@Component
@RabbitListener(queues = "topic.messages")
public class topicMessagesConsumer {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("topicMessages消费者  : " +msg);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

controller:

package com.rabbit.controller;

import com.rabbit.hello.HelloProducer1;
import com.rabbit.hello.HelloProducer2;
import com.rabbit.topic.topicProducer;
import com.rabbit.user.UserProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:55
 */
@RestController
@RequestMapping("/rabbit")
public class RabbitTest {

    @Autowired
    private HelloProducer1 helloProducer1;
    @Autowired
    private HelloProducer2 helloProducer2;
    @Autowired
    private UserProducer userProducer;
    @Autowired
    private topicProducer topicProducer;
    /**
     * 单生产者-单消费者
     */
//    @PostMapping("/hello")
//    public void hello() {
//        helloProducer1.send();
//    }

    /**
     * 单生产者-多消费者
     */
//    @PostMapping("/oneToMany")
//    public void oneToMany() {
//        for(int i=0;i<10;i++){
//            helloProducer1.send("hellomsg:"+i);
//        }
//    }

    /**
     * 多生产者-多消费者
     */
//    @PostMapping("/manyToMany")
//    public void manyToMany() {
//        for(int i=0;i<10;i++){
//            helloProducer1.send("hellomsg:"+i);
//            helloProducer2.send("hellomsg:"+i);
//        }
//    }

    /**
     * 实体类传输测试
     */
//    @PostMapping("/userTest")
//    public void userTest() {
//        userProducer.send();
//    }

    /**
     * topic exchange类型rabbitmq测试
     */
    @PostMapping("/topicTest")
    public void topicTest() {
        topicProducer.send();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74

用post方式执行:

http://127.0.0.1:8080/rabbit/topicTest
  • 1

结果如下:

生产者1 : I am topic.mesaage msg======
生产者2 : I am topic.mesaages msg########
topicMessage消费者  : I am topic.mesaage msg======
topicMessages消费者  : I am topic.mesaage msg======
topicMessages消费者  : I am topic.mesaages msg########
  • 1
  • 2
  • 3
  • 4
  • 5

由以上结果可知:
sender1发送的消息,routing_key是“topic.message”,
所以exchange里面的绑定的binding_key是“topic.message”,
topic.#都符合路由规则;
所以sender1发送的消息,两个队列都能接收到;

sender2发送的消息,routing_key是“topic.messages”,
所以exchange里面的绑定的binding_key只有topic.#都符合路由规则;
所以sender2发送的消息只有队列topic.messages能收到。

6、fanout ExChange示例

Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的所有队列都收到这个消息。

这里使用三个队列来测试(也就是在Application类中创建和绑定的fanout.A、fanout.B、fanout.C)这三个队列都和Application中创建的fanoutExchange转发器绑定。
在这里插入图片描述
在这里插入图片描述
生产者:

package com.rabbit.fanout;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * @author kongfanjun
 * @version 1.0
 * @description:fanout生产者
 * @create 2020/6/4 0004 10:16
 */
@Component
public class FanoutProducer {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void send() {
        String msgString="fanout生产者 :hello i am kfj";
        System.out.println(msgString);
        this.rabbitTemplate.convertAndSend("fanoutExchange","anyway", msgString);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

消费者A:

package com.rabbit.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author kongfanjun
 * @version 1.0
 * @description:fanout.A消费者
 * @create 2020/6/4 0004 10:16
 */
@Component
@RabbitListener(queues = "fanout.A")
public class FanoutConsumerA {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutConsumerA  : " + msg);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

消费者B:

package com.rabbit.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author kongfanjun
 * @version 1.0
 * @description:fanout.B消费者
 * @create 2020/6/4 0004 10:16
 */
@Component
@RabbitListener(queues = "fanout.B")
public class FanoutConsumerB {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutConsumerB  : " + msg);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

消费者C:

package com.rabbit.fanout;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author kongfanjun
 * @version 1.0
 * @description:fanout.C消费者
 * @create 2020/6/4 0004 10:16
 */
@Component
@RabbitListener(queues = "fanout.C")
public class FanoutConsumerC {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("FanoutConsumerC  : " + msg);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

controller:

package com.rabbit.controller;

import com.rabbit.fanout.FanoutProducer;
import com.rabbit.hello.HelloProducer1;
import com.rabbit.hello.HelloProducer2;
import com.rabbit.topic.topicProducer;
import com.rabbit.user.UserProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:55
 */
@RestController
@RequestMapping("/rabbit")
public class RabbitTest {

    @Autowired
    private HelloProducer1 helloProducer1;
    @Autowired
    private HelloProducer2 helloProducer2;
    @Autowired
    private UserProducer userProducer;
    @Autowired
    private topicProducer topicProducer;
    @Autowired
    private FanoutProducer fanoutProducer;
    /**
     * 单生产者-单消费者
     */
//    @PostMapping("/hello")
//    public void hello() {
//        helloProducer1.send();
//    }

    /**
     * 单生产者-多消费者
     */
//    @PostMapping("/oneToMany")
//    public void oneToMany() {
//        for(int i=0;i<10;i++){
//            helloProducer1.send("hellomsg:"+i);
//        }
//    }

    /**
     * 多生产者-多消费者
     */
//    @PostMapping("/manyToMany")
//    public void manyToMany() {
//        for(int i=0;i<10;i++){
//            helloProducer1.send("hellomsg:"+i);
//            helloProducer2.send("hellomsg:"+i);
//        }
//    }

    /**
     * 实体类传输测试
     */
//    @PostMapping("/userTest")
//    public void userTest() {
//        userProducer.send();
//    }

    /**
     * topic exchange类型rabbitmq测试
     */
//    @PostMapping("/topicTest")
//    public void topicTest() {
//        topicProducer.send();
//    }

    /**
     * fanout exchange类型rabbitmq测试
     */
    @PostMapping("/fanoutTest")
    public void fanoutTest() {
        fanoutProducer.send();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85

用post方式执行:

http://127.0.0.1:8080/rabbit/fanoutTest
  • 1

结果如下:

fanout生产者 :hello i am kfj
FanoutConsumerA  : fanout生产者 :hello i am kfj
FanoutConsumerB  : fanout生产者 :hello i am kfj
FanoutConsumerC  : fanout生产者 :hello i am kfj
  • 1
  • 2
  • 3
  • 4

由以上结果可知:就算fanoutSender发送消息的时候,指定了routing_key为"anyway",但是所有接收者都接受到了消息
在这里插入图片描述

7、带callback的消息发送

增加回调处理,这里不再使用application.properties默认配置的方式,会在程序中显示的使用文件中的配置信息。该示例中没有新建队列和exchange,用的是第5节中的topic.messages队列和exchange转发器。消费者也是第5节中的topicMessagesConsumer

rabbitmq配置类:

package com.rabbit.callback;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Scope;

/**
 * @author kongfanjun
 * @version 1.0
 * @description:
 * @create 2020/6/4 0004 10:14
 */
public class RabbitConfig {

    @Value("${spring.rabbitmq.host}")
    private String addresses;

    @Value("${spring.rabbitmq.port}")
    private String port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    @Value("${spring.rabbitmq.publisher-confirms}")
    private boolean publisherConfirms;

    @Bean
    public ConnectionFactory connectionFactory() {

        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses+":"+port);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost(virtualHost);
        /** 如果要进行消息回调,则这里必须要设置为true */
        connectionFactory.setPublisherConfirms(publisherConfirms);
        return connectionFactory;
    }

    @Bean
    /** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 */
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplatenew() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        return template;
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

生产者:

package com.rabbit.callback;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
 * @author kongfanjun
 * @version 1.0
 * @description:
 * @create 2020/6/4 0004 10:14
 */
@Component
public class CallBackProducer implements  RabbitTemplate.ConfirmCallback{
    @Autowired
    private RabbitTemplate rabbitTemplatenew;
    public void send() {

        rabbitTemplatenew.setConfirmCallback(this);
        String msg="callback发送者 : I am callback sender";
        System.out.println(msg );
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
        System.out.println("callback发送者 UUID: " + correlationData.getId());
        this.rabbitTemplatenew.convertAndSend("exchange", "topic.messages", msg, correlationData);
    }

    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        // TODO Auto-generated method stub
        System.out.println("callbakck confirm: " + correlationData.getId());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34

消费者:第5节中的topicMessagesConsumer

package com.rabbit.topic;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author kongfanjun
 * @version 1.0
 * @description:
 * @create 2020/6/4 0004 10:17
 */
@Component
@RabbitListener(queues = "topic.messages")
public class topicMessagesConsumer {

    @RabbitHandler
    public void process(String msg) {
        System.out.println("topicMessages消费者  : " +msg);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

controller:

package com.rabbit.controller;

import com.rabbit.callback.CallBackProducer;
import com.rabbit.fanout.FanoutProducer;
import com.rabbit.hello.HelloProducer1;
import com.rabbit.hello.HelloProducer2;
import com.rabbit.topic.topicProducer;
import com.rabbit.user.UserProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author kongfanjun
 * @version 1.0
 * @create 2020/5/28 0028 16:55
 */
@RestController
@RequestMapping("/rabbit")
public class RabbitTest {

    @Autowired
    private HelloProducer1 helloProducer1;
    @Autowired
    private HelloProducer2 helloProducer2;
    @Autowired
    private UserProducer userProducer;
    @Autowired
    private topicProducer topicProducer;
    @Autowired
    private FanoutProducer fanoutProducer;
    @Autowired
    private CallBackProducer callBackProducer;

    /**
     * 单生产者-单消费者
     */
//    @PostMapping("/hello")
//    public void hello() {
//        helloProducer1.send();
//    }

    /**
     * 单生产者-多消费者
     */
//    @PostMapping("/oneToMany")
//    public void oneToMany() {
//        for(int i=0;i<10;i++){
//            helloProducer1.send("hellomsg:"+i);
//        }
//    }

    /**
     * 多生产者-多消费者
     */
//    @PostMapping("/manyToMany")
//    public void manyToMany() {
//        for(int i=0;i<10;i++){
//            helloProducer1.send("hellomsg:"+i);
//            helloProducer2.send("hellomsg:"+i);
//        }
//    }

    /**
     * 实体类传输测试
     */
//    @PostMapping("/userTest")
//    public void userTest() {
//        userProducer.send();
//    }

    /**
     * topic exchange类型rabbitmq测试
     */
//    @PostMapping("/topicTest")
//    public void topicTest() {
//        topicProducer.send();
//    }

    /**
     * fanout exchange类型rabbitmq测试
     */
//    @PostMapping("/fanoutTest")
//    public void fanoutTest() {
//        fanoutProducer.send();
//    }

    @PostMapping("/callback")
    public void callbak() {
        callBackProducer.send();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94

用post方式执行:

http://127.0.0.1:8080/rabbit/callback
  • 1

结果如下:

callback发送者 : I am callback sender
callback发送者 UUID: fcec0e19-ed2b-4f76-9abc-7dbb4d65b562
topicMessages消费者  : callback发送者 : I am callback sender
callbakck confirm: fcec0e19-ed2b-4f76-9abc-7dbb4d65b562
  • 1
  • 2
  • 3
  • 4

如果要进行消息回调,则这里必须要设置为true
配置文件spring.rabbitmq.publisher-confirms=true
在这里插入图片描述
在这里插入图片描述
从上面可以看出CallBackProducer发出的UUID,收到了回应,又传回来了。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/881454
推荐阅读
相关标签
  

闽ICP备14008679号