当前位置:   article > 正文

RocketMQ—Producer(三)发送方式和消息类型_rocketmq异步发送消息

rocketmq异步发送消息

一:发送方式讲解

RocketMQ版提供三种方式来发送消息:同步(Sync)发送、异步(Async)发送和单向(Oneway)发送。 我们会介绍每种发送方式的原理、应用场景、代码差异,以及三种发送方式的对比。

1.1 同步发送

CommunicationMode#SYNC

原理:

同步发送是指发送者向MQ执行发送消息API时,同步等待,直到消息服务器返回发送结果 。

应用场景:

此种方式应用场景非常广泛,例如重要通知邮件、报名短信通知、营销短信系统等。

同步发送接口介绍:

  1. MQProducer#send
  2. // 同步-发送消息
  3. SendResult send(final Message msg)
  4. throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

备注:

同步发送是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式。

1.2 异步发送

CommunicationMode#ASYNC

原理:

发送者向MQ执行发送消息API时,指定消息发送成功后的回掉函数,然后调用消息发送API后,立即返回,消息发送者线程不阻塞,直到运行结束,消息发送成功或失败的回调任务在一个新的线程中执行 。

应用场景:

异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景,例如,您视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

1.2.1 异步发送接口介绍

  1. MQProducer#send
  2. //异步 发送消息, sendCallback参数是消息发送成功后的回调方法 。
  3. void send(final Message msg, final SendCallback sendCallback)
  4. throws MQClientException, RemotingException, InterruptedException;

1.2.2 异步相关核心属性构造器介绍

DefaultMQProducerImpl:异步相关核心属性构造器介绍

  1. //异步发送队列,默认长度:5w
  2. private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;
  3. //默认异步发送线程池
  4. private final ExecutorService defaultAsyncSenderExecutor;
  5. //可以自定义的-异步发送消息线程池
  6. private ExecutorService asyncSenderExecutor;
  7. //构造器
  8. public DefaultMQProducerImpl(final DefaultMQProducer defaultMQProducer, RPCHook rpcHook) {
  9. this.defaultMQProducer = defaultMQProducer;
  10. this.rpcHook = rpcHook;
  11. //有界队列,长度:5w
  12. this.asyncSenderThreadPoolQueue = new LinkedBlockingQueue<Runnable>(50000);
  13. this.defaultAsyncSenderExecutor = new ThreadPoolExecutor(
  14. Runtime.getRuntime().availableProcessors(),
  15. Runtime.getRuntime().availableProcessors(),
  16. 1000 * 60,
  17. TimeUnit.MILLISECONDS,
  18. this.asyncSenderThreadPoolQueue,
  19. new ThreadFactory() {
  20. private AtomicInteger threadIndex = new AtomicInteger(0);
  21. @Override
  22. public Thread newThread(Runnable r) {
  23. return new Thread(r, "AsyncSenderExecutor_" + this.threadIndex.incrementAndGet());
  24. }
  25. });
  26. }

谨记:

如未设置自定义线程池:asyncSenderExecutor

将会使用默认线程池:defaultAsyncSenderExecutor

默认线程池任务队列默认:5w.如队列任务超出5w,线程池拒绝策略默认为:拒绝策略,可能会有丢失消息发送的风险。

扩展:

异步发送netty网络发送模块使用了Semaphore,如遇性能调优或问题排查,别忘了!!

备注:

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。消息队列RocketMQ版的异步发送,需要您实现异步发送回调的以下接口:SendCallback消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息。发送方通过回调接口接收服务端响应,并处理响应结果。

1.3 单向发送

CommunicationMode#ONEWAY

原理:

消息发送者向 MQ 执行发送消息 API 时,直接返回,不等待消息服务器的结果, 也不注册回调函数,简单地说,就是只管发,不在乎消息是否成功存储在消息服务器上 。

应用场景:

适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。单向发送接口介绍:

  1. MQProducer#sendOneway
  2. //单向消息 发送,就是不在乎发送结果,消息发送出去后该方法立 即返回 。
  3. void sendOneway(final Message msg)
  4. throws MQClientException, RemotingException, InterruptedException;

备注:

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。

三种发送方式的对比

下表概括了三者的特点和主要区别:

二:消息类型讲解

目前开源版本RocketMq生产端支持发送的消息类型为:普通消息、批量消息、延时消息、事物消息(开源版本定时消息和顺序消息目前不支持,顺序消息可变向实现) 。

我们将简单介绍消息和使用这些消息类型的注意事项;我们先分析消息相关的类图关系:

分析:

  • Message作为消息的顶层对象,在生产端可表示各种消息;
  • MessageBatch表示批量消息;
  • MessageExt以及它的子类其实都是在Broker端存储或查询使用,后续可仔细分析哈;

