当前位置:   article > 正文

五阶段--消息服务、消息中间件(broker)、消息队列/rabbitmq的使用场景/六种工作模式_消息队列 broker

消息队列 broker

目录

一 消息服务、消息中间件(broker)、消息队列

1 搭建 Docker 环境

2 用 Docker 运行 Rabbitmq

 二 Rbbitmq 使用场景

服务解耦

三 rabbitemq的六种工作模式

 1 简单模式--只有一个消费者

2 工作模式

处理方法:合理分发消息

消息持久化

3 发布订阅模式(群发/广播模式)

4 路由模式--通过关键词匹配路由消息

5 主题模式,也叫路由模式,都是通过关键词接收消息

6 RPC模式:远程过程调用

RPC的工作方式 

客户端

回调队列 Callback Queue

消息属性 Message Properties

关联id (correlationId):

拓展:测试阻塞队列

客户端远程调用斐波那契数

服务端:提供处理方法



一 消息服务、消息中间件(broker)、消息队列

搭建 Docker 环境

  1. 克隆 centos-8 或 centos-7: docker-base

  2. 设置ip

  1. ./ip-static
  2. ip: 192.168.64.150
  3. ifconfig

注意:若这里出现error,需要使用这两个命令,再重新设置ip

  1. nmcli n on
  2. systemctl restart NetworkManager

    3.安装 docker,参考csdn笔记: Docker(一) - 离线安装_wanght笔记-CSDN博客_docker离线安装

  • 上传离线安装文件,不用按照笔记在线下载

  • DevOps课前资料\docker\docker-install 文件夹

    上传到 /root/

 

 确认安装结果:

  1. docker info
  2. docker run hello-world

用 Docker 运行 Rabbitmq

  1. 从 docker-base 克隆: rabbitmq

  2. 设置ip

  1. ./ip-static
  2. ip: 192.168.64.140
  3. ifconfig

     3. 按照笔记用docker运行 rabbitmq,过程中需要联网下载 rabbitmq 的镜像

  1. docker pull rabbitmq:management 在线下载镜像
  2. 执行 docker images 查看镜像
  3. 关闭防火墙
  4. systemctl stop firewalld
  5. systemctl disable firewalld
  6. 重启 docker 系统服务
  7. systemctl restart docker
  8. mkdir /etc/rabbitmq
  9. vim /etc/rabbitmq/rabbitmq.conf #也可以先cd //etc/rabbitmq 再vim rabbitmq.conf 进行编辑
  10. # rabbitmq.conf配置文件中添加两行配置:
  11. default_user = admin
  12. default_pass = admin
  13. #启动rabbitmq容器
  14. docker run -d --name rabbit \
  15. -p 5672:5672 \
  16. -p 15672:15672 \
  17. -v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
  18. -e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
  19. rabbitmq:management
  20. 访问管理控制台 http://192.168.64.140:15672
  21. 用户名密码都是 admin

发送和接受消息的端口都是5672

管理rabbitmq控制台,界面的端口是15672

下次使用时:执行这两个步骤:

 spring默认集成Rabbitmq

腾讯使用的是Tubemq

阿里使用的是Rocketmq

Rabbitmq

 二 Rbbitmq 使用场景

使用场景:服务解耦,流量削峰,异步调用

服务解耦

RabbitMQ_wanght笔记-CSDN博客_rabbitmq

A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可

解耦

三 rabbitemq的六种工作模式

 1 简单模式--只有一个消费者

简单

RabbitMQ是一个消息中间件,你可以想象它是一个邮局。当你把信件放到邮箱里时,能够确信邮递员会正确地递送你的信件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。

  • 发送消息的程序是生产者
  • 队列就代表一个邮箱。虽然消息会流经RbbitMQ和你的应用程序,但消息只能被存储在队列里。队列存储空间只受服务器内存和磁盘限制,它本质上是一个大的消息缓冲区。多个生产者可以向同一个队列发送消息,多个消费者也可以从同一个队列接收消息.
  • 消费者等待从队列接收消息

简单模式

第一步:创建工程

 

project项目名称:rabbitmq

再在下面创建maven module : rabbitmq-api 

