当前位置:   article > 正文

springboot整合rabbitmq合集(xml方式和注解方式)_springboot rabbitmq 注解用法

springboot rabbitmq 注解用法

首先介绍一下rabbitmq三种模式

  1. Direct–路由模式
    任何发送到Direct Exchange的消息都会被转发到RouteKey指定的Queue。
    这种模式下不需要将Exchange进行任何绑定(binding)操作。
    消息传递时需要一个“RouteKey”,可以简单的理解为要发送到的队列名字。
    如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃。

  2. Fanout–发布/订阅模式
    任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。
    这种模式不需要RouteKey。
    这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
    如果接受到消息的Exchange没有与任何Queue绑定,则消息会被抛弃。

  3. Topic–匹配订阅模式
    任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上。
    就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
    这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
    在进行绑定时,要提供一个该队列关心的主题。
    .“#”表示0个或若干个关键字,“*”表示一个关键字。
    同样,如果Exchange没有发现能够与RouteKey匹配的Queue,则会抛弃此消息。

springboot整合rabbitmq基于注解方式(最简单方式)

pom文件配置

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.1.0.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.example</groupId>
	<artifactId>rabbitmq</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<name>demo</name>
	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>

</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

接下来就是配置文件

spring.application.name=springboot-rabbitmq
server.port=8080
//默认地址就是127.0.0.1:5672,如果是服务器的rabbitmq就改下
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
//消息确认模式,还有一种事务模式,这里不讲解,有兴趣自己去查资料
spring.rabbitmq.publisher-confirms=true
//这里我把他看作是虚拟主机目录,相当于数据库的库名
spring.rabbitmq.virtual-host=/
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

第一步配置生产者

package com.example.annotion.demo.sender;

import java.util.Date;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class Sender {

    //rabbitTemplate.convertAndSend(String exchange交换机名称(可省略), String routingKey路由键, Object object传递的消息)
    @Autowired
    private AmqpTemplate rabbitTemplate;

    //direct方式交换机名字随便填,但是不能填direct,会造成两次消费
    public void sendDirect() {
        String msg1 = "hello " + new Date();
        System.out.println("helloSender : " + msg1);
        this.rabbitTemplate.convertAndSend("hello", msg1);
//        this.rabbitTemplate.convertAndSend("direct","hello", msg1);
        String msg2 = "user " + new Date();
        System.out.println("userSender : " + msg2);
        this.rabbitTemplate.convertAndSend("user", msg2);
//        this.rabbitTemplate.convertAndSend("direct","user", msg2);
    }

	//topic方式
    public void sendTopic() {
        String msg1 = "I am topic.mesaage msg======";
        System.out.println("topic.mesaage sender : " + msg1);
        this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);

        String msg2 = "I am topic.mesaages msg########";
        System.out.println("topic.mesaages sender : " + msg2);
        this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);
    }

    //fanout方式routingKey随便填
    public void sendFanout() {
        String msg = "I am fanoutSender msg======";
        System.out.println("fanoutSender : " + msg);
        this.rabbitTemplate.convertAndSend("fanoutExchange", "suibiantian",msg);
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

接下来就是消费者

package com.example.annotion.demo.receiver;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

/**
 *     @RabbitListener()rabbit监听
 *     @QueueBinding()队列绑定 value绑定@Queue,exchange绑定@Exchange,key为路由键
 *     @Queue队列 value:名称;autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除;durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库
 *     @Exchange交换器,type有五种,其余参数同@Queue
 */
@Component
public class Receiver {

    //===============以下是验证direct Exchange的队列==========
//    @RabbitListener(queues = "hello")
    //direct模式,exchange名字随便填
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "hello",autoDelete = "false",declare = "true"),exchange = @Exchange(value = "suibianxie",type = ExchangeTypes.DIRECT),key = "user"
    ))
    @RabbitHandler
    public void processHello(String msg) {
        System.out.println("helloReceiver  : " + msg);
    }

//    @RabbitListener(queues = "user")
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "user",autoDelete = "false"),exchange = @Exchange(value = "suibianxie",type = ExchangeTypes.DIRECT),key = "hello"
    ))
    @RabbitHandler
    public void processUser(String msg) {
        System.out.println("userReceiver  : " + msg);
    }

    //===============以上是验证direct Exchange的队列==========




    //===============以下是验证topic Exchange的队列==========
//    @RabbitListener(queues = "topic.message")
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "topic.message",autoDelete = "false"),exchange = @Exchange(value = "exchange",type = ExchangeTypes.TOPIC),key = "topic.message"
    ))
    @RabbitHandler
    public void processTopicMessage(String msg) {
        System.out.println("topicMessageReceiver  : " + msg);
    }

