赞
踩
1.pom文件
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.blj</groupId> <artifactId>springboot-rabbitmq</artifactId> <version>1.0-SNAPSHOT</version> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
2.application.properties
spring.application.name=spirng-boot-rabbitmq
#IP地址
spring.rabbitmq.host=127.0.0.1
#rabbitmq默认端口号
spring.rabbitmq.port=5672
#账户名和密码
spring.rabbitmq.username=root
spring.rabbitmq.password=root1234
spring.rabbitmq.virtual-host=/test
3.主启动类
package com.blj.springbootrabbitmq; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * 主启动类 * * @author BaiLiJun on 2019/10/30 */ @SpringBootApplication public class SpringbootRabbitmqApplication { public static void main(String[] args) { SpringApplication.run(SpringbootRabbitmqApplication.class,args); } }
4.RabbitConfig 配置类
package com.blj.springbootrabbitmq.config; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.config.BindingFactoryBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; /** * RabbitConfig 配置类 * * @author BaiLiJun on 2019/10/30 */ @Configuration public class RabbitConfig { //topic 模式 public static final String TOPIC_QUEUE1 = "topic.queue1"; public static final String TOPIC_QUEUE2 = "topic.queue2"; public static final String TOPIC_EXCHANGE = "topic.exchange"; //fanout 模式 public static final String FANOUT_QUEUE1 = "fandout.queue1"; public static final String FANOUT_QUEUE2 = "fandout.queue2"; public static final String FANOUT_EXCHANGE = "fandout.exchange"; //direct模式 public static final String DIRECT_QUEUE1 = "direct.queue1"; public static final String DIRECT_QUEUE2 = "direct.queue2"; public static final String DIRECT_EXCHANGE = "direct.exchange"; //Headers 模式 public static final String HEADER_QUEUE1 = "header.queue1"; public static final String HEADER_QUEUE2 = "header.queue2"; public static final String HEADER_EXCHANGE = "header.exchange"; /** * topic 模式 * */ /** * 声明队列topicQueue1 * * @return */ @Bean public Queue topicQueue1() { return new Queue(TOPIC_QUEUE1); } /** * 声明队列topicQueue2 * * @return */ @Bean public Queue topicQueue2() { return new Queue(TOPIC_QUEUE2); } /** * 声明主题模式交换机 TopicExchange * * @return */ @Bean public TopicExchange topicExchange() { return new TopicExchange(TOPIC_EXCHANGE); } /** * 将队列topicQueue1绑定到交换机 * * @return */ @Bean public Binding topicBinding1() { return BindingBuilder.bind(topicQueue1()).to(topicExchange()).with("user.add"); } /** * 将队列topicQueue2绑定到交换机 * * @return */ @Bean public Binding topicBinding2() { return BindingBuilder.bind(topicQueue2()).to(topicExchange()).with("user.#"); } /** * fanout 模式 *Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息 */ /** * 声明队列 fanoutQueue1 * * @return */ @Bean public Queue fanoutQueue1() { return new Queue(FANOUT_QUEUE1); } /** * 声明队列 fanoutQueue2 * * @return */ @Bean public Queue fanoutQueue2() { return new Queue(FANOUT_QUEUE2); } /** * 声明FanoutExchange * * @return */ @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); } /** * 将队列fanoutQueue1 绑定到交换机 * * @return */ @Bean public Binding fanoutBinding1() { return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange()); } /** * 将队列fanoutQueue2 绑定到交换机 * * @return */ @Bean public Binding fanoutBinding2() { return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange()); } /** * direct模式 * 消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配 * @return */ /** * 声明队列DIRECT_QUEUE1 * * @return */ @Bean public Queue directQueue1() { return new Queue(DIRECT_QUEUE1); } /** * 声明队列DIRECT_QUEUE2 * * @return */ @Bean public Queue directQueue2() { return new Queue(DIRECT_QUEUE2); } /** * 声明directExchange * @return */ @Bean public DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE); } /** * 将队列1绑定到交换机 * @return */ @Bean public Binding directBinding1(){ return BindingBuilder.bind(directQueue1()).to(directExchange()).with("goods.update"); } /** * 将队列2绑定到交换机 * @return */ @Bean public Binding directBinding2(){ return BindingBuilder.bind(directQueue2()).to(directExchange()).with("goods.delete"); } /** * Headers 模式 * 设置header attribute参数类型的交换机,相较于 direct 和 topic 固定地使用 routing_key , * headers 则是一个自定义匹配规则的类型. 在队列与交换器绑定时, 会设定一组键值对规则, * 消息中也包括一组键值对( headers 属性), 当这些键值对有一对, 或全部匹配时, 消息被投送到对应队列 * */ @Bean public Queue headerQueue1(){ return new Queue(HEADER_QUEUE1,true); } @Bean public Queue headerQueue2(){ return new Queue(HEADER_QUEUE2,true); } @Bean public HeadersExchange headersExchange(){ return new HeadersExchange(HEADER_EXCHANGE); } @Bean public Binding headerBinding1(){ Map<String, Object> map = new HashMap<>(); map.put("header1","value1"); map.put("header2","value2"); return BindingBuilder.bind(headerQueue1()).to(headersExchange()).whereAll(map).match(); } @Bean public Binding headerBinding2(){ Map<String, Object> map = new HashMap<>(); map.put("header1","value1"); map.put("header2","value2"); return BindingBuilder.bind(headerQueue2()).to(headersExchange()).whereAll(map).match(); } }
5.实体类
package com.blj.springbootrabbitmq.entity; import java.io.Serializable; /** * 用户实体类 * * @author BaiLiJun on 2019/10/30 */ public class User implements Serializable { private String id; private String name; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return "User{" + "id='" + id + '\'' + ", name='" + name + '\'' + '}'; } }
6.生产者(依次分别为:Fanout 模式生产者,Direct 模式生产者,Topic 模式 生产者,Headers 模式生产者)
package com.blj.springbootrabbitmq.sender; import com.blj.springbootrabbitmq.config.RabbitConfig; import com.blj.springbootrabbitmq.entity.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Fanout 模式 * 生产者 * * @author BaiLiJun on 2019/10/30 */ @Component public class FanoutSender { @Autowired private AmqpTemplate amqpTemplate; public void send(User user) { //第一个参数:Exchange名字 //第二个参数:Route-Key //第三个参数:要发送的内容 amqpTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE,"",user); } }
package com.blj.springbootrabbitmq.sender; import com.blj.springbootrabbitmq.config.RabbitConfig; import com.blj.springbootrabbitmq.entity.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Direct 模式 * 生产者 * * @author BaiLiJun on 2019/10/30 */ @Component public class DirectSender { @Autowired private AmqpTemplate amqpTemplate; public void send(User user){ amqpTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE,"goods.update",user); //amqpTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE,"goods.delete",user); } }
package com.blj.springbootrabbitmq.sender; import com.blj.springbootrabbitmq.config.RabbitConfig; import com.blj.springbootrabbitmq.entity.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Topic 模式 * 生产者 * * @author BaiLiJun on 2019/10/30 */ @Component public class TopicSender { @Autowired private AmqpTemplate amqpTemplate; public void send(User user) { //第一个参数:Exchange名字 //第二个参数:Route-Key //第三个参数:要发送的内容 amqpTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE,"user.add",user); //amqpTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE,"user.delete",user); } }
package com.blj.springbootrabbitmq.sender; import com.blj.springbootrabbitmq.config.RabbitConfig; import com.blj.springbootrabbitmq.entity.User; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageProperties; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * Headers 模式 * 生产者 * * @author BaiLiJun on 2019/10/30 */ @Component public class HeaderSender { @Autowired private AmqpTemplate amqpTemplate; public void send(Object massage) { //第一个参数:Exchange名字 //第二个参数:Route-Key //第三个参数:要发送的内容 String msg=(String)massage; MessageProperties properties = new MessageProperties(); properties.setHeader("header1","value1"); properties.setHeader("header2","value2"); Message message = new Message(msg.getBytes(), properties); amqpTemplate.convertAndSend(RabbitConfig.HEADER_EXCHANGE,"",message); } }
7.消费者(分别为:Fanout模式消费者,Direct 模式消费者,Topic 模式消费者,Header 模式消费者)
package com.blj.springbootrabbitmq.receiver; import com.blj.springbootrabbitmq.config.RabbitConfig; import com.blj.springbootrabbitmq.entity.User; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Fanout模式 * 消费者 * * @author BaiLiJun on 2019/10/30 */ @Component public class FanoutReceiver { //queues是指要监听的队列的名字 @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE1) public void receiveFanout1(User user){ System.out.println("[receiveFanout1] receive message"+user); } @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE2) public void receiveFanout2(User user){ System.out.println("[receiveFanout2] receive message"+user); } }
package com.blj.springbootrabbitmq.receiver; import com.blj.springbootrabbitmq.config.RabbitConfig; import com.blj.springbootrabbitmq.entity.User; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Direct 模式 * 消费者 * * @author BaiLiJun on 2019/10/30 */ @Component public class DirectReceiver { //queues是指要监听的队列的名字 @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE1) public void receiveDirect1(User user){ System.out.println("[receiveDirect1] receive message"+user); } @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE2) public void receiveDirect2(User user){ System.out.println("[receiveDirect2] receive message"+user); } }
package com.blj.springbootrabbitmq.receiver; import com.blj.springbootrabbitmq.config.RabbitConfig; import com.blj.springbootrabbitmq.entity.User; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Topic 模式 * 消费者 * * @author BaiLiJun on 2019/10/30 */ @Component public class TopicReceiver { //queues是指要监听的队列的名字 @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE1) public void receiveTopic1(User user){ System.out.println("[receiveTopic1] receive message"+user.toString()); } @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE2) public void receiveTopic2(User user){ System.out.println("[receiveTopic2] receive message"+user.toString()); } }
package com.blj.springbootrabbitmq.receiver; import com.blj.springbootrabbitmq.config.RabbitConfig; import com.blj.springbootrabbitmq.entity.User; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * Header 模式 * 消费者 * * @author BaiLiJun on 2019/10/30 */ @Component public class HeaderReceiver { //queues是指要监听的队列的名字 @RabbitListener(queues = RabbitConfig.HEADER_QUEUE1) public void receiveHeader1(byte[] message){ System.out.println("[receiveHeader1] receive message "+new String(message)); } @RabbitListener(queues = RabbitConfig.HEADER_QUEUE2) public void receiveHeader2(byte[] message){ System.out.println("[receiveHeader2] receive message "+new String(message)); } }
8.测试
package com.blj.springbootrabbitmq.test; import com.blj.springbootrabbitmq.entity.User; import com.blj.springbootrabbitmq.receiver.FanoutReceiver; import com.blj.springbootrabbitmq.sender.DirectSender; import com.blj.springbootrabbitmq.sender.FanoutSender; import com.blj.springbootrabbitmq.sender.HeaderSender; import com.blj.springbootrabbitmq.sender.TopicSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * 测试类 * * @author BaiLiJun on 2019/10/30 */ @SpringBootTest @RunWith(SpringRunner.class) public class SpringbootRabbitmqTest { @Autowired private FanoutSender fanoutSender; @Autowired private DirectSender directSender; @Autowired private TopicSender topicSender; @Autowired private HeaderSender headerSender; @Test public void testFanoutSend(){ User user = new User(); user.setId("1"); user.setName("Fanout"); fanoutSender.send(user); } @Test public void testDirectSend(){ User user = new User(); user.setId("2"); user.setName("Direct"); directSender.send(user); } @Test public void testTopicSend(){ User user = new User(); user.setId("3"); user.setName("Topic"); topicSender.send(user); } @Test public void testHeaderSend(){ headerSender.send("hello header Exchange"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。