赞
踩
- [root@localhost ~]# uname -r
- 3.10.0-1160.el7.x86_64
[root@localhost ~]# yum -y update
[root@localhost ~]# yum remove docker docker-common docker-selinux docker-engine
[root@localhost ~]# yum install -y yum-utils device-mapper-persistent-data lvm2
- [root@localhost ~]# yum-config-manager --add-repo http://download.docker.com/linux/centos/docker-ce.repo(中央仓库)
- [root@localhost ~]# yum-config-manager --add-repo http://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo(阿里仓库)
[root@localhost ~]# yum list docker-ce --showduplicates | sort -r
[root@localhost ~]# yum -y install docker-ce-18.03.1.ce
- [root@localhost ~]# systemctl start docker
- [root@localhost ~]# systemctl enable docker 设置开机自启
[root@localhost ~]# docker version
[root@localhost ~]# docker search rabbitmq:management
[root@localhost ~]# docker pull macintoshplus/rabbitmq-management
[root@localhost ~]# docker images
- [root@localhost ~]# systemctl start docker
- [root@localhost ~]# systemctl enable docker 设置开机自启
[root@localhost ~]# docker ps -a
- <dependency>
- <groupId>com.rabbitmq</groupId>
- <artifactId>amqp-client</artifactId>
- <version>5.7.1</version>
- </dependency>
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class ConnectionUtil {
-
- public static Connection getConnection() throws IOException, TimeoutException {
- // 定义连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- // 设置服务器地址
- factory.setHost("192.168.111.133");
- // 设置端口
- factory.setPort(5672);
- /**
- * 设置账号信息,用户名、密码、vhost
- * 设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
- */
- /*factory.setVirtualHost("/ly");
- factory.setUsername("ly");
- factory.setPassword("123456")*/;
- // 通过工厂获取连接
- Connection connection = factory.newConnection();
- return connection;
- }
-
- }

- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Send {
-
- private final static String QUEUE_NAME = "simple_queue";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1. 获取到连接
- Connection connection = ConnectionUtil.getConnection();
- // 2. 从连接中创建通道,使用通道才能完成消息相关的操作
- Channel channel = connection.createChannel();
- // 3. 声明(创建)队列
- // 参数:String queue,boolean durable,boolean exclusive,boolean autoDelete, Map<String, Object> arguments
- /**
- * 参数明细:
- * 1.queue,队列名称
- * 2.durable,是否持久化,如果持久化,mq重启后队列还在
- * 3.exclusive,是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,
- * 如果将此参数设置为true可用于临时队列的创建
- * 4.autoDelete,自动删除,队列不再使用时是否自动删除此队列,
- * 如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
- * 5.arguments,参数,可以设置一个队列的扩展参数,比如:可设置存活时间
- */
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- // 消息内容
- String message = "Hello World!";
- // 向指定的队列中发送消息
- //参数:String exchange, String routingKey, BasicProperties props, byte[] body
- /**
- * 参数明细:
- * 1.exchange,交换机,如果不指定将使用mq的欧仁交换机(设置为"")
- * 2.routingKey,路由Key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
- * 3.props,消息的属性
- * 4.body,消息的内容
- */
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println("[x] Send '" + message + "'");
-
- // 关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
- channel.close();
- connection.close();
- }
-
- }

- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Recv {
-
- private final static String QUEUE_NAME = "simple_queue";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1. 获取到连接
- Connection connection = ConnectionUtil.getConnection();
- // 2. 从连接中创建通道,使用通道才能完成消息相关的操作
- Channel channel = connection.createChannel();
- // 3. 声明(创建)队列
- // 参数:String queue,boolean durable,boolean exclusive,boolean autoDelete, Map<String, Object> arguments
- /**
- * 参数明细:
- * 1.queue,队列名称
- * 2.durable,是否持久化,如果持久化,mq重启后队列还在
- * 3.exclusive,是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,
- * 如果将此参数设置为true可用于临时队列的创建
- * 4.autoDelete,自动删除,队列不再使用时是否自动删除此队列,
- * 如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
- * 5.arguments,参数,可以设置一个队列的扩展参数,比如:可设置存活时间
- */
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
- // 实现消费方法
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- // 获取消息,并处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
- /**
- * 当接收到消息后此方法将被调用
- * @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsum
- * @param envelope 信封,通过envelope
- * @param properties 消息属性
- * @param body 消息内容
- * @throws IOException
- */
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // 交换机
- String exchange = envelope.getExchange();
- // 消息id,mq再channel中用来标识消息的id,可用于确认消息已接受
- long deliveryTag = envelope.getDeliveryTag();
- // body 即消息体
- String msg = new String(body, "utf-8");
- System.out.println(" [x] received : " + msg + "!");
- }
- };
- // 监听队列,第二个参数:是否自动进行消息确认。
- // 参数: String queue,boolean autoAck,Consumer callback
- /**
- * 参数明细:
- * 1.queue 队列名称
- * 2.autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为true表示会自动回复mq,
- * 如果设置为false要通过编程实现回复。
- * 3.callback 消费方法,当消费者接收到消息要执行的方法。
- */
- channel.basicConsume(QUEUE_NAME, true, consumer);
- }
-
- }

- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Recv2 {
-
- private final static String QUEUE_NAME = "simple_queue";
-
- public static void main(String[] args) throws IOException, TimeoutException {
- // 1. 获取到连接
- Connection connection = ConnectionUtil.getConnection();
- // 2. 从连接中创建通道,使用通道才能完成消息相关的操作
- final Channel channel = connection.createChannel();
- // 3. 声明(创建)队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
- // 定义队列的消费者
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- // 获取消息,并处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // body 即消息体
- String msg = new String(body, "utf-8");
- System.out.println(" [x] received : " + msg + "!");
- // 手动进行ACK
- /**
- * void basicAck(long var1, boolean var3) throws IOException;
- * var1 用来标识消息的id
- * var3 是否批量 true:将一次性ack所有小于var1的消息
- */
- channel.basicAck(envelope.getDeliveryTag(), false);
- }
- };
- // 监听队列,第二个参数false,手动进行ACK
- channel.basicConsume(QUEUE_NAME, false, consumer);
- }
-
- }

- import com.ly.ConnectionUtil;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
-
- public class Send {
-
- private final static String QUEUE_NAME = "test_work_queue";
-
- public static void main(String[] argv) throws Exception {
- // 获取到连接
- Connection connection = ConnectionUtil.getConnection();
- // 获取通道
- Channel channel = connection.createChannel();
- // 声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- // 循环发布任务
- for (int i = 0; i < 50; i++) {
- // 消息内容
- String message = "task .. " + i;
- channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
-
- Thread.sleep(i * 2);
- }
- // 关闭通道和连接
- channel.close();
- connection.close();
- }
- }

- import com.ly.ConnectionUtil;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeUnit;
-
- public class Recv {
-
- private final static String QUEUE_NAME = "test_work_queue";
-
- public static void main(String[] argv) throws Exception {
- // 获取到连接
- Connection connection = ConnectionUtil.getConnection();
- //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
- Channel channel = connection.createChannel();
- // 声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- //实现消费方法
- DefaultConsumer consumer = new DefaultConsumer(channel){
- // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // body 即消息体
- String msg = new String(body,"utf-8");
- System.out.println(" [消费者1] received : " + msg + "!");
- //模拟任务耗时1s
- try { TimeUnit.SECONDS.sleep(1); } catch (Exception e) { e.printStackTrace(); }
- }
- };
- // 监听队列,第二个参数:是否自动进行消息确认。
- channel.basicConsume(QUEUE_NAME, true, consumer);
- }
- }

- import com.ly.ConnectionUtil;
- import com.rabbitmq.client.*;
- import java.io.IOException;
-
- public class Recv2 {
-
- private final static String QUEUE_NAME = "test_work_queue";
-
- public static void main(String[] argv) throws Exception {
- // 获取到连接
- Connection connection = ConnectionUtil.getConnection();
- //创建会话通道,生产者和mq服务所有通信都在channel通道中完成
- Channel channel = connection.createChannel();
- // 声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
- //实现消费方法
- DefaultConsumer consumer = new DefaultConsumer(channel){
- // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
- // body 即消息体
- String msg = new String(body,"utf-8");
- System.out.println(" [消费者2] received : " + msg + "!");
- }
- };
- // 监听队列,第二个参数:是否自动进行消息确认。
- channel.basicConsume(QUEUE_NAME, true, consumer);
- }
- }

