赞
踩
https://www.rabbitmq.com/#getstarted
RabbitMQ需要安装Windows 支持的64位Erlang版本
http://erlang.org/download/otp_win64_23.0.exe
https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.8.4/rabbitmq-server-3.8.4.exe
rabbitmq-plugins enable rabbitmq_management
默认账号guest\guest
Recv.java
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.nio.charset.StandardCharsets;
-
- public class Send {
-
- private final static String QUEUE_NAME = "hello";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- String message = "Hello World!";
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
- System.out.println(" [x] Sent '" + message + "'");
- }
- }
- }

Recv.java
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.DeliverCallback;
-
- public class Recv {
-
- private final static String QUEUE_NAME = "hello";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- };
- channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
- }
- }

- 编译
- javac -cp amqp-client-5.7.1.jar Send.java Recv.java
- 启动Recv
- java -cp .;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar Recv
-
- 启动Send
- java -cp .;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar Send
NewTask.java
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
-
- public class NewTask {
-
- private static final String TASK_QUEUE_NAME = "task_queue";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
-
- String message = String.join(" ", argv);
-
- channel.basicPublish("", TASK_QUEUE_NAME,
- MessageProperties.PERSISTENT_TEXT_PLAIN,
- message.getBytes("UTF-8"));
- System.out.println(" [x] Sent '" + message + "'");
- }
- }
-
- }

Worker.java
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.DeliverCallback;
-
- public class Worker {
-
- private static final String TASK_QUEUE_NAME = "task_queue";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- final Connection connection = factory.newConnection();
- final Channel channel = connection.createChannel();
-
- channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- channel.basicQos(1);
-
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
-
- System.out.println(" [x] Received '" + message + "'");
- try {
- doWork(message);
- } finally {
- System.out.println(" [x] Done");
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- }
- };
- channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });
- }
-
- private static void doWork(String task) {
- for (char ch : task.toCharArray()) {
- if (ch == '.') {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException _ignored) {
- Thread.currentThread().interrupt();
- }
- }
- }
- }
- }
-

set CP=.;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar
javac -cp %CP% NewTask.java Worker.java
- set CP=.;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar
- java -cp %CP% Worker
- set CP=.;amqp-client-5.7.1.jar;slf4j-api-1.7.26.jar;slf4j-simple-1.7.26.jar
- java -cp %CP% NewTask First message.
- java -cp %CP% NewTask Second message..
- java -cp %CP% NewTask Third message...
- java -cp %CP% NewTask Fourth message....
- java -cp %CP% NewTask Fifth message.....
EmitLog
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class EmitLog {
-
- private static final String EXCHANGE_NAME = "logs";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
-
- String message = argv.length < 1 ? "info: Hello World!" :
- String.join(" ", argv);
-
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
- System.out.println(" [x] Sent '" + message + "'");
- }
- }
-
- }
-

ReceiveLogs
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.DeliverCallback;
-
- public class ReceiveLogs {
- private static final String EXCHANGE_NAME = "logs";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- String queueName = channel.queueDeclare().getQueue();
- channel.queueBind(queueName, EXCHANGE_NAME, "");
-
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] Received '" + message + "'");
- };
- channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
- }
- }
-

- javac -cp %CP% EmitLog.java ReceiveLogs.java
- java -cp %CP% ReceiveLogs> logs_from_rabbit.log
- java -cp %CP% ReceiveLogs
- java -cp %CP% EmitLog
EmitLogDirect
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class EmitLogDirect {
-
- private static final String EXCHANGE_NAME = "direct_logs";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
-
- String severity = getSeverity(argv);
- String message = getMessage(argv);
-
- channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes("UTF-8"));
- System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
- }
- }
-
- private static String getSeverity(String[] strings) {
- if (strings.length < 1)
- return "info";
- return strings[0];
- }
-
- private static String getMessage(String[] strings) {
- if (strings.length < 2)
- return "Hello World!";
- return joinStrings(strings, " ", 1);
- }
-
- private static String joinStrings(String[] strings, String delimiter, int startIndex) {
- int length = strings.length;
- if (length == 0) return "";
- if (length <= startIndex) return "";
- StringBuilder words = new StringBuilder(strings[startIndex]);
- for (int i = startIndex + 1; i < length; i++) {
- words.append(delimiter).append(strings[i]);
- }
- return words.toString();
- }
- }
-

ReceiveLogsDirect
- import com.rabbitmq.client.*;
-
- public class ReceiveLogsDirect {
-
- private static final String EXCHANGE_NAME = "direct_logs";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
- String queueName = channel.queueDeclare().getQueue();
-
- if (argv.length < 1) {
- System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
- System.exit(1);
- }
-
- for (String severity : argv) {
- channel.queueBind(queueName, EXCHANGE_NAME, severity);
- }
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
- };
- channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
- });
- }
- }
-

