当前位置:   article > 正文

Rabbitmq_rabbitmq默认账号密码

rabbitmq默认账号密码

认识Rabbitmq

一、什么是Rabbitmq

RabbitMQ简称MQ是一套实现了高级消息队列协议的开源消息代理软件,简单来说就是一个消息中间件。是一种程序对程序的通信方法,其服务器也是以高性能、健壮以及可伸缩性出名的Erlang语言编写而成。

二、 Rabbitmq有什么作用

abbitMQ简单来说就是一个消息队列中间件,用来保存消息和传递消息的一个容器。在此过程中充当一个中间人的作用 而队列的主要目的就是提供正确的路由来保证消息的传递;如果发送消息时消费者不可用的话,默认情况下该消息将会一直被存储在队列中,直到消费者消费为止 那么同时呢,如果设置了消息存活的时间,即消息的有效期。在此有效期间消息如果还没有被消费的话,那么该消息就会变成死信,由死信交换机接收。而绑定死信交换机的队列则称为死信队列

三、RabbitMQ的常见作用

RabbitMQ的常见作用有三种,分别是服务间解耦、实现异步通信、流量削峰

主要实现了消费者和生产者之间的解耦,发送异步消息,高并发访问解决流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

四、RabbitMQ的应用场景

场景一:用户订单,库存处理。【服务间解耦】

使用MQ前:系统正常时,用户下单,订单系统调用库存系统进行删减操作,操作成功,将成返回消息,提醒下单成功。系统异常时,库存系统将无法访问,导致订单删减操作无法执行,最终导致下单失败。

 

使用MQ后:订单系统和库存系统之间不在互相影响,独立运行,达到了应用解耦的目的。订单系统只需要将下单消息写入MQ,就可以直接执行下一步操作。这时即使库存系统出现异常也不会影响订单系统的操作,且下单的库存删减记录,将会被永久保存到MQ中,直到库存系统恢复正常,从MQ中订阅下单消息,进行消费成功为止。

 

场景二:用户注册,发送手机短信,邮件。【实现异步通信】

使用MQ前:整个操作流程,全部在主线程完成。点击用户注册 --》 入库添加用户 --》发送邮件 --》发送短信。每一步都需要等待上一步完成后才能执行。且每一步操作的响应时间不固定,如果请求过多,会导致主线程请求耗时很长,响应慢,甚至会导致死机的情况出现,严重影响了用户的体验。

使用MQ后:我们在大量用户进行秒杀请求时,将那个巨大的流量请求拒在系统业务处理的上层,并将其转移至MQ中,而不是直接涌入我们的接口。在这里MQ消息队列起到了缓存作用。

 

场景三:商品秒杀和抢购。【流量削峰】

流量削峰是消息队列中常用的场景 一般在秒杀或团购活动中使用广泛。

使用MQ前:对于秒杀、抢购活动,用户访问所产生的流量会很大,甚至会在同一时间段出现上万上亿条请求,这股瞬间的流量暴涨,我们的应用系统配置是无法承受的,会导致系统直接崩溃死机。

例如:A系统平时每秒请求100个,系统稳定运行; 但是晚上8点有秒杀活动 ,每秒并发增至1万条 ,系统最大处理每秒1000条 于是导致系统崩溃。

 

使用MQ后:我们在大量用户进行秒杀请求时,将那个巨大的流量请求拒在系统业务处理的上层,并将其转移至MQ中,而不是直接涌入我们的接口。在这里MQ消息队列起到了缓存作用。

例如:100万用户在高峰期,每秒请求5000个,将这5000个请求写入MQ系统每秒只能处理2000请求,因为MySQL只能处理2000个请求 ; 系统每秒拉取2000个请求 不超过自己的处理能力即可。

 

下载安装

Centos7 安装rabbitmq详细教程_江東-H的博客-CSDN博客

1、下载erlang

erlang官网http://www.erlang.org/downloads

准备环境:

yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel

设定安装规则:
./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe --without-javac

安装:

make && make install

配置环境变量:

vim /etc/profile

export PATH=/root/RabbitMQ/erlang/otp_src_21.1/bin:${PATH}
source /etc/profile    #使环境变量生效

检验是否安装成功:

erl

退出用 halt()

2、下载rabbitmq

设置环境变量:export PATH=$PATH:/usr/local/rabbitmq_server-3.7.8/sbin

添加web管理插件命令:rabbitmq-plugins enable rabbitmq_management  启动端口15672

启动rabbitmq

安装目录下 /sbin       rabbitmq-server

登录rabbitmq    初始账号密码都是guest

3、初始化登录报:User can only log in via localhost

原因:

rabbitmq从3.3.0开始禁止使用guest/guest权限通过除localhost外的访问

解决问题:

找到安装目录 /ebin/rabbit.app

将:{loopback_users, [<<”guest”>>]}
改为:{loopback_users, []}

mq可视化界面

1.默认会提供一个默认用户guest,密码也是guest,线上环境需要创建一个新用户,并把guest用户删除。

2.首先切换到Admin标签页,可以查看或添加用户,添加用户时,可指定Tags,相当于角色,会拥有对应的权限:

1、Exhanges 功能

Durable 选择是否持久化

  • Durable 持久化
  • Transient 不持久化

Type-选择交换机类型

  • topic 主机模式
  • fanout 工作者模式
  • direct 路由模式

Auto Delete 是否自动删除

2、交换机详细操作

3、队列

切换到“Queues”标签,可以查看队列信息,点击队列名称,可查看队列所有状态的消息数量和大小等统计信息:

4、队列详细操作

一、常用的mq工具:

  • Rabbitmq
  • Activemq
  • Rocketmq
  • Kafka
  • Tubemq

二、mq的作用:

RabbitMQ是一种消息中间件,用于处理来自客户端的异步消息。服务端将要发送的消息放入到队列池中。接收端可以根据RabbitMQ配置的转发机制接收服务端发来的消息。RabbitMQ依据指定的转发规则进行消息的转发、缓冲和持久化操作,主要用在多服务器间或单服务器的子系统间进行通信,是分布式系统标准的配置。

rabbitmq

三、rabbitmq六种工作模式

Ⅰ、简单模式

简单

  • 发送消息的程序是生产者
  • 队列就代表一个邮箱。虽然消息会流经RbbitMQ和你的应用程序,但消息只能被存储在队列里。队列存储空间只受服务器内存和磁盘限制,它本质上是一个大的消息缓冲区。多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列接收消息.
  • 消费者等待从队列接收消息

 简单模式

使用过程

1、创建springboot工程,添加依赖:

  1. <dependency> 
  2. <groupId>com.rabbitmq</groupId> 
  3. <artifactId>amqp-client</artifactId> 
  4. <version>5.4.3</version> 
  5. </dependency>

2、生产者发送消息

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.MessageProperties;
  5. import java.io.IOException;
  6. import java.util.concurrent.TimeoutException;
  7. public class Producer {
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. //1、连接
  10. ConnectionFactory factory = new ConnectionFactory();
  11. factory.setHost("192.168.64.140");
  12. factory.setPort(5672);
  13. factory.setPassword("admin");
  14. factory.setUsername("admin");
  15. Connection connection = factory.newConnection();
  16. Channel channel = connection.createChannel(); //与rabbitmq通讯的通道
  17. //2、在服务器上创建helloworld队列(mq里面创建队列)
  18. /* 注意事项:队列如果已经存在,不会重复创建,对于队列的生产者和消费者,都要创建队列,保证谁先启动谁先创建
  19. channel.queueDeclare(队列名,
  20. 是否是持久队列(一直保存在本地),
  21. 是否是独占(排他)队列(多个消费者能不能从同一个队列接收消息)false表示不是,
  22. 是否是自动删除的队列(没有消费者时服务器是否自动删除)false表示不自动删除,
  23. 队列的其他参数属性,使用键值对方式
  24. );*/
  25. channel.queueDeclare("helloworld",false,false,false,null);
  26. //3、向helloworld队列发送消息,参数4为发送的消息,并且只能为byte数组
  27. /*channel.basicPublish(交换机(为空表示使用默认交换机),
  28. 队列名字,
  29. 消息的其他属性配置(键值对设置消息属性,没有使用null值),
  30. 队列的内容(byte数组));
  31. * */
  32. channel.basicPublish("","helloworld", MessageProperties.PERSISTENT_BASIC,"hello world".getBytes());
  33. //现成的键值对
  34. }
  35. }

注意事项:

消费者与生产者都建立连接的原因,由于相同队列不会重复创建,所以,当谁先启动谁就先创建消息队列,节省时间

3、消费者接收消息

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. public class Consumer {
  5. public static void main(String[] args) throws IOException, TimeoutException {
  6. //1、连接
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("192.168.64.140");
  9. factory.setPort(5672);
  10. factory.setPassword("admin");
  11. factory.setUsername("admin");
  12. Connection connection = factory.newConnection();
  13. Channel channel = connection.createChannel(); //与rabbitmq通讯的通道
  14. //2、在服务器上创建helloworld队列(mq里面创建队列)
  15. //注意事项:队列如果已经存在,不会重复创建
  16. channel.queueDeclare("helloworld",false,false,false,null);
  17. //3.1创建处理消息的回调对象
  18. DeliverCallback deliverCallback =new DeliverCallback() {
  19. public void handle(String s, Delivery delivery) throws IOException {
  20. byte[] a = delivery.getBody(); //获取队列消息,byte类型
  21. String s1 = new String(a); //将byte类型转换成字符串类型
  22. }
  23. };
  24. //3.2取消消息处理的回调对象
  25. CancelCallback cancelCallback = new CancelCallback() {
  26. public void handle(String s) throws IOException {
  27. }
  28. };
  29. /*3、从helloworld队列接收消息,把消息传递到回调对象处理
  30. channel.basicConsume(队列的名字,
  31. 确认方式 ACK(Acknowledgment)true自动确认 false手动确认,
  32. 处理消息的回调对象,
  33. 取消消息处理的回调对象);*/
  34. channel.basicConsume("helloworld",true,deliverCallback,cancelCallback);
  35. }
  36. }

Ⅱ、工作模式

工作

 工作模式

消费者有多个,当生产者发送数据时,多个消费者接收消息默认采用轮询策略,这就导致了当某个消费者处理消息的回调对象内的业务处理不完,造成阻塞,其他闲置的消费者也没办法接收到生产者的信息,所以要更改配置,变为每个消费者处理完自己的消息的回调对象内数据,再接收生产者的信息,避免堵塞。

需要修改如下配置参数:

        ①、设置qos=1 每次3只接收一条信息,处理完之前不接收下一条          

        channel.basicQos(1);//3、设置Qos   每次只收一条,处理完之前不好收下一条

        ②、当处理完消息,告诉服务器自己已经处理完消息  autoAck=false

        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //2、回执,告诉服务器已经处理完消息

         ③、还要将ACK修改为false手动模式

        其他配置         

        ④、由于数据都在rabbitmq消息中间件,如果该服务器崩溃,会造成传递信息丢失,持

                久化到本地磁盘的方法,该参数为true      

        channel.queueDeclare("helloworld1",true,false,false,null);

注释:这些修改项已经在代码中标注

使用步骤

1、添加如上依赖

  1. <dependency> 
  2. <groupId>com.rabbitmq</groupId> 
  3. <artifactId>amqp-client</artifactId> 
  4. <version>5.4.3</version> 
  5. </dependency>