第二步:pom文件添加依赖

  1. <dependencies>
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>amqp-client</artifactId>
  5. <version>5.4.3</version>
  6. </dependency>
  7. </dependencies>
  8. <build>
  9. <plugins>
  10. <plugin>
  11. <groupId>org.apache.maven.plugins</groupId>
  12. <artifactId>maven-compiler-plugin</artifactId>
  13. <version>3.8.0</version>
  14. <configuration>
  15. <source>1.8</source>
  16. <target>1.8</target>
  17. </configuration>
  18. </plugin>
  19. </plugins>
  20. </build>

第三步:创建包名m1,发送者生成消息的类

  1. package m1;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.concurrent.TimeoutException;
  8. //生产者队列,启动后发送消息,会在指定服务器看到发送的消息
  9. public class Producer {
  10. public static void main(String[] args) throws IOException, TimeoutException {
  11. //连接
  12. ConnectionFactory f = new ConnectionFactory();
  13. f.setHost("192.168.64.140");//rabbitmq的ip 老师的服务器:wht6.cn
  14. f.setPort(5672);//5672 端口进行接收,发送消息
  15. f.setUsername("admin");
  16. f.setPassword("admin");
  17. Connection con = f.newConnection();
  18. Channel c = con.createChannel();//通信
  19. //创建队列 名为hello-world ,如果队列已经存在,不会重复创建
  20. /**
  21. * 参数:
  22. * 1.队列名
  23. * 2.是否是持久队列
  24. * 3.是否是排他队列,独占队列
  25. * 4.是否自动删除(没有消费者时自动删除)
  26. * 5.其他参数属性
  27. * */
  28. c.queueDeclare("hello-world-666",false,false,false,null);
  29. //向创建的队列,发送消息-先发送到消息服务器-接收者再接收
  30. /**
  31. * 参数:
  32. * 1. 交换机: ""空串是一个默认交换机,Exchange栏中叫AMQ default
  33. * 3. 消息的其他参数属性,键值对数据
  34. * */
  35. c.basicPublish(
  36. "","hello-world-666",
  37. null,"Hello world!".getBytes(StandardCharsets.UTF_8));
  38. }
  39. }

启动服务器,查看发送的消息:

 第四步:创建消费者收到的消息类

  1. package m1;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. //消费者队列
  6. public class Consumer {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //连接
  9. ConnectionFactory f = new ConnectionFactory();
  10. f.setHost("192.168.64.140");//rabbitmq的ip 老师的服务器:wht6.cn
  11. f.setPort(5672);//5672 端口进行接收,发送消息
  12. f.setUsername("admin");
  13. f.setPassword("admin");
  14. Connection con = f.newConnection();
  15. Channel c = con.createChannel();//通信
  16. //创建队列 名为hello-world ,如果队列已经存在,不会重复创建
  17. c.queueDeclare("hello-world-666",false,false,false,null);
  18. //创建回调对象
  19. DeliverCallback deliverCallback = new DeliverCallback(){
  20. @Override
  21. public void handle(String consumerTag, Delivery message) throws IOException {
  22. byte[] a = message.getBody();//收到的消息转为数组
  23. String s = new String(a);//转为字符串
  24. System.out.println("收到:"+s);
  25. }
  26. };
  27. CancelCallback cancelCallback = new CancelCallback(){
  28. @Override
  29. public void handle(String message) throws IOException {
  30. }
  31. };
  32. //接收消息,收到的消息会传递到一个回调对象去处理
  33. /**
  34. * 第二个参数: autoAck- auto acknowlegment 自动确认
  35. * true - 自动确认
  36. * 消息一向消费者发送,服务器立即自动确认消息,删除消息
  37. * false - 手动确认
  38. * 消息发出后,服务器会缓存消息,不删除,
  39. * 等待消费者(接收者)发回一个确认消息(回执)才删除,
  40. * 如果消费者处理消息过程中崩溃或离线,
  41. * 服务器会回滚消息,等待重新发送
  42. * */
  43. //c.basicConsume("hello-world-666",true,处理消息的回到对象,null);
  44. c.basicConsume("hello-world-666",true,deliverCallback,cancelCallback);
  45. }
  46. }

