赞
踩
1、先安装rocketmq,配置环境变量,这里就不写怎么安装了,cmd命令行进入bin目录,运行name-server和broker,分别用如下两个命令行
start mqnamesrv.cmd
start mqbroker.cmd -n localhost:9876
不要关闭命令行窗口,当然也可以用后台运行的方式运行这两个文件
2、创建springboot项目,下面是依赖包
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>com.alibaba.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>3.2.6</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <version>RELEASE</version> </dependency>
3、配置文件 application.properties
###producer #该应用是否启用生产者 rocketmq.producer.isOnOff=on #发送同一类消息的设置为同一个group,保证唯一,默认不需要设置,rocketmq会使用ip@pid(pid代表jvm名字)作为唯一标示 rocketmq.producer.groupName=hpGroup #mq的nameServer地址 rocketmq.producer.namesrvAddr=127.0.0.1:9876 #消息最大长度 默认1024*4(4M) rocketmq.producer.maxMessageSize=4096 #发送消息超时时间,默认3000 rocketmq.producer.sendMsgTimeout=3000 #发送消息失败重试次数,默认2 rocketmq.producer.retryTimesWhenSendFailed=2 ###consumer ##该应用是否启用消费者 rocketmq.consumer.isOnOff=on rocketmq.consumer.groupName=hpGroup #mq的nameServer地址 rocketmq.consumer.namesrvAddr=127.0.0.1:9876 #该消费者订阅的主题和tags("*"号表示订阅该主题下所有的tags),格式:topic~tag1||tag2||tag3;topic2~*; rocketmq.consumer.topics=rocketTopic~* rocketmq.consumer.consumeThreadMin=20 rocketmq.consumer.consumeThreadMax=64 #设置一次消费消息的条数,默认为1条 rocketmq.consumer.consumeMessageBatchMaxSize=1 rocket.group=rocketGroup rocket.topic=rocketTopic rocket.tag=rocketTag
注:要保证rocketmq.consumer.topics去除 ~ 之后的值和rocket.group的值一致,~只是分隔符,也可以选择其他分隔符。
4、
生产者配置类代码:
package com.hp.rocket.rocket; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ProducerConfig { private static final Logger LOG = LoggerFactory.getLogger(ProducerConfig.class) ; @Value("${rocketmq.producer.groupName}") private String groupName; @Value("${rocketmq.producer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.producer.maxMessageSize}") private Integer maxMessageSize ; @Value("${rocketmq.producer.sendMsgTimeout}") private Integer sendMsgTimeout; @Value("${rocketmq.producer.retryTimesWhenSendFailed}") private Integer retryTimesWhenSendFailed; @Bean public DefaultMQProducer getRocketMQProducer() { DefaultMQProducer producer; producer = new DefaultMQProducer(this.groupName); producer.setNamesrvAddr(this.namesrvAddr); //如果需要同一个jvm中不同的producer往不同的mq集群发送消息,需要设置不同的instanceName if(this.maxMessageSize!=null){ producer.setMaxMessageSize(this.maxMessageSize); } if(this.sendMsgTimeout!=null){ producer.setSendMsgTimeout(this.sendMsgTimeout); } //如果发送消息失败,设置重试次数,默认为2次 if(this.retryTimesWhenSendFailed!=null){ producer.setRetryTimesWhenSendFailed(this.retryTimesWhenSendFailed); } try { producer.start(); } catch (MQClientException e) { e.printStackTrace(); } return producer; } }
5、
消费者配置类代码
package com.hp.rocket.rocket; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.hp.rocket.common.CodeMsg; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.util.StringUtils; @Slf4j @SpringBootConfiguration public class ConsumerConfig { @Value("${rocketmq.consumer.namesrvAddr}") private String namesrvAddr; @Value("${rocketmq.consumer.groupName}") private String groupName; @Value("${rocketmq.consumer.consumeThreadMin}") private int consumeThreadMin; @Value("${rocketmq.consumer.consumeThreadMax}") private int consumeThreadMax; @Value("${rocketmq.consumer.topics}") private String topics; @Value("${rocketmq.consumer.consumeMessageBatchMaxSize}") private int consumeMessageBatchMaxSize; @Autowired private MQConsumeMsgListenerProcessor mqMessageListenerProcessor; @Bean public DefaultMQPushConsumer getRocketMQConsumer() throws Exception { if (StringUtils.isEmpty(groupName)){ throw new Exception(CodeMsg.SYSTEM_ERROR.getMsg()); } if (StringUtils.isEmpty(namesrvAddr)){ throw new Exception(CodeMsg.SYSTEM_ERROR.getMsg()); } if(StringUtils.isEmpty(topics)){ throw new Exception(CodeMsg.SYSTEM_ERROR.getMsg()); } DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(groupName); consumer.setNamesrvAddr(namesrvAddr); consumer.setConsumeThreadMin(consumeThreadMin); consumer.setConsumeThreadMax(consumeThreadMax); consumer.registerMessageListener(mqMessageListenerProcessor); /** * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 * 如果非第一次启动,那么按照上次消费的位置继续消费 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); /** * 设置消费模型,集群还是广播,默认为集群 */ //consumer.setMessageModel(MessageModel.CLUSTERING); /** * 设置一次消费消息的条数,默认为1条 */ consumer.setConsumeMessageBatchMaxSize(consumeMessageBatchMaxSize); try { /** * 设置该消费者订阅的主题和tag,如果是订阅该主题下的所有tag,则tag使用*;如果需要指定订阅该主题下的某些tag,则使用||分割,例如tag1||tag2||tag3 */ String[] topicTagsArr = topics.split(";"); for (String topicTags : topicTagsArr) { String[] topicTag = topicTags.split("~"); consumer.subscribe(topicTag[0],topicTag[1]); } consumer.start(); log.info("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr); }catch (MQClientException e){ log.error("consumer is start !!! groupName:{},topics:{},namesrvAddr:{}",groupName,topics,namesrvAddr,e); throw new Exception(e); } return consumer; } }
7、消费者的监听器代码
package com.hp.rocket.rocket; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.common.message.MessageExt; import com.hp.rocket.entity.MessageBack; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.BeanUtils; import org.springframework.stereotype.Component; import org.springframework.util.CollectionUtils; import java.util.List; @Slf4j @Component public class MQConsumeMsgListenerProcessor implements MessageListenerConcurrently { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { if(CollectionUtils.isEmpty(list)){ log.info("接受到的消息为空,不处理,直接返回成功"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } MessageExt messageExt = list.get(0); log.info("接受到的消息为:"+new String(messageExt.getBody())); log.info("接受到的消息为:"+messageExt.toString()); // MessageBack messageBack = new MessageBack(); // messageBack.setMsg(new String(messageExt.getBody())); // messageBack.setId(messageExt.getMsgId()); if(messageExt.getTopic().equals("你的Topic")){ if(messageExt.getTags().equals("你的Tag")){ //判断该消息是否重复消费 int reconsume = messageExt.getReconsumeTimes(); if(reconsume ==3){//消息已经重试了3次,如果不需要再次消费,则返回成功 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } //TODO 处理对应的业务逻辑 } } // 如果没有return success ,consumer会重新消费该消息,直到return success return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }
8、通用配置参数
package com.hp.rocket.service;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
@Service
public class ParamConfigService {
@Value("${rocket.group}")
public String rocketGroup ;
@Value("${rocket.topic}")
public String rocketTopic ;
@Value("${rocket.tag}")
public String rocketTag ;
}
9、service层
package com.hp.rocket.service;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.hp.rocket.entity.MessageBack;
import java.util.List;
public interface RocketMqService {
SendResult openAccountMsg(String msgInfo);
}
impl实现类
package com.hp.rocket.service; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import javax.annotation.Resource; @Service @Slf4j public class RocketMqServiceImpl implements RocketMqService{ @Resource private DefaultMQProducer defaultMQProducer; @Resource private ParamConfigService paramConfigService ; @Override public SendResult openAccountMsg(String msgInfo) { // 可以不使用Config中的Group defaultMQProducer.setProducerGroup(paramConfigService.rocketGroup); log.info("开始发送消息:"+msgInfo); SendResult sendResult = null; try { Message sendMsg = new Message(paramConfigService.rocketTopic, paramConfigService.rocketTag, "open_account_key", msgInfo.getBytes()); sendResult = defaultMQProducer.send(sendMsg); log.info("消息发送响应信息:"+sendResult.toString()); } catch (Exception e) { e.printStackTrace(); } return sendResult ; } }
10、controller层
方便浏览器测试,用的get方法
package com.hp.rocket.controller; import com.alibaba.fastjson.JSONObject; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.SendResult; import com.hp.rocket.common.HPResponse; import com.hp.rocket.service.RocketMqService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.*; @RequestMapping("rocket") @RestController public class RocketController { @Autowired RocketMqService rocketMqService; @GetMapping(value = "getResult/{msg}") public HPResponse<SendResult> getResult(@PathVariable("msg") String msg){ if(StringUtils.isEmpty(msg)){ return HPResponse.error(CodeMsg.SYSTEM_ERROR.fillArgs("参数不能为空")); } SendResult result = rocketMqService.openAccountMsg(msg); return HPResponse.success(result); } }
11、
response返回类也贴上来,方便新手直接拿来用
package com.hp.rocket.common; public class HPResponse<T> { private int code; private String msg; private T data; public static <T> HPResponse<T> success(T data){ return new HPResponse<T>(data); } public static <T> HPResponse<T> success(){ return new HPResponse<T>(); } public static <T> HPResponse<T> error(CodeMsg codeMsg){ return new HPResponse<T>(codeMsg); } private HPResponse(T data) { this.code = 200; this.msg = "success"; this.data = data; } private HPResponse() { this.code = 200; this.msg = "success"; } private HPResponse(CodeMsg codeMsg) { if(codeMsg == null) { return; } this.code = codeMsg.getCode(); this.msg = codeMsg.getMsg(); } public int getCode() { return code; } public String getMsg() { return msg; } public T getData() { return data; } }
12、自定义返回错误码工具类
package com.hp.rocket.common; import lombok.Getter; @Getter public class CodeMsg { private int code; private String msg; public static CodeMsg SERVER_ERROR = new CodeMsg(50000, "服务端异常"); public static CodeMsg SYSTEM_ERROR = new CodeMsg(40000, "%s"); private CodeMsg(int code, String msg) { this.code = code; this.msg = msg; } public CodeMsg fillArgs(Object ... args){ String message = String.format(this.msg,args); return new CodeMsg(this.code,message); } }
启动类我就不贴了,springboot项目自带
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。