2、生产者发送消息

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.io.IOException;
  5. import java.util.Scanner;
  6. import java.util.concurrent.TimeoutException;
  7. public class Producer2 {
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. //1、连接
  10. ConnectionFactory factory = new ConnectionFactory();
  11. factory.setHost("192.168.64.140");
  12. factory.setPort(5672);
  13. factory.setPassword("admin");
  14. factory.setUsername("admin");
  15. Connection connection = factory.newConnection();
  16. Channel channel = connection.createChannel(); //与rabbitmq通讯的通道
  17. /*2、 在服务器上创建helloworld队列(mq里面创建队列)
  18. 注意事项:队列如果已经存在,不会重复创建,对于队列的生产者和消费者,都要创建队列,保证谁先启动谁先创建
  19. channel.queueDeclare(队列名,
  20. 是否是持久队列(一直保存在本地),
  21. 是否是独占(排他)队列(多个消费者能不能从同一个队列接收消息)false表示不是,
  22. 是否是自动删除的队列(没有消费者时服务器是否自动删除)false表示不自动删除,
  23. 队列的其他参数属性,使用键值对方式
  24. );*/
  25. channel.queueDeclare("helloworld1",true,false,false,null);
  26. //④ ④ ④ ④ ④
  27. //循环向队列写入信息
  28. while(true){
  29. System.out.println("输入消息");
  30. String s = new Scanner(System.in).nextLine();
  31. //3、向helloworld队列发送消息,参数4为发送的消息,并且只能为byte数组
  32. /*channel.basicPublish(交换机(为空表示使用默认交换机),
  33. 队列名字,
  34. 消息的其他属性配置(键值对设置消息属性,没有使用null值),
  35. 队列的内容(byte数组));
  36. * */
  37. channel.basicPublish("","helloworld1",null,s.getBytes());
  38. }
  39. }
  40. }

3、消费者接收消息

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.concurrent.TimeoutException;
  4. public class Consumer2 {
  5. public static void main(String[] args) throws IOException, TimeoutException {
  6. //1、连接
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("192.168.64.140");
  9. factory.setPort(5672);
  10. factory.setPassword("admin");
  11. factory.setUsername("admin");
  12. Connection connection = factory.newConnection();
  13. final Channel channel = connection.createChannel(); //与rabbitmq通讯的通道
  14. //2、在服务器上创建helloworld队列(mq里面创建队列)
  15. //注意事项:队列如果已经存在,不会重复创建
  16. channel.queueDeclare("helloworld1", true, false, false, null);
  17. //3.1、创建处理消息的回调对象
  18. DeliverCallback deliverCallback = new DeliverCallback() {
  19. public void handle(String s, Delivery delivery) throws IOException {
  20. String str = new String(delivery.getBody());
  21. System.out.println(str); //打印消息队列
  22. for (int i = 0; i < str.length(); i++) {
  23. if (str.charAt(i) == '.') {
  24. try {
  25. Thread.sleep(5000);
  26. } catch (InterruptedException e) {
  27. System.out.println(e);
  28. }
  29. }
  30. }
  31. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //2、回执,处理完成回执数据
  32. //② ② ② ② ② ②
  33. }
  34. };
  35. //3.1、取消消息处理的回调对象
  36. CancelCallback cancelCallback = new CancelCallback() {
  37. public void handle(String s) throws IOException {
  38. }
  39. };
  40. channel.basicQos(1);//3、设置Qos 每次只收一条,处理完之前不好收下一条
  41. //①①①①①①①①①①①①①①①
  42. /*3、从helloworld队列接收消息,把消息传递到回调对象处理
  43. channel.basicConsume(队列的名字,
  44. 确认方式 ACK(Acknowledgment)true自动确认(如果消息在确认之前消费者崩溃,这条消息作废) false手动确认(如果消息在确认之前消费者崩溃,客户端消息回滚,重新接收),
  45. 处理消息的回调对象,
  46. 取消消息处理的回调对象);*/
  47. channel.basicConsume("helloworld1", false, deliverCallback, cancelCallback);
  48. //③③③③③③③③
  49. }
  50. }

Ⅲ、发布订阅模式

发布订阅

发布订阅

类似于电台和听众之间的关系 

向多个消费者传递同一条消息。这种模式称为“发布/订阅”。 

为了说明该模式,我们将构建一个简单的日志系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序接收它们(开启多个实例)。

首次使用到了交换机(Exchanges)

使用步骤

1、生产者发送消息

  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.Scanner;
  7. import java.util.concurrent.TimeoutException;
  8. public class Producer3 {
  9. //发布订阅模式(将消息广播出去,所有消费者都能接收到)(如果没有消费者接收,发出去的消息直接失效丢失)
  10. //模拟各个服务向消费者发送发送,中间mq转发
  11. public static void main(String[] args) throws IOException, TimeoutException {
  12. //1、连接
  13. ConnectionFactory factory = new ConnectionFactory();
  14. factory.setHost("192.168.64.140");
  15. factory.setPort(5672);
  16. factory.setPassword("admin");
  17. factory.setUsername("admin");
  18. Connection connection = factory.newConnection();
  19. Channel channel = connection.createChannel(); //与rabbitmq通讯的通道
  20. //2、创建一个 Fanout 类型的交换机,并且起名字叫logs(交换机在消费者和生产者都要创建)
  21. //channel.exchangeDeclare(交换机的名字, 交换机的类型);
  22. channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
  23. //3、循环向队列写入信息
  24. while(true){
  25. System.out.println("输入消息");
  26. String s = new Scanner(System.in).nextLine();
  27. //4、向helloworld队列发送消息,参数4为发送的消息,并且只能为byte数组
  28. /*channel.basicPublish(交换机(为空表示使用默认交换机),
  29. 队列名字,
  30. 消息的其他属性配置(键值对设置消息属性,没有使用null值),
  31. 队列的内容(byte数组));
  32. * */
  33. channel.basicPublish("logs","",null,s.getBytes());
  34. //对于Fanout,参数二写不写都无效
  35. }
  36. }
  37. }