//    @RabbitListener(queues = "topic.messages")
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "topic.messages",autoDelete = "false"),exchange = @Exchange(value = "exchange",type = ExchangeTypes.TOPIC),key = "topic.#"
    ))
    @RabbitHandler
    public void processTopicMessages(String msg) {
        System.out.println("topicMessagesReceiver  : " + msg);
    }

    //===============以上是验证topic Exchange的队列==========






    //===============以下是验证fanout Exchange的队列==========
//    @RabbitListener(queues = "fanout.A")
    //fanout方式key不用填
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "fanout.A",autoDelete = "false"),exchange = @Exchange(value = "fanoutExchange",type = ExchangeTypes.FANOUT)
    ))
    @RabbitHandler
    public void processFanoutA(String msg) {
        System.out.println("fanoutAReceiver  : " + msg);
    }

//    @RabbitListener(queues = "fanout.B")
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "fanout.B",autoDelete = "false"),exchange = @Exchange(value = "fanoutExchange",type = ExchangeTypes.FANOUT)
    ))
    @RabbitHandler
    public void processFanoutB(String msg) {
        System.out.println("fanoutBReceiver  : " + msg);
    }

//    @RabbitListener(queues = "fanout.C")
    @RabbitHandler
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "fanout.C",autoDelete = "false"),exchange = @Exchange(value = "fanoutExchange",type = ExchangeTypes.FANOUT)
    ))
    public void processFanoutC(String msg) {
        System.out.println("fanoutCReceiver  : " + msg);
    }

    //===============以上是验证fanout Exchange的队列==========

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99

再来一个controller方便测试

package com.example.annotion.demo.controller;

import com.example.annotion.demo.sender.Sender;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/rabbit")
public class RabbitController {
    
    @Autowired
    private Sender sender;

    
    @GetMapping("/direct")
    public void direct() {
        sender.sendDirect();
    }

    @GetMapping("/topic")
    public void topic() {
        sender.sendTopic();
    }

    @GetMapping("/fanout")
    public void fanout() {
        sender.sendFanout();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

启动项目,分别访问127.0.0.1:8080/rabbit/direct,127.0.0.1:8080/rabbit/topic,127.0.0.1:8080/rabbit/fanout三个地址看看效果。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

springboot整合rabbitmq基于注解方式

这种注解方式其实原理和上面一样,只是消费者的RabbitListener只要配置一个queue的名称,其他配置同意提取到一个配置类中
消费者代码

package com.example.annotion.demo.receiver;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;


@Component
public class Receiver {

    //===============以下是验证direct Exchange的队列==========
    @RabbitListener(queues = "hello")
    @RabbitHandler
    public void processHello(String msg) {
        System.out.println("helloReceiver  : " + msg);
    }

    @RabbitListener(queues = "user")
    @RabbitHandler
    public void processUser(String msg) {
        System.out.println("userReceiver  : " + msg);
    }

    //===============以上是验证direct Exchange的队列==========




    //===============以下是验证topic Exchange的队列==========
    @RabbitListener(queues = "topic.message")
    @RabbitHandler
    public void processTopicMessage(String msg) {
        System.out.println("topicMessageReceiver  : " + msg);
    }

    @RabbitListener(queues = "topic.messages")
    @RabbitHandler
    public void processTopicMessages(String msg) {
        System.out.println("topicMessagesReceiver  : " + msg);
    }

    //===============以上是验证topic Exchange的队列==========






    //===============以下是验证fanout Exchange的队列==========
    @RabbitListener(queues = "fanout.A")
    @RabbitHandler
    public void processFanoutA(String msg) {
        System.out.println("fanoutAReceiver  : " + msg);
    }

    @RabbitListener(queues = "fanout.B")
    @RabbitHandler
    public void processFanoutB(String msg) {
        System.out.println("fanoutBReceiver  : " + msg);
    }

    @RabbitListener(queues = "fanout.C")
    @RabbitHandler
    public void processFanoutC(String msg) {
        System.out.println("fanoutCReceiver  : " + msg);
    }

    //===============以上是验证fanout Exchange的队列==========

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

配置类代码

package com.example.annotion.demo.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

@Component
public class QueueCofig {

    //===============以下是验证direct Exchange的队列==========
    @Bean
    public Queue helloQueue() {
        return new Queue("hello");
    }
    
    @Bean
    public Queue userQueue() {
        return new Queue("user");
    }

    /**
     *注入name为'direct'的DirectExchange,默认名字就是空字符串,可以不注入
     */
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange("direct");
    }

    /**
     * 将队列hello与exchange绑定,binding_key为hello,就是完全匹配
     */
    @Bean
    Binding bindingHelloExchange(Queue helloQueue, DirectExchange exchange) {
        return BindingBuilder.bind(helloQueue).to(exchange).with("hello");
    }

    /**
     * 将队列user与exchange绑定,binding_key为hello,就是完全匹配
     */
    @Bean
    Binding bindingUserExchange(Queue userQueue, DirectExchange exchange) {
        return BindingBuilder.bind(userQueue).to(exchange).with("user");
    }

    //===============以上是验证direct Exchange的队列==========

    //===============以下是验证topic Exchange的队列==========
    @Bean
    public Queue queueMessage() {
        return new Queue("topic.message");
    }

    @Bean
    public Queue queueMessages() {
        return new Queue("topic.messages");
    }


    /**
     *注入name为exchange的TopicExchange
     */
    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    /**
     * 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配
     */
    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    /**
     * 将队列topic.messages与exchange绑定,binding_key为topic.#,模糊匹配
     */
    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }


    //===============以上是验证topic Exchange的队列==========
    
    
    //===============以下是验证Fanout Exchange的队列==========
    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }

    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }

    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }


    /**
     * 注入name为fanoutExchange的FanoutExchange
     */
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }



    /**
     * 将队列fanout.A与FanoutExchange绑定
     */
    @Bean
    Binding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }

    /**
     * 将队列fanout.B与FanoutExchange绑定
     */
    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }

    /**
     * 将队列fanout.C与FanoutExchange绑定
     */
    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }
    //===============以上是验证Fanout Exchange的队列==========



}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142

