当前位置:   article > 正文

rocketmq顺序消息、延时消息、事务消息等类型_mq延时消息 事务消息

mq延时消息 事务消息

maven依赖

  1. <dependency>
  2. <groupId>org.apache.rocketmq</groupId>
  3. <artifactId>rocketmq-client</artifactId>
  4. <version>4.9.4</version>
  5. </dependency>

一、普通消息

1、生产者:

  1. // Initialize Consumer and set Consumer Goup Name
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("kingTestGroup");
  3. // Set the address of NameServer
  4. consumer.setNamesrvAddr("192.168.124.16:9876");
  5. // Subscribe One or more of topics,and specify the tag filtering conditions, here specify * means receive all tag messages
  6. consumer.subscribe("kingTopic", "*");
  7. // Register a callback interface to handle messages received from the Broker
  8. consumer.registerMessageListener(new MessageListenerConcurrently() {
  9. @Override
  10. public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
  11. for (MessageExt msg : msgs) {
  12. System.out.println(new String(msg.getBody()));
  13. }
  14. // System.out.printf("%s Receive New Messages: %n", msgs);
  15. // Return to the message consumption status, ConsumeConcurrentlyStatus.CONSUME_SUCCESS for successful consumption
  16. return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  17. }
  18. });
  19. // Start Consumer
  20. consumer.start();
  21. System.out.printf("Consumer Started.%n");

消费者:

  1. // Initialize a producer and set the Producer group name
  2. DefaultMQProducer producer = new DefaultMQProducer("kingTestGroup"); //(1)
  3. // Set the address of NameServer
  4. producer.setNamesrvAddr("192.168.124.16:9876"); //(2)
  5. // Start Producer
  6. producer.start();
  7. for (int i = 0; i < 10; i++) {
  8. // Create a message and set the topic, tag, body and so on. The tag can be understood as a label to categorize the message, and RocketMQ can filter the tag on the consumer side.
  9. Message msg = new Message("kingTopic" /* Topic */,
  10. "TagA" /* Tag */,
  11. ("Hello king" + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
  12. ); //(3)
  13. // Use the producer to send and wait for the result of sending synchronously
  14. SendResult sendResult = producer.send(msg); //(4)
  15. System.out.printf("%s%n", sendResult);
  16. }
  17. // Close the producer once it is no longer in use
  18. producer.shutdown();

二、顺序消息

通常会有这样的业务场景,一个订单的新建、扣钱、完成这三个消息必须要保证顺序消费,所以rocketmq提供了一种保证部分顺序的消息机制

生产者:

  1. DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
  2. producer.start();
  3. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  4. for (int i = 0; i < 100; i++) {
  5. int orderId = i % 10;
  6. Message msg =
  7. new Message("TopicTest", tags[i % tags.length], "KEY" + i,
  8. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  9. SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
  10. @Override
  11. public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
  12. Integer id = (Integer) arg;
  13. int index = id % mqs.size();
  14. return mqs.get(index);
  15. }
  16. }, orderId);
  17. System.out.printf("%s%n", sendResult);
  18. }
  19. producer.shutdown();
  20. } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
  21. e.printStackTrace();
  22. }

消费者:

  1. // Initialize Consumer and set Consumer Goup Name
  2. DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("kingTestGroup");
  3. // Set the address of NameServer
  4. consumer.setNamesrvAddr("192.168.124.16:9876");
  5. // Subscribe One or more of topics,and specify the tag filtering conditions, here specify * means receive all tag messages
  6. consumer.subscribe("kingTopicOrder", "*");
  7. // Register a callback interface to handle messages received from the Broker
  8. consumer.registerMessageListener(new MessageListenerOrderly() {
  9. @Override
  10. public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
  11. for (MessageExt msg : list) {
  12. System.out.println(new String(msg.getBody()));
  13. }
  14. return ConsumeOrderlyStatus.SUCCESS;
  15. }
  16. });
  17. // Start Consumer
  18. consumer.start();
  19. System.out.printf("Consumer Started.%n");

三、延时消息

