赞
踩
Kaika 主要特点是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务。大型公司建议可以选用,如果有日志采集功能,肯定是首选kafka了。尚硅谷官网kafka视频连接http://www.gulixueyuan.com/course/ 330/tasks
RocketMQ天生为金融互联网领域而生,对于可靠性要求很高的场景,尤其是电商里面的订单扣款,以及业务削 峰,在大量交易涌入时,后端可能无法及时处理的情况。RoketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。
RabbitMQ结合 erlang语言本身的并发优势,性能好时效性微秒级,社区活跃度也比较高,管理界面用起来十分方便,如果你的数据量没有那么大,中小型公司优先选择功能比较完备的 RabbitMQ。
生产者,交换机,队列,消费者
消息接收后,为手动确认,该条消息会回到队列中
队列持久化和消息持久化,虽然持久化,也会丢失,需要在发布确认中优化
channel.basicQos(0); 或者不设置,即为默认为轮询分发
channel.basicQos(1);
channel.basicQos(5); 预取5条,只能保证一开始的预取
- /**
- * 创建rabbitmq连接
- * @return
- */
- public static Channel getChannel() {
- ConnectionFactory factory = new ConnectionFactory();
- // factory.setHost("147.23.23.33");
- factory.setHost("172.16.75.388");
- factory.setPort(5672);
- factory.setUsername("admin");
- // factory.setPassword("123456");
- factory.setPassword("admin");
- Channel channel = null;
- try{
- Connection connection = factory.newConnection();
- channel = connection.createChannel();
- }catch (Exception e){
- System.out.println("e.getMessage() = " + e.getMessage());
- }
-
- return channel;
- }
- // 1、单个确认
- public static void publishSingleConfirm() throws Exception {
- Channel channel = RabbitMqUtil.getChannel();
- channel.queueDeclare("hello",true,false,false,null);
- //开启发布确认
- channel.confirmSelect();
- //开始时间
- long start = System.currentTimeMillis();
- for (Integer i = 0; i < MESSAGE_COUNT; i++) {
- String msg = i+"";
- channel.basicPublish("","hello",null,msg.getBytes());
- //消息发布确认
- boolean flag = channel.waitForConfirms();
- if(flag){
- System.out.println( i + "--消息确认成功");
- }
- }
- //结束时间
- long end = System.currentTimeMillis();
- System.out.println("发布200条,单条发布确认用时:" + (end - start)+"毫秒");
- }
- //2、批量发布确认
- public static void publishBatchConfirm() throws Exception {
- Channel channel = RabbitMqUtil.getChannel();
- channel.queueDeclare("hello",true,false,false,null);
- //开启发布确认
- channel.confirmSelect();
- //开始时间
- long start = System.currentTimeMillis();
- for (Integer i = 0; i < MESSAGE_COUNT; i++) {
- String msg = i+"";
- channel.basicPublish("","hello",null,msg.getBytes());
- //消息批量发布确认
- if(MESSAGE_COUNT==i){
- channel.waitForConfirms();
- System.out.println( i + "--消息确认成功");
- }
- }
- //结束时间
- long end = System.currentTimeMillis();
- System.out.println("发布200条,批量发布确认用时:" + (end - start)+"毫秒");
- }
- // 3、异步确认
- public static void publishAsyncConfirm() throws Exception {
- Channel channel = RabbitMqUtil.getChannel();
- channel.queueDeclare("hello",true,false,false,null);
- //开启发布确认
- channel.confirmSelect();
- /**
- * 声明一个线程安全有序的哈希表,适用于高并发的情况下
- * 1、可以轻松的将序号和数据进行关联
- * 2、轻松批量删除
- * 3、支持高并发(多线程)
- */
- ConcurrentSkipListMap<Long, String> concurrentSkipListMap = new ConcurrentSkipListMap<>();
-
- //发布成功的消息的回调
- ConfirmCallback ackConfirm = ( deliveryTag, multiple)->{
- //拿到发布成功的消息
- ConcurrentNavigableMap<Long, String> concurrentNavigableMap = concurrentSkipListMap.headMap(deliveryTag);
- //从队列中删除发布成功的,剩下的即为发布不成功的
- concurrentSkipListMap.remove(deliveryTag);
- System.out.println("确认消息的标记-->" + deliveryTag);
- };
- //发送失败的消息的回调
- ConfirmCallback nackConfirm = ( deliveryTag, multiple)->{
- //拿到为发布成功的消息
- String msg = concurrentSkipListMap.get(deliveryTag);
- System.out.println("未发布成功的消息:" + msg);
-
- };
-
- //监听broker
- channel.addConfirmListener(ackConfirm,nackConfirm);
-
- //开始时间
- long start = System.currentTimeMillis();
- for (Integer i = 0; i < MESSAGE_COUNT; i++) {
- String msg = i+"";
- channel.basicPublish("","hello",null,msg.getBytes());
- /**
- * 将发送的消息放到 concurrentSkipListMap
- * concurrentSkipListMap的 key 是从channel中拿到的序列号
- * value 是需要发送的 msg
- */
- concurrentSkipListMap.put(channel.getNextPublishSeqNo(),msg);
- }
- //结束时间
- long end = System.currentTimeMillis();
- System.out.println("发布200条,异步发布确认用时:" + (end - start)+"毫秒");
- }
1、队列持久化
2、消息持久化
3、发布确认(单条发布确认,批量发布确认,异步发布确认)
广播模式,队列通过路由key将其和交换机绑定关系,生产者只需将消息发送到交换机,路由key便会将消息路由到对应的队列中,从而供消费者进行消费
根据routingkey与队列的绑定关系,交换机将消息路由到对应的队列
该模式的routingkey不可以乱写,必须是一个单词列表,用点号隔开,例如:stu.test.author,该种单词列表最多不能超过255个字节,在该模式中,有两种替换符:
*:可以代替一个单词
#:可以代替零个或者多个单词
1)消息TTL过期,即为消息存放时间过期
2)队列达到最大长度(队列存满了,无法再添加信息到mq)
3)消息被拒绝(basic.reject或basic.nack),并且reque=false
1、使用死信队列
- /**
- * 死信队列的延迟队列 配置类
- */
- @Configuration
- public class TtlQueueConfig {
- //普通交换机
- public static final String normal_exchange = "normal_exchange";
- //死信交换机
- public static final String death_exchange = "death_exchange";
-
- //普通队列A
- public static final String a_queue = "a_queue";
- //普通队列B
- public static final String b_queue = "b_queue";
- public static final String c_queue = "c_queue";
- //死信队列
- public static final String death_queue = "death_queue";
-
- @Bean("normal_exchange")
- public DirectExchange normalExchange(){
- return new DirectExchange(normal_exchange);
- }
- @Bean("death_exchange")
- public DirectExchange deathExchange(){
- return new DirectExchange(death_exchange);
- }
- //声明普通队列 ttl为10s
- @Bean("a_queue")
- public Queue aQueue(){
- Map<String,Object> arguments = new HashMap<>(3);
- //设置消息过期时间 ,消息发布者也可以设置消息过期时间,单位为ms
- arguments.put("x-message-ttl",10000);
- //设置正常队列的死信交换机
- arguments.put("x-dead-letter-exchange",death_exchange);
- //设置routing-key
- arguments.put("x-dead-letter-routing-key","Y");
-
- return QueueBuilder.durable(a_queue).withArguments(arguments).build();
- }
- //声明普通队列 ttl为40s
- @Bean("b_queue")
- public Queue bQueue(){
- Map<String,Object> arguments = new HashMap<>(3);
- //设置消息过期时间 ,消息发布者也可以设置消息过期时间,单位为ms
- arguments.put("x-message-ttl",40000);
- //设置正常队列的死信交换机
- arguments.put("x-dead-letter-exchange",death_exchange);
- //设置routing-key
- arguments.put("x-dead-letter-routing-key","Y");
- return QueueBuilder.durable(b_queue).withArguments(arguments).build();
- }
- //声明普通队列 消息发布者设置过期时间
- @Bean("c_queue")
- public Queue cQueue(){
- Map<String,Object> arguments = new HashMap<>(2);
- //设置正常队列的死信交换机
- arguments.put("x-dead-letter-exchange",death_exchange);
- //设置routing-key
- arguments.put("x-dead-letter-routing-key","Y");
-
- return QueueBuilder.durable(c_queue).withArguments(arguments).build();
- }
- //声明死信队列
- @Bean("death_queue")
- public Queue deathQueue(){
- return QueueBuilder.durable(death_queue).build();
- }
- //普通交换机与队列进行绑定
- @Bean
- public Binding aQueueBingDingNormalExchange(@Qualifier("a_queue")Queue queueA,
- @Qualifier("normal_exchange") DirectExchange exchange){
- return BindingBuilder.bind(queueA).to(exchange).with("XA");
- }
- @Bean
- public Binding bQueueBingDingNormalExchange(@Qualifier("b_queue")Queue queueB,
- @Qualifier("normal_exchange") DirectExchange exchange){
- return BindingBuilder.bind(queueB).to(exchange).with("XB");
- }
- @Bean
- public Binding cQueueBingDingNormalExchange(@Qualifier("c_queue")Queue queueB,
- @Qualifier("normal_exchange") DirectExchange exchange){
- return BindingBuilder.bind(queueB).to(exchange).with("XC");
- }
- //死信队列与交换机绑定
- @Bean
- public Binding deathQueueBingDingNormalExchange(@Qualifier("death_queue")Queue deathQueue,
- @Qualifier("death_exchange") DirectExchange exchange){
- return BindingBuilder.bind(deathQueue).to(exchange).with("Y");
- }
- }
- //发布消息:
- /**
- * 发送消息到延迟队列
- * @param message
- */
- @GetMapping("send/test")
- public void sendMsg(String message){
- rabbitTemplate.convertAndSend("normal_exchange","XA",message);
- rabbitTemplate.convertAndSend("normal_exchange","XB",message);
- System.out.println("发送成功");
- }
2、使用rabbitmq延迟中间件
- /**
- * 基于rabbitmq-delayed-message-exchange 插件实现的延迟队列
- */
- @Configuration
- public class DelayQueueConfig {
- //延迟交换机
- public static final String delayed_exchange = "delayed_exchange";
- //延迟队列
- public static final String delayed_queue = "delayed_queue";
- //延迟routing_key
- public static final String delayed_routing_key = "delayed_routing_key";
-
- //声明交换机 自定义x-delayed-exchange
- @Bean
- public CustomExchange customExchange(){
- /**
- * 交换机名称
- * 交换机类型
- * 是否持久化
- * 是否自动删除
- * 交换机参数
- */
- Map<String,Object> map = new HashMap<>();
- map.put("x-delayed-type","direct");
- return new CustomExchange(delayed_exchange,"x-delayed-message",true,false,map);
- }
- //声明队列
- @Bean
- public Queue delayedQueue(){
- return new Queue(delayed_queue);
- }
-
- //绑定交换机和队列
- @Bean
- public Binding delayedBingDingDelayedQueue(@Qualifier("delayedQueue") Queue queue ,
- @Qualifier("customExchange") CustomExchange exchange){
- return BindingBuilder.bind(queue).to(exchange).with(delayed_routing_key).noargs();
- }
- }
即为死信队列,生产者发送消息时设定过期时间的话,如果第一条消息的过期时间很长,第二条消息的过期时间很短,rabbitmq会等第一条消息过期后丢入到死信队列,然后再检查第二条消息的过期时间,从而导致过期时间短的消息晚被丢入到死信队列
rabbitmq-delayed-message-exchange
Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
如果交换机type有x-delayed-message表示安装成功
- /**
- * 交换机确认消息的回调
- * 交换机确认回调时 ,需要在配置文件中添加
- * spring.rabbitmq.publisher-confirm-type: correlated 默认为 none 不开启
- * 消息回退回调
- * spring.rabbitmq.publisher-returns: true
- */
- @Slf4j
- @Component
- public class ConfirmConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnCallback {
- @Autowired
- private RabbitTemplate rabbitTemplate;
- /**
- * 由于RabbitTemplate.ConfirmCallback 是rabbitmqTemplate的内部方法,所以需要初始化
- */
- @PostConstruct //该注解是在其他注解加载完成后才使用的
- public void init(){
- rabbitTemplate.setConfirmCallback(this);
- rabbitTemplate.setReturnCallback(this);
- }
- /**
- * 参数说明
- *
- * @param correlationData 发送的消息 这个是从消息发布者那里发布消息到交换机时添加的
- * @param b 交换机是否接收到消息
- * @param s 失败的原因
- */
- @Override
- public void confirm(CorrelationData correlationData, boolean b, String s) {
- String id = correlationData != null ? correlationData.getId() : "";
-
- if (b) {
- log.info("交换机已接收到id为:{}的消息", id);
- }else{
- log.info("交换机未接收到id为:{}的消息",id);
- }
- }
- /**
- * 回退消息,只有消息没有到达目的地的时候会触发
- * @param
- */
- @Override
- public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
- log.error("回退消息:{}",message);
- }
- }
正常声明交换机时,指定备份交换机,当消息不能正常到达正常交换机时,会将消息发送到备份交换机
- @Bean("normal_exchange")
- public DirectExchange deathExchange(){
- return ExchangeBuilder.directExchange(normal_exchange).withArgument("alternate","bakup_exchange").build();
- }
原因:由于网络原因,导致用户重复消费,从而产生问题
解决方案:最优使用redis的原子性
- Map<String,Object> map = new HashMap<>();
- map.put("x-max-priority",10);//官方允许范围0-255
- channel.queueDeclare("test_queue", true, false, false, map);
- AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build();
- channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());
消费者拿到消息后会先对消息进行优先级排队,然后在进行消费
正常情况下:保存在内存中
惰性队列:会将消息保存到磁盘上
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。