当前位置:   article > 正文

Windows下安装使用RabbitMQ_amqp-client 5.14.0 服务端的版本号

amqp-client 5.14.0 服务端的版本号

官方文档

https://www.rabbitmq.com/#getstarted

下载和安装

RabbitMQ需要安装Windows 支持64Erlang版本

http://erlang.org/download/otp_win64_23.0.exe

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.4/rabbitmq-server-3.8.4.exe

配置可视化RabbitMQ管理界面

rabbitmq-plugins enable rabbitmq_management

http://localhost:15672/

默认账号guest\guest

创建用户

Set Permission

你好,世界

Recv.java

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import java.nio.charset.StandardCharsets;
  5. public class Send {
  6. private final static String QUEUE_NAME = "hello";
  7. public static void main(String[] argv) throws Exception {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("localhost");
  10. try (Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel()) {
  12. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  13. String message = "Hello World!";
  14. channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
  15. System.out.println(" [x] Sent '" + message + "'");
  16. }
  17. }
  18. }

Recv.java

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.DeliverCallback;
  5. public class Recv {
  6. private final static String QUEUE_NAME = "hello";
  7. public static void main(String[] argv) throws Exception {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("localhost");
  10. Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel();
  12. channel.queueDeclare(QUEUE_NAME, false, false, false, null);
  13. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  14. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  15. String message = new String(delivery.getBody(), "UTF-8");
  16. System.out.println(" [x] Received '" + message + "'");
  17. };
  18. channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
  19. }
  20. }
  1. 编译
  2. javac -cp amqp-client-5.7.1.jar Send.java Recv.java
  3. 启动Recv
  4. java -cp .;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar Recv
  5. 启动Send
  6. java -cp .;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar Send

工作队列

NewTask.java

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.MessageProperties;
  5. public class NewTask {
  6. private static final String TASK_QUEUE_NAME = "task_queue";
  7. public static void main(String[] argv) throws Exception {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("localhost");
  10. try (Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel()) {
  12. channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
  13. String message = String.join(" ", argv);
  14. channel.basicPublish("", TASK_QUEUE_NAME,
  15. MessageProperties.PERSISTENT_TEXT_PLAIN,
  16. message.getBytes("UTF-8"));
  17. System.out.println(" [x] Sent '" + message + "'");
  18. }
  19. }
  20. }

Worker.java

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.DeliverCallback;
  5. public class Worker {
  6. private static final String TASK_QUEUE_NAME = "task_queue";
  7. public static void main(String[] argv) throws Exception {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("localhost");
  10. final Connection connection = factory.newConnection();
  11. final Channel channel = connection.createChannel();
  12. channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
  13. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  14. channel.basicQos(1);
  15. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  16. String message = new String(delivery.getBody(), "UTF-8");
  17. System.out.println(" [x] Received '" + message + "'");
  18. try {
  19. doWork(message);
  20. } finally {
  21. System.out.println(" [x] Done");
  22. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  23. }
  24. };
  25. channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
  26. }
  27. private static void doWork(String task) {
  28. for (char ch : task.toCharArray()) {
  29. if (ch == '.') {
  30. try {
  31. Thread.sleep(1000);
  32. } catch (InterruptedException _ignored) {
  33. Thread.currentThread().interrupt();
  34. }
  35. }
  36. }
  37. }
  38. }

set CP=.;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar
javac -cp %CP% NewTask.java Worker.java

  1. set CP=.;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar
  2. java -cp %CP% Worker

  1. set CP=.;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar
  2. java -cp %CP% NewTask First message.
  3. java -cp %CP% NewTask Second message..
  4. java -cp %CP% NewTask Third message...
  5. java -cp %CP% NewTask Fourth message....
  6. java -cp %CP% NewTask Fifth message.....

发布/订阅

EmitLog

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. public class EmitLog {
  5. private static final String EXCHANGE_NAME = "logs";
  6. public static void main(String[] argv) throws Exception {
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("localhost");
  9. try (Connection connection = factory.newConnection();
  10. Channel channel = connection.createChannel()) {
  11. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  12. String message = argv.length < 1 ? "info: Hello World!" :
  13. String.join(" ", argv);
  14. channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
  15. System.out.println(" [x] Sent '" + message + "'");
  16. }
  17. }
  18. }

ReceiveLogs

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.DeliverCallback;
  5. public class ReceiveLogs {
  6. private static final String EXCHANGE_NAME = "logs";
  7. public static void main(String[] argv) throws Exception {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("localhost");
  10. Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel();
  12. channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
  13. String queueName = channel.queueDeclare().getQueue();
  14. channel.queueBind(queueName, EXCHANGE_NAME, "");
  15. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  16. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  17. String message = new String(delivery.getBody(), "UTF-8");
  18. System.out.println(" [x] Received '" + message + "'");
  19. };
  20. channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  21. }
  22. }
  1. javac -cp %CP% EmitLog.java ReceiveLogs.java
  2. java -cp %CP% ReceiveLogs> logs_from_rabbit.log
  3. java -cp %CP% ReceiveLogs
  4. java -cp %CP% EmitLog