- import com.ly.rabbitmq.ConnectionUtil;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Send {
-
- private final static String EXCHANGE_NAME = "test_fanout_exchange";
-
- public static void main(String[] argv) throws IOException, TimeoutException {
- // 获取到连接
- Connection connection = ConnectionUtil.getConnection();
- // 获取通道
- Channel channel = connection.createChannel();
- // 声明exchange,指定类型为fanout
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
-
- // 消息内容
- String message = "注册成功!!";
- // 发布消息到Exchange
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
- System.out.println(" [生产者] Sent '" + message + "'");
-
- channel.close();
- connection.close();
- }
- }

- import com.ly.rabbitmq.ConnectionUtil;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Recv {
-
- private final static String QUEUE_NAME = "fanout_exchange_queue_sms";//短信队列
-
- private final static String EXCHANGE_NAME = "test_fanout_exchange";
-
- public static void main(String[] argv) throws IOException, TimeoutException {
- // 获取到连接
- Connection connection = ConnectionUtil.getConnection();
- // 获取通道
- Channel channel = connection.createChannel();
- // 声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
- // 绑定队列到交换机
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
-
- // 定义队列的消费者
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- // body 即消息体
- String msg = new String(body);
- System.out.println(" [短信服务] received : " + msg + "!");
- }
- };
- // 监听队列,自动返回完成
- channel.basicConsume(QUEUE_NAME, true, consumer);
- }
- }

- import com.ly.rabbitmq.ConnectionUtil;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Recv2 {
-
- private final static String QUEUE_NAME = "fanout_exchange_queue_email";//邮件队列
-
- private final static String EXCHANGE_NAME = "test_fanout_exchange";
-
- public static void main(String[] argv) throws IOException, TimeoutException {
- // 获取到连接
- Connection connection = ConnectionUtil.getConnection();
- // 获取通道
- Channel channel = connection.createChannel();
- // 声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
- // 绑定队列到交换机
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
-
- // 定义队列的消费者
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- // body 即消息体
- String msg = new String(body);
- System.out.println(" [邮件服务] received : " + msg + "!");
- }
- };
- // 监听队列,自动返回完成
- channel.basicConsume(QUEUE_NAME, true, consumer);
- }
- }

- import com.ly.rabbitmq.ConnectionUtil;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Send {
-
- private final static String EXCHANGE_NAME = "test_direct_exchange";
-
- public static void main(String[] argv) throws IOException, TimeoutException {
- // 获取到连接
- Connection connection = ConnectionUtil.getConnection();
- // 获取通道
- Channel channel = connection.createChannel();
- // 声明exchange,指定类型为direct
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
- // 消息内容,
- String message = "注册成功!请短信回复[T]退订";
- // 发送消息,并且指定routing key 为:sms,只有短信服务能接收到消息
- channel.basicPublish(EXCHANGE_NAME, "sms", null, message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
-
- channel.close();
- connection.close();
- }
- }

- import com.ly.rabbitmq.ConnectionUtil;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Recv {
-
- private final static String QUEUE_NAME = "direct_exchange_queue_sms";//短信队列
-
- private final static String EXCHANGE_NAME = "test_direct_exchange";
-
- public static void main(String[] argv) throws IOException, TimeoutException {
- /* 获取到连接 */
- Connection connection = ConnectionUtil.getConnection();
- // 获取通道
- Channel channel = connection.createChannel();
- // 声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
- // 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms");//指定接收发送方指定routing key为sms的消息
- //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");
-
- // 定义队列的消费者
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- // body 即消息体
- String msg = new String(body);
- System.out.println(" [短信服务] received : " + msg + "!");
- }
- };
- // 监听队列,自动ACK
- channel.basicConsume(QUEUE_NAME, true, consumer);
- }
- }

