赞
踩
什么场景下会用到RabbitMQ 数据库与ES搜索库 同步时
RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同
(发送者,上游)producer------------->queue----------------->|多consumer|(消费,下游)
手动ack:
channel.basicConsume(, false, );//不自动akc
try{
.......
channel.basicAck(envelope.getDeliveryTag(),false);//手动akc
}catch(){
}
好处:保证消费者把消息成功消费(理论)
producer------------->queue----------------->|多consumer|
能者多劳:
channel.basicQos(1);
如何防止消息堆积:多consumer + 能者多劳
producer--------->exchange-----(绑定)------>|多queue|----------------->|多consumer|
注意:exchange只负责分发消息,若没有queue绑定则会把消息丢弃
routingkey
producer--------->exchange------(绑定)----->|多queue|----------------->|多consumer|
routingkey:灵活分发消息
*.routingkey.#
producer--------->exchange------(绑定)----->|多queue|----------------->|多consumer|
*:匹配一个字符
#:匹配多个字符
1、队列
channel.queueDeclare(, true, , , );
2、交换器
channel.exchangeDeclare(, , true);
3、消息
channel.basicPublish(, , MessageProperties.PERSISTENT_TEXT_PLAIN, );
如何保证消息成功消费?
1、rabbitmq会宕机?
持久化
2、consumer业务会处理失败?
手动akc:只有ack后消息才会删除
1、pom.xml
spring-boot-starter-amqp
2、application.yml
spring:
rabbitmq:
host: 192.168.145.136 (rebbitmq 虚拟机地址)
port: 5672
username: admin
password: 1111
virtual-host: / (库名)
3、接收者
@Component
public class Recver{
@RabbitListener(bindings = {@QueueBinding(
value = @Queue(name = "springboot_queue",durable = "true"),
exchange = @Exchange(name = "spring_exchange", type = ExchangeTypes.TOPIC),
key={"*.*"}
)})
public void listen(String msg){
}
}
4、发送者
@Autowired
private AmqpTemplate amqpTemplate;
amqpTemplate.convertAndSend(EXCHANGE_NAME, ROUTINGKEY, msg);
抽取一个建立RabbitMQ连接的工具类,方便其他程序获取连接
- package com.bjpowernode.util;
-
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.Connection;
-
- public class ConnectionUtil {
- /**
- * 建立与RabbitMQ的连接
- * @return
- * @throws Exception
- */
- public static Connection getConnection() throws Exception {
- //定义连接工厂
- ConnectionFactory factory = new ConnectionFactory();
- //设置服务地址
- factory.setHost("192.168.145.136 ");
- //端口
- factory.setPort(5672);
- //设置账号信息,用户名、密码、vhost
- factory.setUsername("admin");
- factory.setPassword("1111");
- factory.setVirtualHost("/");
- // 通过工程获取连接
- Connection connection = factory.newConnection();
- return connection;
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。