当前位置:   article > 正文

RabbitMQ通信方式之RPC_rabbitmq rpc

rabbitmq rpc

RabbitMQ通信方式之RPC

RPC通信方式结构图

在RabbitMQ中,使用RPC通信方式的应用程序被称为RPC客户端,而提供服务的应用程序被称为RPC服务器。当RPC客户端发送请求时,消息属性要设置上reply_to的回调队列,且将一个唯一的标识符(correlation_id)作为消息的一部分发送给RPC服务器。RPC服务器在处理请求并生成响应时,将使用该唯一标识符将响应发送回RPC客户端。

  1. reply_to:当客户端发送一个RPC请求时,它需要告诉RPC服务器如何将响应消息发送回客户端。reply_to字段用于指定一个用于接收响应的队列名称,RPC服务器将在该队列上发送响应消息。如果客户端没有指定reply_to字段,则RPC服务器将无法发送响应消息。

  1. correlation_id:correlation_id字段用于将RPC请求和响应消息进行匹配。当客户端发送一个RPC请求时,它应该生成一个唯一的correlation_id值,并将其包含在请求消息中。当RPC服务器生成响应消息时,它应该使用相同的correlation_id值将响应消息发送回客户端。客户端将使用correlation_id值来将响应消息与先前发送的请求消息进行匹配。

  1. 添加RabbitMQ和Junit工具栏的maven依赖

  1. <!--rabbitmq-->
  2. <dependency>
  3. <groupId>com.rabbitmq</groupId>
  4. <artifactId>amqp-client</artifactId>
  5. <version>5.12.0</version>
  6. </dependency>
  7. <!--junit-->
  8. <dependency>
  9. <groupId>junit</groupId>
  10. <artifactId>junit</artifactId>
  11. <version>4.12</version>
  12. </dependency>
  1. 创建一个构建连接的工具类

  1. public class RabbitMQConnectionUtil {
  2. public static final String RABBITMQ_HOST = "xxx.xx.xxx.xxx"; //设置你自己的RabbitMQ服务ip
  3. public static final int RABBITMQ_PORT = 5672;
  4. public static final String RABBITMQ_USERNAME = "guest";
  5. public static final String RABBITMQ_PASSWORD = "guest";
  6. public static final String RABBITMQ_VIRTUAL_HOST = "/";
  7. /**
  8. * 构建RabbitMQ的连接对象
  9. */
  10. public static Connection getConnection() throws Exception {
  11. //1. 创建Connection工厂
  12. ConnectionFactory factory = new ConnectionFactory();
  13. //2. 设置RabbitMQ的连接信息
  14. factory.setHost(RABBITMQ_HOST);
  15. factory.setPort(RABBITMQ_PORT);
  16. factory.setUsername(RABBITMQ_USERNAME);
  17. factory.setPassword(RABBITMQ_PASSWORD);
  18. factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);
  19. //3. 返回连接对象
  20. Connection connection = factory.newConnection();
  21. return connection;
  22. }
  1. 创建客户端并启动

  1. public class Producer {
  2. public static final String QUEUE_PRODUCER = "rpc_producer";
  3. public static final String QUEUE_CONSUMER = "rpc_consumer";
  4. @Test
  5. public void producer() throws Exception{
  6. //1. 获取连接对象
  7. Connection connection = RabbitMQConnectionUtil.getConnection();
  8. //2. 构建Channel
  9. Channel channel = connection.createChannel();
  10. //3. 构建队列
  11. channel.queueDeclare(QUEUE_PRODUCER,false,false,false,null);
  12. channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
  13. //4. 发布消息
  14. String message = "Hello RPC!";
  15. String uuid = UUID.randomUUID().toString();
  16. AMQP.BasicProperties props = new AMQP.BasicProperties()
  17. .builder()
  18. .replyTo(QUEUE_CONSUMER)
  19. .correlationId(uuid)
  20. .build();
  21. channel.basicPublish("",QUEUE_PRODUCER,props,message.getBytes());
  22. //5. 监听回调的队列
  23. channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){
  24. @Override
  25. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  26. String id = properties.getCorrelationId();
  27. if(id != null && id.equalsIgnoreCase(uuid)){
  28. System.out.println("接收到服务端的响应: " + new String(body,"UTF-8"));
  29. }
  30. channel.basicAck(envelope.getDeliveryTag(),false);
  31. }
  32. });
  33. System.out.println("消息发送成功!");
  34. System.in.read();
  35. }
  36. }

  1. 创建服务端并启动

  1. public class Consumer {
  2. public static final String QUEUE_PUBLISHER = "rpc_producer";
  3. public static final String QUEUE_CONSUMER = "rpc_consumer";
  4. @Test
  5. public void consume() throws Exception {
  6. //1. 获取连接对象
  7. Connection connection = RabbitMQConnectionUtil.getConnection();
  8. //2. 构建Channel
  9. Channel channel = connection.createChannel();
  10. //3. 构建队列
  11. channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
  12. channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
  13. //4. 监听消息
  14. DefaultConsumer callback = new DefaultConsumer(channel){
  15. @Override
  16. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  17. System.out.println("消费者获取到消息: " + new String(body,"UTF-8"));
  18. String resp = "获取到了client发出的请求,这里是响应的信息";
  19. String respQueueName = properties.getReplyTo();
  20. String uuid = properties.getCorrelationId();
  21. AMQP.BasicProperties props = new AMQP.BasicProperties()
  22. .builder()
  23. .correlationId(uuid)
  24. .build();
  25. channel.basicPublish("",respQueueName,props,resp.getBytes());
  26. channel.basicAck(envelope.getDeliveryTag(),false);
  27. }
  28. };
  29. channel.basicConsume(QUEUE_PUBLISHER,false,callback);
  30. System.out.println("开始监听队列");
  31. System.in.read();
  32. }
  33. }

  1. 再看客户端的控制台,可以看到已接收到服务端返回的内容

虽然RabbitMQ的RPC通信方式可以实现客户端和服务端之间的解耦,实现远程调用,但也会涉及一些缺点:

  1. 性能开销:RPC通信方式需要在客户端和服务端之间进行多次网络通信,而每次网络通信都会带来一定的性能开销,因此在高并发场景下,RPC通信可能会影响系统的性能。

  1. 实现复杂度:RPC通信方式需要客户端和服务端共同定义一套RPC协议,包括消息格式、参数类型和返回值类型等,这需要一定的实现复杂度,尤其是在跨语言和跨平台的场景中。

  1. 依赖可靠的消息代理:RPC通信需要可靠的消息代理来实现消息传递和消息保证,而RabbitMQ等消息代理的可靠性需要进行专门的配置和管理,增加了运维成本。

  1. 安全性:由于RPC通信涉及到网络传输,可能会面临数据被劫持、篡改和窃听等安全风险,因此需要采取相应的安全措施,如加密、认证和授权等。

该模式作为了解即可,实际应用并不广泛。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Li_阴宅/article/detail/921222
推荐阅读
相关标签
  

闽ICP备14008679号