2、消费者接收消息

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.UUID;
  4. import java.util.concurrent.TimeoutException;
  5. public class Consumer3 {
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. //1、连接
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("192.168.64.140");
  10. factory.setPort(5672);
  11. factory.setPassword("admin");
  12. factory.setUsername("admin");
  13. Connection connection = factory.newConnection();
  14. final Channel channel = connection.createChannel(); //与rabbitmq通讯的通道
  15. //2、在服务器上创建XXXX随机队列{独占(排他),自动删除,非持久}
  16. //注意事项:消费者需要绑定生产者的频段,才能接收到消息。队列的创建采用随机名字(实际工作中消费者成千上百)
  17. String str = UUID.randomUUID().toString();
  18. channel.queueDeclare(str, false, true, true, null);
  19. //3、创建一个 Fanout 类型的交换机,并且起名字叫logs
  20. channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
  21. //4、交换机和队列绑定
  22. channel.queueBind(str,"logs","");
  23. //5.1、创建处理消息的回调对象
  24. DeliverCallback deliverCallback = new DeliverCallback() {
  25. public void handle(String s, Delivery delivery) throws IOException {
  26. String str = new String(delivery.getBody()); //接收队列里的消息
  27. System.out.println("收到" + str); //处理消息
  28. }
  29. };
  30. //5.2取消消息处理的回调对象
  31. CancelCallback cancelCallback = new CancelCallback() {
  32. public void handle(String s) throws IOException {
  33. }
  34. };
  35. /*5、从随机队列接收消息,把消息传递到回调对象处理
  36. channel.basicConsume(队列的名字,
  37. 确认方式 ACK(Acknowledgment)true自动确认(如果消息在确认之前消费者崩溃,这条消息作废) false手动确认(如果消息在确认之前消费者崩溃,客户端消息回滚,重新接收),
  38. 处理消息的回调对象,
  39. 取消消息处理的回调对象);*/
  40. channel.basicConsume(str, true, deliverCallback, cancelCallback);
  41. }
  42. }

Ⅳ、路由模式

生产者携带路由关键字与接收的消费者关键字做比对,消费者决定该消息是否接收

路由

路由模式

使用步骤

1、生产者发送消息

  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.sql.SQLOutput;
  7. import java.util.Scanner;
  8. import java.util.concurrent.TimeoutException;
  9. public class Producer4 {
  10. //路由模式根据绑定的关键词匹配
  11. //模拟各个服务向消费者发送发送日志,携带关键字,中间mq转发
  12. public static void main(String[] args) throws IOException, TimeoutException {
  13. //1、连接
  14. ConnectionFactory factory = new ConnectionFactory();
  15. factory.setHost("192.168.64.140");
  16. factory.setPort(5672);
  17. factory.setPassword("admin");
  18. factory.setUsername("admin");
  19. factory.setVirtualHost("kongjuian1"); //指定使用空间名字,默认空间名为 “/”
  20. Connection connection = factory.newConnection();
  21. Channel channel = connection.createChannel(); //与rabbitmq通讯的通道
  22. //2、创建一个 Direct 类型的交换机,并且起名字叫direct_logs(交换机在消费者和生产者都要创建)
  23. channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
  24. //3、发送消息,消息上携带关键字信息
  25. while(true){
  26. System.out.println("输入消息");
  27. String s = new Scanner(System.in).nextLine();
  28. System.out.println("请输入路由键: ");
  29. String k = new Scanner(System.in).nextLine();
  30. //4、向helloworld队列发送消息,参数4为发送的消息,并且只能为byte数组
  31. /*channel.basicPublish(交换机(为空表示使用默认交换机),
  32. 队列名字,现在表示为携带的关键词
  33. 消息的其他属性配置(键值对设置消息属性,没有使用null值),
  34. 队列的内容(byte数组));
  35. * */
  36. channel.basicPublish("direct_logs",k,null,s.getBytes());
  37. }
  38. }
  39. }

2、消费者接收消息

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.Scanner;
  4. import java.util.UUID;
  5. import java.util.concurrent.TimeoutException;
  6. public class Consumer4 {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //1、连接
  9. ConnectionFactory factory = new ConnectionFactory();
  10. factory.setHost("192.168.64.140");
  11. factory.setPort(5672);
  12. factory.setPassword("admin");
  13. factory.setUsername("admin");
  14. factory.setVirtualHost("kongjuian1"); //指定使用空间名字,默认空间名为 “/”
  15. Connection connection = factory.newConnection();
  16. final Channel channel = connection.createChannel(); //与rabbitmq通讯的通道
  17. //2、在服务器上创建XXXX随机队列{独占(排他),自动删除,非持久}
  18. //让服务器自动命名,自动提供队列参数
  19. //channel.queueDeclare(随机队列名, false, true, true, null);
  20. channel.queueDeclare(); //服务器自动提供的参数为上面所示参数
  21. String name = channel.queueDeclare().getQueue(); //得到服务器创建的随机名字
  22. //3、创建一个 DIRECT 类型的交换机,并且起名字叫logs
  23. channel.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
  24. //4、设置绑定键
  25. System.out.println("请输入绑定键,用空格隔开: ");//aa bb cc 获取到,根据空格拆分放到一个数组里面
  26. String s = new Scanner(System.in).nextLine();
  27. String[] a = s.split("\\s+"); // \s是空白字符,相当于空格 +表示1到多个
  28. for (String k : a){
  29. channel.queueBind(name,"direct_logs",k);
  30. }
  31. //5.1、创建处理消息的回调对象
  32. DeliverCallback deliverCallback = new DeliverCallback() {
  33. public void handle(String s, Delivery delivery) throws IOException {
  34. String str = new String(delivery.getBody()); //接收队列里的消息
  35. System.out.println("收到" + str); //处理消息
  36. }
  37. };
  38. //5.2取消消息处理的回调对象
  39. CancelCallback cancelCallback = new CancelCallback() {
  40. public void handle(String s) throws IOException {
  41. }
  42. };
  43. /*5、从随机队列接收消息,把消息传递到回调对象处理
  44. channel.basicConsume(队列的名字,
  45. 确认方式 ACK(Acknowledgment)true自动确认(如果消息在确认之前消费者崩溃,这条消息作废) false手动确认(如果消息在确认之前消费者崩溃,客户端消息回滚,重新接收),
  46. 处理消息的回调对象,
  47. 取消消息处理的回调对象);*/
  48. channel.basicConsume(name, true, deliverCallback, cancelCallback);
  49. }
  50. }

