赞
踩
简单模式 HelloWorld。一个生产者、一个消费者,不需要设置交换机使用默认的交换机。
public class Producer { //队列名称 private final static String QUEUE_NAME = "hello"; public static void main(String[] args) { //建立连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置目标主机ip factory.setHost("192.168.47.128"); //设置账号名密码 factory.setUsername("yf"); factory.setPassword("123456"); // 修改端口的设置 // factory.setPort(); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //通道和队列的连接 /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数说明: queue:队列名称 durable:是否持久化 exclusive:是否独占,是否一个消费者监听一个队列 autoDelete:是否自动删除。如果没有消费者consumer,自动删除掉队列 arguments:参数 */ channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 10; i++) { //需要发送的消息 String message = "Hello RabbitMQ!"+i; //通过最基础的发布 /* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 参数说明: exchange:指定交换机,如果使用默认模式,就使用“” routingKey:路由名称 props:配置信息 body:发送的消息(要求字节数组) */ channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } } catch (Exception e) { e.printStackTrace(); } } }
/** * 服务端,接收信息 */ public class Consumer { //指定接收队列名称 private final static String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); //设置目标主机ip factory.setHost("192.168.47.128"); //设置用户密码 factory.setUsername("yf"); factory.setPassword("123456"); //建立连接 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), "UTF-8"); System.out.println(" [x] Received '" + msg + "'"); } }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
public class Producer { //队列名称 private final static String QUEUE_NAME = "work_queues"; public static void main(String[] args) { //建立连接工厂 ConnectionFactory factory = new ConnectionFactory(); //设置目标主机ip factory.setHost("192.168.47.128"); //设置账号名密码 factory.setUsername("yf"); factory.setPassword("123456"); // 修改端口的设置 // factory.setPort(); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { //通道和队列的连接 /* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) 参数说明: queue:队列名称 durable:是否持久化 exclusive:是否独占,是否一个消费者监听一个队列 autoDelete:是否自动删除。如果没有消费者consumer,自动删除掉队列 arguments:参数 */ // channel.basicQos(1);// 如果你的消息还没有确认,那么我同一时间只能给你发送一条消息 channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0; i < 10; i++) { //需要发送的消息 String message = "Hello RabbitMQ!"+i; //通过最基础的发布 /* basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) 参数说明: exchange:指定交换机,如果使用默认模式,就使用“” routingKey:路由名称 props:配置信息 body:发送的消息(要求字节数组) */ channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } } catch (Exception e) { e.printStackTrace(); } } }
/** * 服务端,接收信息 */ public class Consumer1 { //指定接收队列名称 private final static String QUEUE_NAME = "work_queues"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); //设置目标主机ip factory.setHost("192.168.47.128"); //设置用户密码 factory.setUsername("yf"); factory.setPassword("123456"); //建立连接 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.basicQos(1); // 如果你的消息还没有确认,那么我同一时间只能给你发送一条消息 channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = new DeliverCallback() { @Override public void handle(String consumerTag, Delivery message) throws IOException { String msg = new String(message.getBody(), "UTF-8"); System.out.println(" [x] Received '" + msg + "'"); } }; /* 这种写法和上面是一样的 使用的是lambda表达式 DeliverCallback deliverCallback = (consumerTag, message) -> { String msg = new String(message.getBody(), "UTF-8"); System.out.println(" [x] Received '" + msg + "'"); }; */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
该文章还没写完,先发布出来,后面会持续更新!!!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。