- javac -cp %CP% ReceiveLogsDirect.java EmitLogDirect.java
- java -cp %CP% ReceiveLogsDirect warning error > logs_from_rabbit.log
-
- java -cp %CP% ReceiveLogsDirect info warning error
- java -cp %CP% EmitLogDirect error "Run. Run. Or it will explode."
EmitLogTopic
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- public class EmitLogTopic {
-
- private static final String EXCHANGE_NAME = "topic_logs";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
-
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
-
- String routingKey = getRouting(argv);
- String message = getMessage(argv);
-
- channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
- System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
- }
- }
-
- private static String getRouting(String[] strings) {
- if (strings.length < 1)
- return "anonymous.info";
- return strings[0];
- }
-
- private static String getMessage(String[] strings) {
- if (strings.length < 2)
- return "Hello World!";
- return joinStrings(strings, " ", 1);
- }
-
- private static String joinStrings(String[] strings, String delimiter, int startIndex) {
- int length = strings.length;
- if (length == 0) return "";
- if (length < startIndex) return "";
- StringBuilder words = new StringBuilder(strings[startIndex]);
- for (int i = startIndex + 1; i < length; i++) {
- words.append(delimiter).append(strings[i]);
- }
- return words.toString();
- }
- }
-

ReceiveLogsTopic
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.DeliverCallback;
-
- public class ReceiveLogsTopic {
-
- private static final String EXCHANGE_NAME = "topic_logs";
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
-
- channel.exchangeDeclare(EXCHANGE_NAME, "topic");
- String queueName = channel.queueDeclare().getQueue();
-
- if (argv.length < 1) {
- System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
- System.exit(1);
- }
-
- for (String bindingKey : argv) {
- channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
- }
-
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
-
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- String message = new String(delivery.getBody(), "UTF-8");
- System.out.println(" [x] Received '" + delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
- };
- channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
- }
- }
-

- javac -cp %CP% ReceiveLogsTopic.java EmitLogTopic.java
- java -cp %CP% ReceiveLogsTopic "#"
- java -cp %CP% ReceiveLogsTopic "kern.*"
- java -cp %CP% ReceiveLogsTopic "*.critical"
- java -cp %CP% ReceiveLogsTopic "kern.*" "*.critical"
- java -cp %CP% EmitLogTopic "kern.critical" "A critical kernel error"
RPCClient
- import com.rabbitmq.client.AMQP;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.io.IOException;
- import java.util.UUID;
- import java.util.concurrent.ArrayBlockingQueue;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.TimeoutException;
-
- public class RPCClient implements AutoCloseable {
-
- private Connection connection;
- private Channel channel;
- private String requestQueueName = "rpc_queue";
-
- public RPCClient() throws IOException, TimeoutException {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
-
- connection = factory.newConnection();
- channel = connection.createChannel();
- }
-
- public static void main(String[] argv) {
- try (RPCClient fibonacciRpc = new RPCClient()) {
- for (int i = 0; i < 32; i++) {
- String i_str = Integer.toString(i);
- System.out.println(" [x] Requesting fib(" + i_str + ")");
- String response = fibonacciRpc.call(i_str);
- System.out.println(" [.] Got '" + response + "'");
- }
- } catch (IOException | TimeoutException | InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- public String call(String message) throws IOException, InterruptedException {
- final String corrId = UUID.randomUUID().toString();
-
- String replyQueueName = channel.queueDeclare().getQueue();
- AMQP.BasicProperties props = new AMQP.BasicProperties
- .Builder()
- .correlationId(corrId)
- .replyTo(replyQueueName)
- .build();
-
- channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
-
- final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
-
- String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
- if (delivery.getProperties().getCorrelationId().equals(corrId)) {
- response.offer(new String(delivery.getBody(), "UTF-8"));
- }
- }, consumerTag -> {
- });
-
- String result = response.take();
- channel.basicCancel(ctag);
- return result;
- }
-
- public void close() throws IOException {
- connection.close();
- }
- }
-