第五步:启动测试结果,接收者可以看到发送者发来的消息

 

 第六步:当发送者服务和接收者服务都打开时,发送者发送消息,接收者就自动接受到

2 工作模式

工作

工作模式

工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。

我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发。

使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作任务,我们可以添加更多工作进程,这样就可以轻松扩展。

第一步:创建m2包,创建消息发送者,生产者的Producer类 

  1. package m2;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.nio.charset.StandardCharsets;
  7. import java.util.Scanner;
  8. import java.util.concurrent.TimeoutException;
  9. public class Producer {
  10. public static void main(String[] args) throws IOException, TimeoutException {
  11. //连接
  12. ConnectionFactory f = new ConnectionFactory();
  13. f.setHost("192.168.64.140");//rabbitmq的ip 老师的服务器:wht6.cn
  14. f.setPort(5672);//5672 端口进行接收,发送消息
  15. f.setUsername("admin");
  16. f.setPassword("admin");
  17. Connection con = f.newConnection();
  18. Channel c = con.createChannel();//通信
  19. //创建队列 名为hello-world-666 ,如果队列已经存在,不会重复创建
  20. c.queueDeclare("hello-world-666",false,false,false,null);
  21. //向hello-world-666队列,发送消息
  22. while (true){
  23. System.out.println("输入消息:");
  24. String s = new Scanner(System.in).nextLine();
  25. c.basicPublish("","hello-world-666",
  26. null,s.getBytes(StandardCharsets.UTF_8));
  27. }
  28. }
  29. }

  第二步:创建消息接收者,消费者的Consumer类,和简单模式的消费者类相同,只是添加了模拟耗时消息

  1. package m2;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. public class Consumer {
  6. public static void main(String[] args) throws IOException, TimeoutException {
  7. //连接
  8. ConnectionFactory f = new ConnectionFactory();
  9. f.setHost("192.168.64.140");//rabbitmq的ip 老师的服务器:wht6.cn
  10. f.setPort(5672);//5672 端口进行接收,发送消息
  11. f.setUsername("admin");
  12. f.setPassword("admin");
  13. Connection con = f.newConnection();
  14. Channel c = con.createChannel();//通信
  15. //创建队列 名为hello-world ,如果队列已经存在,不会重复创建
  16. c.queueDeclare("hello-world-666",false,false,false,null);
  17. //创建回调对象
  18. DeliverCallback deliverCallback = new DeliverCallback(){
  19. @Override
  20. public void handle(String consumerTag, Delivery message) throws IOException {
  21. byte[] a = message.getBody();//收到的消息转为数组
  22. String s = new String(a);//转为字符串
  23. System.out.println("收到:"+s);//"sdsasdads"
  24. //模拟耗时消息
  25. //遍历字符串,找'.'点字符,每找到一个就暂停一秒
  26. for (int i = 0; i < s.length(); i++) {
  27. if ('.' == s.charAt(i)){
  28. try {
  29. Thread.sleep(1000);
  30. } catch (InterruptedException e) {
  31. e.printStackTrace();
  32. }
  33. }
  34. }
  35. System.out.println("---消息处理结束----");
  36. }
  37. };
  38. CancelCallback cancelCallback = new CancelCallback(){
  39. @Override
  40. public void handle(String message) throws IOException {
  41. }
  42. };
  43. //c.basicConsume("hello-world-666",true,处理消息的回到对象,null);
  44. c.basicConsume("hello-world-666",true,deliverCallback,cancelCallback);
  45. }
  46. }

第三步:创建多个消息消费者接收者,使一个消息生产者,发送者发送消息时,多个消息消费者接收者随机其中一个接受消息

先启动一个一个消费者启动类,通过Edit Configuration 设置多个消费者

第四步:启动消息生产者类,和两个消息接收者类,发送消息进行接受测试

 

缺点:接收者1接收者2依次接收消息,但是接收者1接受的消息处理时间很长时,接收者2并不能为接收者1分担消息,而是等待接收者1处理完了,再继续接收消息,造成了接收阻塞状态!!!

处理方法:合理分发消息

第一步:手动ACK ,手动发送回执

 测试:

发送几条量大延迟的消息,两个接收者其中有一个会有未接收到的数据,会在rabbitqm的Unacked存储,关掉两个接收者,所有消息会存到rabbitemq的Ready中,再开启其中一个接收者,所有的消息会向这个接收者发送.

第二步:每次只接收一条消息,处理完之前不接受下一条

 合理分发消息后,消息会缓存在rabbitmq服务器,一旦服务器崩溃,消息也会消失!所以要对消息持久化

消息持久化

1. 队列持久化

2. 消息持久化

生产者消费者的队列名都改为task_queue  ,生产者消息持久化,null改

MessageProperties.PERSISTENT_BASIC

消息消费者接收者改掉队列名task_queue

3 发布订阅模式(群发/广播模式)

发布订阅

发布订阅

在前面的例子中,我们任务消息只交付给一个工作进程。在这部分,我们将做一些完全不同的事情——我们将向多个消费者传递同一条消息。这种模式称为“发布/订阅”。

为了说明该模式,我们将构建一个简单的日志系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序接收它们。

在我们的日志系统中,接收程序的每个运行副本都将获得消息。这样,我们就可以运行一个消费者并将日志保存到磁盘; 同时我们可以运行另一个消费者在屏幕上打印日志。

最终, 消息会被广播到所有消息接受者.

  • 消息生产者向交换机(Fanout)发送一条消息,交换机把消息广播给多个消息接收者
  • 如果没有消息接收者,消息会被丢弃
  • 消费者,生产者都要绑定交换机
  • Fanout 叫做扇出交换机
     

第一步: m3包创建消息生产者,向交换机发送消息

  1. package m3;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.Scanner;
  7. import java.util.concurrent.TimeoutException;
  8. public class Producer {
  9. public static void main(String[] args) throws IOException, TimeoutException {
  10. //连接
  11. ConnectionFactory f = new ConnectionFactory();
  12. //f.setHost("192.168.64.140");//自己的服务器,也可以连接老师的wht6.cn
  13. f.setHost("wht6.cn");
  14. f.setPort(5672);
  15. f.setUsername("admin");
  16. f.setPassword("admin");
  17. Channel c = f.newConnection().createChannel();
  18. //创建 fanout 类型交换机: logs
  19. //c.exchangeDeclare("logs","fanout");
  20. c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
  21. //向交换机发送消息
  22. while(true){
  23. System.out.println("输入消息:");
  24. String s = new Scanner(System.in).nextLine();
  25. //对于fanout交换机,第二个参数无效
  26. c.basicPublish("logs","",null,s.getBytes());
  27. }
  28. }
  29. }

第二步:m3包创建消息接收消费者,随机队列和交换机的绑定,从队列接收消息

  1. package m3;
  2. import com.rabbitmq.client.*;
  3. import java.lang.String;
  4. import java.io.IOException;
  5. import java.util.UUID;
  6. import java.util.concurrent.TimeoutException;
  7. public class Consumer {
  8. public static void main(String[] args) throws IOException, TimeoutException {
  9. //连接
  10. ConnectionFactory f = new ConnectionFactory();
  11. //f.setHost("192.168.64.140");//自己的服务器,也可以连接老师的wht6.cn
  12. f.setHost("wht6.cn");
  13. f.setPort(5672);
  14. f.setUsername("admin");
  15. f.setPassword("admin");
  16. Channel c = f.newConnection().createChannel();
  17. //1.创建随机队列
  18. String queue = UUID.randomUUID().toString();//UUID产生随机队列名
  19. c.queueDeclare(queue,false,true,true,null);
  20. //2.创建交换机-名为logs
  21. c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
  22. //3.绑定,第三个参数,对于fanout交换机无效,目前设为空串""
  23. c.queueBind(queue,"logs","");
  24. //从队列接收消息
  25. DeliverCallback deliverCallback = new DeliverCallback() {
  26. @Override
  27. public void handle(String consumerTag, Delivery message) throws IOException {
  28. String s = new String(message.getBody());
  29. System.out.println("收到: "+s);
  30. }
  31. };
  32. CancelCallback cancelCallback = new CancelCallback() {
  33. @Override
  34. public void handle(String consumerTag) throws IOException {
  35. }
  36. };
  37. c.basicConsume(queue,true,deliverCallback,cancelCallback);
  38. }
  39. }

