赞
踩
利用配置文件配置的参数去构造框架对象注入容器,后续将对象注入到属性中去使用。
在导入依赖,配置完yml文件之后,系统启动就会自动注入一个rabbitmqTemplate。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.6.4</version>
</dependency>
<!--rabbitmq消息队列-->
<dependency>
<groupId>org.jeecgframework.boot</groupId>
<artifactId>sd-business-starter-rabbitmq</artifactId>
<version>1.0.0</version>
</dependency>
# 消息队列配置 rabbitmq: host: xx.xx.xx.xx #rabbitmq的连接地址 port: 5672 #rabbitmq的连接amqp端口号 virtual-host: / #rabbitmq的虚拟host username: xxxx #rabbitmq的用户名 password: xxxx #rabbitmq的密码 listener: simple: acknowledge-mode: manual #确认消息-手动确认 prefetch: 10 #消费者每次拉取数量 retry: enabled: true # 开启重试 max-attempts: 3 # 重试次数,默认为3次 initial-interval: 5000 #重试间隔时间(单位毫秒) max-interval: 1200000 #重试最大时间间隔(单位毫秒)、 multiplier: 2 #应用于前一重试间隔的乘法器 publisher-confirm-type: correlated #发布确认属性配置;新版本已弃用publisher-confirms template: mandatory: true #配合发布确认
@Configuration public class RabbitMqConfig { /** * 短信群发立即推送队列、延迟队列所绑定的交换机 **/ @Bean CustomExchange messageSendDelayPluginDirectExchange() { HashMap<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange("messageSend.updatePushRecordAndTask.direct", "x-delayed-message", true, false, args); // durable=true交换机持久化 } /** * 声明短信群发立即推送队列、延迟队列 **/ @Bean public Queue messageSendDelayPluginQueue() { // durable=true队列持久化 return new Queue("school.messageSend.updatePushRecordAndTask", true, false, false); } /** * 短信群发立即推送队列、延迟队列绑定交换机,并指定 routingKey,routingKey最好和queue一样 **/ @Bean public Binding messageSendDelayPluginBinding(Queue messageSendDelayPluginQueue, CustomExchange messageSendDelayPluginDirectExchange) { return BindingBuilder.bind(messageSendDelayPluginQueue) .to(messageSendDelayPluginDirectExchange) .with("school.messageSend.updatePushRecordAndTask") .noargs(); } }
rabbitTemplate.convertAndSend(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor, @Nullable CorrelationData correlationData,Envelope envelope)
参数名 | 解释 |
---|---|
exchange | 指定消息发送到的交换器的名称。根据配置的routingKey和交换器规则,消息可以被路由到一个或多个队列中 |
routingKey | 指定消息的路由键,路由键用于将消息从交换器路由到一个或多个队列。 |
message | 消息内容 |
messagePostProcessor | 一个用于在发送消息之前对消息进行进一步处理的回调接口。您可以使用消息后处理器来设置消息的一些属性,如消息的过期时间、消息的优先级等。(一般使用匿名方法来实现) |
correlationData | 可选参数,关联的数据对象,通常用于关联发送的消息和接收到的消息的回调。(不影响业务逻辑) |
Envelope | 可选参数,可以访问有关消息的元数据,例如交付标签(delivery tag,手动确认和拒绝消息使用)、交换器名称和路由键等) |
import org.jeecg.modules.admin.massSend.bo.UpdatePushRecordAndTaskBO; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MessageSendUpdatePushRecordAndTaskProducer { @Autowired RabbitTemplate rabbitTemplate; public void sendMessage(UpdatePushRecordAndTaskBO updatePushRecordAndTaskBO) { CorrelationData correlationData = new CorrelationData(); correlationData.setId(updatePushRecordAndTaskBO.getMassSendNo()); //给延时插件队列发送消息 rabbitTemplate.convertAndSend("messageSend.updatePushRecordAndTask.direct", "school.messageSend.updatePushRecordAndTask", updatePushRecordAndTaskBO, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //给消息设置延迟毫秒值,通过给消息设置x-delay头来设置消息从交换机发送到队列的延迟时间,值为0代表不延迟 message.getMessageProperties().setHeader("x-delay", updatePushRecordAndTaskBO.getDelayTime()); message.getMessageProperties().setCorrelationId(updatePushRecordAndTaskBO.getSendNo()); return message; } }, correlationData); } }
//必须要实现序列号接口
@Data
public class UpdatePushRecordAndTaskBO implements Serializable {
@ApiModelProperty(value = "群发编号")
private java.lang.String sendNo;
@ApiModelProperty(value = "延迟时间-定时发送使用")
private Long delayTime;
}
消费者handle方法的参数是自定义的,参数存在就会进行注入。
@RabbitListener(queues = “school.imBatchPush.push”, containerFactory = “rabbitListenerContainerFactory”)
参数 | 解释 |
---|---|
queues | 监听的队列 |
containerFactory | 指定用于创建消息监听容器的工厂 bean 的名称,这个可以直接使用默认的rabbitListenerContainerFactory |
handle(ImBatchPushBO imBatchPushBO, Channel channel, @Headers Map<String, Object> map,CorrelationData correlationData)
参数 | 解释 |
---|---|
ImBatchPushBO | 自定义消息体,用来接收生产者传递过来的参数 |
Channel | 可选参数,可以用来执行一些高级的RabbitMQ操作,如手动确认消息或拒绝消息。 |
@Headers Map<String, Object> | 可选参数,前面的@Headers表明要将消息标头注入到这个Map对象 |
CorrelationData | 可选参数,用于接收生产者存入的关联数据对象 |
@Header(“customHeader”) String customHeaderValue | 可选参数,用于从标头获取指定的参数;左边效果是取名为customHeader的参数注入customHeaderValue中 |
操作 | 解释 |
---|---|
channel.basicConsume(queueName, false, consumer); | 设置手动确认模式,第二个参数设置为false代表手动确认模式,开启手动模式最好在配置文件里面去配置,就不需要每个消费者都手动开启 |
channel.basicAck(deliveryTag, false); | 确认消息,第二个参数设置为false表示确认单个消息 |
channel.basicReject(deliveryTag, false); | 拒绝消息,第二个参数设置为false表示不重新入队列 |
channel.basicNack(deliveryTag, false, true); | 拒绝消息并重新入队列,第二个参数设置为false表示不批量拒绝 |
import cn.hutool.core.util.ObjectUtil; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.rabbitmq.client.Channel; import lombok.extern.slf4j.Slf4j; import org.jeecg.common.constant.CommonConstant; import org.jeecg.common.util.DateUtils; import org.jeecg.modules.admin.massSend.bo.UpdatePushRecordAndTaskBO; import org.jeecg.modules.admin.massSend.entity.SdSchoolMessageSendTask; import org.jeecg.modules.admin.massSend.entity.SdSchoolMessagePushRecord; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.Map; @Slf4j @Component @RabbitListener(queues = "school.messageSend.updatePushRecordAndTask", containerFactory = "rabbitListenerContainerFactory") public class MessageSendUpdatePushRecordAndTaskConsumer { @RabbitHandler public void handle(UpdatePushRecordAndTaskBO updatePushRecordAndTaskBO, Channel channel, @Headers Map<String, Object> map) throws Exception { log.info("短信群发更新状态开始: updateTaskBO={}", updatePushRecordAndTaskBO.toString()); if (map.get("error") != null) { System.out.println("错误的消息"); try { //否认消息,消息会被重新放回消息队列 channel.basicNack((Long) map.get(AmqpHeaders.DELIVERY_TAG), false, true); return; } catch (IOException e) { e.printStackTrace(); } } try { //具体的消费者业务逻辑 log.info("具体的消费业务逻辑执行了"); channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false); //确认消息 } catch (IOException e) { log.error("短信群发更新状态否认消息异常"); } } }
直接将生产者像其他类作为一个属性注入到类中,然后像普通的对象调用方法那样调用即可。
@Autowired
private MessageMassSendUpdatePushRecordAndTaskProducer messageMassSendUpdatePushRecordAndTaskProducer;
public void test(){
UpdatePushRecordAndTaskBO updatePushRecordAndTaskBO = new UpdatePushRecordAndTaskBO();
updatePushRecordAndTaskBO.setMassSendNo(massTask.getSendNo());
updatePushRecordAndTaskBO.setDelayTime(0L);
messageSendUpdatePushRecordAndTaskProducer.sendMessage(updatePushRecordAndTaskBO);
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。