当前位置:   article > 正文

[RabbitMQ]Java客户端:源码概览_rabbitmq java客户端

rabbitmq java客户端

本文简要介绍RabbitMQ提供的Java客户端中最基本的功能性接口/类及相关源码。

Mavan依赖:

  1. <dependency>
  2. <groupId>com.rabbitmq</groupId>
  3. <artifactId>amqp-client</artifactId>
  4. <version>5.13.1</version>
  5. </dependency>

AMQP#

com.rabbitmq.client.AMQP接口将AMQP(Advanced Message Queue Protocol,高级消息队列协议)中的方法和消息属性封装成Java对象,便于以面向对象的思维模式进行编程。

该接口类图简要如下:

AMQP接口中包含许多内部类,大体可以分成三类:

0.1 协议信息#

PROTOCOL内部类,保存了AMQP的协议版本等信息。

  1. public static class PROTOCOL {
  2. public static final int MAJOR = 0;
  3. public static final int MINOR = 9;
  4. public static final int REVISION = 1;
  5. public static final int PORT = 5672;
  6. }

0.2 方法#

包括ConnectionChannelAccessExchangeQueueBasicTxConfirm内部类,分别封装了向Broker发送的不同方法的基本数据格式和内容。

它们都实现了com.rabbitmq.client.impl.Method抽象类(后续会介绍),在发送请求时,通过Method抽象类的toFrame()方法可以转换成Frame(帧),然后com.rabbitmq.client.impl.AMQConnection将其以二进制数据的方式通过TCP协议发送给Broker

它们都提供了各自的Builder,便于实例化方法对象(建造者模式)。例如,最常用的Publish方法类图简要如下:

通过如下代码实例化出Publish对象:

AMQP.Basic.Publish publish = new AMQP.Basic.Publish.Builder().exchange("myExchange").routingKey("myRoutingKey").mandatory(true).build();

在发送给Broker前可以通过如下代码将Publish对象转成帧:

  1. Method method = (Method) publish;
  2. Frame frame = method.toFrame(1);

com.rabbitmq.client.impl.AMQConnection对象管理着与Broker的连接,它通过如下代码将方法发送给Broker

  1. AMQConnection connection = channel.getConnection();
  2. connection.writeFrame(frame);

0.3 消息属性#

BasicProperties内部类,封装了消息的基本属性。

它也提供了Builder,我们在发送消息时可以使用BasicProperties实例携带消息头信息,类图如下:

通过如下代码实例化出BasicProperties对象,并发送消息:

  1. Map<String, Object> headers = new HashMap<>();
  2. headers.put("color", "blue");
  3. AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().headers(headers).expiration("10000").build();
  4. channel.basicPublish("myExchange", "myRoutingKey", true, properties, "hello".getBytes());

BasicProperties对象在发送前最终会被转换成com.rabbitmq.client.impl.AMQContentHeader对象,代表AMQ消息内容的头。

AMQContentHeadertoFrame()方法也可以将其转换成Frame(帧),然后com.rabbitmq.client.impl.AMQConnection将其以二进制数据的方式通过TCP协议发送给Broker

  1. AMQConnection connection = channel.getConnection();
  2. Frame headerFrame = contentHeader.toFrame(channelNumber, body.length);
  3. connection.writeFrame(headerFrame);

ConnectionFactory#

com.rabbitmq.client.ConnectionFactory类是用来创建与RabbitMQ服务器连接(com.rabbitmq.client.Connection)的工厂类。简要类图如下:

ConnectionFactory内部封装了许多属性,用来设置与ConnectionSocket相关的连接信息。

它还提供了一套默认配置:

  1. public static final String DEFAULT_VHOST = "/";
  2. public static final String DEFAULT_HOST = "localhost";
  3. public static final int DEFAULT_AMQP_PORT = AMQP.PROTOCOL.PORT; // 5672
  4. public static final int DEFAULT_AMQP_OVER_SSL_PORT = 5671;
  5. public static final String DEFAULT_PASS = "guest"; // CredentialsProvider.username
  6. public static final String DEFAULT_USER = "guest"; // CredentialsProvider.password
  7. private boolean nio = false;
  8. private boolean automaticRecovery = true;

ConnectionFactory的基本使用如下:

  1. ConnectionFactory connectionFactory = new ConnectionFactory();
  2. Connection connection = connectionFactory.newConnection();//返回RecoveryAwareAMQConnection或AMQConnection对象

底层会创建出java.net.Socketjava.nio.channels.SocketChannel,代表与RabbitMQ服务器的TCP连接:

  1. Socket socket = SocketFactory.getDefault().createSocket();
  2. SocketChannel channel = SocketChannel.open();