效果:实时接收消息

4 路由模式--通过关键词匹配路由消息

路由

在上一小节,我们构建了一个简单的日志系统。我们能够向多个接收者广播日志消息。

在这一节,我们将向其添加一个特性—我们将只订阅所有消息中的一部分。例如,我们只接收关键错误消息并保存到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。

  •  可以绑定重复的关键词,带有这个关键词的消费者都可以接收到该关键词下的消息
  • 使消费者可以根据自己感兴趣的关键词绑定,来接受相应的消息

第一步: 使用direct直连交换机,携带路由关键词发送消息

  1. package m4;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.Scanner;
  7. import java.util.concurrent.TimeoutException;
  8. public class Producer {
  9. public static void main(String[] args) throws IOException, TimeoutException {
  10. //连接
  11. ConnectionFactory f = new ConnectionFactory();
  12. f.setHost("192.168.64.140");
  13. //f.setHost("wht6.cn");
  14. f.setPort(5672);
  15. f.setUsername("admin");
  16. f.setPassword("admin");
  17. Channel c = f.newConnection().createChannel();
  18. //创建 direct 类型直连交换机: direct_logs
  19. c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
  20. //发送消息,消息上需要携带路由键关键词
  21. while(true){
  22. System.out.print("输入消息: ");
  23. String s = new Scanner(System.in).nextLine();
  24. System.out.print("输入路由键: ");
  25. String k = new Scanner(System.in).nextLine();
  26. //对于默认交换机,使用队列名作为路由键
  27. c.basicPublish("direct_logs",k,null,s.getBytes());
  28. }
  29. }
  30. }

第二步:设置绑定键,接收消息

  1. package m4;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.Scanner;
  5. import java.util.concurrent.TimeoutException;
  6. public class Consumer {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //连接
  9. ConnectionFactory f = new ConnectionFactory();
  10. f.setHost("192.168.64.140");
  11. //f.setHost("wht6.cn");
  12. f.setPort(5672);
  13. f.setUsername("admin");
  14. f.setPassword("admin");
  15. Channel c = f.newConnection().createChannel();
  16. //1.随机队列--由服务器自动提供队列参数格式: (随机命名.false,ture,true)
  17. String queue = c.queueDeclare().getQueue();//随机命名,getQueue获取随机名
  18. //2.交换机
  19. c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
  20. //3.绑定,设置绑定键
  21. System.out.println("输入绑定键,使用空格隔开:"); //"aa bb cc dd"
  22. String s = new Scanner(System.in).nextLine();
  23. String[] a = s.split("\\s+");//第一个斜杠是转义字符 \s是空白字符 +加号指一到多个
  24. for (String k : a) {
  25. c.queueBind(queue,"direct_logs",k);//k是用于绑定的键值
  26. }
  27. //从随机队列接收消息
  28. DeliverCallback deliverCallback = new DeliverCallback() {
  29. @Override
  30. public void handle(String consumerTag, Delivery message) throws IOException {
  31. String s = new String(message.getBody());
  32. //从消息中携带的路由键
  33. message.getEnvelope().getRoutingKey();
  34. System.out.println("收到: "+s);
  35. }
  36. };
  37. CancelCallback cancelCallback = new CancelCallback() {
  38. @Override
  39. public void handle(String consumerTag) throws IOException {
  40. }
  41. };
  42. c.basicConsume(queue,true,deliverCallback,cancelCallback);
  43. }
  44. }

第三步:启动一个消息生产者,两个消息接收者,消息生产者发送消息和路由键,,绑定关键词的接收者就能根据关键词获取信息

 

5 主题模式,也叫路由模式,都是通过关键词接收消息

主题

 

 

在上一小节,我们改进了日志系统。我们没有使用只能进行广播的fanout交换机,而是使用Direct交换机,从而可以选择性接收日志。

虽然使用Direct交换机改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。

在我们的日志系统中,我们可能不仅希望根据级别订阅日志,还希望根据发出日志的源订阅日志。

这将给我们带来很大的灵活性——我们可能只想接收来自“cron”的关键错误,但也要接收来自“kern”的所有日志。