RPCServer
- import com.rabbitmq.client.*;
-
- public class RPCServer {
-
- private static final String RPC_QUEUE_NAME = "rpc_queue";
-
- private static int fib(int n) {
- if (n == 0) return 0;
- if (n == 1) return 1;
- return fib(n - 1) + fib(n - 2);
- }
-
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
-
- try (Connection connection = factory.newConnection();
- Channel channel = connection.createChannel()) {
- channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
- channel.queuePurge(RPC_QUEUE_NAME);
-
- channel.basicQos(1);
-
- System.out.println(" [x] Awaiting RPC requests");
-
- Object monitor = new Object();
- DeliverCallback deliverCallback = (consumerTag, delivery) -> {
- AMQP.BasicProperties replyProps = new AMQP.BasicProperties
- .Builder()
- .correlationId(delivery.getProperties().getCorrelationId())
- .build();
-
- String response = "";
-
- try {
- String message = new String(delivery.getBody(), "UTF-8");
- int n = Integer.parseInt(message);
-
- System.out.println(" [.] fib(" + message + ")");
- response += fib(n);
- } catch (RuntimeException e) {
- System.out.println(" [.] " + e.toString());
- } finally {
- channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8"));
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
- // RabbitMq consumer worker thread notifies the RPC server owner thread
- synchronized (monitor) {
- monitor.notify();
- }
- }
- };
-
- channel.basicConsume(RPC_QUEUE_NAME, false, deliverCallback, (consumerTag -> { }));
- // Wait and be prepared to consume the message from RPC client.
- while (true) {
- synchronized (monitor) {
- try {
- monitor.wait();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }
- }
- }

- javac -cp %CP% RPCClient.java RPCServer.java
- java -cp %CP% RPCServer
- java -cp %CP% RPCClient
PublisherConfirms
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.ConfirmCallback;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
-
- import java.time.Duration;
- import java.util.UUID;
- import java.util.concurrent.ConcurrentNavigableMap;
- import java.util.concurrent.ConcurrentSkipListMap;
- import java.util.function.BooleanSupplier;
-
- public class PublisherConfirms {
- static final int MESSAGE_COUNT = 50_000;
-
- static Connection createConnection() throws Exception {
- ConnectionFactory cf = new ConnectionFactory();
- cf.setHost("localhost");
- cf.setUsername("guest");
- cf.setPassword("guest");
- return cf.newConnection();
- }
-
- public static void main(String[] args) throws Exception {
- publishMessagesIndividually();
- publishMessagesInBatch();
- handlePublishConfirmsAsynchronously();
- }
-
- static void publishMessagesIndividually() throws Exception {
- try (Connection connection = createConnection()) {
- Channel ch = connection.createChannel();
- String queue = UUID.randomUUID().toString();
- ch.queueDeclare(queue, false, false, true, null);
- ch.confirmSelect();
- long start = System.nanoTime();
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- String body = String.valueOf(i);
- ch.basicPublish("", queue, null, body.getBytes());
- ch.waitForConfirmsOrDie(5_000);
- }
- long end = System.nanoTime();
- System.out.format("Published %,d messages individually in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
- }
- }
-
- static void publishMessagesInBatch() throws Exception {
- try (Connection connection = createConnection()) {
- Channel ch = connection.createChannel();
- String queue = UUID.randomUUID().toString();
- ch.queueDeclare(queue, false, false, true, null);
- ch.confirmSelect();
- int batchSize = 100;
- int outstandingMessageCount = 0;
- long start = System.nanoTime();
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- String body = String.valueOf(i);
- ch.basicPublish("", queue, null, body.getBytes());
- outstandingMessageCount++;
- if (outstandingMessageCount == batchSize) {
- ch.waitForConfirmsOrDie(5_000);
- outstandingMessageCount = 0;
- }
- }
- if (outstandingMessageCount > 0) {
- ch.waitForConfirmsOrDie(5_000);
- }
- long end = System.nanoTime();
- System.out.format("Published %,d messages in batch in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
- }
- }
-
- static void handlePublishConfirmsAsynchronously() throws Exception {
- try (Connection connection = createConnection()) {
- Channel ch = connection.createChannel();
- String queue = UUID.randomUUID().toString();
- ch.queueDeclare(queue, false, false, true, null);
- ch.confirmSelect();
- ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>();
- ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
- if (multiple) {
- ConcurrentNavigableMap<Long, String> confirmed = outstandingConfirms.headMap(
- sequenceNumber, true
- );
- confirmed.clear();
- } else {
- outstandingConfirms.remove(sequenceNumber);
- }
- };
- ch.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> {
- String body = outstandingConfirms.get(sequenceNumber);
- System.err.format(
- "Message with body %s has been nack-ed. Sequence number: %d, multiple: %b%n",
- body, sequenceNumber, multiple
- );
- cleanOutstandingConfirms.handle(sequenceNumber, multiple);
- });
- long start = System.nanoTime();
- for (int i = 0; i < MESSAGE_COUNT; i++) {
- String body = String.valueOf(i);
- outstandingConfirms.put(ch.getNextPublishSeqNo(), body);
- ch.basicPublish("", queue, null, body.getBytes());
- }
- if (!waitUntil(Duration.ofSeconds(60), () -> outstandingConfirms.isEmpty())) {
- throw new IllegalStateException("All messages could not be confirmed in 60 seconds");
- }
- long end = System.nanoTime();
- System.out.format("Published %,d messages and handled confirms asynchronously in %,d ms%n", MESSAGE_COUNT, Duration.ofNanos(end - start).toMillis());
- }
- }
-
- static boolean waitUntil(Duration timeout, BooleanSupplier condition) throws InterruptedException {
- int waited = 0;
- while (!condition.getAsBoolean() && waited < timeout.toMillis()) {
- Thread.sleep(100L);
- waited = +100;
- }
- return condition.getAsBoolean();
- }
- }

- javac -cp %CP% PublisherConfirms.java
- java -cp %CP% PublisherConfirms
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。