SocketSocketChannel会被封装到com.rabbitmq.client.impl.FrameHandler中:

  1. // nio==false,使用Socket(默认值)
  2. SocketFrameHandler frameHandler = new SocketFrameHandler(sock, this.shutdownExecutor);
  3. // nio==true,使用SocketChannel
  4. SocketChannelFrameHandlerState state = new SocketChannelFrameHandlerState(channel, nioLoopContext, nioParams, sslEngine);
  5. SocketChannelFrameHandler frameHandler = new SocketChannelFrameHandler(state);

FrameHandler中提供了readFrame()writeFrame(),分别可以从Socket/SocketChannel中读取或写入数据。

FrameHandler又会被封装到com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectioncom.rabbitmq.client.impl.AMQConnection中:

  1. // automaticRecovery==true,默认值
  2. FrameHandler frameHandler = factory.create(addr, connectionName());
  3. RecoveryAwareAMQConnection conn = createConnection(params, frameHandler, metricsCollector);
  4. // automaticRecovery==false
  5. FrameHandler handler = fhFactory.create(addr, clientProvidedName);
  6. AMQConnection conn = createConnection(params, handler, metricsCollector);

因此,我们可以使用返回的Connection对象与RabbitMQ服务器进行交互。

Connection#

com.rabbitmq.client.Connection接口代表与RabbitMQ服务器的TCP连接,类图简要如下:

Connection主要提供了createChannel()openChannel()方法,用来创建Channel。后者提供了几乎所有与RabbitMQ进行交互的方法,是项目中使用频率最高的一个接口。

Connection的基本使用如下:

Channe channel = connection.createChannel();

Connection的实现类主要包括以下几种,分别代表不同类型的连接:

  • AMQConnection类:代表最基本的与RabbitMQ服务器的连接。内部持有FrameHandler等成员变量,用来与服务器交互。
  • RecoveryAwareAMQConnection接口:代表自动重连的连接,内部没有方法,类似与标志性接口。
  • AutorecoveringConnection类:自动重连Connection的实现类,在非正常断开情况下会自动重连,例如I/O异常。它持有RecoveryAwareAMQConnection对象作为代理,从而间接可以使用FrameHandler对象与服务器进行交互。重连时,内部组件也会按如下顺序自动重连:
    • Exchanges
    • Queues
    • Bindings (both queue and exchange-to-exchange)
    • Consumers
  • RecoveryAwareAMQConnection:它是对AMQConnection的修改,主要用作AutorecoveringConnection的成员变量。它与AMQConnection主要区别在于它内部使用com.rabbitmq.client.impl.recovery.RecoveryAwareChannelN作为Channel

在项目中使用的实现类主要为AMQConnectionAutorecoveringConnection(根据ConnectionFactoryautomaticRecovery成员变量进行选择)。

AMQConnection.createChannel()方法会使用ChannelManager创建出ChannelN类型的通道:

  1. ChannelManager cm = _channelManager;
  2. Channel channel = cm.createChannel(this);
  3. // 底层:
  4. new ChannelN(connection, channelNumber, workService, this.metricsCollector);

AutorecoveringConnection.createChannel()方法会使用RecoveryAwareAMQConnection创建出RecoveryAwareChannelN类型的通道,并使用AutorecoveringChannel包装:

  1. RecoveryAwareChannelN ch = (RecoveryAwareChannelN) delegate.createChannel();
  2. final AutorecoveringChannel channel = new AutorecoveringChannel(this, delegateChannel);

ChannelManagerAMQConnection中一个十分重要的成员变量,它管理着AMQConnection对象所属的所有Channel对象(key为通道编号,取值范围为1~_channelMax):

private final Map<Integer, ChannelN> _channelMap = new HashMap<Integer, ChannelN>();

AMQConnection中有一个十分重要的方法writeFrame(),可以将数据发送给RabbitMQ服务器:

  1. public void writeFrame(Frame f) throws IOException {
  2. _frameHandler.writeFrame(f);
  3. _heartbeatSender.signalActivity();
  4. }
  5. // SocketFrameHandler
  6. public void writeFrame(Frame frame) throws IOException {
  7. synchronized (_outputStream) {
  8. frame.writeTo(_outputStream);
  9. }
  10. }
  11. public void writeTo(DataOutputStream os) throws IOException {
  12. os.writeByte(type);
  13. os.writeShort(channel);
  14. if (accumulator != null) {
  15. os.writeInt(accumulator.size());
  16. accumulator.writeTo(os);
  17. } else {
  18. os.writeInt(payload.length);
  19. os.write(payload);
  20. }
  21. os.write(AMQP.FRAME_END);
  22. }

AMQConnection中有一个十分重要的方法startMainLoop(),可以创建新线程监听RabbitMQ服务器发送来的消息:

  1. public void startMainLoop() {
  2. MainLoop loop = new MainLoop();
  3. final String name = "AMQP Connection " + getHostAddress() + ":" + getPort();
  4. mainLoopThread = Environment.newThread(threadFactory, loop, name);
  5. mainLoopThread.start();
  6. }