mq里面可以创建不同的空间,不同空间相互隔离,互不影响

创建方法:在服务器上创建

  factory.setVirtualHost("kongjuian1");  //指定使用空间名字,默认空间名为 “/” 

Ⅴ、主题模式

主题

主题模式可以理解为路由模式的升级,绑定的路由关键字,可以使用通配符代替(类似于正则表达式)


*通配一个字符            #通配多个字符
 

使用步骤

1、生产者发送消息

  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.Scanner;
  7. import java.util.concurrent.TimeoutException;
  8. public class Producer5 {
  9. //主题模式根据绑定的关键词匹配(和路由的区别在于关键词格式,和交换机不一样)
  10. public static void main(String[] args) throws IOException, TimeoutException {
  11. //1、连接
  12. ConnectionFactory factory = new ConnectionFactory();
  13. factory.setHost("192.168.64.140");
  14. factory.setPort(5672);
  15. factory.setPassword("admin");
  16. factory.setUsername("admin");
  17. factory.setVirtualHost("kongjuian1"); //指定使用空间名字,默认空间名为 “/”
  18. Connection connection = factory.newConnection();
  19. Channel channel = connection.createChannel(); //与rabbitmq通讯的通道
  20. //2、创建一个 Topic 类型的交换机,并且起名字叫direct_logs(交换机在消费者和生产者都要创建)
  21. channel.exchangeDeclare("Topic_logs", BuiltinExchangeType.TOPIC);
  22. //3、发送消息,消息上携带关键字信息
  23. while(true){
  24. System.out.println("输入消息");
  25. String s = new Scanner(System.in).nextLine();
  26. System.out.println("请输入路由键: ");
  27. String k = new Scanner(System.in).nextLine();
  28. //4、向helloworld队列发送消息,参数4为发送的消息,并且只能为byte数组
  29. /*channel.basicPublish(交换机(为空表示使用默认交换机),
  30. 队列名字,现在表示为携带的关键词
  31. 消息的其他属性配置(键值对设置消息属性,没有使用null值),
  32. 队列的内容(byte数组));
  33. * */
  34. channel.basicPublish("Topic_logs",k,null,s.getBytes());
  35. }
  36. }
  37. }

2、消费者接收消息

  1. import com.rabbitmq.client.*;
  2. import java.io.IOException;
  3. import java.util.Scanner;
  4. import java.util.concurrent.TimeoutException;
  5. public class Consumer5 {
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. //1、连接
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("192.168.64.140");
  10. factory.setPort(5672);
  11. factory.setPassword("admin");
  12. factory.setUsername("admin");
  13. factory.setVirtualHost("kongjuian1"); //指定使用空间名字,默认空间名为 “/”
  14. Connection connection = factory.newConnection();
  15. final Channel channel = connection.createChannel(); //与rabbitmq通讯的通道
  16. //2、在服务器上创建XXXX随机队列{独占(排他),自动删除,非持久}
  17. //让服务器自动命名,自动提供队列参数
  18. //channel.queueDeclare(随机队列名, false, true, true, null);
  19. channel.queueDeclare(); //服务器自动提供的参数为上面所示参数
  20. String name = channel.queueDeclare().getQueue(); //得到服务器创建的随机名字
  21. //3、创建一个 DIRECT 类型的交换机,并且起名字叫logs
  22. channel.exchangeDeclare("Topic_logs", BuiltinExchangeType.TOPIC);
  23. //4、设置绑定键
  24. System.out.println("请输入绑定键,用空格隔开: ");//aa.ab.bc bb.bb cc.dd.gg 获取到,根据空格拆分放到一个数组里面
  25. String s = new Scanner(System.in).nextLine();
  26. String[] a = s.split("\\s+"); // \s是空白字符,相当于空格 +表示1到多个
  27. for (String k : a){
  28. channel.queueBind(name,"Topic_logs",k);
  29. }
  30. //5.1、创建处理消息的回调对象
  31. DeliverCallback deliverCallback = new DeliverCallback() {
  32. public void handle(String s, Delivery delivery) throws IOException {
  33. String str = new String(delivery.getBody()); //接收队列里的消息
  34. System.out.println("收到" + str); //处理消息
  35. }
  36. };
  37. //5.2取消消息处理的回调对象
  38. CancelCallback cancelCallback = new CancelCallback() {
  39. public void handle(String s) throws IOException {
  40. }
  41. };
  42. /*5、从随机队列接收消息,把消息传递到回调对象处理
  43. channel.basicConsume(队列的名字,
  44. 确认方式 ACK(Acknowledgment)true自动确认(如果消息在确认之前消费者崩溃,这条消息作废) false手动确认(如果消息在确认之前消费者崩溃,客户端消息回滚,重新接收),
  45. 处理消息的回调对象,
  46. 取消消息处理的回调对象);*/
  47. channel.basicConsume(name, true, deliverCallback, cancelCallback);
  48. }
  49. }

四、消息服务案例


 

前两个案例的地址:(1条消息) Spring cloud Netflix_asddasddeedd的博客-CSDN博客

  1. BUS配置刷新 刷新指令消息发送到Rabbitmq ,其他模块接收执行,执行刷新操作 主题模式
  2. sleuth + zipkin 链路跟踪 链路跟踪日志,通过 Rabbitmq 中转发送到 Zipkin 服务器 简单模式
  3. 订单的流量削峰 购物系统产生的订单,不直接存储到数据库,而是发送到 Rabbitmq, 后台的消费者模块接收订单,再向数据库存储, 短时间大量订单并发存储,变成顺序存储 简单模式 / 工作模式。

