当前位置:   article > 正文

阿里rocketMq发送消息队列_com.aliyun.openservices maven

com.aliyun.openservices maven

1、maven依赖:

  1. <dependency>
  2. <groupId>com.aliyun.openservices</groupId>
  3. <artifactId>ons-client</artifactId>
  4. <version>1.7.9.Final</version>
  5. </dependency>

2、发送消息util:

  1. import com.aliyun.openservices.ons.api.*;
  2. import org.slf4j.Logger;
  3. import org.slf4j.LoggerFactory;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.core.env.Environment;
  6. import org.springframework.stereotype.Component;
  7. import javax.annotation.PostConstruct;
  8. import java.text.SimpleDateFormat;
  9. import java.util.Date;
  10. import java.util.Properties;
  11. /**
  12. * 发送阿里云队列消息 --- RabbitMq
  13. * @author zhangxb
  14. */
  15. @Component
  16. public class AliRabbitMqUtil {
  17. public static Logger logger = LoggerFactory.getLogger(AliRabbitMqUtil.class);
  18. public Properties properties = new Properties();
  19. @Autowired
  20. private Environment env;
  21. public static String AL_RABBITMQ_GROUP_ID;
  22. public static String AL_RABBITMQ_ACCESSKEY;
  23. public static String AL_RABBITMQ_SECRETKEY;
  24. public static String AL_RABBITMQ_SENDMSGTIMEOUTMILLIS;
  25. public static String AL_RABBITMQ_NAMESRV_ADDR;
  26. public static String AL_RABBITMQ_TOPIC;
  27. public static String AL_RABBITMQ_TAGS;
  28. @PostConstruct
  29. public void readConfig() {
  30. AL_RABBITMQ_GROUP_ID = env.getProperty("AL_RABBITMQ_GROUP_ID");
  31. AL_RABBITMQ_ACCESSKEY = env.getProperty("AL_RABBITMQ_ACCESSKEY");
  32. AL_RABBITMQ_SECRETKEY = env.getProperty("AL_RABBITMQ_SECRETKEY");
  33. AL_RABBITMQ_SENDMSGTIMEOUTMILLIS = env.getProperty("AL_RABBITMQ_SENDMSGTIMEOUTMILLIS");
  34. AL_RABBITMQ_NAMESRV_ADDR = env.getProperty("AL_RABBITMQ_NAMESRV_ADDR");
  35. AL_RABBITMQ_TOPIC = env.getProperty("AL_RABBITMQ_TOPIC");
  36. AL_RABBITMQ_TAGS = env.getProperty("AL_RABBITMQ_TAGS");
  37. }
  38. private AliRabbitMqUtil(){}
  39. private static AliRabbitMqUtil aliRabbitMqUtil=null;
  40. public static AliRabbitMqUtil getInstance(){
  41. if(aliRabbitMqUtil == null){
  42. aliRabbitMqUtil = new AliRabbitMqUtil();
  43. }
  44. return aliRabbitMqUtil;
  45. }
  46. public void sendRabbitMQMessage(String message,String setKey){
  47. properties.setProperty(PropertyKeyConst.GROUP_ID, AL_RABBITMQ_GROUP_ID);
  48. // AccessKey 阿里云身份验证,在阿里云服务器管理控制台创建
  49. properties.put(PropertyKeyConst.AccessKey, AL_RABBITMQ_ACCESSKEY);
  50. // SecretKey 阿里云身份验证,在阿里云服务器管理控制台创建
  51. properties.put(PropertyKeyConst.SecretKey, AL_RABBITMQ_SECRETKEY);
  52. //设置发送超时时间,单位毫秒
  53. properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, AL_RABBITMQ_SENDMSGTIMEOUTMILLIS);
  54. // 设置 TCP 接入域名,到控制台的实例基本信息中查看
  55. properties.put(PropertyKeyConst.NAMESRV_ADDR,AL_RABBITMQ_NAMESRV_ADDR);
  56. Producer producer = ONSFactory.createProducer(properties);
  57. // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
  58. producer.start();
  59. Message msg = new Message(
  60. // Message 所属的 Topic
  61. AL_RABBITMQ_TOPIC,
  62. // Message Tag 可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在 MQ 服务器过滤
  63. AL_RABBITMQ_TAGS,
  64. // Message Body 可以是任何二进制形式的数据, MQ 不做任何干预,
  65. // 需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
  66. message.getBytes());
  67. // 设置代表消息的业务关键属性,请尽可能全局唯一。
  68. // 以方便您在无法正常收到消息情况下,可通过阿里云服务器管理控制台查询消息并补发
  69. // 注意:不设置也不会影响消息正常收发
  70. msg.setKey("ORDERID_" + setKey);
  71. String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
  72. try {
  73. SendResult sendResult = producer.send(msg);
  74. // 同步发送消息,只要不抛异常就是成功
  75. if (sendResult != null) {
  76. logger.info(date + " Send mq message success. Topic is:" + msg.getTopic() + " OrderID is: " + setKey);
  77. }
  78. } catch (Exception e) {
  79. // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
  80. logger.error(new Date() + " Send mq message failed. Topic is:" + msg.getTopic());
  81. DingDingMessageUtil.sendTextMessage("异常时间:"+date + "\n 服务名:provider"+"\n MQ-阿里订单信息发送失败. \n Topic is:" + msg.getTopic() + "\n 订单号: " + setKey);
  82. e.printStackTrace();
  83. }
  84. // 在应用退出前,销毁 Producer 对象
  85. // 注意:如果不销毁也没有问题
  86. producer.shutdown();
  87. }
  88. }