结果如下
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

springboot整合rabbitmq基于xml方式

这种方式比较麻烦,但是呢,有些老项目可能是这么用的,所以在这里也做一个demo。
pom文件和上面一样,配置文件把mq的配置去掉,第一步设置配置类,加载xml文件

package com.example.xml.demo.config;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.ImportResource;

/**
 * 实例化xml文件中定义的bean
 **/
@Configuration
@EnableRabbit
@ImportResource({ "classpath:config/applicationContext-*.xml" })
public class XmlConfig {

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

第二步就是配置文件,这里我分了两个配置文件,把不改变的连接信息之类的放在单独的配置文件,这部分文件配置即可发送mq消息

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
                            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!-- 读取配置文件, 多个properties文件可以用英文逗号分隔 -->
    <context:property-placeholder ignore-resource-not-found="true" location="classpath*:/rabbitmq.properties" file-encoding="UTF-8"/>
    <!-- 公共部分 -->
    <!-- 创建连接类 连接安装好的 rabbitmq -->
    <bean id="connectionFactory"  class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
        <constructor-arg value="localhost" />
        <!-- username,访问RabbitMQ服务器的账户,默认是guest -->
        <property name="username" value="${rmq.manager.user}" />
        <!-- username,访问RabbitMQ服务器的密码,默认是guest -->
        <property name="password" value="${rmq.manager.password}" />
        <!-- host,RabbitMQ服务器地址,默认值"localhost" -->
        <property name="host" value="${rmq.ip}" />
        <!-- port,RabbitMQ服务端口,默认值为5672 -->
        <property name="port" value="${rmq.port}" />
    </bean>
    <bean id="amqpAdmin" class="org.springframework.amqp.rabbit.core.RabbitAdmin">
        <constructor-arg ref="connectionFactory" />
    </bean>

    <bean id="rabbitTemplate" class="org.springframework.amqp.rabbit.core.RabbitTemplate">
        <constructor-arg ref="connectionFactory"></constructor-arg>
    </bean>


    <!-- 声明消息转换器为SimpleMessageConverter -->
    <bean id="messageConverter"
          class="org.springframework.amqp.support.converter.SerializerMessageConverter">
    </bean>
</beans>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

引入了rabbitmq.properties

rmq.ip=127.0.0.1
rmq.port=5672
rmq.manager.user=guest
rmq.manager.password=guest
  • 1
  • 2
  • 3
  • 4

接下来就是消费者的一些监听绑定的xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xmlns:rabbit="http://www.springframework.org/schema/rabbit"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
						http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
	<!-- name:队列名称;autoDelete:是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除;durable: 是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库 -->
	<!-- 声明Queue并设定Queue的名称 -->
	<rabbit:queue name="user" durable="true" auto-delete="false"/>
	<rabbit:queue name="hello" durable="true" auto-delete="false"/>
<!-- direct 模式可不配置direct-exchange -->
<!--	<rabbit:direct-exchange name="direct" xmlns="http://www.springframework.org/schema/rabbit" durable="true">-->
<!--		<bindings>-->
<!--			<binding queue="user" key="user" />-->
<!--			<binding queue="hello" key="hello" />-->
<!--		</bindings>-->
<!--	</rabbit:direct-exchange>-->