路由

EmitLogDirect

  1. import com.rabbitmq.client.BuiltinExchangeType;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. public class EmitLogDirect {
  6. private static final String EXCHANGE_NAME = "direct_logs";
  7. public static void main(String[] argv) throws Exception {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("localhost");
  10. try (Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel()) {
  12. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  13. String severity = getSeverity(argv);
  14. String message = getMessage(argv);
  15. channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
  16. System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
  17. }
  18. }
  19. private static String getSeverity(String[] strings) {
  20. if (strings.length < 1)
  21. return "info";
  22. return strings[0];
  23. }
  24. private static String getMessage(String[] strings) {
  25. if (strings.length < 2)
  26. return "Hello World!";
  27. return joinStrings(strings, " ", 1);
  28. }
  29. private static String joinStrings(String[] strings, String delimiter, int startIndex) {
  30. int length = strings.length;
  31. if (length == 0) return "";
  32. if (length <= startIndex) return "";
  33. StringBuilder words = new StringBuilder(strings[startIndex]);
  34. for (int i = startIndex + 1; i < length; i++) {
  35. words.append(delimiter).append(strings[i]);
  36. }
  37. return words.toString();
  38. }
  39. }

ReceiveLogsDirect

  1. import com.rabbitmq.client.*;
  2. public class ReceiveLogsDirect {
  3. private static final String EXCHANGE_NAME = "direct_logs";
  4. public static void main(String[] argv) throws Exception {
  5. ConnectionFactory factory = new ConnectionFactory();
  6. factory.setHost("localhost");
  7. Connection connection = factory.newConnection();
  8. Channel channel = connection.createChannel();
  9. channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
  10. String queueName = channel.queueDeclare().getQueue();
  11. if (argv.length < 1) {
  12. System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
  13. System.exit(1);
  14. }
  15. for (String severity : argv) {
  16. channel.queueBind(queueName, EXCHANGE_NAME, severity);
  17. }
  18. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  19. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  20. String message = new String(delivery.getBody(), "UTF-8");
  21. System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  22. };
  23. channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
  24. });
  25. }
  26. }
  1. javac -cp %CP% ReceiveLogsDirect.java EmitLogDirect.java
  2. java -cp %CP% ReceiveLogsDirect warning error > logs_from_rabbit.log
  3. java -cp %CP% ReceiveLogsDirect info warning error
  4. java -cp %CP% EmitLogDirect error "Run. Run. Or it will explode."

主题

EmitLogTopic

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. public class EmitLogTopic {
  5. private static final String EXCHANGE_NAME = "topic_logs";
  6. public static void main(String[] argv) throws Exception {
  7. ConnectionFactory factory = new ConnectionFactory();
  8. factory.setHost("localhost");
  9. try (Connection connection = factory.newConnection();
  10. Channel channel = connection.createChannel()) {
  11. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  12. String routingKey = getRouting(argv);
  13. String message = getMessage(argv);
  14. channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
  15. System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
  16. }
  17. }
  18. private static String getRouting(String[] strings) {
  19. if (strings.length < 1)
  20. return "anonymous.info";
  21. return strings[0];
  22. }
  23. private static String getMessage(String[] strings) {
  24. if (strings.length < 2)
  25. return "Hello World!";
  26. return joinStrings(strings, " ", 1);
  27. }
  28. private static String joinStrings(String[] strings, String delimiter, int startIndex) {
  29. int length = strings.length;
  30. if (length == 0) return "";
  31. if (length < startIndex) return "";
  32. StringBuilder words = new StringBuilder(strings[startIndex]);
  33. for (int i = startIndex + 1; i < length; i++) {
  34. words.append(delimiter).append(strings[i]);
  35. }
  36. return words.toString();
  37. }
  38. }

