赞
踩
RPC通信方式结构图
在RabbitMQ中,使用RPC通信方式的应用程序被称为RPC客户端,而提供服务的应用程序被称为RPC服务器。当RPC客户端发送请求时,消息属性要设置上reply_to的回调队列,且将一个唯一的标识符(correlation_id)作为消息的一部分发送给RPC服务器。RPC服务器在处理请求并生成响应时,将使用该唯一标识符将响应发送回RPC客户端。
reply_to:当客户端发送一个RPC请求时,它需要告诉RPC服务器如何将响应消息发送回客户端。reply_to字段用于指定一个用于接收响应的队列名称,RPC服务器将在该队列上发送响应消息。如果客户端没有指定reply_to字段,则RPC服务器将无法发送响应消息。
correlation_id:correlation_id字段用于将RPC请求和响应消息进行匹配。当客户端发送一个RPC请求时,它应该生成一个唯一的correlation_id值,并将其包含在请求消息中。当RPC服务器生成响应消息时,它应该使用相同的correlation_id值将响应消息发送回客户端。客户端将使用correlation_id值来将响应消息与先前发送的请求消息进行匹配。
添加RabbitMQ和Junit工具栏的maven依赖
- <!--rabbitmq-->
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.12.0</version>
- </dependency>
- <!--junit-->
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.12</version>
- </dependency>
创建一个构建连接的工具类
- public class RabbitMQConnectionUtil {
- public static final String RABBITMQ_HOST = "xxx.xx.xxx.xxx"; //设置你自己的RabbitMQ服务ip
- public static final int RABBITMQ_PORT = 5672;
- public static final String RABBITMQ_USERNAME = "guest";
- public static final String RABBITMQ_PASSWORD = "guest";
- public static final String RABBITMQ_VIRTUAL_HOST = "/";
- /**
- * 构建RabbitMQ的连接对象
- */
- public static Connection getConnection() throws Exception {
- //1. 创建Connection工厂
- ConnectionFactory factory = new ConnectionFactory();
-
- //2. 设置RabbitMQ的连接信息
- factory.setHost(RABBITMQ_HOST);
- factory.setPort(RABBITMQ_PORT);
- factory.setUsername(RABBITMQ_USERNAME);
- factory.setPassword(RABBITMQ_PASSWORD);
- factory.setVirtualHost(RABBITMQ_VIRTUAL_HOST);
-
- //3. 返回连接对象
- Connection connection = factory.newConnection();
- return connection;
- }
创建客户端并启动
- public class Producer {
-
- public static final String QUEUE_PRODUCER = "rpc_producer";
- public static final String QUEUE_CONSUMER = "rpc_consumer";
-
- @Test
- public void producer() throws Exception{
- //1. 获取连接对象
- Connection connection = RabbitMQConnectionUtil.getConnection();
-
- //2. 构建Channel
- Channel channel = connection.createChannel();
-
- //3. 构建队列
- channel.queueDeclare(QUEUE_PRODUCER,false,false,false,null);
- channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
-
- //4. 发布消息
- String message = "Hello RPC!";
- String uuid = UUID.randomUUID().toString();
-
- AMQP.BasicProperties props = new AMQP.BasicProperties()
- .builder()
- .replyTo(QUEUE_CONSUMER)
- .correlationId(uuid)
- .build();
- channel.basicPublish("",QUEUE_PRODUCER,props,message.getBytes());
- //5. 监听回调的队列
- channel.basicConsume(QUEUE_CONSUMER,false,new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- String id = properties.getCorrelationId();
- if(id != null && id.equalsIgnoreCase(uuid)){
- System.out.println("接收到服务端的响应: " + new String(body,"UTF-8"));
- }
- channel.basicAck(envelope.getDeliveryTag(),false);
- }
- });
- System.out.println("消息发送成功!");
-
- System.in.read();
- }
-
- }
创建服务端并启动
- public class Consumer {
-
- public static final String QUEUE_PUBLISHER = "rpc_producer";
- public static final String QUEUE_CONSUMER = "rpc_consumer";
-
- @Test
- public void consume() throws Exception {
- //1. 获取连接对象
- Connection connection = RabbitMQConnectionUtil.getConnection();
-
- //2. 构建Channel
- Channel channel = connection.createChannel();
-
- //3. 构建队列
- channel.queueDeclare(QUEUE_PUBLISHER,false,false,false,null);
- channel.queueDeclare(QUEUE_CONSUMER,false,false,false,null);
-
-
- //4. 监听消息
- DefaultConsumer callback = new DefaultConsumer(channel){
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- System.out.println("消费者获取到消息: " + new String(body,"UTF-8"));
- String resp = "获取到了client发出的请求,这里是响应的信息";
- String respQueueName = properties.getReplyTo();
- String uuid = properties.getCorrelationId();
- AMQP.BasicProperties props = new AMQP.BasicProperties()
- .builder()
- .correlationId(uuid)
- .build();
- channel.basicPublish("",respQueueName,props,resp.getBytes());
- channel.basicAck(envelope.getDeliveryTag(),false);
- }
- };
- channel.basicConsume(QUEUE_PUBLISHER,false,callback);
- System.out.println("开始监听队列");
-
- System.in.read();
- }
- }
再看客户端的控制台,可以看到已接收到服务端返回的内容
虽然RabbitMQ的RPC通信方式可以实现客户端和服务端之间的解耦,实现远程调用,但也会涉及一些缺点:
性能开销:RPC通信方式需要在客户端和服务端之间进行多次网络通信,而每次网络通信都会带来一定的性能开销,因此在高并发场景下,RPC通信可能会影响系统的性能。
实现复杂度:RPC通信方式需要客户端和服务端共同定义一套RPC协议,包括消息格式、参数类型和返回值类型等,这需要一定的实现复杂度,尤其是在跨语言和跨平台的场景中。
依赖可靠的消息代理:RPC通信需要可靠的消息代理来实现消息传递和消息保证,而RabbitMQ等消息代理的可靠性需要进行专门的配置和管理,增加了运维成本。
安全性:由于RPC通信涉及到网络传输,可能会面临数据被劫持、篡改和窃听等安全风险,因此需要采取相应的安全措施,如加密、认证和授权等。
该模式作为了解即可,实际应用并不广泛。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。