赞
踩
- server:
- port: 8021
- spring:
- #给项目来个名字
- application:
- name: rabbitmq-test
- #配置rabbitMq 服务器
- rabbitmq:
- host: 127.0.0.1
- port: 5672
- username: need
- password: 123456
- #虚拟host 可以不设置,使用server默认host
- virtual-host: /testhost
-
- #ack 确认方式
- listener:
- simple:
- acknowledge-mode: manual
- direct:
- acknowledge-mode: manual
- import com.rabbitmq.client.Channel;
- import org.springframework.amqp.core.Message;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.stereotype.Component;
- import java.io.IOException;
-
- @Component
- public class DirectReceiver_1 {
-
- @RabbitListener(queues = "TestDirectQueue")//监听的队列名称 TestDirectQueue
- public void process(Message message, Channel channel) throws IOException {
-
- long deliveryTag = message.getMessageProperties().getDeliveryTag();
- //或
- //@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag
-
- try {
- String msgbody = new String(message.getBody());
- //1.接收转换消息
- System.out.println("DirectReceiver消费者 1 收到消息 : " +msgbody+" 编号: "+deliveryTag);
-
- //2. 处理业务逻辑
- System.out.println("处理业务逻辑...");
- //模拟出现错误
- System.out.println(500/Double.valueOf(msgbody));
- //3. 手动签收
- channel.basicAck(deliveryTag,true);
- } catch (Exception e) {
- try {
- Thread.sleep(3000);
- } catch (InterruptedException interruptedException) {
- interruptedException.printStackTrace();
- }
- //e.printStackTrace();
-
- //4.拒绝签收
- /*
- 第三个参数:requeue:重回队列。如果设置为true,则消息重新回到queue,broker会重新发送该消息给消费端
- */
- channel.basicNack(deliveryTag,true,true);
- //channel.basicReject(deliveryTag,true);
- }
- }
- }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。