要在日志系统中实现这一点,我们需要了解更复杂的Topic主题交换机。

  • 关键词是a.b.c 点点点的格式输入
  • 使用主题交换机topic -- BuiltinExchangeType.TOPIC

第一步:m5包创建消息生产者类

  1. package m5;
  2. import com.rabbitmq.client.BuiltinExchangeType;
  3. import com.rabbitmq.client.Channel;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.Scanner;
  7. import java.util.concurrent.TimeoutException;
  8. public class Producer {
  9. public static void main(String[] args) throws IOException, TimeoutException {
  10. //连接
  11. ConnectionFactory f = new ConnectionFactory();
  12. f.setHost("192.168.64.140");
  13. //f.setHost("wht6.cn");
  14. f.setPort(5672);
  15. f.setUsername("admin");
  16. f.setPassword("admin");
  17. Channel c = f.newConnection().createChannel();
  18. //创建 topic 类型主题交换机: topic_logs
  19. c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
  20. //发送消息,消息上需要携带路由键关键词
  21. while(true){
  22. System.out.print("输入消息: ");
  23. String s = new Scanner(System.in).nextLine();
  24. System.out.print("输入路由键: ");
  25. String k = new Scanner(System.in).nextLine();
  26. //对于默认交换机,使用队列名作为路由键
  27. c.basicPublish("topic_logs",k,null,s.getBytes());
  28. }
  29. }
  30. }

第二步:创建消息消费者类,使用topic主题交换机

  1. package m5;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.Scanner;
  5. import java.util.concurrent.TimeoutException;
  6. public class Consumer {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //连接
  9. ConnectionFactory f = new ConnectionFactory();
  10. f.setHost("192.168.64.140");
  11. //f.setHost("wht6.cn");
  12. f.setPort(5672);
  13. f.setUsername("admin");
  14. f.setPassword("admin");
  15. Channel c = f.newConnection().createChannel();
  16. //1.随机队列--由服务器自动提供队列参数格式: (随机命名.false,ture,true)
  17. String queue = c.queueDeclare().getQueue();//随机命名,getQueue获取随机名
  18. //2.交换机
  19. c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
  20. //3.绑定,设置绑定键
  21. System.out.println("输入绑定键,使用空格隔开:"); //"aa bb cc dd"
  22. String s = new Scanner(System.in).nextLine();
  23. String[] a = s.split("\\s+");//第一个斜杠是转义字符 \s是空白字符 +加号指一到多个
  24. for (String k : a) {
  25. c.queueBind(queue,"topic_logs",k);//k是用于绑定的键值
  26. }
  27. //从随机队列接收消息
  28. DeliverCallback deliverCallback = new DeliverCallback() {
  29. @Override
  30. public void handle(String consumerTag, Delivery message) throws IOException {
  31. String s = new String(message.getBody());
  32. //从消息中携带的路由键
  33. message.getEnvelope().getRoutingKey();
  34. System.out.println("收到: "+s);
  35. }
  36. };
  37. CancelCallback cancelCallback = new CancelCallback() {
  38. @Override
  39. public void handle(String consumerTag) throws IOException {
  40. }
  41. };
  42. c.basicConsume(queue,true,deliverCallback,cancelCallback);
  43. }
  44. }

第三步:启动两个消费者和一个生产者服务测试:消息消费者绑定键,消息生产者发送消息并发送指定的绑定键,即可根据绑定键使对应的消费者接收到消息

 

6 RPC模式:远程过程调用

RPC

如果我们需要在远程电脑上运行一个方法,并且还要等待一个返回结果该怎么办?这和前面的例子不太一样, 这种模式我们通常称为远程过程调用,即RPC.

在本节中,我们将会学习使用RabbitMQ去搭建一个RPC系统:一个客户端和一个可以升级(扩展)的RPC服务器。为了模拟一个耗时任务,我们将创建一个返回斐波那契数列的虚拟的RPC服务。

RPC的工作方式 

  • 对于RPC请求,客户端发送一条带有两个属性的消息:replyTo,设置为仅为请求创建的匿名独占队列,和correlationId,设置为每个请求的惟一id值。
  • 请求被发送到rpc_queue队列。
  • RPC工作进程(即:服务器)在队列上等待请求。当一个请求出现时,它执行任务,并使用replyTo字段中的队列将结果发回客户机。
  • 客户机在回应消息队列上等待数据。当消息出现时,它检查correlationId属性。如果匹配请求中的值,则向程序返回该响应数据.