3、调用工具类发送消息:

AliRabbitMqUtil.getInstance().sendRabbitMQMessage(messageo.toString(),orderid);

 

简单发送消息队列写法:

  1. import com.aliyun.openservices.ons.api.*;
  2. import com.ycb.socket.zk.AppSecretConfig;
  3. import java.util.Properties;
  4. public class RocketMqProducer {
  5. public static void sendQueueMsg(String groupId,String topicName,byte[] bytes,String tag){
  6. Properties properties = new Properties();
  7. // 您在控制台创建的 Group ID
  8. properties.put(PropertyKeyConst.GROUP_ID, groupId);
  9. // 鉴权用的 RAM 子账号的 AccessKeyId,由主账号创建,请向主账号获取
  10. properties.put(PropertyKeyConst.AccessKey,AppSecretConfig.get().ACCESSKEY);
  11. // 鉴权用的 RAM 子账号的 AccessKeySecret,由主账号创建,请向主账号获取
  12. properties.put(PropertyKeyConst.SecretKey, AppSecretConfig.get().SECRETKEY);
  13. // 设置 TCP 接入域名,进入控制台的实例管理页面,在页面上方选择实例后,在实例信息中的“获取接入点信息”区域查看
  14. properties.put(PropertyKeyConst.NAMESRV_ADDR,AppSecretConfig.get().NAMESRV_ADDR);
  15. Producer producer = ONSFactory.createProducer(properties);
  16. // 在发送消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
  17. producer.start();
  18. // 在控制台创建的 Topic,即该消息所属的 Topic 名称
  19. // Message Tag,可理解为 Gmail 中的标签,对消息进行再归类,方便 Consumer 指定过滤条件在消息队列 RocketMQ 服务器过滤
  20. // Message Body任何二进制形式的数据, 消息队列 RocketMQ 不做任何干预,
  21. Message msg = new Message(topicName,tag,bytes);
  22. SendResult sendResult = producer.send(msg);
  23. System.out.println("Send Message success. Message ID is: " + sendResult.getMessageId());
  24. producer.shutdown();
  25. }
  26. }

简单消费写法:

  1. package com.thinkgem.jeesite.modules.rocketmq;
  2. import java.util.Properties;
  3. import com.aliyun.openservices.ons.api.Action;
  4. import com.aliyun.openservices.ons.api.ConsumeContext;
  5. import com.aliyun.openservices.ons.api.Consumer;
  6. import com.aliyun.openservices.ons.api.Message;
  7. import com.aliyun.openservices.ons.api.MessageListener;
  8. import com.aliyun.openservices.ons.api.ONSFactory;
  9. import com.aliyun.openservices.ons.api.PropertyKeyConst;
  10. import com.thinkgem.jeesite.common.config.Global;
  11. public class RocketMqConsumer {
  12. public static void getQueueMsg(String groupId,String topicName,String tag){
  13. Properties properties = new Properties();
  14. // 您在控制台创建的 Group ID
  15. properties.put(PropertyKeyConst.GROUP_ID, groupId);
  16. // 鉴权用的 RAM 子账号的 AccessKeyId,由主账号创建,请向主账号获取
  17. properties.put(PropertyKeyConst.AccessKey,Global.getConfig("AccessKey"));
  18. // 鉴权用的 RAM 子账号的 AccessKeySecret,由主账号创建,请向主账号获取
  19. properties.put(PropertyKeyConst.SecretKey, Global.getConfig("SecretKey"));
  20. // 设置 TCP 接入域名,进入控制台的实例管理页面,在页面上方选择实例后,在实例信息中的“获取接入点信息”区域查看
  21. properties.put(PropertyKeyConst.NAMESRV_ADDR,Global.getConfig("NAMESRV_ADDR"));
  22. Consumer consumer = ONSFactory.createConsumer(properties);
  23. consumer.subscribe(topicName, tag, new MessageListener() {
  24. public Action consume(Message message, ConsumeContext context) {
  25. System.out.println("Receive: " + new String(message.getBody()));
  26. // Cat cat = (Cat) ByteArrayUtils.bytesToObject(message.getBody()).get();
  27. //
  28. // System.out.println("消费者===》"+cat.getName()+"---"+cat.getColor()+"---"+cat.getHight()+"---"+cat.getWeight());
  29. return Action.CommitMessage;
  30. }
  31. });
  32. consumer.start();
  33. System.out.println("Consumer Started");
  34. }
  35. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/386625
推荐阅读
相关标签
  

闽ICP备14008679号