赞
踩
basicQos预取方法参数解析
参数:
实际中prefetchSize和global几乎不使用,rabbitmq也没有去实现其操作,不考虑。
basicConsumer消费方法参数解析
参数:
Consumer.java 消费者类
package com.lmc.mq.nospring; import com.rabbitmq.client.*; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; /** * @author lmc * @Description: TODO * @Create 2021-09-07 22:06 * @version: 1.0 */ public class Consumer { private final static String QUEUE_NAME = "lmc-test"; //队列名称 public static void main(String[] args) { initModule(); } public static void initModule() { //创建连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("xx.xx.xx.xx"); //设置rabbitmq-server的地址 connectionFactory.setPort(5672); //使用的端口号 connectionFactory.setVirtualHost("/"); //使用的虚拟主机 connectionFactory.setUsername("guest"); connectionFactory.setPassword("guest"); //由连接工厂创建连接 Connection connection = null; try { connection = connectionFactory.newConnection(); //通过连接创建信道 final Channel channel = connection.createChannel(); channel.basicQos(0, 3, true); //创建消费者,指定要使用的channel。QueueingConsume类已经弃用,使用DefaultConsumer代替 DefaultConsumer consumer = new DefaultConsumer(channel) { //监听的queue中有消息进来时,会自动调用此方法来处理消息。但此方法默认是空的,需要重写 @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { MqMessageDispatcher.doDispatch(new String(body, "UTF-8"), channel, envelope); } }; //监听指定的queue。会一直监听。 //参数:要监听的queue、是否自动确认消息、使用的Consumer channel.basicConsume(QUEUE_NAME, false, consumer); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
MqMessageDispatcher.java 多线程类:同时并发处理多个消息
package com.lmc.mq.nospring; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Envelope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author lmc * @Description: TODO * @Create 2021-09-07 22:45 * @version: 1.0 */ public class MqMessageDispatcher { public static Logger logger = LoggerFactory.getLogger(MqMessageDispatcher.class); public static ExecutorService msgHandleService = Executors.newFixedThreadPool(5); static { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { msgHandleService.shutdown(); } }); } public static void doDispatch(String message, Channel channel, Envelope envelope) { msgHandleService.execute(new MessageHandleTask(message, channel, envelope)); } private static class MessageHandleTask implements Runnable { String message; Channel channel; Envelope envelope; public MessageHandleTask(String message, Channel channel, Envelope envelope) { this.message = message; this.channel = channel; this.envelope = envelope; } @Override public void run() { long start = System.currentTimeMillis(); logger.info("Received message: " + message); try { Thread.sleep(5000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { // 手动确认消息,若自动确认则不需要写以下该行 channel.basicAck(envelope.getDeliveryTag(), false); } catch (IOException e) { System.err.println("fail to confirm message:" + message); } } } }
MqMessageDispatcher.java
package com.lmc.mq.nospring; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Envelope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /** * @author lmc * @Description: TODO * @Create 2021-09-07 22:45 * @version: 1.0 */ public class MqMessageDispatcher { public static final Logger logger = LoggerFactory.getLogger(MqMessageDispatcher.class); public static ExecutorService msgHandleService = Executors.newFixedThreadPool(5); public static Map<String, Integer> cacheMap = new HashMap(5); static { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { msgHandleService.shutdown(); } }); } public static void doDispatch(String message, Channel channel, Envelope envelope) { msgHandleService.execute(new MessageHandleTask(message, channel, envelope)); } private static class MessageHandleTask implements Runnable { String message; Channel channel; Envelope envelope; public MessageHandleTask(String message, Channel channel, Envelope envelope) { this.message = message; this.channel = channel; this.envelope = envelope; } @Override public void run() { int currentTimes = 0; // 当前重试次数 boolean isSuccess = false; // 消息是否处理成功 // 获取当前消息重试次数,(这种情况适合每条消息内容不一样,最好每条消息都有唯一标识) if (cacheMap.containsKey(message)) { currentTimes = cacheMap.get(message); }else { cacheMap.put(message, 0); } long start = System.currentTimeMillis(); logger.info("Received message: " + message); try { Thread.sleep(5000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } try { if (isSuccess) { // 手动确认消息 logger.info("message[" + message + "] consumer success.(Ack)"); cacheMap.put(message, 0); channel.basicAck(envelope.getDeliveryTag(), false); }else { if (currentTimes >= 5) { // 手动确认消息,若自动确认则不需要写以下该行 logger.warn("message[" + message + "] consumer fail,have retry 5 times.(Ack)"); cacheMap.put(message, 0); channel.basicAck(envelope.getDeliveryTag(), false); }else { // 处理失败,重试未5次,重新处理 cacheMap.put(message, ++currentTimes); logger.warn("message[" + message + "] consumer fail,prepare to retry " + currentTimes + " times...(Nack)"); channel.basicNack(envelope.getDeliveryTag(), false, true); } } } catch (IOException e) { System.err.println("fail to confirm message:" + message); } } } }
使用springboot同时处理多个消息,只需要在配置文件中,添加以下配置:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 开启手动确认
concurrency: 1 #消费者最小数量
max-concurrency: 3 #消费之最大数量
prefetch: 3 #在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
监听类 LmcTestConsumer
:
package com.lmc.mq.spring.consumer; import com.rabbitmq.client.Channel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.stereotype.Component; import java.io.IOException; /** * @author lmc * @Description: TODO * @Create 2021-09-18 19:32 * @version: 1.0 */ @Component public class LmcTestConsumer { public static final Logger logger = LoggerFactory.getLogger(LmcTestConsumer.class); @RabbitHandler @RabbitListener(queues = "lmc-test") public void handler(@Payload Message message, Channel channel) { try { String msg = new String(message.getBody(), "UTF-8"); MqMessageDispatcher.doDispatch(msg, channel, message.getMessageProperties().getDeliveryTag()); } catch (IOException e) { logger.error(e.getMessage()); } catch (NullPointerException e1) { logger.error(e1.getMessage()); } catch (Exception e) { logger.error(e.getMessage()); } } }
具体代码可见Gitee仓库: https://gitee.com/lmchh/lmc-tools/tree/master/tools-message-queue
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。