赞
踩
报上面的错误,是因为producer.start();这个没有启动或是启动的地方报错。一般情况下是这样的。
我的坑
我是将rocketmq作为一个工具类注解,以达到通用性,可以在别的项目上拿到有用的信息,然后通过rocketmq发送到消费者。
我的工具类在本地项目上是可以使用的。但是在测试环境测试,第一次发送信息也是可以的,连着多次就是不行的。报上面 的错。
或是等很长时间后再发送信息也是可以的,连着也是不行的。
经过分析,发现是我的生产者是同步发送信息的,需要返回信息,如果拿不到发送第二次就会报错。
因此我就使用异步发送的方法来发送信息。
再次测试成功。
下面我将三种生产者发送消息的方式写出来。
异步发送
上面的还有,省略 producer.start(); for(int i = 0 ; i <5; i ++) { // 1. 创建消息 Message message = new Message("test_quick_topic", // 主题 "TagA", // 标签 "key" + i, // 用户自定义的key ,唯一的标识 ("Hello RocketMQ" + i).getBytes()); // 消息内容实体(byte[]) // 2.2 异步发送消息 producer.send(message, new SendCallback() { //rabbitmq急速入门的实战: 可靠性消息投递 @Override public void onSuccess(SendResult sendResult) { System.err.println("msgId: " + sendResult.getMsgId() + ", status: " + sendResult.getSendStatus()); } @Override public void onException(Throwable e) { e.printStackTrace(); System.err.println("------发送失败"); } });
1. 可靠同步发送
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。
public class SyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("example_group_name"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" , "TagA" , ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. SendResult sendResult = producer.send(msg); System.out.printf("%s%n", sendResult); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
2. 可靠异步发送
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。 MQ 的异步发送,需要用户实现异步发送回调接口(SendCallback)。消息发送方在发送了一条消息后,不需要等待服务器响应即可返回,进行第二条消息发送。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
public class AsyncProducer { public static void main(String[] args) throws Exception { //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("example_group_name"); //Launch the instance. producer.start(); producer.setRetryTimesWhenSendAsyncFailed(0); for (int i = 0; i < 100; i++) { final int index = i; //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest", "TagA", "OrderID188", "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET)); producer.send(msg, new SendCallback() { public void onSuccess(SendResult sendResult) { System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId()); } public void onException(Throwable e) { System.out.printf("%-10d Exception %s %n", index, e); e.printStackTrace(); } }); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
3. 单向(Oneway)发送
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。 此方式发送消息的过程耗时非常短,一般在微秒级别。
public class OnewayProducer { public static void main(String[] args) throws Exception{ //Instantiate with a producer group name. DefaultMQProducer producer = new DefaultMQProducer("example_group_name"); //Launch the instance. producer.start(); for (int i = 0; i < 100; i++) { //Create a message instance, specifying topic, tag and message body. Message msg = new Message("TopicTest" /* Topic */, "TagA" /* Tag */, ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); //Call send message to deliver message to one of brokers. producer.sendOneway(msg); } //Shut down once the producer instance is not longer in use. producer.shutdown(); } }
4.比较特点
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4Ibqy4ob-1574753021007)
参考:https://www.jianshu.com/p/42330afbe53a
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。