ReceiveLogsTopic

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.Connection;
  3. import com.rabbitmq.client.ConnectionFactory;
  4. import com.rabbitmq.client.DeliverCallback;
  5. public class ReceiveLogsTopic {
  6. private static final String EXCHANGE_NAME = "topic_logs";
  7. public static void main(String[] argv) throws Exception {
  8. ConnectionFactory factory = new ConnectionFactory();
  9. factory.setHost("localhost");
  10. Connection connection = factory.newConnection();
  11. Channel channel = connection.createChannel();
  12. channel.exchangeDeclare(EXCHANGE_NAME, "topic");
  13. String queueName = channel.queueDeclare().getQueue();
  14. if (argv.length < 1) {
  15. System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
  16. System.exit(1);
  17. }
  18. for (String bindingKey : argv) {
  19. channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
  20. }
  21. System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
  22. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  23. String message = new String(delivery.getBody(), "UTF-8");
  24. System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
  25. };
  26. channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
  27. }
  28. }
  1. javac -cp %CP% ReceiveLogsTopic.java EmitLogTopic.java
  2. java -cp %CP% ReceiveLogsTopic "#"
  3. java -cp %CP% ReceiveLogsTopic "kern.*"
  4. java -cp %CP% ReceiveLogsTopic "*.critical"
  5. java -cp %CP% ReceiveLogsTopic "kern.*" "*.critical"
  6. java -cp %CP% EmitLogTopic "kern.critical" "A critical kernel error"

RPC

RPCClient

  1. import com.rabbitmq.client.AMQP;
  2. import com.rabbitmq.client.Channel;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.io.IOException;
  6. import java.util.UUID;
  7. import java.util.concurrent.ArrayBlockingQueue;
  8. import java.util.concurrent.BlockingQueue;
  9. import java.util.concurrent.TimeoutException;
  10. public class RPCClient implements AutoCloseable {
  11. private Connection connection;
  12. private Channel channel;
  13. private String requestQueueName = "rpc_queue";
  14. public RPCClient() throws IOException, TimeoutException {
  15. ConnectionFactory factory = new ConnectionFactory();
  16. factory.setHost("localhost");
  17. connection = factory.newConnection();
  18. channel = connection.createChannel();
  19. }
  20. public static void main(String[] argv) {
  21. try (RPCClient fibonacciRpc = new RPCClient()) {
  22. for (int i = 0; i < 32; i++) {
  23. String i_str = Integer.toString(i);
  24. System.out.println(" [x] Requesting fib(" + i_str + ")");
  25. String response = fibonacciRpc.call(i_str);
  26. System.out.println(" [.] Got '" + response + "'");
  27. }
  28. } catch (IOException | TimeoutException | InterruptedException e) {
  29. e.printStackTrace();
  30. }
  31. }
  32. public String call(String message) throws IOException, InterruptedException {
  33. final String corrId = UUID.randomUUID().toString();
  34. String replyQueueName = channel.queueDeclare().getQueue();
  35. AMQP.BasicProperties props = new AMQP.BasicProperties
  36. .Builder()
  37. .correlationId(corrId)
  38. .replyTo(replyQueueName)
  39. .build();
  40. channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
  41. final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
  42. String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
  43. if (delivery.getProperties().getCorrelationId().equals(corrId)) {
  44. response.offer(new String(delivery.getBody(), "UTF-8"));
  45. }
  46. }, consumerTag -> {
  47. });
  48. String result = response.take();
  49. channel.basicCancel(ctag);
  50. return result;
  51. }
  52. public void close() throws IOException {
  53. connection.close();
  54. }
  55. }

RPCServer

  1. import com.rabbitmq.client.*;
  2. public class RPCServer {
  3. private static final String RPC_QUEUE_NAME = "rpc_queue";
  4. private static int fib(int n) {
  5. if (n == 0) return 0;
  6. if (n == 1) return 1;
  7. return fib(n - 1) + fib(n - 2);
  8. }
  9. public static void main(String[] argv) throws Exception {
  10. ConnectionFactory factory = new ConnectionFactory();
  11. factory.setHost("localhost");
  12. try (Connection connection = factory.newConnection();
  13. Channel channel = connection.createChannel()) {
  14. channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
  15. channel.queuePurge(RPC_QUEUE_NAME);
  16. channel.basicQos(1);
  17. System.out.println(" [x] Awaiting RPC requests");
  18. Object monitor = new Object();
  19. DeliverCallback deliverCallback = (consumerTag, delivery) -> {
  20. AMQP.BasicProperties replyProps = new AMQP.BasicProperties
  21. .Builder()
  22. .correlationId(delivery.getProperties().getCorrelationId())
  23. .build();
  24. String response = "";
  25. try {
  26. String message = new String(delivery.getBody(), "UTF-8");
  27. int n = Integer.parseInt(message);
  28. System.out.println(" [.] fib(" + message + ")");
  29. response += fib(n);
  30. } catch (RuntimeException e) {
  31. System.out.println(" [.] " + e.toString());
  32. } finally {
  33. channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
  34. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
  35. // RabbitMq consumer worker thread notifies the RPC server owner thread
  36. synchronized (monitor) {
  37. monitor.notify();
  38. }
  39. }
  40. };
  41. channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
  42. // Wait and be prepared to consume the message from RPC client.
  43. while (true) {
  44. synchronized (monitor) {
  45. try {
  46. monitor.wait();
  47. } catch (InterruptedException e) {
  48. e.printStackTrace();
  49. }
  50. }
  51. }
  52. }
  53. }
  54. }
  1. javac -cp %CP% RPCClient.java RPCServer.java
  2. java -cp %CP% RPCServer
  3. java -cp %CP% RPCClient