顾名思义,消息会延时发送,下图是延时的级别丢应的秒数,rocketmq不能延时指定的秒数。

 生产者:

  1. DefaultMQProducer producer = new DefaultMQProducer("kingTestGroup"); //(1)
  2. // Set the address of NameServer
  3. producer.setNamesrvAddr("192.168.124.16:9876"); //(2)
  4. producer.start();
  5. int totalMessagesToSend = 100;
  6. for (int i = 0; i < totalMessagesToSend; i++) {
  7. Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
  8. // This message will be delivered to consumer 10 seconds later.
  9. message.setDelayTimeLevel(3);
  10. // Send the message
  11. producer.send(message);
  12. }
  13. // Shutdown producer after use.
  14. producer.shutdown();

四、批量发送消息

  1. DefaultMQProducer producer = new DefaultMQProducer("kingTestGroup"); //(1)
  2. // Set the address of NameServer
  3. producer.setNamesrvAddr("192.168.124.16:9876"); //(2)
  4. producer.start();
  5. //If you just send messages of no more than 1MiB at a time, it is easy to use batch
  6. //Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule support
  7. String topic = "BatchTest";
  8. List<Message> messages = new ArrayList<>();
  9. messages.add(new Message(topic, "Tag", "OrderID001", "Hello world 0".getBytes()));
  10. messages.add(new Message(topic, "Tag", "OrderID002", "Hello world 1".getBytes()));
  11. messages.add(new Message(topic, "Tag", "OrderID003", "Hello world 2".getBytes()));
  12. producer.send(messages);

五、事务消息

mq事务的作用就是保证本地事务与mq消息的一致性,换句话来说就是,保证本地事务执行完了,消息一定发送成功,本地事务失败了,发出去的消息要回滚,半消息的作用就是保证rocketmq是否正常。

  1. public static void main(String[] args) throws MQClientException, InterruptedException {
  2. TransactionListener transactionListener = new TransactionListenerImpl();
  3. TransactionMQProducer producer = new TransactionMQProducer("kingTestGroup");
  4. producer.setNamesrvAddr("192.168.124.16:9876"); //(2)
  5. ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
  6. @Override
  7. public Thread newThread(Runnable r) {
  8. Thread thread = new Thread(r);
  9. thread.setName("client-transaction-msg-check-thread");
  10. return thread;
  11. }
  12. });
  13. producer.setExecutorService(executorService);
  14. producer.setTransactionListener(transactionListener);
  15. producer.start();
  16. String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
  17. for (int i = 0; i < 10; i++) {
  18. try {
  19. Message msg =
  20. new Message("TopicTest", tags[i % tags.length], "KEY" + i,
  21. ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
  22. SendResult sendResult = producer.sendMessageInTransaction(msg, null);
  23. System.out.printf("%s%n", sendResult);
  24. Thread.sleep(10);
  25. } catch (MQClientException | UnsupportedEncodingException e) {
  26. e.printStackTrace();
  27. }
  28. }
  29. for (int i = 0; i < 100000; i++) {
  30. Thread.sleep(1000);
  31. }
  32. producer.shutdown();
  33. }
  34. static class TransactionListenerImpl implements TransactionListener {
  35. private AtomicInteger transactionIndex = new AtomicInteger(0);
  36. private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
  37. @Override
  38. public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
  39. int value = transactionIndex.getAndIncrement();
  40. int status = value % 3;
  41. localTrans.put(msg.getTransactionId(), status);
  42. return LocalTransactionState.UNKNOW;
  43. }
  44. @Override
  45. public LocalTransactionState checkLocalTransaction(MessageExt msg) {
  46. Integer status = localTrans.get(msg.getTransactionId());
  47. if (null != status) {
  48. switch (status) {
  49. case 0:
  50. return LocalTransactionState.UNKNOW;
  51. case 1:
  52. return LocalTransactionState.COMMIT_MESSAGE;
  53. case 2:
  54. return LocalTransactionState.ROLLBACK_MESSAGE;
  55. default:
  56. return LocalTransactionState.COMMIT_MESSAGE;
  57. }
  58. }
  59. return LocalTransactionState.COMMIT_MESSAGE;
  60. }
  61. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/神奇cpp/article/detail/1003121
推荐阅读
相关标签
  

闽ICP备14008679号