当前位置:   article > 正文

Rabbitmq同时处理多个消息(多线程)_rabbitmq多线程

rabbitmq多线程

Rabbitmq同时处理多个消息(多线程)

basicQos预取方法参数解析

  • basicQos(int prefetchCount)
  • basicQos(int prefetchCount, boolean global)
  • basicQos(int prefetchSize, int prefetchCount, boolean global)

参数:

  • prefetchSize:可接收消息的大小
  • prefetchCount:处理消息最大的数量。
  • global:是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的

实际中prefetchSize和global几乎不使用,rabbitmq也没有去实现其操作,不考虑。

basicConsumer消费方法参数解析

  • basicConsumer(String queue, Consumer consumer)
  • basicConsumer(String queue, boolean autoAck, Consumer consumer)

参数:

  • queue:监听的队列名称
  • autoAck:是否自动消费消息
  • consumer:使用的消费者类

一, 失败不重试,直接确认

Consumer.java 消费者类

package com.lmc.mq.nospring;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;

/**
 * @author lmc
 * @Description: TODO
 * @Create 2021-09-07 22:06
 * @version: 1.0
 */
public class Consumer {

    private final static String QUEUE_NAME = "lmc-test"; //队列名称

    public static void main(String[] args) {
        initModule();
    }

    public static void initModule() {
        //创建连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("xx.xx.xx.xx"); //设置rabbitmq-server的地址
        connectionFactory.setPort(5672);  //使用的端口号
        connectionFactory.setVirtualHost("/");  //使用的虚拟主机
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");

        //由连接工厂创建连接
        Connection connection = null;

        try {
            connection = connectionFactory.newConnection();
            //通过连接创建信道
            final Channel channel = connection.createChannel();
            channel.basicQos(0, 3, true);
            //创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                //监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写
                @Override
                public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    MqMessageDispatcher.doDispatch(new String(body, "UTF-8"), channel, envelope);
                }
            };

            //监听指定的queue。会一直监听。
            //参数:要监听的queue、是否自动确认消息、使用的Consumer
            channel.basicConsume(QUEUE_NAME, false, consumer);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }

    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62

MqMessageDispatcher.java 多线程类:同时并发处理多个消息

package com.lmc.mq.nospring;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lmc
 * @Description: TODO
 * @Create 2021-09-07 22:45
 * @version: 1.0
 */
public class MqMessageDispatcher {

    public static Logger logger = LoggerFactory.getLogger(MqMessageDispatcher.class);

    public static ExecutorService msgHandleService = Executors.newFixedThreadPool(5);

    static {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                msgHandleService.shutdown();
            }
        });

    }

    public static void doDispatch(String message, Channel channel, Envelope envelope) {
        msgHandleService.execute(new MessageHandleTask(message, channel, envelope));
    }

    private static class MessageHandleTask implements Runnable {

        String message;
        Channel channel;
        Envelope envelope;

        public MessageHandleTask(String message, Channel channel, Envelope envelope) {
            this.message = message;
            this.channel = channel;
            this.envelope = envelope;
        }

        @Override
        public void run() {
            long start = System.currentTimeMillis();
            logger.info("Received message: " + message);
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            try {
                // 手动确认消息,若自动确认则不需要写以下该行
                channel.basicAck(envelope.getDeliveryTag(), false);
            } catch (IOException e) {
                System.err.println("fail to confirm message:" + message);
            }
        }
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

二, 失败重试5次,再直接确认

MqMessageDispatcher.java

package com.lmc.mq.nospring;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * @author lmc
 * @Description: TODO
 * @Create 2021-09-07 22:45
 * @version: 1.0
 */
public class MqMessageDispatcher {

    public static final Logger logger = LoggerFactory.getLogger(MqMessageDispatcher.class);

    public static ExecutorService msgHandleService = Executors.newFixedThreadPool(5);

    public static Map<String, Integer> cacheMap = new HashMap(5);

    static {
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                msgHandleService.shutdown();
            }
        });
    }

    public static void doDispatch(String message, Channel channel, Envelope envelope) {
        msgHandleService.execute(new MessageHandleTask(message, channel, envelope));
    }

    private static class MessageHandleTask implements Runnable {

        String message;
        Channel channel;
        Envelope envelope;

        public MessageHandleTask(String message, Channel channel, Envelope envelope) {
            this.message = message;
            this.channel = channel;
            this.envelope = envelope;
        }

        @Override
        public void run() {

            int currentTimes = 0; // 当前重试次数
            boolean isSuccess = false; // 消息是否处理成功
            // 获取当前消息重试次数,(这种情况适合每条消息内容不一样,最好每条消息都有唯一标识)
            if (cacheMap.containsKey(message)) {
                currentTimes = cacheMap.get(message);
            }else {
                cacheMap.put(message, 0);
            }

            long start = System.currentTimeMillis();
            logger.info("Received message: " + message);
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            try {
                if (isSuccess) {
                    // 手动确认消息
                    logger.info("message[" + message + "] consumer success.(Ack)");
                    cacheMap.put(message, 0);
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }else {
                    if (currentTimes >= 5) {
                        // 手动确认消息,若自动确认则不需要写以下该行
                        logger.warn("message[" + message + "] consumer fail,have retry 5 times.(Ack)");
                        cacheMap.put(message, 0);
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }else {
                        // 处理失败,重试未5次,重新处理
                        cacheMap.put(message, ++currentTimes);
                        logger.warn("message[" + message + "] consumer fail,prepare to retry " + currentTimes + " times...(Nack)");
                        channel.basicNack(envelope.getDeliveryTag(), false, true);
                    }
                }

            } catch (IOException e) {
                System.err.println("fail to confirm message:" + message);
            }
        }
    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101

三,使用SpringBoot

使用springboot同时处理多个消息,只需要在配置文件中,添加以下配置:

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: manual # 开启手动确认
        concurrency: 1 #消费者最小数量
        max-concurrency: 3 #消费之最大数量
        prefetch: 3 #在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

监听类 LmcTestConsumer

package com.lmc.mq.spring.consumer;

import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @author lmc
 * @Description: TODO
 * @Create 2021-09-18 19:32
 * @version: 1.0
 */
@Component
public class LmcTestConsumer {

    public static final Logger logger = LoggerFactory.getLogger(LmcTestConsumer.class);


    @RabbitHandler
    @RabbitListener(queues = "lmc-test")
    public void handler(@Payload Message message, Channel channel) {
        try {
            String msg = new String(message.getBody(), "UTF-8");
            MqMessageDispatcher.doDispatch(msg, channel, message.getMessageProperties().getDeliveryTag());
        } catch (IOException e) {
            logger.error(e.getMessage());
        } catch (NullPointerException e1) {
            logger.error(e1.getMessage());
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

具体代码可见Gitee仓库: https://gitee.com/lmchh/lmc-tools/tree/master/tools-message-queue

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/515760
推荐阅读
相关标签
  

闽ICP备14008679号