- import com.ly.rabbitmq.ConnectionUtil;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Recv2 {
-
- private final static String QUEUE_NAME = "direct_exchange_queue_email";//邮件队列
-
- private final static String EXCHANGE_NAME = "test_direct_exchange";
-
- public static void main(String[] argv) throws IOException, TimeoutException {
- // 获取到连接
- Connection connection = ConnectionUtil.getConnection();
- // 获取通道
- Channel channel = connection.createChannel();
- // 声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
- // 绑定队列到交换机,同时指定需要订阅的routing key。可以指定多个
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email");//指定接收发送方指定routing key为email的消息
-
- // 定义队列的消费者
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- // body 即消息体
- String msg = new String(body);
- System.out.println(" [邮件服务] received : " + msg + "!");
- }
- };
- // 监听队列,自动ACK
- channel.basicConsume(QUEUE_NAME, true, consumer);
- }
- }

- import com.ly.rabbitmq.ConnectionUtil;
- import com.rabbitmq.client.BuiltinExchangeType;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
-
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Send {
-
- private final static String EXCHANGE_NAME = "test_topic_exchange";
-
- public static void main(String[] argv) throws IOException, TimeoutException {
- // 获取到连接
- Connection connection = ConnectionUtil.getConnection();
- // 获取通道
- Channel channel = connection.createChannel();
- // 声明exchange,指定类型为topic
- channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
- // 消息内容
- String message = "这是一只行动迅速的橙色的兔子";
- // 发送消息,并且指定routing key为:quick.orange.rabbit
- channel.basicPublish(EXCHANGE_NAME, "quick.orange.rabbit", null, message.getBytes());
- System.out.println(" [动物描述:] Sent '" + message + "'");
-
- channel.close();
- connection.close();
- }
- }

- import com.ly.rabbitmq.ConnectionUtil;
- import com.rabbitmq.client.*;
- import java.io.IOException;
- import java.util.concurrent.TimeoutException;
-
- public class Recv {
-
- private final static String QUEUE_NAME = "topic_exchange_queue_Q1";
-
- private final static String EXCHANGE_NAME = "test_topic_exchange";
-
- public static void main(String[] argv) throws IOException, TimeoutException {
- // 获取到连接
- Connection connection = ConnectionUtil.getConnection();
- // 获取通道
- Channel channel = connection.createChannel();
- // 声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
- // 绑定队列到交换机,同时指定需要订阅的routing key。订阅所有的橙色动物
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.orange.*");
-
- // 定义队列的消费者
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- // body 即消息体
- String msg = new String(body);
- System.out.println(" [消费者1] received : " + msg + "!");
- }
- };
- // 监听队列,自动ACK
- channel.basicConsume(QUEUE_NAME, true, consumer);
- }
- }

- import com.ly.rabbitmq.ConnectionUtil;
- import com.rabbitmq.client.*;
- import java.io.IOException;
-
- public class Recv2 {
-
- private final static String QUEUE_NAME = "topic_exchange_queue_Q2";
-
- private final static String EXCHANGE_NAME = "test_topic_exchange";
-
- public static void main(String[] argv) throws Exception {
- // 获取到连接
- Connection connection = ConnectionUtil.getConnection();
- // 获取通道
- Channel channel = connection.createChannel();
- // 声明队列
- channel.queueDeclare(QUEUE_NAME, false, false, false, null);
-
- // 绑定队列到交换机,同时指定需要订阅的routing key。订阅关于兔子以及懒惰动物的消息
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*.rabbit");
- channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lazy.#");
-
- // 定义队列的消费者
- DefaultConsumer consumer = new DefaultConsumer(channel) {
- // 获取消息,并且处理,这个方法类似事件监听,如果有消息的时候,会被自动调用
- @Override
- public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
- byte[] body) throws IOException {
- // body 即消息体
- String msg = new String(body);
- System.out.println(" [消费者2] received : " + msg + "!");
- }
- };
- // 监听队列,自动ACK
- channel.basicConsume(QUEUE_NAME, true, consumer);
- }
- }

- 在 RPC 请求中,客户端发送带有两个属性的消息:一个是设置回调队列的 reply_to 属性,另一个是设置唯一值的 correlation_id 属性。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。