发布者确认

PublisherConfirms

  1. import com.rabbitmq.client.Channel;
  2. import com.rabbitmq.client.ConfirmCallback;
  3. import com.rabbitmq.client.Connection;
  4. import com.rabbitmq.client.ConnectionFactory;
  5. import java.time.Duration;
  6. import java.util.UUID;
  7. import java.util.concurrent.ConcurrentNavigableMap;
  8. import java.util.concurrent.ConcurrentSkipListMap;
  9. import java.util.function.BooleanSupplier;
  10. public class PublisherConfirms {
  11. static final int MESSAGE_COUNT = 50_000;
  12. static Connection createConnection() throws Exception {
  13. ConnectionFactory cf = new ConnectionFactory();
  14. cf.setHost("localhost");
  15. cf.setUsername("guest");
  16. cf.setPassword("guest");
  17. return cf.newConnection();
  18. }
  19. public static void main(String[] args) throws Exception {
  20. publishMessagesIndividually();
  21. publishMessagesInBatch();
  22. handlePublishConfirmsAsynchronously();
  23. }
  24. static void publishMessagesIndividually() throws Exception {
  25. try (Connection connection = createConnection()) {
  26. Channel ch = connection.createChannel();
  27. String queue = UUID.randomUUID().toString();
  28. ch.queueDeclare(queue, false, false, true, null);
  29. ch.confirmSelect();
  30. long start = System.nanoTime();
  31. for (int i = 0; i < MESSAGE_COUNT; i++) {
  32. String body = String.valueOf(i);
  33. ch.basicPublish("", queue, null, body.getBytes());
  34. ch.waitForConfirmsOrDie(5_000);
  35. }
  36. long end = System.nanoTime();
  37. System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
  38. }
  39. }
  40. static void publishMessagesInBatch() throws Exception {
  41. try (Connection connection = createConnection()) {
  42. Channel ch = connection.createChannel();
  43. String queue = UUID.randomUUID().toString();
  44. ch.queueDeclare(queue, false, false, true, null);
  45. ch.confirmSelect();
  46. int batchSize = 100;
  47. int outstandingMessageCount = 0;
  48. long start = System.nanoTime();
  49. for (int i = 0; i < MESSAGE_COUNT; i++) {
  50. String body = String.valueOf(i);
  51. ch.basicPublish("", queue, null, body.getBytes());
  52. outstandingMessageCount++;
  53. if (outstandingMessageCount == batchSize) {
  54. ch.waitForConfirmsOrDie(5_000);
  55. outstandingMessageCount = 0;
  56. }
  57. }
  58. if (outstandingMessageCount > 0) {
  59. ch.waitForConfirmsOrDie(5_000);
  60. }
  61. long end = System.nanoTime();
  62. System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
  63. }
  64. }
  65. static void handlePublishConfirmsAsynchronously() throws Exception {
  66. try (Connection connection = createConnection()) {
  67. Channel ch = connection.createChannel();
  68. String queue = UUID.randomUUID().toString();
  69. ch.queueDeclare(queue, false, false, true, null);
  70. ch.confirmSelect();
  71. ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
  72. ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
  73. if (multiple) {
  74. ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
  75. sequenceNumber, true
  76. );
  77. confirmed.clear();
  78. } else {
  79. outstandingConfirms.remove(sequenceNumber);
  80. }
  81. };
  82. ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
  83. String body = outstandingConfirms.get(sequenceNumber);
  84. System.err.format(
  85. "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
  86. body, sequenceNumber, multiple
  87. );
  88. cleanOutstandingConfirms.handle(sequenceNumber, multiple);
  89. });
  90. long start = System.nanoTime();
  91. for (int i = 0; i < MESSAGE_COUNT; i++) {
  92. String body = String.valueOf(i);
  93. outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
  94. ch.basicPublish("", queue, null, body.getBytes());
  95. }
  96. if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
  97. throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
  98. }
  99. long end = System.nanoTime();
  100. System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
  101. }
  102. }
  103. static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {
  104. int waited = 0;
  105. while (!condition.getAsBoolean() && waited < timeout.toMillis()) {
  106. Thread.sleep(100L);
  107. waited = +100;
  108. }
  109. return condition.getAsBoolean();
  110. }
  111. }
  1. javac -cp %CP% PublisherConfirms.java
  2. java -cp %CP% PublisherConfirms

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/976000
推荐阅读
相关标签
  

闽ICP备14008679号