赞
踩
MQ:Message Queue 消息队列,用于分布式系统中传送消息。
优势:
劣势:
使用MQ需要满足的条件:
常见的MQ产品:
除了下图的几款常用的MQ产品,还可以直接使用 redis 充当MQ。
AMQP(高级消息队列协议)是一种网络协议,基于此协议可以实现客户端与消息中间件间的消息传递,而与客户端与中间件本身产品类型无关。RabbitMQ 是基于 AMQP 协议使用 Erlang 语言开发的一款消息队列产品。
AMQP 中规定的角色类型:
Exchange:交换机,根据 Routes 处理消息的分发;
Queue:消息队列。
RabbitMQ 架构:
1. 安装 rabbitmq-server
开始费了好大的劲用 rpm 装,出了很多错,最后直接下面一句搞定:
sudo apt install rabbitmq-server
如果报错:dpkg: error processing package,可能是dpkg管理的包信息出现了损坏,通过sudo apt -f install进行修复:
sudo mv /var/lib/dpkg/info/ /var/lib/dpkg/info_old/
sudo mkdir /var/lib/dpkg/info/
sudo apt-get update
sudo apt-get -f install
sudo mv /var/lib/dpkg/info/* /var/lib/dpkg/info_old/
sudo rm -rf /var/lib/dpkg/info
sudo mv /var/lib/dpkg/info_old/ /var/lib/dpkg/info/
rabbitmq 启动命令:
rabbitmq-server -detached #后台启动(推荐)
rabbitmqctl stop # 停止服务
service rabbitmq-server start # 启动服务
service rabbitmq-server stop # 停止服务
service rabbitmq-server restart # 重启服务
2. 添加用户并赋权
在rabbitMQ中添加用户:
rabbitmqctl add_user zhangsan password
设置用户为管理员:
rabbitmqctl set_user_tags zhangsan administrator
为用户设置读写等权限:
rabbitmqctl set_permissions -p '/' zhangsan '.' '.' '.'
注意:添加完用户需要重启 rabbitmq.
3. 启动 rabbitmq 管理控制台
rabbitmq-plugins enable rabbitmq_management
启动控制台后,即可通过 web 页面访问:http://192.xxx.xxx.xxx:15672/,默认端口15672,用户即是刚才添加的用户。
1. 简单模式
一对一,单个生产者,单个消费者。
- 生产者代码
/** * 简单模式,无交换机 */ public class Producer { public static void main(String[] args) throws IOException, TimeoutException { //1. 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.xxx.xxx"); factory.setPort(5672); factory.setVirtualHost("/rabbitVirtualHost"); factory.setUsername("xxx"); factory.setPassword("xxx"); //3. 创建连接 Connection connection = factory.newConnection(); //4. 创建channel Channel channel = connection.createChannel(); //5. 创建队列 /** * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) * 参数: * 1. 队列名 * 2. 可持久化,mq宕机后会保存队列 * 3. 是否独占:只有一个消费者可以监听该队列 * 4. 是否自动删除:当没有消费者时自动删除队列 * 5. 参数 */ channel.queueDeclare("simpleRabbitmq", true, false, false, null); //6. 发送消息 /** * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) * 参数: * 1. 交换机名称,简单模式使用默认的交换机,名称为空字符串 * 2. 路由键,使用默认路由时路由键与队列名称保持一致 * 3. 参数 * 4. 消息内容 */ String msg = "hello rabbitmq"; channel.basicPublish("", "simpleRabbitmq", null, msg.getBytes()); channel.close(); connection.close(); } }
开始在 java 建立连接的时候一直连接不上,报 IOException、EOF 啥的错。我是在本地windows上安装了 Unbuntu 虚拟机,rabbitmq 安装在虚拟机上,程序在本地运行,相当于远程连接。百度都是说什么端口错了需要连5672、用户不能用默认的 guest、用户需要添加权限啥的…… 这些我都是正确的,还有人说需要开放 5672 端口,但是我 telnet 是通的,说明已经开放了。后来终于看到一个博客上说
除了要配置访问权限以外,还需要将运行环境配置修改下:
vi /etc/rabbitmq/rabbitmq-env.conf
添加:
NODE_IP_ADDRESS=192.168.xxx.xxx ## 注意这个地方写啥,java连接的ip就应该写啥
NODE_PORT=5672
配置完以后重启 rabbitmq,终于连接成功了,泪奔,卡了太久了。。。
- 消费者代码
public class SimpleConsumer { public static void main(String[] args) throws IOException, TimeoutException { //1. 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.xxx.xxx"); factory.setPort(5672); factory.setVirtualHost("/rabbitVirtualHost"); factory.setUsername("xxx"); factory.setPassword("xxx"); //3. 创建连接 Connection connection = factory.newConnection(); //4. 创建channel Channel channel = connection.createChannel(); //5. 声明队列 /** * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) * 参数: * 1. 队列名 * 2. 可持久化,mq宕机后会保存队列 * 3. 是否独占:只有一个消费者可以监听该队列 * 4. 是否自动删除:当没有消费者时自动删除队列 * 5. 参数 */ channel.queueDeclare("simpleRabbitmq", true, false, false, null); //6. 发送消息 Consumer consumer = new DefaultConsumer(channel) { /** * 收到消息后执行该方法 * @param consumerTag * @param envelope 获取某些信息 * @param properties 配置 * @param body * @throws IOException */ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); System.out.println("consumerTag: " + consumerTag); System.out.println("exchange: " + envelope.getExchange()); System.out.println("routingKey: " + envelope.getRoutingKey()); System.out.println("properties: " + properties); System.out.println("body: " + new String(body)); } }; /** * basicConsumer(String queue, boolean autoAck, Consumer callback) * 参数: * 1. 队列名 * 2. 是否自动发送回执 * 3. 回调对象 */ channel.basicConsume("simpleRabbitmq", true, consumer); } }
2. 工作队列
一对多,多个消费者消费同一个队列中的消息,每个消息只能被其中一个消费者消费。适用于消息过多,单个消费者处理压力过大的情况。
代码实现上与简单模式相同,只需要启动两个消费者监听同一个队列即可。
3. 发布订阅(广播模式)
简单模式和工作队列模式每个消息都只会被一个消费者消费。发布订阅模式可以实现同一个消息被多个消费者消费。增加 exchange 和 队列绑定。
BuiltinExchangeType.FANOUT
public class PubSubProducer { public static void main(String[] args) throws IOException, TimeoutException { //1. 创建连接工厂 ConnectionFactory factory = new ConnectionFactory(); //2. 设置参数 factory.setHost("192.168.xxx.xxx"); factory.setPort(5672); factory.setVirtualHost("/rabbitVirtualHost"); factory.setUsername("xxx"); factory.setPassword("xxx"); //3. 创建连接 Connection connection = factory.newConnection(); //4. 创建channel Channel channel = connection.createChannel(); //5. 创建队列 /** * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) * 参数: * 1. 队列名 * 2. 可持久化,mq宕机后会保存队列 * 3. 是否独占:只有一个消费者可以监听该队列 * 4. 是否自动删除:当没有消费者时自动删除队列 * 5. 参数 */ String queue1 = "fanoutQueue1"; String queue2 = "fanoutQueue2"; channel.queueDeclare(queue1, true, false, false, null); channel.queueDeclare(queue2, true, false, false, null); //6. 创建交换机 /** * DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) * 参数: * 1. 交换机名称 * 2. 交换机类型 * 3. 是否可持久化 * 4. 是否自动删除 * 5. 是否内部使用 * 6. 参数 */ String exchangeName = "fanoutExchange"; channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null); //7. 交换机绑定队列 /** * queueBind(String queue, String exchange, String routingKey) * 当为广播模式时,路由键为"" */ channel.queueBind(queue1, exchangeName, ""); channel.queueBind(queue2, exchangeName, ""); //8. 发送消息 /** * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) * 参数: * 1. 交换机名称,简单模式使用默认的交换机,名称为空字符串 * 2. 路由键,使用默认路由时路由键与队列名称保持一致 * 3. 参数 * 4. 消息内容 */ String msg = "rabbitmq test pubsub fanout"; channel.basicPublish(exchangeName, "", null, msg.getBytes()); channel.close(); connection.close(); } }
消费者代码与简单模式一样,不赘述。
4. 路由模式
路由模式相比广播模式的唯一区别是队列绑定交换机需要添加 routing key,发布消息时也需要指定 routing key,只有 routing key 一致的消息才能被监听者接收到。
BuiltinExchangeType.DIRECT
String queue1 = "directQueue1"; String queue2 = "directQueue2"; channel.queueDeclare(queue1, true, false, false, null); channel.queueDeclare(queue2, true, false, false, null); String exchangeName = "directExchange"; //采用direct类型 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null); //交换机绑定队列 //队列1绑定 channel.queueBind(queue1, exchangeName, "error"); //队列2绑定 channel.queueBind(queue2, exchangeName, "info"); channel.queueBind(queue2, exchangeName, "warning"); channel.queueBind(queue2, exchangeName, "error"); //发送消息 String msg = "rabbitmq test pubsub routing"; channel.basicPublish(exchangeName, "info", null, msg.getBytes()); channel.basicPublish(exchangeName, "error", null, msg.getBytes());
5. 通配符模式
与路由模式相比,通配符模式允许 routing key 带有通配符,使得路由规则更加灵活。
BuiltinExchangeType.TOPIC
producer:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.xxx.xxx
port: 5672
username: xxx
password: xxx
virtual-host: /rabbitVirtualHost
@Configuration public class MqConfig { public static final String EXCHANGE_NAME = "boot_direct_exchange"; public static final String QUEUE_NAME = "boot_direct_queue"; /** * 配置交换机 * @return */ @Bean public Exchange bootExchange() { return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build(); } /** * 配置队列 * @return */ @Bean public Queue bootQueue() { return QueueBuilder.durable(QUEUE_NAME).build(); } /** * 队列绑定交换机 * Qualifier 的作用是声明注入哪个对象,防止多个交换机队列不知道注入哪一个 * @param exchange * @param queue * @return */ @Bean public Binding bindQueueExchange(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue) { return BindingBuilder.bind(queue).to(exchange).with("errorLog").noargs(); } }
@SpringBootTest
class BootproducerApplicationTests {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void contextLoads() {
rabbitTemplate.convertAndSend(MqConfig.EXCHANGE_NAME,
"errorLog", "this is test for boot rabbitmq producer");
}
}
consumer:
依赖和配置文件同生产者;
监听队列
@Component
public class MqListener {
@RabbitListener(queues = "boot_direct_queue")
public void BootConsumerListener (Message message) {
System.out.println(message.getBody().toString());
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。