其核心在于MainLoop内部类,核心步骤如下:

  1. 调用Frame frame = _frameHandler.readFrame()读取RabbitMQ服务器发送来的消息。
  2. 调用readFrame(frame)处理消息。
  1. private class MainLoop implements Runnable {
  2. @Override
  3. public void run() {
  4. boolean shouldDoFinalShutdown = true;
  5. try {
  6. while (_running) {
  7. Frame frame = _frameHandler.readFrame();
  8. readFrame(frame);
  9. }
  10. } catch (Throwable ex) {
  11. if (ex instanceof InterruptedException) {
  12. shouldDoFinalShutdown = false;
  13. } else {
  14. handleFailure(ex);
  15. }
  16. } finally {
  17. if (shouldDoFinalShutdown) {
  18. doFinalShutdown();
  19. }
  20. }
  21. }
  22. }

readFrame(frame)方法会从ChannelManager成员变量中获取该消息对应的通道,然后调用channel.handleFrame(frame)方法进行业务处理(最终会调用channel.processAsync(Command command)方法):

  1. private void readFrame(Frame frame) throws IOException {
  2. if (frame != null) {
  3. _missedHeartbeats = 0;
  4. if (frame.type == AMQP.FRAME_HEARTBEAT) {
  5. } else {
  6. if (frame.channel == 0) {
  7. _channel0.handleFrame(frame);
  8. } else {
  9. if (isOpen()) {
  10. ChannelManager cm = _channelManager;
  11. if (cm != null) {
  12. ChannelN channel;
  13. try {
  14. channel = cm.getChannel(frame.channel);
  15. } catch(UnknownChannelException e) {
  16. LOGGER.info("Received a frame on an unknown channel, ignoring it");
  17. return;
  18. }
  19. channel.handleFrame(frame); // 业务处理
  20. }
  21. }
  22. }
  23. }
  24. } else {
  25. handleSocketTimeout();
  26. }
  27. }

Channel#

com.rabbitmq.client.Channel中封装了与RabbitMQ服务器交互的API,简要类图如下:

Channel的基本使用方式如下:

  1. // 声明交换机
  2. channel.exchangeDeclare("myExchange", BuiltinExchangeType.DIRECT);
  3. // 声明队列
  4. channel.queueDeclare("myQueue", true, false, false, null);
  5. // 声明绑定
  6. channel.exchangeBind("myQueue", "myExchange", "myRoutingKey");
  7. // 发送消息
  8. channel.basicPublish("myExchange", "myRoutingKey", true, null, "hello".getBytes());
  9. // 订阅消息
  10. channel.basicConsume("myQueue", new DefaultConsumer(channel) {
  11. @Override
  12. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  13. System.out.println(new String(body));
  14. }
  15. });
  16. // 拉取消息
  17. channel.basicGet("myQueue", true);

Channel是实现类包括以下几种:

  • ChannelNAMQP协议功能API的主要实现类。
  • RecoveryAwareChannelN:重写了basicAck()basicReject()basicNack()方法,对ChannelN功能进行扩展,实时跟踪delivery tag,对最新的tag进行响应。
  • AutorecoveringChannel:在connection重连时会自动恢复的通道,内部通过持有RecoveryAwareChannelN代理对象来实现具体操作。

3.1 ChannelN#

com.rabbitmq.client.impl.ChannelN是对AMQP协议功能性API的主要实现类,它除了实现Channel中定义的AMQP协议功能性API,还继承了AMQChannel抽象类,通过其_connection成员变量可以在底层调用到SocketSocketChannel向RabbitMQ服务器进行读写操作。

除此之外,为了实现AMQP协议的特定功能,如消息确认机制。ChannelN内部封装了如下成员变量:

  • _consumers:消息消费者,以consumerTag作为key,用于监听消息。
  • returnListeners:监听RabbitMQ服务器找不到对应交换机时的返回消息(basicPublish方法发送消息时设置mandatoryimmediate)。
  • confirmListeners:监听RabbitMQ服务器的确认消息(acknack)。
  • defaultConsumer:默认的消息消费者。
  • dispatcher:启动线程执行_consumers中的任务。