Message 核心属性和方法分析:

  1. public class Message implements Serializable {
  2. private static final long serialVersionUID = 8445773977080406428L;
  3. //主题 topic
  4. private String topic;
  5. //消息-Flag一些特殊的消息标记,int类型。标记的含义定义在 MessageSysFlag 中
  6. private int flag;
  7. /**
  8. * 扩展属性
  9. * TAGS: 消息TAG,用于消息过滤
  10. * KEYS: Message 索引键, 多个用空格隔开, RocketMQ 可以根据这些 key 快速检索到消息 。
  11. * WAIT: 消息发送时是否等消息存储完成后再返回
  12. * DELAY: 消息延迟级别,用于定时消息或消息重试 。
  13. */
  14. private Map<String, String> properties;
  15. //消息体
  16. private byte[] body;
  17. // 事务Id
  18. private String transactionId;
  19. ...省略...
  20. // 消息延迟级别,用于定时消息或消息重试 。
  21. public void setDelayTimeLevel(int level) {
  22. this.putProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL, String.valueOf(level));
  23. }
  24. }

备注:延迟级别对应时间的是下面的常量:

MessageConst.PROPERTY_DELAY_TIME_LEVEL

  1. //private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  2. //DELAY: 消息延迟级别,用于定时消息或消息重试 。--properties属性
  3. public static final String PROPERTY_DELAY_TIME_LEVEL = "DELAY";

MessageBath 核心属性和方法分析:

  1. /**
  2. * 批量消息,实现了 Iterable 迭代接口
  3. * 批量消息发送是将 同一主题 的多条消息一起 打包发送到消息服务端,减少网络调用次数,提高网络传输效率 。
  4. * 当然,并不是在同一批次中发送的消息数量越多性能就越好,其判断依据是单条消息的长度,如果单条消息内容比较长, 则打包多条消息发送会影响其他 线程发送消息的响应时间 ,
  5. * 并且单批次消息发送总长度不能超过 DefaultMQProducer#maxMessageSize。批量消息 发送要解决的是 如何将这些消息 编码以便服务端能够正确解码出每条 消息的消息内容 。
  6. */
  7. public class MessageBatch extends Message implements Iterable<Message> {
  8. private static final long serialVersionUID = 621335151046335557L;
  9. private final List<Message> messages;
  10. private MessageBatch(List<Message> messages) {
  11. this.messages = messages;
  12. }
  13. /**
  14. * 消息编码
  15. * @return
  16. */
  17. public byte[] encode() {
  18. return MessageDecoder.encodeMessages(messages);
  19. }
  20. public Iterator<Message> iterator() {
  21. return messages.iterator();
  22. }
  23. /**
  24. * 消息转换, messages -> MessageBatch
  25. * 1>批量消息 不支持 延时消息
  26. * 2>消息主题 必须一致
  27. * 3>消息WAIT 必须一致
  28. * @param messages
  29. * @return
  30. */
  31. public static MessageBatch generateFromList(Collection<Message> messages) {
  32. assert messages != null;
  33. assert messages.size() > 0;
  34. List<Message> messageList = new ArrayList<Message>(messages.size());
  35. Message first = null;
  36. for (Message message : messages) {
  37. if (message.getDelayTimeLevel() > 0) { //批量消息 不支持 延时消息
  38. throw new UnsupportedOperationException("TimeDelayLevel in not supported for batching");
  39. }
  40. if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { //不支持重试topic(%RETRY%)
  41. throw new UnsupportedOperationException("Retry Group is not supported for batching");
  42. }
  43. if (first == null) {
  44. first = message;
  45. } else {
  46. if (!first.getTopic().equals(message.getTopic())) {
  47. throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
  48. }
  49. if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
  50. throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
  51. }
  52. }
  53. messageList.add(message);
  54. }
  55. MessageBatch messageBatch = new MessageBatch(messageList);
  56. messageBatch.setTopic(first.getTopic());
  57. messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
  58. return messageBatch;
  59. }
  60. }

小结:

  1. 普通消息: 消息队列RocketMQ版中无特性的消息,没有其他特性属性,就是普通的Message对象, 区别于有特性的定时和延时消息、顺序消息和事务消息;
  2. 延时消息: 生产者对指定消息进行延时投递,如果客户端发送延时消息Message中的properties属性必须包含DELAY属性key;
  3. 批量消息: 其实就是Message的集合,多了一些验证;
  4. 事务消息: 后续单独讲解。

三:结论

本文简单讲解了生产端消息发送方式的区别,开源版本消息类型的区别,知识点小但很重要,建议就是源码都懂了,那些高大上理论概念是不是很简单了?


程序员的核心竞争力其实还是技术,因此对技术还是要不断的学习,关注 “IT巅峰技术” 公众号 ,该公众号内容定位:中高级开发、架构师、中层管理人员等中高端岗位服务的,除了技术交流外还有很多架构思想和实战案例。

作者是 《 消息中间件 RocketMQ 技术内幕》 一书作者,同时也是 “RocketMQ上海社区”联合创始人,曾就职于拼多多、德邦等公司,现任上市快递公司架构负责人,主要负责开发框架的搭建、中间件相关技术的二次开发和运维管理、混合云及基础服务平台的建设。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/386591
推荐阅读
相关标签
  

闽ICP备14008679号