赞
踩
前言:本文为原创 若有错误欢迎评论!
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring.rabbitmq.addresses=192.168.11.76:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
#配置生产者发送消息默认使用的交换机(一般一个微服务使用一个交换机)
spring.rabbitmq.template.exchange=yzx.test.exchange
#开启confirm机制的回调
spring.rabbitmq.publisher-confirms=true
#开启return机制的回调
spring.rabbitmq.publisher-returns=true
#开启发送消息的mandatory委托 可以接收return回调
spring.rabbitmq.template.mandatory=true
@RestController public class producer { @Autowired private RabbitTemplate rabbitTemplate; ConfirmCallback confirmCallback= new ConfirmCallback(){ @Override public void confirm(CorrelationData correlationData, boolean b, String s) { if(!b){ System.out.println("发送到交换机失败 原因:"+s); }else { System.out.println("发送到交换机成功 corr"+correlationData); } } }; ReturnCallback returnCallback= new ReturnCallback() { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { System.out.println("从交换机通过routingkey路由到队列失败 原因:"+replyText); } }; /* * 为了给消息增加属性所以用message包装 * 必须要设置correlationData 保证ack的唯一 */ @GetMapping("test_Message") public void sendMessage() throws JsonProcessingException { MessageProperties messageProperties=new MessageProperties(); messageProperties.setContentType("application/json"); messageProperties.setExpiration("10000");//设置过期时间(单位毫秒) 在队列中没处理过期会加入死信队列 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);//设置消息持久化 User user=new User(123,"test"); ObjectMapper objectMapper=new ObjectMapper(); byte[] jsonData=objectMapper.writeValueAsBytes(user); Message message=new Message(jsonData,messageProperties); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //消息的唯一标识:采用id + 时间戳 保证全局唯一 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()+new Date()); rabbitTemplate.convertAndSend("commen_exchange001","spring",message,correlationData); } /* * 推荐直接使用rabbitTemplate的converandSend 直接发送对象 */ @GetMapping("test_Object") public void sendObject() throws JsonProcessingException { User user=new User(123,"test"); rabbitTemplate.setConfirmCallback(confirmCallback); rabbitTemplate.setReturnCallback(returnCallback); //全局唯一 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()+new Date()); //配置文件指定了默认交换机 这里可以省略交换机 //直接发送对象 rabbitTemplate.convertAndSend("spring",user,correlationData); } }
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件
spring.rabbitmq.addresses=192.168.11.76:5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/ spring.rabbitmq.connection-timeout=15000 #设置消费端手动 ack spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.direct.acknowledge-mode=manual #消费者最小数量(即如果不够那一个消费端将充当多个消费者) spring.rabbitmq.listener.simple.concurrency=1 #消费之最大数量 spring.rabbitmq.listener.simple.max-concurrency=10 #在单个请求中处理的消息个数,(basicQos限流在springboot中不再生效) #单个消费者限流的数=(concurrency*prefetch)/消费者端总数 因为有最小消费者数量 spring.rabbitmq.listener.simple.prefetch=1
消费者绑定交换机和队列并接收消息(两种方式)
方式一(推荐):通过注解创建和绑定
在消费者的配置文件中写入信息 然后读取这些交换机、队列、路由的名字进行绑定
#队列基本信息 spring.rabbitmq.listener.order.queue.name=conmmen_quene spring.rabbitmq.listener.order.queue.durable=true #死信重定向交换机 spring.rabbitmq.listener.direct.exchange.name=direct_exchange spring.rabbitmq.listener.direct.routingkey.name=direct_routingkey #交换机基本信息 spring.rabbitmq.listener.order.exchange.name=conmmen_exchange spring.rabbitmq.listener.order.exchange.durable=true spring.rabbitmq.listener.order.exchange.type=topic #忽略重复声明的异常 spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true #队列在交换机路由的路径 spring.rabbitmq.listener.order.key=springboot.rabbit.*
使用:
@RabbitListner():配置绑定信息(可以注解类或者方法)
@RabbitHandler:注解方法 表示接收该队列消息消息(如果@RabbitListner注解在方法上 可省略该注解)
//要注入到spring才可以生效 @Component public class Listener{ @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}", durable="${spring.rabbitmq.listener.order.queue.durable}", arguments = { @Argument(name = "x-dead-letter-exchange",value = "${spring.rabbitmq.listener.direct.exchange.name}"), @Argument(name = "x-dead-letter-routing-key",value = "${spring.rabbitmq.listener.direct.routingkey.name}") }), exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}", durable="${spring.rabbitmq.listener.order.exchange.durable}", type= "${spring.rabbitmq.listener.order.exchange.type}", ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"), key = "${spring.rabbitmq.listener.order.key}" ) ) public void onOrderMessage(Message message, Channel channel){ /** * message.getBody():获得消息体的byte[]数组 ,然后再去解析 */ //手动ack channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); } @RabbitListener...//省略 /** * 直接在监听的方法参数里获得消息体 */ public void onOrderMessage(User user,Message message, Channel channel){ /** * 取到User之后 把user的唯一标识id 保存下来 用作避免消息重复消费 */ channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); } }
方式二:用@Bean注入
先建一个enum类 放交换机名、队列名、路由名(要注意对应 且不要一个单词太简单)
public enum QueueEnum { /** * 接收死信消息的重定向 */ MESSAGE_DIRECT_QUEUE("message.center.direct", "message.center.create", "message.center.create"), /** * 一般消息通知队列 */ MESSAGE_TTL_QUEUE("message.center.topic.test_ttl", "message.center.create.tset_ttl", "message.center.create.test_ttl"); /** * 交换名称 */ private String exchange; /** * 队列名称 */ private String queneName; /** * 路由键 */ private String routeKey; QueueEnum(String exchange, String queneName, String routeKey) { this.exchange = exchange; this.queneName = queneName; this.routeKey = routeKey; } }
在启动类的同级新建RabbitMqConfig
@Configuration public class RabbitMqConfiguration { /** * 创建死信重定向交换机 * * @return */ @Bean DirectExchange messageDirect() { return new DirectExchange(QueueEnum.MESSAGE_DIRECT_QUEUE.getExchange(),true,false); } /** * 创建普通消息交换机(测试ttl过期) * * @return */ @Bean DirectExchange messageTtl() { return new DirectExchange(QueueEnum.MESSAGE_TTL_QUEUE.getExchange(),true,false); } /** * 创建死信重定向的消费队列 * * @return */ @Bean public Queue messageDirectQueue() { return new Queue(QueueEnum.MESSAGE_DIRECT_QUEUE.getName()); } /** * 创建普通消息队列(测试ttl过期) * * @return */ @Bean Queue messageTtlQueue() { Map<String,Object> args=new HashMap<>(); // 设置该Queue的死信的重定向交换机 args.put("x-dead-letter-exchange", QueueEnum.MESSAGE_DIRECT_QUEUE.getExchange()); // 设置死信routingKey 必须设置routingkey args.put("x-dead-letter-routing-key", QueueEnum.MESSAGE_DIRECT_QUEUE.getRouteKey()); return new Queue(QueueEnum.MESSAGE_TTL_QUEUE.getName(),true,false,false,args); } /** * 交换机与队列绑定 * * @param messageDirect 通过bean注入上面配置的死信重定向交换机 * @param messageDirectQueue 通过bean注入上面配置的死信重定向队列 * @return */ @Bean Binding messageBinding(DirectExchange messageDirect, Queue messageDirectQueue) { return BindingBuilder .bind(messageDirectQueue) .to(messageDirect) .with(QueueEnum.MESSAGE_DIRECT_QUEUE.getRouteKey()); } /** * 交换机与队列绑定 * * @param messageTtlQueue 通过bean注入上面配置的普通交换机 * @param messageTtlDirect 通过bean注入上面配置的普通交换机 * @return */ @Bean public Binding messageTtlBinding(Queue messageTtlQueue, DirectExchange messageTtlDirect) { return BindingBuilder .bind(messageTtlQueue) .to(messageTtlDirect) .with(QueueEnum.MESSAGE_TTL_QUEUE.getRouteKey()); } }
使用:
先用【@RabbitListener(quene="(queneName)】注解类或者方法
再用【@RabbitHandler】注解方法 表示接收该队列消息消息
@RabbitListener(queues = QueueEnum.MESSAGE_DIRECT_QUEUE.getName())
@RabbitHandler
public void consumer(Message message, Channel channel){
//在消息的请求头获得deliveryTag
Long deliveryTag = message.getMessageProperties().getDeliveryTag();
//手工ACK
channel.basicAck(deliveryTag, false);
}
注意
Message:org.springframework.amqp.core.Message
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。