ChannelN中监听消息的核心源码如下:

  1. public boolean processAsync(Command command) throws IOException {
  2. Method method = command.getMethod();
  3. if (method instanceof Channel.Close) {
  4. asyncShutdown(command);
  5. return true;
  6. }
  7. if (isOpen()) {
  8. // 根据不同方法类型调用对应的处理方法
  9. if (method instanceof Basic.Deliver) {
  10. processDelivery(command, (Basic.Deliver) method);
  11. return true;
  12. } else if (method instanceof Basic.Return) {
  13. callReturnListeners(command, (Basic.Return) method);
  14. return true;
  15. } else if (method instanceof Channel.Flow) {
  16. Channel.Flow channelFlow = (Channel.Flow) method;
  17. synchronized (_channelMutex) {
  18. _blockContent = !channelFlow.getActive();
  19. transmit(new Channel.FlowOk(!_blockContent));
  20. _channelMutex.notifyAll();
  21. }
  22. return true;
  23. } else if (method instanceof Basic.Ack) {
  24. Basic.Ack ack = (Basic.Ack) method;
  25. callConfirmListeners(command, ack);
  26. handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false);
  27. return true;
  28. } else if (method instanceof Basic.Nack) {
  29. Basic.Nack nack = (Basic.Nack) method;
  30. callConfirmListeners(command, nack);
  31. handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true);
  32. return true;
  33. } else if (method instanceof Basic.RecoverOk) {
  34. for (Map.Entry<String, Consumer> entry : Utility.copy(_consumers).entrySet()) {
  35. this.dispatcher.handleRecoverOk(entry.getValue(), entry.getKey());
  36. }
  37. return false;
  38. } else if (method instanceof Basic.Cancel) {
  39. Basic.Cancel m = (Basic.Cancel)method;
  40. String consumerTag = m.getConsumerTag();
  41. Consumer callback = _consumers.remove(consumerTag);
  42. if (callback == null) {
  43. callback = defaultConsumer;
  44. }
  45. if (callback != null) {
  46. try {
  47. this.dispatcher.handleCancel(callback, consumerTag);
  48. } catch (WorkPoolFullException e) {
  49. throw e;
  50. } catch (Throwable ex) {
  51. getConnection().getExceptionHandler().handleConsumerException(this,
  52. ex,
  53. callback,
  54. consumerTag,
  55. "handleCancel");
  56. }
  57. } else {
  58. LOGGER.warn("Could not cancel consumer with unknown tag {}", consumerTag);
  59. }
  60. return true;
  61. } else {
  62. return false;
  63. }
  64. } else {
  65. if (method instanceof Channel.CloseOk) {
  66. return false;
  67. } else {
  68. return true;
  69. }
  70. }
  71. }

可见,该方法类似于SpringMVC中的DispatcherServlet,它会根据监听到Command对象的方法类型进行分发处理。接下来介绍的各成员变量方法调用的入口都在这个方法中。

3.1.1 ConsumerDispatcher#

com.rabbitmq.client.impl.ConsumerDispatcher的作用是从线程池中获取空闲线程处理消息。它的主要作用是开启线程,而实际处理消息的业务逻辑在具体Consumer代理对象中处理。

例如,在处理生产者发布的消息时,ConsumerDispatcher会进行如下处理:

  1. public void handleDelivery(final Consumer delegate,
  2. final String consumerTag,
  3. final Envelope envelope,
  4. final AMQP.BasicProperties properties,
  5. final byte[] body) throws IOException {
  6. executeUnlessShuttingDown(
  7. new Runnable() {
  8. @Override
  9. public void run() {
  10. try {
  11. delegate.handleDelivery(consumerTag,
  12. envelope,
  13. properties,
  14. body);
  15. } catch (Throwable ex) {
  16. connection.getExceptionHandler().handleConsumerException(
  17. channel,
  18. ex,
  19. delegate,
  20. consumerTag,
  21. "handleDelivery");
  22. }
  23. }
  24. });
  25. }

3.1.2 Consumer#

com.rabbitmq.client.Consumer接口中定义了不同消息的处理方法,实例对象则表示消息消费者。

com.rabbitmq.client.DefaultConsumer是默认实现类,它实现了接口中的所有方法(空方法)。我们可以采取匿名内部类的方式,实现具体某个需要的方法,而不是实现所有方法。

我们可以使用如下代码添加消费者:

  1. channel.basicConsume("myQueue", new DefaultConsumer(channel) {
  2. @Override
  3. public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
  4. System.out.println(new String(body));
  5. }
  6. });

ChannelN中调用消费者处理消息方法(handleDelivery())的源码如下:

  1. protected void processDelivery(Command command, Basic.Deliver method) {
  2. Basic.Deliver m = method;
  3. Consumer callback = _consumers.get(m.getConsumerTag());
  4. if (callback == null) {
  5. if (defaultConsumer == null) {
  6. throw new IllegalStateException("Unsolicited delivery -" +
  7. " see Channel.setDefaultConsumer to handle this" +
  8. " case.");
  9. }
  10. else {
  11. callback = defaultConsumer;
  12. }
  13. }
  14. Envelope envelope = new Envelope(m.getDeliveryTag(),
  15. m.getRedelivered(),
  16. m.getExchange(),
  17. m.getRoutingKey());
  18. try {
  19. metricsCollector.consumedMessage(this, m.getDeliveryTag(), m.getConsumerTag());
  20. this.dispatcher.handleDelivery(callback,
  21. m.getConsumerTag(),
  22. envelope,
  23. (BasicProperties) command.getContentHeader(),
  24. command.getContentBody());
  25. } catch (WorkPoolFullException e) {
  26. throw e;
  27. } catch (Throwable ex) {
  28. getConnection().getExceptionHandler().handleConsumerException(this,
  29. ex,
  30. callback,
  31. m.getConsumerTag(),
  32. "handleDelivery");
  33. }
  34. }

