赞
踩
SAC: single active consumer, 是指如果有多个实例,只允许其中一个实例消费,其他实例为空闲
实现消息顺序消费,操作:
@Data
@AllArgsConstructor
public class SeqMessage implements Serializable {
//消息id
private String requestNo;
//消息中顺序,1,2,3,4
private int order;
}
@Configuration public class OrderQueueConfiguration { public static final String EXCHANGE = "order-ex"; public static final String RK_PREFIX = "rk-"; public static final String ONE_QUEUE = "one-queue"; public static final String TWO_QUEUE = "two-queue"; public static final String THREE_QUEUE = "three-queue"; public static final String FOUR_QUEUE = "four-queue"; @Bean public DirectExchange exchange() { // 使用直连的模式 return new DirectExchange(EXCHANGE, true, false); } @Bean public Binding oneBinding() { return BindingBuilder.bind(oneQueue()).to(exchange()).with(RK_PREFIX + 1); } @Bean public Binding twoBinding() { return BindingBuilder.bind(twoQueue()).to(exchange()).with(RK_PREFIX + 2); } @Bean public Binding threeBinding() { return BindingBuilder.bind(threeQueue()).to(exchange()).with(RK_PREFIX + 3); } @Bean public Binding fourBinding() { return BindingBuilder.bind(fourQueue()).to(exchange()).with(RK_PREFIX + 3); } @Bean public Queue oneQueue() { return createSacQueue(ONE_QUEUE); } @Bean public Queue twoQueue() { return createSacQueue(TWO_QUEUE); } @Bean public Queue threeQueue() { return createSacQueue(THREE_QUEUE); } @Bean public Queue fourQueue() { return createSacQueue(FOUR_QUEUE); } private static Queue createSacQueue(String queueName) { Map<String, Object> arguments = new HashMap<>(2); arguments.put("x-single-active-consumer", true); return new Queue(queueName, true, false, false, arguments); } }
重要的是 x-single-active-consumer
这个属性, 只有一个实例生效
为每个队列创建一个监听消费者
@Slf4j @Component public class OrderListener { @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = EXCHANGE,declare = "false"), value = @Queue(value = ONE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 1)) public void onMessage1(Message message, @Headers Channel channel) { String messageStr = ""; try { messageStr = new String(message.getBody(), StandardCharsets.UTF_8); log.info("{} recv: {}", ONE_QUEUE, messageStr); } catch (Exception e) { log.error("######### OrderListener.onMessage: {}-{}", messageStr, e); } } @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = EXCHANGE,declare = "false"), value = @Queue(value = TWO_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 2)) public void onMessage2(Message message, @Headers Channel channel) { String messageStr = ""; try { messageStr = new String(message.getBody(), StandardCharsets.UTF_8); log.info("{} recv: {}", TWO_QUEUE, messageStr); } catch (Exception e) { log.error("######### OrderListener.onMessage: {}-{}", messageStr, e); } } @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = EXCHANGE,declare = "false"), value = @Queue(value = THREE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 3)) public void onMessage3(Message message, @Headers Channel channel) { String messageStr = ""; try { messageStr = new String(message.getBody(), StandardCharsets.UTF_8); log.info("{} recv: {}", THREE_QUEUE, messageStr); } catch (Exception e) { log.error("######### OrderListener.onMessage: {}-{}", messageStr, e); } } @RabbitListener(bindings = @QueueBinding( exchange = @Exchange(value = EXCHANGE,declare = "false"), value = @Queue(value = FOUR_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 4)) public void onMessage4(Message message, @Headers Channel channel) { String messageStr = ""; try { messageStr = new String(message.getBody(), StandardCharsets.UTF_8); log.info("{} recv: {}", FOUR_QUEUE, messageStr); } catch (Exception e) { log.error("######### OrderListener.onMessage: {}-{}", messageStr, e); } } }
@GetMapping("/send/seq/messqge") public String sendSeqMessage() throws JsonProcessingException { int cnt = 100; int mod = 4; int seqSize = 6; for (int i = 0; i < cnt; i++) { for (int j = 0; j < seqSize; j++) { int rk = i % mod + 1; SeqMessage seqMessage = new SeqMessage("seq-" + i, j); String s = objectMapper.writeValueAsString(seqMessage); log.info("routeKey: {}, send msg: {}", rk, s); rabbitTemplate.convertAndSend(EXCHANGE, RK_PREFIX + rk, s); } } return "success"; }
运行结果:
two-queue recv: {"requestNo":"seq-1","order":0} two-queue recv: {"requestNo":"seq-1","order":1} two-queue recv: {"requestNo":"seq-1","order":2} two-queue recv: {"requestNo":"seq-1","order":3} two-queue recv: {"requestNo":"seq-1","order":4} two-queue recv: {"requestNo":"seq-1","order":5} two-queue recv: {"requestNo":"seq-5","order":0} two-queue recv: {"requestNo":"seq-5","order":1} two-queue recv: {"requestNo":"seq-5","order":2} two-queue recv: {"requestNo":"seq-5","order":3} two-queue recv: {"requestNo":"seq-5","order":4} two-queue recv: {"requestNo":"seq-5","order":5} three-queue recv: {"requestNo":"seq-2","order":0} three-queue recv: {"requestNo":"seq-2","order":1} three-queue recv: {"requestNo":"seq-2","order":2} three-queue recv: {"requestNo":"seq-2","order":3} three-queue recv: {"requestNo":"seq-2","order":4} three-queue recv: {"requestNo":"seq-2","order":5} three-queue recv: {"requestNo":"seq-6","order":0} three-queue recv: {"requestNo":"seq-6","order":1} three-queue recv: {"requestNo":"seq-6","order":2} three-queue recv: {"requestNo":"seq-6","order":3} three-queue recv: {"requestNo":"seq-6","order":4} three-queue recv: {"requestNo":"seq-6","order":5}
可以发现,消息消费是顺序的
good luck!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。