	<rabbit:queue name="topic.message" durable="true" auto-delete="false"/>
	<rabbit:queue name="topic.messages" durable="true" auto-delete="false"/>

	<!-- topic主题 -->
	<rabbit:topic-exchange name="exchange" xmlns="http://www.springframework.org/schema/rabbit" durable="true">
		<!-- 交换器绑定queue ,pattern就是routeKey路由键 -->
		<bindings>
			<binding queue="topic.message" pattern="topic.message" />
			<binding queue="topic.messages" pattern="topic.#" />
		</bindings>
	</rabbit:topic-exchange>

	<rabbit:queue name="fanout.A" durable="true" auto-delete="false"/>
	<rabbit:queue name="fanout.B" durable="true" auto-delete="false"/>
	<rabbit:queue name="fanout.C" durable="true" auto-delete="false"/>

	<!--fanout主题  没有routeKey路由键-->
	<rabbit:fanout-exchange  name="fanoutExchange" durable="true">

		<rabbit:bindings>
			<rabbit:binding queue="fanout.A" />
			<rabbit:binding queue="fanout.B" />
			<rabbit:binding queue="fanout.C" />
		</rabbit:bindings>
	</rabbit:fanout-exchange>



	<bean id="receiver" class="com.example.xml.demo.receiver.Receiver" />

	<!-- 把监听器配置进rabbit监听容器,ref引用bean,method监听方法,queues监听队列名字(上面配置的queue的name)  -->
	<rabbit:listener-container connection-factory="connectionFactory">
		<rabbit:listener ref="receiver" method="processUser" queues="user" />
		<rabbit:listener ref="receiver" method="processHello" queues="hello" />

		<rabbit:listener ref="receiver" method="processTopicMessage" queues="topic.message" />
		<rabbit:listener ref="receiver" method="processTopicMessages" queues="topic.messages" />
		<rabbit:listener ref="receiver" method="processFanoutA" queues="fanout.A" />
		<rabbit:listener ref="receiver" method="processFanoutB" queues="fanout.B" />
		<rabbit:listener ref="receiver" method="processFanoutC" queues="fanout.C" />
	</rabbit:listener-container>
</beans>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

接下来我们的接受类com.example.xml.demo.receiver.Receiver

package com.example.xml.demo.receiver;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

public class Receiver {


    public void processHello(String msg) {
        System.out.println("helloReceiver  : " + msg);
    }


    public void processUser(String msg) {
        System.out.println("userReceiver  : " + msg);
    }


    public void processTopicMessage(String msg) {
        System.out.println("topicMessageReceiver  : " + msg);
    }


    public void processTopicMessages(String msg) {
        System.out.println("topicMessagesReceiver  : " + msg);
    }


    public void processFanoutA(String msg) {
        System.out.println("fanoutAReceiver  : " + msg);
    }


    public void processFanoutB(String msg) {
        System.out.println("fanoutBReceiver  : " + msg);
    }


    public void processFanoutC(String msg) {
        System.out.println("fanoutCReceiver  : " + msg);
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

生产者类代码

package com.example.xml.demo.sender;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class Sender {

    @Autowired
    private AmqpTemplate rabbitTemplate;

    public void sendQueue() {
        String msg1 = "hello " + new Date();
        System.out.println("helloSender : " + msg1);
        this.rabbitTemplate.convertAndSend("hello", msg1);
//        this.rabbitTemplate.convertAndSend("direct","hello", msg1);
        String msg2 = "user " + new Date();
        System.out.println("userSender : " + msg2);
        this.rabbitTemplate.convertAndSend("user", msg2);
//        this.rabbitTemplate.convertAndSend("direct","user", msg1);
    }

    public void sendTopic() {
        String msg1 = "I am topic.mesaage msg======";
        System.out.println("topic.mesaage sender : " + msg1);
        this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);

        String msg2 = "I am topic.mesaages msg########";
        System.out.println("topic.mesaages sender : " + msg2);
        this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);
    }

    public void sendFanout() {
        String msg = "I am fanoutSender msg======";
        System.out.println("fanoutSender : " + msg);
        this.rabbitTemplate.convertAndSend("fanoutExchange","keysuibiantian", msg);
    }


}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43

接下来看结果
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/139264
推荐阅读
相关标签
  

闽ICP备14008679号