需要注意的是,在调用this.dispatcher.handleDelivery()之前,会首先调用Consumer callback = _consumers.get(m.getConsumerTag())根据consumerTag获取对应的消费者。因此,消费者处理消息是一对一的。

消费者其他方法的调用也可以在ChannelN.processAsync()中找到。

3.1.3 ReturnListener#

com.rabbitmq.client.ReturnListener接口中定义了监听返回消息的通用方法handleReturn(),主要用于消息发布者监听返回消息。

消息发布者通过basicPublish方法发送消息时设置mandatoryimmediate,但RabbitMQ服务器找不到对应交换机时会返回消息。消息发布者通过往Channel对象中添加ReturnListener实现类,即可监听到返回消息:

  1. channel.addReturnListener(new ReturnListener() {
  2. @Override
  3. public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
  4. System.out.println("return message: " + new String(body));
  5. }
  6. });

ChannelN中处理返回消息的源码如下:

  1. private void callReturnListeners(Command command, Basic.Return basicReturn) {
  2. try {
  3. for (ReturnListener l : this.returnListeners) {
  4. l.handleReturn(basicReturn.getReplyCode(),
  5. basicReturn.getReplyText(),
  6. basicReturn.getExchange(),
  7. basicReturn.getRoutingKey(),
  8. (BasicProperties) command.getContentHeader(),
  9. command.getContentBody());
  10. }
  11. } catch (Throwable ex) {
  12. getConnection().getExceptionHandler().handleReturnListenerException(this, ex);
  13. } finally {
  14. metricsCollector.basicPublishUnrouted(this);
  15. }
  16. }

ReturnListener是针对ChannelN级别的。接收到返回消息后,所有添加到ChannelN对象的ReturnListener监听器都会被调用。

3.1.4 ConfirmListener#

com.rabbitmq.client.ConfirmListener接口中定义的监听RabbitMQ服务器确认消息(acknack)的回调方法,主要用于消息发布者使用。

基本使用代码如下:

  1. channel.addConfirmListener(new ConfirmListener() {
  2. @Override
  3. public void handleAck(long deliveryTag, boolean multiple) throws IOException {
  4. // 业务处理
  5. }
  6. @Override
  7. public void handleNack(long deliveryTag, boolean multiple) throws IOException {
  8. // 业务处理
  9. }
  10. });

ChannelN中处理返回消息的源码如下:

  1. private void callConfirmListeners(@SuppressWarnings("unused") Command command, Basic.Ack ack) {
  2. try {
  3. for (ConfirmListener l : this.confirmListeners) {
  4. l.handleAck(ack.getDeliveryTag(), ack.getMultiple());
  5. }
  6. } catch (Throwable ex) {
  7. getConnection().getExceptionHandler().handleConfirmListenerException(this, ex);
  8. } finally {
  9. metricsCollector.basicPublishAck(this, ack.getDeliveryTag(), ack.getMultiple());
  10. }
  11. }
  12. private void callConfirmListeners(@SuppressWarnings("unused") Command command, Basic.Nack nack) {
  13. try {
  14. for (ConfirmListener l : this.confirmListeners) {
  15. l.handleNack(nack.getDeliveryTag(), nack.getMultiple());
  16. }
  17. } catch (Throwable ex) {
  18. getConnection().getExceptionHandler().handleConfirmListenerException(this, ex);
  19. } finally {
  20. metricsCollector.basicPublishNack(this, nack.getDeliveryTag(), nack.getMultiple());
  21. }
  22. }

ConfirmListener是针对ChannelN级别的。接收到确认消息后,所有添加到ChannelN对象的ConfirmListener监听器都会被调用。

3.1.5 basicPublish()#

前几小节讲述的都是RabbitMQ客户端监听从服务器响应的消息,本小节简要分析客户端发送消息的流程。

发送消息的基本方式如下:

