赞
踩
客户端向RabbitMQ中的调用队列发送消息,服务端从调用队列取出消息并将结果返回给RabbitMQ中的返回队列,假如客户端的消息请求是计算第几位菲波拉契数的值。
- <?xml version="1.0" encoding="UTF-8"?>
- <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-parent</artifactId>
- <version>2.3.2.RELEASE</version>
- <relativePath/> <!-- lookup parent from repository -->
- </parent>
- <groupId>cn.tedu</groupId>
- <artifactId>rabbitmq-spring</artifactId>
- <version>0.0.1-SNAPSHOT</version>
- <name>rabbitmq-spring</name>
- <description>Demo project for Spring Boot</description>
- <properties>
- <java.version>1.8</java.version>
- </properties>
- <dependencies>
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-amqp</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-starter-test</artifactId>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.springframework.amqp</groupId>
- <artifactId>spring-rabbit-test</artifactId>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.springframework.boot</groupId>
- <artifactId>spring-boot-maven-plugin</artifactId>
- </plugin>
- </plugins>
- </build>
-
- </project>
- spring:
- rabbitmq:
- host: wht6.cn
- port: 5672
- username: admin
- password: admin
- virtual-host: vh-zl
注意:如果用的是别人的rabbitmq服务的话,则需要添加 virtual-host,并且还需要去服务里面手动添加配置。
只是创建了普通队列。
- package cn.tedu.sp09.m6;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.Scanner;
- import java.util.UUID;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.TimeoutException;
- /**
- * @Author 作者:小龙猿
- * @Project 项目:springcloud1
- * @Time 时间:2021/9/18 10:45
- */
- public class Client {
- public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
- System.out.println("求第几个菲波拉契数:");
- int n = new Scanner(System.in).nextInt();
- long r = f(n);
- System.out.println("第"+n+"个菲波拉契数:"+r);
-
- }
-
- private static long f(int n) throws IOException, TimeoutException, InterruptedException {
- //客户端准备阻塞队列集合
- ArrayBlockingQueue<Long> abq = new ArrayBlockingQueue<>(10);
- //连接
- ConnectionFactory f = new ConnectionFactory();
- f.setHost("wht6.cn");
- f.setPort(5672);
- f.setUsername("admin");
- f.setPassword("admin");
- Channel c = f.newConnection().createChannel();
- //创建调用队列,调用队列和返回队列是消息中间件里面的
- c.queueDeclare("rpc-queue", false, false, false, null);
- //创建返回队列,队列名随机,存放返回结果
- String replayTo = c.queueDeclare().getQueue();
- //产生一个关联id,用于指定将结果返回给哪个用户
- String cid = UUID.randomUUID().toString();
- //发送调用信息,携带两个参数:返回队列名和关联id
- AMQP.BasicProperties prop = new AMQP.BasicProperties.Builder().replyTo(replayTo).correlationId(cid).build();
- c.basicPublish("", "rpc-queue",prop, (n+"").getBytes());
- //执行其它运算
- System.out.println("执行其它运算");
- //需要结果时,从返回队列接收计算结果
- DeliverCallback deliverCallback = (s, delivery) -> {
- //消费者线程处理计算结果
- // 判断delivery中的关联id是不是刚才发送的关联id
- if (cid.equals(delivery.getProperties().getCorrelationId())){
- String s1 = new String(delivery.getBody());//获取返回结果
- //把结果放入 BlockingQueue
- abq.add(Long.valueOf(s1));
- }
- };
- CancelCallback cancelCallback = s -> {
-
- };
- c.basicConsume(replayTo, true,deliverCallback,cancelCallback);
- //主线程中,从BlockingQueue 获取数据
- return abq.take();//返回客户端阻塞队列里面的结果
- }
- }
- package cn.tedu.sp09.m6;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
- /**
- * @Author 作者:小龙猿
- * @Project 项目:springcloud1
- * @Time 时间:2021/9/18 14:04
- */
- public class Server {
- public static void main(String[] args) throws IOException, TimeoutException {
- //连接
- ConnectionFactory f = new ConnectionFactory();
- f.setHost("wht6.cn");
- f.setPort(5672);
- f.setUsername("admin");
- f.setPassword("admin");
- Channel c = f.newConnection().createChannel();
- //创建调用队列rpc-queue,调用队列和返回队列是消息中间件里面的
- c.queueDeclare("rpc-queue", false, false, false, null);
- //从rpc-queue 接收调用信息
- DeliverCallback deliverCallback = (s,delivery)->{
- //求出菲波拉契数
- //从delivery取出: n,返回队列名,关联id
- Integer n = Integer.valueOf(new String(delivery.getBody()));
- String replyTo = delivery.getProperties().getReplyTo();
- String cid = delivery.getProperties().getCorrelationId();
- System.out.println("求第"+n+"个菲波拉契数");
- long r = fbnq(n);
- //把结果发回到返回队列,携带关联id参数
- AMQP.BasicProperties prop = new AMQP.BasicProperties.Builder()
- .correlationId(cid)
- .build();
- c.basicPublish("", replyTo, prop, (r+"").getBytes());
- };
- CancelCallback cancelCallback = s -> {};
- c.basicConsume("rpc-queue", true,deliverCallback,cancelCallback);
- }
- public static long fbnq(int n){
- if (n == 1 || n == 2){
- return 1;
- }
- long a = 1;
- long b = 1;
- for (int i = 3; i < n; i++) {
- b = a + b;
- a = b - a;
- }
- return b;
- }
- }
入门版都是自己创建队列并携带参数信息,进阶版就把这些操作交给了spring去处理。
- package cn.tedu.rabbitmqspring.m6;
- import org.springframework.amqp.core.Queue;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.SpringApplication;
- import org.springframework.boot.autoconfigure.SpringBootApplication;
- import org.springframework.context.annotation.Bean;
- import javax.annotation.PostConstruct;
- import java.util.Scanner;
- import java.util.UUID;
- /**
- * @Author 作者:小龙猿
- * @Project 项目:rabbitmq-api
- * @Time 时间:2021/9/18 14:58
- */
- @SpringBootApplication
- public class Main {
- public static void main(String[] args) {
- SpringApplication.run(Main.class, args);
- }
- /*
- 这个对象放入spring容器,对象名叫:rndQueue
- SPEL --- Spring Expression Language
- 直接访问 spring 容器中的对象
- #{rndQueue.name}
- OGNL --- Object Graph Navigation Language
- Struts2 中的一种标记
- ${}
- */
- @Bean
- public Queue rndQueue(){//创建随机队列
- return new Queue(UUID.randomUUID().toString(),false,true,true);
- }
- @Autowired
- private Client client;
- //创建测试方法
- @PostConstruct
- public void test(){
- new Thread(()->{
- while (true){
- System.out.print("求第几个菲波拉契数:");
- int n = new Scanner(System.in).nextInt();
- client.send(n);
- }
- }).start();
- }
- }
- package cn.tedu.rabbitmqspring.m6;
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.amqp.core.MessageProperties;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.amqp.support.AmqpHeaders;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.stereotype.Component;
- import java.util.UUID;
- /**
- * @Author 作者:小龙猿
- * @Project 项目:rabbitmq-api
- * @Time 时间:2021/9/18 14:58
- */
- @Component
- public class Client {
- @Autowired
- private AmqpTemplate t;
- //使用 SPEL 表达式获取随机队列名: "#{rndQueue.name}"
- @Value("#{rndQueue.name}")
- private String replyTo;
- //发送调用数据时, 携带随机队列名和correlationId
- public void send(int n){
- t.convertAndSend("rpc-queue", n,(message)->{
- MessageProperties p = message.getMessageProperties();
- p.setReplyTo(replyTo);
- p.setCorrelationId(UUID.randomUUID().toString());
- return message;
- });
- }
- //从随机队列接收计算结果,从随机队列接收调用结果, 并获取correlationId
- @RabbitListener(queues = "#{rndQueue.name}")
- public void receive(long r, @Header(name = AmqpHeaders.CORRELATION_ID)String cid){
- System.out.println("结果:"+r);
- }
- }
- package cn.tedu.rabbitmqspring.m6;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Component;
- /**
- * @Author 作者:小龙猿
- * @Project 项目:rabbitmq-api
- * @Time 时间:2021/9/18 15:26
- */
- @Component
- public class Server {
- //如果处理消息的方法不是void,有返回值,
- //那么spring会把返回值通过返回队列发回到客户端,并携带关联id
- @RabbitListener(queues = "rpc-queue")
- public long receive(int n){
- long r = f(n);
- return r;
- }
- private long f(int n) {
- if (n ==1 || n == 2)
- return 1;
- return f(n-1)+f(n-2);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。