当前位置:   article > 正文

RabbitMQ消息队列的入门实践_@rabbitlistener(queues = "#{

@rabbitlistener(queues = "#{

需求

客户端向RabbitMQ中的调用队列发送消息,服务端从调用队列取出消息并将结果返回给RabbitMQ中的返回队列,假如客户端的消息请求是计算第几位菲波拉契数的值。

入门版

添加依赖

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <parent>
  6. <groupId>org.springframework.boot</groupId>
  7. <artifactId>spring-boot-starter-parent</artifactId>
  8. <version>2.3.2.RELEASE</version>
  9. <relativePath/> <!-- lookup parent from repository -->
  10. </parent>
  11. <groupId>cn.tedu</groupId>
  12. <artifactId>rabbitmq-spring</artifactId>
  13. <version>0.0.1-SNAPSHOT</version>
  14. <name>rabbitmq-spring</name>
  15. <description>Demo project for Spring Boot</description>
  16. <properties>
  17. <java.version>1.8</java.version>
  18. </properties>
  19. <dependencies>
  20. <dependency>
  21. <groupId>org.springframework.boot</groupId>
  22. <artifactId>spring-boot-starter-amqp</artifactId>
  23. </dependency>
  24. <dependency>
  25. <groupId>org.springframework.boot</groupId>
  26. <artifactId>spring-boot-starter-test</artifactId>
  27. <scope>test</scope>
  28. </dependency>
  29. <dependency>
  30. <groupId>org.springframework.amqp</groupId>
  31. <artifactId>spring-rabbit-test</artifactId>
  32. <scope>test</scope>
  33. </dependency>
  34. </dependencies>
  35. <build>
  36. <plugins>
  37. <plugin>
  38. <groupId>org.springframework.boot</groupId>
  39. <artifactId>spring-boot-maven-plugin</artifactId>
  40. </plugin>
  41. </plugins>
  42. </build>
  43. </project>

application.yml配置文件

  1. spring:
  2. rabbitmq:
  3. host: wht6.cn
  4. port: 5672
  5. username: admin
  6. password: admin
  7. virtual-host: vh-zl

注意:如果用的是别人的rabbitmq服务的话,则需要添加 virtual-host,并且还需要去服务里面手动添加配置。

客户端

只是创建了普通队列

  1. package cn.tedu.sp09.m6;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.Scanner;
  5. import java.util.UUID;
  6. import java.util.concurrent.ArrayBlockingQueue;
  7. import java.util.concurrent.TimeoutException;
  8. /**
  9. * @Author 作者:小龙猿
  10. * @Project 项目:springcloud1
  11. * @Time 时间:2021/9/18 10:45
  12. */
  13. public class Client {
  14. public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
  15. System.out.println("求第几个菲波拉契数:");
  16. int n = new Scanner(System.in).nextInt();
  17. long r = f(n);
  18. System.out.println("第"+n+"个菲波拉契数:"+r);
  19. }
  20. private static long f(int n) throws IOException, TimeoutException, InterruptedException {
  21. //客户端准备阻塞队列集合
  22. ArrayBlockingQueue<Long> abq = new ArrayBlockingQueue<>(10);
  23. //连接
  24. ConnectionFactory f = new ConnectionFactory();
  25. f.setHost("wht6.cn");
  26. f.setPort(5672);
  27. f.setUsername("admin");
  28. f.setPassword("admin");
  29. Channel c = f.newConnection().createChannel();
  30. //创建调用队列,调用队列和返回队列是消息中间件里面的
  31. c.queueDeclare("rpc-queue", false, false, false, null);
  32. //创建返回队列,队列名随机,存放返回结果
  33. String replayTo = c.queueDeclare().getQueue();
  34. //产生一个关联id,用于指定将结果返回给哪个用户
  35. String cid = UUID.randomUUID().toString();
  36. //发送调用信息,携带两个参数:返回队列名和关联id
  37. AMQP.BasicProperties prop = new AMQP.BasicProperties.Builder().replyTo(replayTo).correlationId(cid).build();
  38. c.basicPublish("", "rpc-queue",prop, (n+"").getBytes());
  39. //执行其它运算
  40. System.out.println("执行其它运算");
  41. //需要结果时,从返回队列接收计算结果
  42. DeliverCallback deliverCallback = (s, delivery) -> {
  43. //消费者线程处理计算结果
  44. // 判断delivery中的关联id是不是刚才发送的关联id
  45. if (cid.equals(delivery.getProperties().getCorrelationId())){
  46. String s1 = new String(delivery.getBody());//获取返回结果
  47. //把结果放入 BlockingQueue
  48. abq.add(Long.valueOf(s1));
  49. }
  50. };
  51. CancelCallback cancelCallback = s -> {
  52. };
  53. c.basicConsume(replayTo, true,deliverCallback,cancelCallback);
  54. //主线程中,从BlockingQueue 获取数据
  55. return abq.take();//返回客户端阻塞队列里面的结果
  56. }
  57. }

服务端

  1. package cn.tedu.sp09.m6;
  2. import com.rabbitmq.client.*;
  3. import java.io.IOException;
  4. import java.util.concurrent.TimeoutException;
  5. /**
  6. * @Author 作者:小龙猿
  7. * @Project 项目:springcloud1
  8. * @Time 时间:2021/9/18 14:04
  9. */
  10. public class Server {
  11. public static void main(String[] args) throws IOException, TimeoutException {
  12. //连接
  13. ConnectionFactory f = new ConnectionFactory();
  14. f.setHost("wht6.cn");
  15. f.setPort(5672);
  16. f.setUsername("admin");
  17. f.setPassword("admin");
  18. Channel c = f.newConnection().createChannel();
  19. //创建调用队列rpc-queue,调用队列和返回队列是消息中间件里面的
  20. c.queueDeclare("rpc-queue", false, false, false, null);
  21. //从rpc-queue 接收调用信息
  22. DeliverCallback deliverCallback = (s,delivery)->{
  23. //求出菲波拉契数
  24. //从delivery取出: n,返回队列名,关联id
  25. Integer n = Integer.valueOf(new String(delivery.getBody()));
  26. String replyTo = delivery.getProperties().getReplyTo();
  27. String cid = delivery.getProperties().getCorrelationId();
  28. System.out.println("求第"+n+"个菲波拉契数");
  29. long r = fbnq(n);
  30. //把结果发回到返回队列,携带关联id参数
  31. AMQP.BasicProperties prop = new AMQP.BasicProperties.Builder()
  32. .correlationId(cid)
  33. .build();
  34. c.basicPublish("", replyTo, prop, (r+"").getBytes());
  35. };
  36. CancelCallback cancelCallback = s -> {};
  37. c.basicConsume("rpc-queue", true,deliverCallback,cancelCallback);
  38. }
  39. public static long fbnq(int n){
  40. if (n == 1 || n == 2){
  41. return 1;
  42. }
  43. long a = 1;
  44. long b = 1;
  45. for (int i = 3; i < n; i++) {
  46. b = a + b;
  47. a = b - a;
  48. }
  49. return b;
  50. }
  51. }

进阶版

入门版都是自己创建队列并携带参数信息,进阶版就把这些操作交给了spring去处理。

主程序

  1. package cn.tedu.rabbitmqspring.m6;
  2. import org.springframework.amqp.core.Queue;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.SpringApplication;
  5. import org.springframework.boot.autoconfigure.SpringBootApplication;
  6. import org.springframework.context.annotation.Bean;
  7. import javax.annotation.PostConstruct;
  8. import java.util.Scanner;
  9. import java.util.UUID;
  10. /**
  11. * @Author 作者:小龙猿
  12. * @Project 项目:rabbitmq-api
  13. * @Time 时间:2021/9/18 14:58
  14. */
  15. @SpringBootApplication
  16. public class Main {
  17. public static void main(String[] args) {
  18. SpringApplication.run(Main.class, args);
  19. }
  20. /*
  21. 这个对象放入spring容器,对象名叫:rndQueue
  22. SPEL --- Spring Expression Language
  23. 直接访问 spring 容器中的对象
  24. #{rndQueue.name}
  25. OGNL --- Object Graph Navigation Language
  26. Struts2 中的一种标记
  27. ${}
  28. */
  29. @Bean
  30. public Queue rndQueue(){//创建随机队列
  31. return new Queue(UUID.randomUUID().toString(),false,true,true);
  32. }
  33. @Autowired
  34. private Client client;
  35. //创建测试方法
  36. @PostConstruct
  37. public void test(){
  38. new Thread(()->{
  39. while (true){
  40. System.out.print("求第几个菲波拉契数:");
  41. int n = new Scanner(System.in).nextInt();
  42. client.send(n);
  43. }
  44. }).start();
  45. }
  46. }

客户端

  1. package cn.tedu.rabbitmqspring.m6;
  2. import org.springframework.amqp.core.AmqpTemplate;
  3. import org.springframework.amqp.core.MessageProperties;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.amqp.support.AmqpHeaders;
  6. import org.springframework.beans.factory.annotation.Autowired;
  7. import org.springframework.beans.factory.annotation.Value;
  8. import org.springframework.messaging.handler.annotation.Header;
  9. import org.springframework.stereotype.Component;
  10. import java.util.UUID;
  11. /**
  12. * @Author 作者:小龙猿
  13. * @Project 项目:rabbitmq-api
  14. * @Time 时间:2021/9/18 14:58
  15. */
  16. @Component
  17. public class Client {
  18. @Autowired
  19. private AmqpTemplate t;
  20. //使用 SPEL 表达式获取随机队列名: "#{rndQueue.name}"
  21. @Value("#{rndQueue.name}")
  22. private String replyTo;
  23. //发送调用数据时, 携带随机队列名和correlationId
  24. public void send(int n){
  25. t.convertAndSend("rpc-queue", n,(message)->{
  26. MessageProperties p = message.getMessageProperties();
  27. p.setReplyTo(replyTo);
  28. p.setCorrelationId(UUID.randomUUID().toString());
  29. return message;
  30. });
  31. }
  32. //从随机队列接收计算结果,从随机队列接收调用结果, 并获取correlationId
  33. @RabbitListener(queues = "#{rndQueue.name}")
  34. public void receive(long r, @Header(name = AmqpHeaders.CORRELATION_ID)String cid){
  35. System.out.println("结果:"+r);
  36. }
  37. }

服务端

  1. package cn.tedu.rabbitmqspring.m6;
  2. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. /**
  6. * @Author 作者:小龙猿
  7. * @Project 项目:rabbitmq-api
  8. * @Time 时间:2021/9/18 15:26
  9. */
  10. @Component
  11. public class Server {
  12. //如果处理消息的方法不是void,有返回值,
  13. //那么spring会把返回值通过返回队列发回到客户端,并携带关联id
  14. @RabbitListener(queues = "rpc-queue")
  15. public long receive(int n){
  16. long r = f(n);
  17. return r;
  18. }
  19. private long f(int n) {
  20. if (n ==1 || n == 2)
  21. return 1;
  22. return f(n-1)+f(n-2);
  23. }
  24. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/728259
推荐阅读
相关标签
  

闽ICP备14008679号