channel.basicPublish("myExchange", "myRoutingKey", null, "hello".getBytes());
  1. ChannelN中的basicPublish()方法中执行如下代码,核心步骤如下:
    1. 将形参转换成AMQCommand对象中的CommandAssembler成员变量:exchangeroutingKeyBasic.Publish方法对象(Method),propertiesAMQContentHeader对象,bodyList<byte[]>对象。
    2. 调用transmit(command)方法,发送命令。
  1. public void basicPublish(String exchange, String routingKey,
  2. boolean mandatory, boolean immediate,
  3. BasicProperties props, byte[] body)
  4. throws IOException
  5. {
  6. if (nextPublishSeqNo > 0) {
  7. unconfirmedSet.add(getNextPublishSeqNo());
  8. nextPublishSeqNo++;
  9. }
  10. if (props == null) {
  11. props = MessageProperties.MINIMAL_BASIC;
  12. }
  13. AMQCommand command = new AMQCommand(
  14. new Basic.Publish.Builder()
  15. .exchange(exchange)
  16. .routingKey(routingKey)
  17. .mandatory(mandatory)
  18. .immediate(immediate)
  19. .build(), props, body);
  20. try {
  21. transmit(command);
  22. } catch (IOException e) {
  23. metricsCollector.basicPublishFailure(this, e);
  24. throw e;
  25. }
  26. metricsCollector.basicPublish(this);
  27. }
  1. ChannelN中执行transmit()quiescingTransmit()方法,最终会调用AMQCommand.transmit()方法:
  1. public void transmit(AMQCommand c) throws IOException {
  2. synchronized (_channelMutex) {
  3. ensureIsOpen();
  4. quiescingTransmit(c);
  5. }
  6. }
  7. public void quiescingTransmit(AMQCommand c) throws IOException {
  8. synchronized (_channelMutex) {
  9. if (c.getMethod().hasContent()) {
  10. while (_blockContent) {
  11. try {
  12. _channelMutex.wait();
  13. } catch (InterruptedException ignored) {
  14. Thread.currentThread().interrupt();
  15. }
  16. ensureIsOpen();
  17. }
  18. }
  19. this._trafficListener.write(c);
  20. c.transmit(this);
  21. }
  22. }
  1. AMQCommand中执行transmit()方法,核心步骤如下:
    1. 获取AMQConnection对象。
    2. 分别将AMQContentHeaderMethodList<byte[]>对象转换成Frame对象。
    3. 通过AMQConnection对象发送数据。
  1. public void transmit(AMQChannel channel) throws IOException {
  2. int channelNumber = channel.getChannelNumber();
  3. AMQConnection connection = channel.getConnection();
  4. synchronized (assembler) {
  5. Method m = this.assembler.getMethod();
  6. if (m.hasContent()) {
  7. byte[] body = this.assembler.getContentBody();
  8. Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
  9. int frameMax = connection.getFrameMax();
  10. boolean cappedFrameMax = frameMax > 0;
  11. int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
  12. if (cappedFrameMax && headerFrame.size() > frameMax) {
  13. String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
  14. throw new IllegalArgumentException(msg);
  15. }
  16. connection.writeFrame(m.toFrame(channelNumber));
  17. connection.writeFrame(headerFrame);
  18. for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
  19. int remaining = body.length - offset;
  20. int fragmentLength = (remaining < bodyPayloadMax) ? remaining
  21. : bodyPayloadMax;
  22. Frame frame = Frame.fromBodyFragment(channelNumber, body,
  23. offset, fragmentLength);
  24. connection.writeFrame(frame);
  25. }
  26. } else {
  27. connection.writeFrame(m.toFrame(channelNumber));
  28. }
  29. }
  30. connection.flush();
  31. }

3.1.6 basicGet()#

除了添加Comsumer监听器,我们还可以主动调用basicGet()向RabbitMQ服务器“拉取”消息。

basicGet()方法本质上是向RabbitMQ服务器发送一个Basic.Get请求,然后等待响应。

