赞
踩
目录
克隆 centos-8 或 centos-7: docker-base
设置ip
- ./ip-static
- ip: 192.168.64.150
-
- ifconfig
注意:若这里出现error,需要使用这两个命令,再重新设置ip
- nmcli n on
-
- systemctl restart NetworkManager
3.安装 docker,参考csdn笔记: Docker(一) - 离线安装_wanght笔记-CSDN博客_docker离线安装
上传离线安装文件,不用按照笔记在线下载
DevOps课前资料\docker\docker-install 文件夹
上传到 /root/
确认安装结果:
- docker info
-
- docker run hello-world
从 docker-base 克隆: rabbitmq
设置ip
- ./ip-static
- ip: 192.168.64.140
-
- ifconfig
3. 按照笔记用docker运行 rabbitmq,过程中需要联网下载 rabbitmq 的镜像
- docker pull rabbitmq:management 在线下载镜像
-
- 执行 docker images 查看镜像
-
- 关闭防火墙
-
- systemctl stop firewalld
- systemctl disable firewalld
-
- 重启 docker 系统服务
-
- systemctl restart docker
-
- mkdir /etc/rabbitmq
-
- vim /etc/rabbitmq/rabbitmq.conf #也可以先cd //etc/rabbitmq 再vim rabbitmq.conf 进行编辑
-
- # rabbitmq.conf配置文件中添加两行配置:
- default_user = admin
- default_pass = admin
-
- #启动rabbitmq容器
- docker run -d --name rabbit \
- -p 5672:5672 \
- -p 15672:15672 \
- -v /etc/rabbitmq/rabbitmq.conf:/etc/rabbitmq/rabbitmq.conf \
- -e RABBITMQ_CONFIG_FILE=/etc/rabbitmq/rabbitmq.conf \
- rabbitmq:management
-
- 访问管理控制台 http://192.168.64.140:15672
- 用户名密码都是 admin
发送和接受消息的端口都是5672
管理rabbitmq控制台,界面的端口是15672
下次使用时:执行这两个步骤:
spring默认集成Rabbitmq
腾讯使用的是Tubemq
阿里使用的是Rocketmq
使用场景:服务解耦,流量削峰,异步调用
RabbitMQ_wanght笔记-CSDN博客_rabbitmq
A服务只需要向消息服务器发送消息,而不用考虑谁需要这些数据;下游服务如果需要数据,自行从消息服务器订阅消息,不再需要数据时则取消订阅即可
RabbitMQ是一个消息中间件,你可以想象它是一个邮局。当你把信件放到邮箱里时,能够确信邮递员会正确地递送你的信件。RabbitMq就是一个邮箱、一个邮局和一个邮递员。
第一步:创建工程
project项目名称:rabbitmq
再在下面创建maven module : rabbitmq-api
第二步:pom文件添加依赖
- <dependencies>
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.4.3</version>
- </dependency>
- </dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-compiler-plugin</artifactId>
- <version>3.8.0</version>
- <configuration>
- <source>1.8</source>
- <target>1.8</target>
- </configuration>
- </plugin>
- </plugins>
- </build>
第三步:创建包名m1,发送者生成消息的类
- package m1;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.nio.charset.StandardCharsets;
- import java.util.concurrent.TimeoutException;
- //生产者队列,启动后发送消息,会在指定服务器看到发送的消息
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接
- ConnectionFactory f = new ConnectionFactory();
- f.setHost("192.168.64.140");//rabbitmq的ip 老师的服务器:wht6.cn
- f.setPort(5672);//5672 端口进行接收,发送消息
- f.setUsername("admin");
- f.setPassword("admin");
- Connection con = f.newConnection();
- Channel c = con.createChannel();//通信
- //创建队列 名为hello-world ,如果队列已经存在,不会重复创建
- /**
- * 参数:
- * 1.队列名
- * 2.是否是持久队列
- * 3.是否是排他队列,独占队列
- * 4.是否自动删除(没有消费者时自动删除)
- * 5.其他参数属性
- * */
- c.queueDeclare("hello-world-666",false,false,false,null);
- //向创建的队列,发送消息-先发送到消息服务器-接收者再接收
- /**
- * 参数:
- * 1. 交换机: ""空串是一个默认交换机,Exchange栏中叫AMQ default
- * 3. 消息的其他参数属性,键值对数据
- * */
- c.basicPublish(
- "","hello-world-666",
- null,"Hello world!".getBytes(StandardCharsets.UTF_8));
- }
- }
启动服务器,查看发送的消息:
第四步:创建消费者收到的消息类
- package m1;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- //消费者队列
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接
- ConnectionFactory f = new ConnectionFactory();
- f.setHost("192.168.64.140");//rabbitmq的ip 老师的服务器:wht6.cn
- f.setPort(5672);//5672 端口进行接收,发送消息
- f.setUsername("admin");
- f.setPassword("admin");
- Connection con = f.newConnection();
- Channel c = con.createChannel();//通信
- //创建队列 名为hello-world ,如果队列已经存在,不会重复创建
- c.queueDeclare("hello-world-666",false,false,false,null);
- //创建回调对象
- DeliverCallback deliverCallback = new DeliverCallback(){
- @Override
- public void handle(String consumerTag, Delivery message) throws IOException {
- byte[] a = message.getBody();//收到的消息转为数组
- String s = new String(a);//转为字符串
- System.out.println("收到:"+s);
- }
- };
- CancelCallback cancelCallback = new CancelCallback(){
- @Override
- public void handle(String message) throws IOException {
-
- }
- };
- //接收消息,收到的消息会传递到一个回调对象去处理
- /**
- * 第二个参数: autoAck- auto acknowlegment 自动确认
- * true - 自动确认
- * 消息一向消费者发送,服务器立即自动确认消息,删除消息
- * false - 手动确认
- * 消息发出后,服务器会缓存消息,不删除,
- * 等待消费者(接收者)发回一个确认消息(回执)才删除,
- * 如果消费者处理消息过程中崩溃或离线,
- * 服务器会回滚消息,等待重新发送
- * */
- //c.basicConsume("hello-world-666",true,处理消息的回到对象,null);
- c.basicConsume("hello-world-666",true,deliverCallback,cancelCallback);
- }
- }
第五步:启动测试结果,接收者可以看到发送者发来的消息
第六步:当发送者服务和接收者服务都打开时,发送者发送消息,接收者就自动接受到
工作队列(即任务队列)背后的主要思想是避免立即执行资源密集型任务,并且必须等待它完成。相反,我们将任务安排在稍后完成。
我们将任务封装为消息并将其发送到队列。后台运行的工作进程将获取任务并最终执行任务。当运行多个消费者时,任务将在它们之间分发。
使用任务队列的一个优点是能够轻松地并行工作。如果我们正在积压工作任务,我们可以添加更多工作进程,这样就可以轻松扩展。
第一步:创建m2包,创建消息发送者,生产者的Producer类
- package m2;
-
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.nio.charset.StandardCharsets;
- import java.util.Scanner;
- import java.util.concurrent.TimeoutException;
-
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接
- ConnectionFactory f = new ConnectionFactory();
- f.setHost("192.168.64.140");//rabbitmq的ip 老师的服务器:wht6.cn
- f.setPort(5672);//5672 端口进行接收,发送消息
- f.setUsername("admin");
- f.setPassword("admin");
- Connection con = f.newConnection();
- Channel c = con.createChannel();//通信
- //创建队列 名为hello-world-666 ,如果队列已经存在,不会重复创建
- c.queueDeclare("hello-world-666",false,false,false,null);
- //向hello-world-666队列,发送消息
- while (true){
- System.out.println("输入消息:");
- String s = new Scanner(System.in).nextLine();
- c.basicPublish("","hello-world-666",
- null,s.getBytes(StandardCharsets.UTF_8));
- }
- }
- }
第二步:创建消息接收者,消费者的Consumer类,和简单模式的消费者类相同,只是添加了模拟耗时消息
- package m2;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接
- ConnectionFactory f = new ConnectionFactory();
- f.setHost("192.168.64.140");//rabbitmq的ip 老师的服务器:wht6.cn
- f.setPort(5672);//5672 端口进行接收,发送消息
- f.setUsername("admin");
- f.setPassword("admin");
- Connection con = f.newConnection();
- Channel c = con.createChannel();//通信
- //创建队列 名为hello-world ,如果队列已经存在,不会重复创建
- c.queueDeclare("hello-world-666",false,false,false,null);
- //创建回调对象
- DeliverCallback deliverCallback = new DeliverCallback(){
- @Override
- public void handle(String consumerTag, Delivery message) throws IOException {
- byte[] a = message.getBody();//收到的消息转为数组
- String s = new String(a);//转为字符串
- System.out.println("收到:"+s);//"sdsasdads"
- //模拟耗时消息
- //遍历字符串,找'.'点字符,每找到一个就暂停一秒
- for (int i = 0; i < s.length(); i++) {
- if ('.' == s.charAt(i)){
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- System.out.println("---消息处理结束----");
- }
- };
- CancelCallback cancelCallback = new CancelCallback(){
- @Override
- public void handle(String message) throws IOException {
-
- }
- };
- //c.basicConsume("hello-world-666",true,处理消息的回到对象,null);
- c.basicConsume("hello-world-666",true,deliverCallback,cancelCallback);
- }
- }
第三步:创建多个消息消费者接收者,使一个消息生产者,发送者发送消息时,多个消息消费者接收者随机其中一个接受消息
先启动一个一个消费者启动类,通过Edit Configuration 设置多个消费者
第四步:启动消息生产者类,和两个消息接收者类,发送消息进行接受测试
缺点:接收者1接收者2依次接收消息,但是接收者1接受的消息处理时间很长时,接收者2并不能为接收者1分担消息,而是等待接收者1处理完了,再继续接收消息,造成了接收阻塞状态!!!
第一步:手动ACK ,手动发送回执
测试:
发送几条量大延迟的消息,两个接收者其中有一个会有未接收到的数据,会在rabbitqm的Unacked存储,关掉两个接收者,所有消息会存到rabbitemq的Ready中,再开启其中一个接收者,所有的消息会向这个接收者发送.
第二步:每次只接收一条消息,处理完之前不接受下一条
合理分发消息后,消息会缓存在rabbitmq服务器,一旦服务器崩溃,消息也会消失!所以要对消息持久化
1. 队列持久化
2. 消息持久化
生产者消费者的队列名都改为task_queue ,生产者消息持久化,null改
MessageProperties.PERSISTENT_BASIC
消息消费者接收者改掉队列名task_queue
在前面的例子中,我们任务消息只交付给一个工作进程。在这部分,我们将做一些完全不同的事情——我们将向多个消费者传递同一条消息。这种模式称为“发布/订阅”。
为了说明该模式,我们将构建一个简单的日志系统。它将由两个程序组成——第一个程序将发出日志消息,第二个程序接收它们。
在我们的日志系统中,接收程序的每个运行副本都将获得消息。这样,我们就可以运行一个消费者并将日志保存到磁盘; 同时我们可以运行另一个消费者在屏幕上打印日志。
最终, 消息会被广播到所有消息接受者.
第一步: m3包创建消息生产者,向交换机发送消息
- package m3;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.Scanner;
- import java.util.concurrent.TimeoutException;
-
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接
- ConnectionFactory f = new ConnectionFactory();
- //f.setHost("192.168.64.140");//自己的服务器,也可以连接老师的wht6.cn
- f.setHost("wht6.cn");
- f.setPort(5672);
- f.setUsername("admin");
- f.setPassword("admin");
- Channel c = f.newConnection().createChannel();
- //创建 fanout 类型交换机: logs
- //c.exchangeDeclare("logs","fanout");
- c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
- //向交换机发送消息
- while(true){
- System.out.println("输入消息:");
- String s = new Scanner(System.in).nextLine();
- //对于fanout交换机,第二个参数无效
- c.basicPublish("logs","",null,s.getBytes());
- }
- }
- }
第二步:m3包创建消息接收消费者,随机队列和交换机的绑定,从队列接收消息
- package m3;
-
- import com.rabbitmq.client.*;
- import java.lang.String;
-
- import java.io.IOException;
- import java.util.UUID;
- import java.util.concurrent.TimeoutException;
-
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接
- ConnectionFactory f = new ConnectionFactory();
- //f.setHost("192.168.64.140");//自己的服务器,也可以连接老师的wht6.cn
- f.setHost("wht6.cn");
- f.setPort(5672);
- f.setUsername("admin");
- f.setPassword("admin");
- Channel c = f.newConnection().createChannel();
- //1.创建随机队列
- String queue = UUID.randomUUID().toString();//UUID产生随机队列名
- c.queueDeclare(queue,false,true,true,null);
- //2.创建交换机-名为logs
- c.exchangeDeclare("logs", BuiltinExchangeType.FANOUT);
- //3.绑定,第三个参数,对于fanout交换机无效,目前设为空串""
- c.queueBind(queue,"logs","");
- //从队列接收消息
- DeliverCallback deliverCallback = new DeliverCallback() {
- @Override
- public void handle(String consumerTag, Delivery message) throws IOException {
- String s = new String(message.getBody());
- System.out.println("收到: "+s);
- }
- };
- CancelCallback cancelCallback = new CancelCallback() {
- @Override
- public void handle(String consumerTag) throws IOException {
- }
- };
- c.basicConsume(queue,true,deliverCallback,cancelCallback);
- }
- }
效果:实时接收消息
在上一小节,我们构建了一个简单的日志系统。我们能够向多个接收者广播日志消息。
在这一节,我们将向其添加一个特性—我们将只订阅所有消息中的一部分。例如,我们只接收关键错误消息并保存到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
第一步: 使用direct直连交换机,携带路由关键词发送消息
- package m4;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.Scanner;
- import java.util.concurrent.TimeoutException;
-
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接
- ConnectionFactory f = new ConnectionFactory();
- f.setHost("192.168.64.140");
- //f.setHost("wht6.cn");
- f.setPort(5672);
- f.setUsername("admin");
- f.setPassword("admin");
- Channel c = f.newConnection().createChannel();
- //创建 direct 类型直连交换机: direct_logs
- c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
- //发送消息,消息上需要携带路由键关键词
- while(true){
- System.out.print("输入消息: ");
- String s = new Scanner(System.in).nextLine();
- System.out.print("输入路由键: ");
- String k = new Scanner(System.in).nextLine();
- //对于默认交换机,使用队列名作为路由键
- c.basicPublish("direct_logs",k,null,s.getBytes());
- }
-
- }
- }
第二步:设置绑定键,接收消息
- package m4;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.Scanner;
- import java.util.concurrent.TimeoutException;
-
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接
- ConnectionFactory f = new ConnectionFactory();
- f.setHost("192.168.64.140");
- //f.setHost("wht6.cn");
- f.setPort(5672);
- f.setUsername("admin");
- f.setPassword("admin");
- Channel c = f.newConnection().createChannel();
- //1.随机队列--由服务器自动提供队列参数格式: (随机命名.false,ture,true)
- String queue = c.queueDeclare().getQueue();//随机命名,getQueue获取随机名
- //2.交换机
- c.exchangeDeclare("direct_logs", BuiltinExchangeType.DIRECT);
- //3.绑定,设置绑定键
- System.out.println("输入绑定键,使用空格隔开:"); //"aa bb cc dd"
- String s = new Scanner(System.in).nextLine();
- String[] a = s.split("\\s+");//第一个斜杠是转义字符 \s是空白字符 +加号指一到多个
- for (String k : a) {
- c.queueBind(queue,"direct_logs",k);//k是用于绑定的键值
- }
- //从随机队列接收消息
- DeliverCallback deliverCallback = new DeliverCallback() {
- @Override
- public void handle(String consumerTag, Delivery message) throws IOException {
- String s = new String(message.getBody());
- //从消息中携带的路由键
- message.getEnvelope().getRoutingKey();
- System.out.println("收到: "+s);
- }
- };
- CancelCallback cancelCallback = new CancelCallback() {
- @Override
- public void handle(String consumerTag) throws IOException {
- }
- };
- c.basicConsume(queue,true,deliverCallback,cancelCallback);
- }
-
- }
第三步:启动一个消息生产者,两个消息接收者,消息生产者发送消息和路由键,,绑定关键词的接收者就能根据关键词获取信息
在上一小节,我们改进了日志系统。我们没有使用只能进行广播的fanout交换机,而是使用Direct交换机,从而可以选择性接收日志。
虽然使用Direct交换机改进了我们的系统,但它仍然有局限性——它不能基于多个标准进行路由。
在我们的日志系统中,我们可能不仅希望根据级别订阅日志,还希望根据发出日志的源订阅日志。
这将给我们带来很大的灵活性——我们可能只想接收来自“cron”的关键错误,但也要接收来自“kern”的所有日志。
要在日志系统中实现这一点,我们需要了解更复杂的Topic主题交换机。
第一步:m5包创建消息生产者类
- package m5;
-
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.Scanner;
- import java.util.concurrent.TimeoutException;
-
- public class Producer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接
- ConnectionFactory f = new ConnectionFactory();
- f.setHost("192.168.64.140");
- //f.setHost("wht6.cn");
- f.setPort(5672);
- f.setUsername("admin");
- f.setPassword("admin");
- Channel c = f.newConnection().createChannel();
- //创建 topic 类型主题交换机: topic_logs
- c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
- //发送消息,消息上需要携带路由键关键词
- while(true){
- System.out.print("输入消息: ");
- String s = new Scanner(System.in).nextLine();
- System.out.print("输入路由键: ");
- String k = new Scanner(System.in).nextLine();
- //对于默认交换机,使用队列名作为路由键
- c.basicPublish("topic_logs",k,null,s.getBytes());
- }
-
- }
- }
第二步:创建消息消费者类,使用topic主题交换机
- package m5;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.Scanner;
- import java.util.concurrent.TimeoutException;
-
- public class Consumer {
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接
- ConnectionFactory f = new ConnectionFactory();
- f.setHost("192.168.64.140");
- //f.setHost("wht6.cn");
- f.setPort(5672);
- f.setUsername("admin");
- f.setPassword("admin");
- Channel c = f.newConnection().createChannel();
- //1.随机队列--由服务器自动提供队列参数格式: (随机命名.false,ture,true)
- String queue = c.queueDeclare().getQueue();//随机命名,getQueue获取随机名
- //2.交换机
- c.exchangeDeclare("topic_logs", BuiltinExchangeType.TOPIC);
- //3.绑定,设置绑定键
- System.out.println("输入绑定键,使用空格隔开:"); //"aa bb cc dd"
- String s = new Scanner(System.in).nextLine();
- String[] a = s.split("\\s+");//第一个斜杠是转义字符 \s是空白字符 +加号指一到多个
- for (String k : a) {
- c.queueBind(queue,"topic_logs",k);//k是用于绑定的键值
- }
- //从随机队列接收消息
- DeliverCallback deliverCallback = new DeliverCallback() {
- @Override
- public void handle(String consumerTag, Delivery message) throws IOException {
- String s = new String(message.getBody());
- //从消息中携带的路由键
- message.getEnvelope().getRoutingKey();
- System.out.println("收到: "+s);
- }
- };
- CancelCallback cancelCallback = new CancelCallback() {
- @Override
- public void handle(String consumerTag) throws IOException {
- }
- };
- c.basicConsume(queue,true,deliverCallback,cancelCallback);
- }
-
- }
第三步:启动两个消费者和一个生产者服务测试:消息消费者绑定键,消息生产者发送消息并发送指定的绑定键,即可根据绑定键使对应的消费者接收到消息
如果我们需要在远程电脑上运行一个方法,并且还要等待一个返回结果该怎么办?这和前面的例子不太一样, 这种模式我们通常称为远程过程调用,即RPC.
在本节中,我们将会学习使用RabbitMQ去搭建一个RPC系统:一个客户端和一个可以升级(扩展)的RPC服务器。为了模拟一个耗时任务,我们将创建一个返回斐波那契数列的虚拟的RPC服务。
在客户端定义一个RPCClient类,并定义一个call()方法,这个方法发送一个RPC请求,并等待接收响应结果
- RPCClient client = new RPCClient();
- String result = client.call("4");
- System.out.println( "第四个斐波那契数是: " + result);
使用RabbitMQ去实现RPC很容易。一个客户端发送请求信息,并得到一个服务器端回复的响应信息。为了得到响应信息,我们需要在请求的时候发送一个“回调”队列地址。我们可以使用默认队列。下面是示例代码:
- //定义回调队列,
- //自动生成对列名,非持久,独占,自动删除
- callbackQueueName = ch.queueDeclare().getQueue();
-
- //用来设置回调队列的参数对象
- BasicProperties props = new BasicProperties
- .Builder()
- .replyTo(callbackQueueName)
- .build();
- //发送调用消息
- ch.basicPublish("", "rpc_queue", props, message.getBytes());
在上面的代码中,我们会为每个RPC请求创建一个回调队列。 这是非常低效的,这里还有一个更好的方法:让我们为每个客户端创建一个回调队列。
这就提出了一个新的问题,在队列中得到一个响应时,我们不清楚这个响应所对应的是哪一条请求。这时候就需要使用关联id(correlationId)。我们将为每一条请求设置唯一的的id值。稍后,当我们在回调队列里收到一条消息的时候,我们将查看它的id属性,这样我们就可以匹配对应的请求和响应。如果我们发现了一个未知的id值,我们可以安全的丢弃这条消息,因为它不属于我们的请求。
- package m6;
-
- import java.util.Scanner;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
-
- //测试BlockingQueue阻塞队列
- public class TestBlockingQueue {
- static BlockingQueue<String> bq =
- new ArrayBlockingQueue<String>(10);//括号内是容量参数
- public static void main(String[] args) {
- //第一个线程:从bq取数据,没有数据会阻塞等待
- new Thread(() -> {
- System.out.println("线程1正在获取数据");
- try {
- String s = bq.take();
- System.out.println("线程1已获取数据:" +s);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }).start();
-
- //第二个线程:向bq放入数据
- new Thread(() -> {
- System.out.println("线程2 -- 输入数据放入集合:");
- String s = new Scanner(System.in).nextLine();
- bq.add(s);
- }).start();
-
- }
- }
- package m6;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.Scanner;
- import java.util.UUID;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.TimeoutException;
-
- //客户端调用斐波那契数
- public class Client {
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- System.out.println("求第几个斐波那契数: ");
- int n = new Scanner(System.in).nextInt();
- long r =f(n);
- System.out.println("第"+ n +"斐波那契数:" + r);
- }
-
- private static long f(int n) throws IOException, TimeoutException, InterruptedException {
- //准备阻塞队列集合
- ArrayBlockingQueue<Long> abq =
- new ArrayBlockingQueue<>(10);
- //连接
- ConnectionFactory f = new ConnectionFactory();
- f.setHost("192.168.64.140"); //wht6.cn
- f.setPort(5672);
- f.setUsername("admin");
- f.setPassword("admin");
- Channel c = f.newConnection().createChannel();
- //创建调用队列: rpc-queue
- c.queueDeclare("rpc-queue", false, false, false, null);
- //创建随机队列,用来获取计算结果
- String replayTo = c.queueDeclare().getQueue();//返回队列名
- //产生一个关联id
- String cid = UUID.randomUUID().toString();
- //发送调用信息, 携带两个参数: 返回队列名,关联id
- AMQP.BasicProperties prop = new AMQP.BasicProperties.Builder()
- .replyTo(replayTo) //返回队列名
- .correlationId(cid) //关联id
- .build();
- c.basicPublish("", "rpc-queue", prop, (n + "").getBytes());
- //执行其他运算.....
-
- System.out.println("执行其他运算---------");
- //需要结果时,从返回队列接收计算结果
- DeliverCallback deliverCallback = (consumerTag, message) -> {
- // 消费者线程处理计算结果
- //判断message中的关联id,是不是刚才发送的关联id
- if (cid.equals(message.getProperties().getCorrelationId())) {
- String s = new String(message.getBody());
- //把结果放入 BlockingQueue
- abq.add(Long.valueOf(s));
- }
- };
- CancelCallback cancelCallback = consumerTag -> {
- };
- c.basicConsume(replayTo, true, deliverCallback, cancelCallback);
- // 主线程中,从 BlockingQueue 获取数据
- return abq.take();
-
- }
- }
后台发送消息,会阻塞在调用队列rpc-queue中,
- package m6;
-
- import com.rabbitmq.client.*;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- //服务端
- public class Server {
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接
- ConnectionFactory f = new ConnectionFactory();
- f.setHost("192.168.64.140"); //wht6.cn
- f.setPort(5672);
- f.setUsername("admin");
- f.setPassword("admin");
- Channel c = f.newConnection().createChannel();
- //创建调用队列: rpc-queue
- c.queueDeclare("rpc-queue", false, false, false, null);
- //从 rpc-queue 接收调用消息(回调对象)
- DeliverCallback deliverCallback =(consumerTag,message) -> {
- //求出斐波那契数
- //把结果发回到返回队列,并携带关联id
- //从 message 取出: n,返回队列名, 关联id
- Integer n = Integer.valueOf(new String(message.getBody()));
- String replyTo = message.getProperties().getReplyTo();
- String cid = message.getProperties().getCorrelationId();
- System.out.println("求第"+n+"个斐波那契数");
- long r = fbnq(n);//调用fbnq方法
-
- AMQP.BasicProperties prop = new AMQP.BasicProperties.Builder()
- .correlationId(cid)
- .build();
- c.basicPublish("",replyTo,prop,(r+"").getBytes());
- };
- CancelCallback cancelCallback =consumerTag ->{};
- c.basicConsume("rpc-queue",true,deliverCallback,cancelCallback);
-
- }
- //求斐波那契数的方法
- public static long fbnq(int n){
- if (n == 1 || n ==2){
- return 1;
- }
- /*
- *起始:
- * a = 1
- * b = 1
- * 第一次计算后:
- * b= a+b =2
- * a=1
- * 第二次计算后:
- * b=a+b=1+2=3
- * a=2
- * 规律:
- * b =a+b
- * a =b-a
- * */
- long a= 1;
- long b =1;
- for (int i = 3; i <= n; i++) {
- b = a + b;
- a = b-a;
- }
- return b;
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。