客户端


在客户端定义一个RPCClient类,并定义一个call()方法,这个方法发送一个RPC请求,并等待接收响应结果

  1. RPCClient client = new RPCClient();
  2. String result = client.call("4");
  3. System.out.println( "第四个斐波那契数是: " + result);


回调队列 Callback Queue


使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址。我们可以使用默认队列。下面是示例代码:

  1. //定义回调队列,
  2. //自动生成对列名,非持久,独占,自动删除
  3. callbackQueueName = ch.queueDeclare().getQueue();
  4. //用来设置回调队列的参数对象
  5. BasicProperties props = new BasicProperties
  6.                             .Builder()
  7.                             .replyTo(callbackQueueName)
  8.                             .build();
  9. //发送调用消息
  10. ch.basicPublish("", "rpc_queue", props, message.getBytes());


消息属性 Message Properties

  1. AMQP 0-9-1协议定义了消息的14个属性。大部分属性很少使用,下面是比较常用的4个:
  2. deliveryMode:将消息标记为持久化(值为2)或非持久化(任何其他值)。
  3. contentType:用于描述mime类型。例如,对于经常使用的JSON格式,将此属性设置为:application/json。
  4. replyTo:通常用于指定回调队列。
  5. correlationId:将RPC响应与请求关联起来非常有用。

关联id (correlationId):


在上面的代码中,我们会为每个RPC请求创建一个回调队列。 这是非常低效的,这里还有一个更好的方法:让我们为每个客户端创建一个回调队列。

这就提出了一个新的问题,在队列中得到一个响应时,我们不清楚这个响应所对应的是哪一条请求。这时候就需要使用关联id(correlationId)。我们将为每一条请求设置唯一的的id值。稍后,当我们在回调队列里收到一条消息的时候,我们将查看它的id属性,这样我们就可以匹配对应的请求和响应。如果我们发现了一个未知的id值,我们可以安全的丢弃这条消息,因为它不属于我们的请求。

拓展:测试阻塞队列

  1. package m6;
  2. import java.util.Scanner;
  3. import java.util.concurrent.ArrayBlockingQueue;
  4. import java.util.concurrent.BlockingQueue;
  5. //测试BlockingQueue阻塞队列
  6. public class TestBlockingQueue {
  7. static BlockingQueue<String> bq =
  8. new ArrayBlockingQueue<String>(10);//括号内是容量参数
  9. public static void main(String[] args) {
  10. //第一个线程:从bq取数据,没有数据会阻塞等待
  11. new Thread(() -> {
  12. System.out.println("线程1正在获取数据");
  13. try {
  14. String s = bq.take();
  15. System.out.println("线程1已获取数据:" +s);
  16. } catch (InterruptedException e) {
  17. e.printStackTrace();
  18. }
  19. }).start();
  20. //第二个线程:向bq放入数据
  21. new Thread(() -> {
  22. System.out.println("线程2 -- 输入数据放入集合:");
  23. String s = new Scanner(System.in).nextLine();
  24. bq.add(s);
  25. }).start();
  26. }
  27. }
  • 启动测试:

客户端远程调用斐波那契数

  1. package m6;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.Scanner;
  5. import java.util.UUID;
  6. import java.util.concurrent.ArrayBlockingQueue;
  7. import java.util.concurrent.TimeoutException;
  8. //客户端调用斐波那契数
  9. public class Client {
  10. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  11. System.out.println("求第几个斐波那契数: ");
  12. int n = new Scanner(System.in).nextInt();
  13. long r =f(n);
  14. System.out.println("第"+ n +"斐波那契数:" + r);
  15. }
  16. private static long f(int n) throws IOException, TimeoutException, InterruptedException {
  17. //准备阻塞队列集合
  18. ArrayBlockingQueue<Long> abq =
  19. new ArrayBlockingQueue<>(10);
  20. //连接
  21. ConnectionFactory f = new ConnectionFactory();
  22. f.setHost("192.168.64.140"); //wht6.cn
  23. f.setPort(5672);
  24. f.setUsername("admin");
  25. f.setPassword("admin");
  26. Channel c = f.newConnection().createChannel();
  27. //创建调用队列: rpc-queue
  28. c.queueDeclare("rpc-queue", false, false, false, null);
  29. //创建随机队列,用来获取计算结果
  30. String replayTo = c.queueDeclare().getQueue();//返回队列名
  31. //产生一个关联id
  32. String cid = UUID.randomUUID().toString();
  33. //发送调用信息, 携带两个参数: 返回队列名,关联id
  34. AMQP.BasicProperties prop = new AMQP.BasicProperties.Builder()
  35. .replyTo(replayTo) //返回队列名
  36. .correlationId(cid) //关联id
  37. .build();
  38. c.basicPublish("", "rpc-queue", prop, (n + "").getBytes());
  39. //执行其他运算.....
  40. System.out.println("执行其他运算---------");
  41. //需要结果时,从返回队列接收计算结果
  42. DeliverCallback deliverCallback = (consumerTag, message) -> {
  43. // 消费者线程处理计算结果
  44. //判断message中的关联id,是不是刚才发送的关联id
  45. if (cid.equals(message.getProperties().getCorrelationId())) {
  46. String s = new String(message.getBody());
  47. //把结果放入 BlockingQueue
  48. abq.add(Long.valueOf(s));
  49. }
  50. };
  51. CancelCallback cancelCallback = consumerTag -> {
  52. };
  53. c.basicConsume(replayTo, true, deliverCallback, cancelCallback);
  54. // 主线程中,从 BlockingQueue 获取数据
  55. return abq.take();
  56. }
  57. }
  • 启动测试:

后台发送消息,会阻塞在调用队列rpc-queue中,

服务端:提供处理方法

  1. package m6;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. //服务端
  6. public class Server {
  7. public static void main(String[] args) throws IOException, TimeoutException {
  8. //连接
  9. ConnectionFactory f = new ConnectionFactory();
  10. f.setHost("192.168.64.140"); //wht6.cn
  11. f.setPort(5672);
  12. f.setUsername("admin");
  13. f.setPassword("admin");
  14. Channel c = f.newConnection().createChannel();
  15. //创建调用队列: rpc-queue
  16. c.queueDeclare("rpc-queue", false, false, false, null);
  17. //从 rpc-queue 接收调用消息(回调对象)
  18. DeliverCallback deliverCallback =(consumerTag,message) -> {
  19. //求出斐波那契数
  20. //把结果发回到返回队列,并携带关联id
  21. //从 message 取出: n,返回队列名, 关联id
  22. Integer n = Integer.valueOf(new String(message.getBody()));
  23. String replyTo = message.getProperties().getReplyTo();
  24. String cid = message.getProperties().getCorrelationId();
  25. System.out.println("求第"+n+"个斐波那契数");
  26. long r = fbnq(n);//调用fbnq方法
  27. AMQP.BasicProperties prop = new AMQP.BasicProperties.Builder()
  28. .correlationId(cid)
  29. .build();
  30. c.basicPublish("",replyTo,prop,(r+"").getBytes());
  31. };
  32. CancelCallback cancelCallback =consumerTag ->{};
  33. c.basicConsume("rpc-queue",true,deliverCallback,cancelCallback);
  34. }
  35. //求斐波那契数的方法
  36. public static long fbnq(int n){
  37. if (n == 1 || n ==2){
  38. return 1;
  39. }
  40. /*
  41. *起始:
  42. * a = 1
  43. * b = 1
  44. * 第一次计算后:
  45. * b= a+b =2
  46. * a=1
  47. * 第二次计算后:
  48. * b=a+b=1+2=3
  49. * a=2
  50. * 规律:
  51. * b =a+b
  52. * a =b-a
  53. * */
  54. long a= 1;
  55. long b =1;
  56. for (int i = 3; i <= n; i++) {
  57. b = a + b;
  58. a = b-a;
  59. }
  60. return b;
  61. }
  62. }
  • 启动客户端,服务端测试:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/693279
推荐阅读
相关标签
  

闽ICP备14008679号