basicGet()方法的基本使用如下:

  1. // 自动确认模式
  2. GetResponse message = channel.basicGet("myQueue", true);
  3. System.out.println(new String(message.getBody()));
  4. // 手动确认模式
  5. GetResponse myQueue = channel.basicGet("myQueue", false);
  6. System.out.println(new String(message.getBody()));
  7. channel.basicAck(myQueue.getEnvelope().getDeliveryTag(), false);
  1. ChannelN中的basicGet()方法中执行如下代码,核心步骤如下:

    1. 将形参转换成AMQCommand对象中的CommandAssembler成员变量:queueBasic.Get方法对象(Method),propertiesAMQContentHeader对象,bodyList<byte[]>对象。
    2. 调用exnWrappingRpc(command)方法,发送命令。
    3. 等待响应replyCommand,并封装成GetResponse对象返回,
  1. public GetResponse basicGet(String queue, boolean autoAck) throws IOException{
  2. validateQueueNameLength(queue);
  3. AMQCommand replyCommand = exnWrappingRpc(new Basic.Get.Builder()
  4. .queue(queue)
  5. .noAck(autoAck)
  6. .build());
  7. Method method = replyCommand.getMethod();
  8. if (method instanceof Basic.GetOk) {
  9. Basic.GetOk getOk = (Basic.GetOk)method;
  10. Envelope envelope = new Envelope(getOk.getDeliveryTag(),
  11. getOk.getRedelivered(),
  12. getOk.getExchange(),
  13. getOk.getRoutingKey());
  14. BasicProperties props = (BasicProperties)replyCommand.getContentHeader();
  15. byte[] body = replyCommand.getContentBody();
  16. int messageCount = getOk.getMessageCount();
  17. metricsCollector.consumedMessage(this, getOk.getDeliveryTag(), autoAck);
  18. return new GetResponse(envelope, props, body, messageCount);
  19. } else if (method instanceof Basic.GetEmpty) {
  20. return null;
  21. } else {
  22. throw new UnexpectedMethodError(method);
  23. }
  24. }
  25. public AMQCommand exnWrappingRpc(Method m) throws IOException {
  26. try {
  27. return privateRpc(m);
  28. } catch (AlreadyClosedException ace) {
  29. throw ace;
  30. } catch (ShutdownSignalException ex) {
  31. throw wrap(ex);
  32. }
  33. }
  1. ChannelN中的privateRpc()方法中执行如下代码,核心步骤如下:

    1. 实例化SimpleBlockingRpcContinuation对象,用于获取响应。
    2. 调用rpc(m, k)方法,发送Basic.Get请求。
    3. 调用k.getReply()方法,等待响应并返回。
  1. private AMQCommand privateRpc(Method m) throws IOException, ShutdownSignalException{
  2. SimpleBlockingRpcContinuation k = new SimpleBlockingRpcContinuation(m);
  3. rpc(m, k); // 发送请求
  4. if(_rpcTimeout == NO_RPC_TIMEOUT) {
  5. return k.getReply(); // 等待响应
  6. } else {
  7. try {
  8. return k.getReply(_rpcTimeout);
  9. } catch (TimeoutException e) {
  10. throw wrapTimeoutException(m, e);
  11. }
  12. }
  13. }
  14. public void rpc(Method m, RpcContinuation k) throws IOException {
  15. synchronized (_channelMutex) {
  16. ensureIsOpen();
  17. quiescingRpc(m, k);
  18. }
  19. }
  20. public void quiescingRpc(Method m, RpcContinuation k) throws IOException {
  21. synchronized (_channelMutex) {
  22. enqueueRpc(k);
  23. quiescingTransmit(m);
  24. }
  25. }
  26. public void enqueueRpc(RpcContinuation k) {
  27. doEnqueueRpc(() -> new RpcContinuationRpcWrapper(k));
  28. }
  29. public void quiescingTransmit(Method m) throws IOException {
  30. synchronized (_channelMutex) {
  31. quiescingTransmit(new AMQCommand(m));
  32. }
  33. }
  34. public void quiescingTransmit(AMQCommand c) throws IOException {
  35. synchronized (_channelMutex) {
  36. if (c.getMethod().hasContent()) {
  37. while (_blockContent) {
  38. try {
  39. _channelMutex.wait();
  40. } catch (InterruptedException ignored) {
  41. Thread.currentThread().interrupt();
  42. }
  43. ensureIsOpen();
  44. }
  45. }
  46. this._trafficListener.write(c);
  47. c.transmit(this);
  48. }
  49. }
  1. 最终,在AMQCommand中执行transmit()方法,核心步骤如下:

    1. 获取AMQConnection对象。
    2. 分别将AMQContentHeaderMethodList<byte[]>对象转换成Frame对象。
    3. 通过AMQConnection对象发送数据。
  1. public void transmit(AMQChannel channel) throws IOException {
  2. int channelNumber = channel.getChannelNumber();
  3. AMQConnection connection = channel.getConnection();
  4. synchronized (assembler) {
  5. Method m = this.assembler.getMethod();
  6. if (m.hasContent()) {
  7. byte[] body = this.assembler.getContentBody();
  8. Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
  9. int frameMax = connection.getFrameMax();
  10. boolean cappedFrameMax = frameMax > 0;
  11. int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
  12. if (cappedFrameMax && headerFrame.size() > frameMax) {
  13. String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
  14. throw new IllegalArgumentException(msg);
  15. }
  16. connection.writeFrame(m.toFrame(channelNumber));
  17. connection.writeFrame(headerFrame);
  18. for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
  19. int remaining = body.length - offset;
  20. int fragmentLength = (remaining < bodyPayloadMax) ? remaining
  21. : bodyPayloadMax;
  22. Frame frame = Frame.fromBodyFragment(channelNumber, body,
  23. offset, fragmentLength);
  24. connection.writeFrame(frame);
  25. }
  26. } else {
  27. connection.writeFrame(m.toFrame(channelNumber));
  28. }
  29. }
  30. connection.flush();
  31. }

AMQCommand#