案例三:

案例简介:

订单的流量削峰 购物系统产生的订单,不直接存储到数据库,而是发送到 Rabbitmq, 后台的消费者模块接收订单,再向数据库存储, 短时间大量订单并发存储,变成顺序存储 简单模式 / 工作模式。


订单发送到 Rabbitmq

1、添加 Rabbitmq 依赖和 Rabbitmq 连接配置

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>
  1. spring:
  2. rabbitmq:
  3. host: 192.168.64.140
  4. port: 5672
  5. username: admin
  6. password: admin
  7. virtual-host: /

2、在启动类(或者自定义自动配置类)中设置队列的参数: orderQueue, true, false, false

  1. import org.mybatis.spring.annotation.MapperScan;
  2. import org.springframework.amqp.core.Queue;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.context.annotation.Bean;
  6. @SpringBootApplication
  7. @MapperScan("com.pd.mapper")
  8. public class RunPdAPP{
  9. public static void main(String[] args) {
  10. SpringApplication.run(RunPdAPP.class, args);
  11. }
  12. //新建spring的Queue实例用来封装队列的参数
  13. //rabbitmq的自动配置类会自动发现这个Queue实例
  14. //根据其中的参数自动在服务器上创建队列
  15. @Bean
  16. public Queue orderQueue(){//持久,非独占,不自动删除
  17. return new Queue("orderQueue",true,false,false);
  18. }
  19. }

 3、修改需要发送消息的模块

  1. @Autowired
  2. private AmqpTemplate t; //注入对象: AmqpTemplate(用来封装发送消息代码的工具)发送消息
  3. public String saveOrder(PdOrder pdOrder) throws Exception {
  4. String orderId = generateId(); // 生成订单id(该方法随机生成时间作为id订单编号)
  5. pdOrder.setOrderId(orderId); // 订单id放入订单对象
  6. // 订单id、地址id、用户id、购买的商品id
  7. // 转换并发送,先把数据转成byte[]再发送
  8. t.convertAndSend("orderQueue", pdOrder);


订单的消费者

1、Rabbitmq基础配置,准备队列参数

        ①、添加 Rabbitmq 依赖和 Rabbitmq 连接配置

        ③、在启动类(或者自定义自动配置类)中设置队列的参数: orderQueue, true, false, false


2、新建消费者类: OrderConsumer
3、用注解配置接收消息

  1. import com.pd.pojo.PdOrder;
  2. import com.pd.service.OrderService;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.beans.factory.annotation.Autowired;
  6. import org.springframework.stereotype.Component;
  7. //队列名字
  8. @RabbitListener(queues = "orderQueue") //通过注解配置就可以接收信息,不需要写代码
  9. @Component
  10. public class OrderConsumer {
  11. @Autowired
  12. private OrderService orderService;
  13. @RabbitHandler //指定使用哪个方法处理该队列消息,并且该方法只能有一个
  14. public void receive(PdOrder order) throws Exception {
  15. orderService.saveOrder(order);
  16. System.out.println("=============================订单已经完成==================");
  17. }
  18. }


4、将接收到的消息储存到数据库

五、RabbitMQ - Spring boot 整合

 添加RabbitMQ依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-amqp</artifactId>
  4. </dependency>

配置yml,配置RabbitMQ连接信息

  1. spring:
  2. rabbitmq:
  3. host: 192.168.64.140
  4. port: 5672
  5. username: admin
  6. password: admin
  7. virtual-host: /

1、简单模式整合

①、配置启动类

  1. import org.springframework.amqp.core.Queue;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.context.annotation.Bean;
  6. import javax.annotation.PostConstruct;
  7. @SpringBootApplication
  8. public class Main1 {
  9. public static void main(String[] args) {
  10. SpringApplication.run(Main1.class,args);
  11. }
  12. //设置队列参数
  13. @Bean
  14. public Queue helloWorld(){
  15. //参数简介 :队列名称,非持久,非独占,不自动删除
  16. return new Queue("helloworld",false,false,false);
  17. }
  18. //调用生产者发送消息
  19. @Autowired
  20. private Producer1 producer;
  21. //spring的执行流程
  22. //自动扫描创建实例--》完成依赖注入--》@PostConstruct-->后续步骤
  23. @PostConstruct
  24. public void test(){
  25. //开启一个新的线程,不阻塞程序主线程执行
  26. new Thread(new Runnable() {
  27. @Override
  28. public void run() {
  29. try {
  30. Thread.sleep(3000); //等待helloworld队列被创建
  31. } catch (InterruptedException e) {
  32. e.printStackTrace();
  33. }
  34. producer.send();
  35. }
  36. }).start();
  37. }
  38. }

②、创建生产者类

  1. import org.springframework.amqp.core.AmqpTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class Producer1 {
  6. //发送消息的封装工具
  7. //
  8. @Autowired
  9. private AmqpTemplate t;
  10. public void send(){
  11. //向helloworld队列发送消息 //自动转换成byter数组
  12. //相当于: channel.basicPublish("logs","",null,s.getBytes());
  13. t.convertAndSend("helloworld","hello world");
  14. }
  15. }

③、创建消费者类

  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Component;
  3. //@RabbitListener(queues = "helloworld")
  4. //接收队列helloworld,
  5. //如果放在类上,要与 @RabbitHandler配合使用
  6. //也可以直接梵高方法上。
  7. @Component
  8. public class Consumer1 {
  9. @RabbitListener(queues = "helloworld")
  10. public void receive(String msg){
  11. System.out.println("收到消息:" + msg);
  12. }
  13. }

2、工作模式

修改yml,每次抓取消息修改为1

  1. spring:
  2. rabbitmq:
  3. host: 192.168.64.140
  4. port: 5672
  5. username: admin
  6. password: admin
  7. virtual-host: /
  8. listener:
  9. simple:
  10. prefetch: 1 #每次抓取一条消息,处理完之前不收下一条,默认抓取250条

①、配置启动类

  1. import org.springframework.amqp.core.Queue;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.context.annotation.Bean;
  6. import javax.annotation.PostConstruct;
  7. import java.util.Scanner;
  8. /*
  9. 合理分发
  10. 1. autoAck=false,手动确认
  11. spring封装的rabbitmq,默认就是手动确认,
  12. spring会自动执行发送回执
  13. 2. qos=1,预抓取消息数量(配置类中修改)
  14. yml 添加 pre-fetch=1,默认是250
  15. 消息持久化
  16. 1. 队列持久化
  17. new Queue("", true)
  18. 2. 消息数据的持久化
  19. spring发送的消息,默认就是持久消息
  20. */
  21. @SpringBootApplication
  22. public class Main2 {
  23. public static void main(String[] args) {
  24. SpringApplication.run(Main2.class, args);
  25. }
  26. // 设置队列参数
  27. @Bean
  28. public Queue taskQueue() {
  29. // 持久,非独占,不自动删除
  30. return new Queue("task_queue"); //只给队列名,其他参数是下面的默认值
  31. // return new Queue("task_queue",true,false,false);
  32. }
  33. @Autowired
  34. private Producer2 p;
  35. /*
  36. spring的主线程执行流程
  37. 自动扫描创建实例 --> 完成依赖注入 --> @PostConstruct --> 后续步骤
  38. */
  39. @PostConstruct
  40. public void test() {
  41. // 在新线程中执行自己的运算,不阻塞 spring 主线程执行
  42. new Thread(() -> {
  43. while (true){
  44. System.out.print("输入消息:");
  45. String s = new Scanner(System.in).nextLine();
  46. p.send(s);
  47. }
  48. }).start();
  49. }
  50. }

②创建生产者类

  1. import org.springframework.amqp.core.AmqpTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class Producer2 {
  6. // 发送消息的封装工具
  7. // RabbitAutoConfiguration 中创建了 AmqpTemplate 实例
  8. @Autowired
  9. private AmqpTemplate t;
  10. public void send(String s) {
  11. // 向 helloworld 队列发送消息
  12. // 队列的参数在启动类中设置
  13. t.convertAndSend("task_queue", s);
  14. /*
  15. t.convertAndSend("task_queue", s, 消息预处理对象);
  16. 在预处理对象中,可以对消息的参数进行调整,
  17. 可以把持久化参数设置成非持久
  18. */
  19. }
  20. }

③创建消费者类

  1. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  2. import org.springframework.stereotype.Component;
  3. //每个@RabbitListener都会注册成为一个消费者
  4. @Component
  5. public class Consumer2 {
  6. @RabbitListener(queues = "task_queue")
  7. public void receive1(String msg) {
  8. System.out.println("消费者1收到:"+msg);
  9. }
  10. @RabbitListener(queues = "task_queue")
  11. public void receive2(String msg) {
  12. System.out.println("消费者2收到:"+msg);
  13. }
  14. }

3、发布订阅模式

启动类

  1. import org.springframework.amqp.core.FanoutExchange;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.context.annotation.Bean;
  6. import javax.annotation.PostConstruct;
  7. import java.util.Scanner;
  8. @SpringBootApplication
  9. public class Main3 {
  10. public static void main(String[] args) {
  11. SpringApplication.run(Main3.class,args);
  12. }
  13. //创建FANOUT交换机
  14. @Bean
  15. public FanoutExchange logs(){
  16. return new FanoutExchange("logs",false,false);
  17. }
  18. //开始调用生产者发送消息
  19. @Autowired
  20. private Producer3 producer3;
  21. //spring的执行流程
  22. //自动扫描创建实例--》完成依赖注入--》@PostConstruct-->后续步骤
  23. @PostConstruct
  24. public void test(){
  25. //开启一个新的线程,不阻塞程序主线程执行
  26. new Thread(new Runnable() {
  27. @Override
  28. public void run() {
  29. while(true){
  30. System.out.println("输入消息: ");
  31. String s = new Scanner(System.in).nextLine();
  32. producer3.send(s);
  33. }
  34. }
  35. }).start();
  36. }
  37. }

生产者

  1. import org.springframework.amqp.core.AmqpTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class Producer3 {
  6. //发送消息的封装工具
  7. @Autowired
  8. private AmqpTemplate t;
  9. public void send(String s){
  10. //向helloworld队列发送消息 //自动转换成byter数组
  11. t.convertAndSend("logs","",s);
  12. //相当于: channel.basicPublish("logs","",null,s.getBytes());
  13. }
  14. }

消费者

  1. import org.springframework.amqp.rabbit.annotation.Exchange;
  2. import org.springframework.amqp.rabbit.annotation.Queue;
  3. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class Consumer3 {
  8. //创建队列,并且和交换机进行绑定
  9. //参数为绑定配置
  10. @RabbitListener(bindings = @QueueBinding(
  11. value = @Queue, //创建队列,该注解不给参数表示创建随机队列,
  12. exchange = @Exchange(name = "logs" ,declare = "false") //设置交换机
  13. //false表示不创建新的交换机,使用已经存在的交换机
  14. ))
  15. public void receive1(String msg){
  16. System.out.println("1号收到消息:" + msg);
  17. }
  18. @RabbitListener(bindings = @QueueBinding(
  19. value = @Queue,
  20. exchange = @Exchange(name = "logs" ,declare = "false")
  21. ))
  22. public void receive2(String msg){
  23. System.out.println("2号收到消息:" + msg);
  24. }
  25. }

4、主题模式

启动类

  1. import org.springframework.amqp.core.DirectExchange;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.context.annotation.Bean;
  6. import javax.annotation.PostConstruct;
  7. import java.util.Scanner;
  8. @SpringBootApplication
  9. public class Main4 {
  10. public static void main(String[] args) {
  11. SpringApplication.run(Main4.class,args);
  12. }
  13. //创建Direct交换机
  14. @Bean
  15. public DirectExchange logs(){
  16. return new DirectExchange("direct_log",false,false);
  17. }
  18. //开始调用生产者发送消息
  19. @Autowired
  20. private Producer4 producer4;
  21. //spring的执行流程
  22. //自动扫描创建实例--》完成依赖注入--》@PostConstruct-->后续步骤
  23. @PostConstruct
  24. public void test(){
  25. //开启一个新的线程,不阻塞程序主线程执行
  26. new Thread(new Runnable() {
  27. @Override
  28. public void run() {
  29. while(true){
  30. System.out.println("输入消息: ");
  31. String s = new Scanner(System.in).nextLine();
  32. System.out.println("输入路由键: ");
  33. String k = new Scanner(System.in).nextLine();
  34. producer4.send(k,s);
  35. }
  36. }
  37. }).start();
  38. }
  39. }

生产者

  1. import org.springframework.amqp.core.AmqpTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class Producer4 {
  6. //发送消息的封装工具
  7. @Autowired
  8. private AmqpTemplate t;
  9. public void send(String k , String s){
  10. //向队列发送消息 //自动转换成byter数组
  11. //相当于: channel.basicPublish("logs","",null,s.getBytes());
  12. t.convertAndSend("direct_log",k,s);
  13. }
  14. }

消费者

  1. import org.springframework.amqp.rabbit.annotation.Exchange;
  2. import org.springframework.amqp.rabbit.annotation.Queue;
  3. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class Consumer4 {
  8. //创建队列,并且和交换机进行绑定
  9. //参数为绑定配置
  10. @RabbitListener(bindings = @QueueBinding(
  11. value = @Queue, //创建队列,该注解不给参数表示创建随机队列,
  12. exchange = @Exchange(name = "direct_log" ,declare = "false"), //设置交换机
  13. //表示不创建新的交换机,使用已经存在的交换机
  14. key = {"error"}
  15. ))
  16. public void receive1(String msg){
  17. System.out.println("1号收到消息:" + msg);
  18. }
  19. @RabbitListener(bindings = @QueueBinding(
  20. value = @Queue,
  21. exchange = @Exchange(name = "direct_log" ,declare = "false"),
  22. key = {"info","error","warning"}
  23. ))
  24. public void receive2(String msg){
  25. System.out.println("2号收到消息:" + msg);
  26. }
  27. }

5、路由模式

启动类

  1. import org.springframework.amqp.core.TopicExchange;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.boot.SpringApplication;
  4. import org.springframework.boot.autoconfigure.SpringBootApplication;
  5. import org.springframework.context.annotation.Bean;
  6. import javax.annotation.PostConstruct;
  7. import java.util.Scanner;
  8. @SpringBootApplication
  9. public class Main5 {
  10. public static void main(String[] args) {
  11. SpringApplication.run(Main5.class,args);
  12. }
  13. //创建Topic交换机
  14. @Bean
  15. public TopicExchange topict(){
  16. return new TopicExchange("topict_log",false,false);
  17. }
  18. //开始调用生产者发送消息
  19. @Autowired
  20. private Producer5 producer5;
  21. //spring的执行流程
  22. //自动扫描创建实例--》完成依赖注入--》@PostConstruct-->后续步骤
  23. @PostConstruct
  24. public void test(){
  25. //开启一个新的线程,不阻塞程序主线程执行
  26. new Thread(new Runnable() {
  27. @Override
  28. public void run() {
  29. while(true){
  30. System.out.println("输入消息: ");
  31. String s = new Scanner(System.in).nextLine();
  32. System.out.println("输入路由键: ");
  33. String k = new Scanner(System.in).nextLine();
  34. producer5.send(k,s);
  35. }
  36. }
  37. }).start();
  38. }
  39. }

生产者

  1. import org.springframework.amqp.core.AmqpTemplate;
  2. import org.springframework.beans.factory.annotation.Autowired;
  3. import org.springframework.stereotype.Component;
  4. @Component
  5. public class Producer5 {
  6. //发送消息的封装工具
  7. @Autowired
  8. private AmqpTemplate t;
  9. public void send(String k , String s){
  10. //向队列发送消息 //自动转换成byter数组
  11. //相当于: channel.basicPublish("logs","",null,s.getBytes());
  12. t.convertAndSend("topic_log",k,s);
  13. }
  14. }

消费者

  1. import org.springframework.amqp.rabbit.annotation.Exchange;
  2. import org.springframework.amqp.rabbit.annotation.Queue;
  3. import org.springframework.amqp.rabbit.annotation.QueueBinding;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class Consumer5 {
  8. //创建队列,并且和交换机进行绑定
  9. //参数为绑定配置
  10. @RabbitListener(bindings = @QueueBinding(
  11. value = @Queue, //创建队列,该注解不给参数表示创建随机队列,
  12. exchange = @Exchange(name = "topic_log" ,declare = "false"), //设置交换机
  13. //表示不创建新的交换机,使用已经存在的交换机
  14. key = {"*.orange.*"}
  15. ))
  16. public void receive1(String msg){
  17. System.out.println("1号收到消息:" + msg);
  18. }
  19. @RabbitListener(bindings = @QueueBinding(
  20. value = @Queue,
  21. exchange = @Exchange(name = "topict_log" ,declare = "false"),
  22. key = {"*.*.rabbit","lazy.#"}
  23. ))
  24. public void receive2(String msg){
  25. System.out.println("2号收到消息:" + msg);
  26. }
  27. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/酷酷是懒虫/article/detail/769746
推荐阅读
相关标签
  

闽ICP备14008679号