com.rabbitmq.client.impl.AMQCommand类实现了com.rabbitmq.client.Command接口,其成员变量CommandAssembler对象是AMQP规范中methodheaderbody的容器。

AMQCommand中提供了一个十分重要的方法:transmit(AMQChannel)。调用该方法能够将methodheaderbody通过Connection发送给RabbitMQ服务器。该方法在前几个小节都有介绍,核心步骤如下:

  1. 获取AMQConnection对象。
  2. 分别将AMQContentHeaderMethodList<byte[]>对象转换成Frame对象。
  3. 通过AMQConnection对象发送数据。
  1. public void transmit(AMQChannel channel) throws IOException {
  2. int channelNumber = channel.getChannelNumber();
  3. AMQConnection connection = channel.getConnection();
  4. synchronized (assembler) {
  5. Method m = this.assembler.getMethod();
  6. if (m.hasContent()) {
  7. byte[] body = this.assembler.getContentBody();
  8. Frame headerFrame = this.assembler.getContentHeader().toFrame(channelNumber, body.length);
  9. int frameMax = connection.getFrameMax();
  10. boolean cappedFrameMax = frameMax > 0;
  11. int bodyPayloadMax = cappedFrameMax ? frameMax - EMPTY_FRAME_SIZE : body.length;
  12. if (cappedFrameMax && headerFrame.size() > frameMax) {
  13. String msg = String.format("Content headers exceeded max frame size: %d > %d", headerFrame.size(), frameMax);
  14. throw new IllegalArgumentException(msg);
  15. }
  16. connection.writeFrame(m.toFrame(channelNumber));
  17. connection.writeFrame(headerFrame);
  18. for (int offset = 0; offset < body.length; offset += bodyPayloadMax) {
  19. int remaining = body.length - offset;
  20. int fragmentLength = (remaining < bodyPayloadMax) ? remaining
  21. : bodyPayloadMax;
  22. Frame frame = Frame.fromBodyFragment(channelNumber, body,
  23. offset, fragmentLength);
  24. connection.writeFrame(frame);
  25. }
  26. } else {
  27. connection.writeFrame(m.toFrame(channelNumber));
  28. }
  29. }
  30. connection.flush();
  31. }

4.1 CommandAssembler#

com.rabbitmq.client.impl.CommandAssembler类中封装了AMQP规范中methodheaderbody

4.1.1 Method#

com.rabbitmq.client.impl.Method抽象类代表AMQP规范中method,我们平常所使用的com.rabbitmq.client.AMQP接口中的ConnectionChannelAccessExchangeQueueBasicTxConfirm等内部类都实现了该抽象类。

我们调用channel.basicPublish()等方法向RabbitMQ服务器发送消息,或者从通过注册Consumer监听RabbitMQ服务器的消息时,都会将method数据段转换成Method实现类进行处理。

Method.toFrame()方法则能将自己转换成Frame对象,进行发送。

4.1.2 AMQContentHeader#

com.rabbitmq.client.impl.AMQContentHeader抽象类代表AMQP规范中headerchannel.basicPublish()方法形参BasicProperties实现了该抽象类,我们可以通过该对象为消息设置属性。

我们调用channel.basicPublish()等方法向RabbitMQ服务器发送消息,或者从通过注册Consumer监听RabbitMQ服务器的消息时,都会将method数据段转换成AMQContentHeader实现类进行处理。

AMQContentHeader.toFrame()方法则能将自己转换成Frame对象,进行发送。

Frame#

com.rabbitmq.client.impl.Frame代表AMQP wire-protocol frame(帧),主要包含以下成员变量:

  • type:帧类型。
  • channel:所属通道。
  • payload:输入载荷。
  • accumulator:输出载荷。

Frame还提供了静态方法readFrom(),可以从输入流中读取到Frame对象,主要提供给FrameHandler.readFrame()方法调用:

  1. public static Frame readFrom(DataInputStream is) throws IOException {
  2. int type;
  3. int channel;
  4. try {
  5. type = is.readUnsignedByte();
  6. } catch (SocketTimeoutException ste) {
  7. return null; // failed
  8. }
  9. if (type == 'A') {
  10. protocolVersionMismatch(is);
  11. }
  12. channel = is.readUnsignedShort();
  13. int payloadSize = is.readInt();
  14. byte[] payload = new byte[payloadSize];
  15. is.readFully(payload);
  16. int frameEndMarker = is.readUnsignedByte();
  17. if (frameEndMarker != AMQP.FRAME_END) {
  18. throw new MalformedFrameException("Bad frame end marker: " + frameEndMarker);
  19. }
  20. return new Frame(type, channel, payload);
  21. }

 分类: RabbitMQ

 标签: RabbitMQ

来源:[RabbitMQ]Java客户端:源码概览 - Xianhuii - 博客园

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/384391
推荐阅读
相关标签
  

闽ICP备14008679号