赞
踩
/** 创建connection,根据connection创建channel,这样就形成了代码与RabbitMq的会话通道 */ ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(主机Ip); /** 创建连接后,要将连接关闭,因为Connection,Channel都实现了java.lang.AutoCloseable接口,所以我们可以使用 以下的写法,当我们使用完以后,连接会自动关闭 */ try (Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel()) { /** 创建交换机 第一个参数 交换机的名称 第二个参数 交换机的工作模式 第三个参数 交换机是否被持久化 第四个参数 交换机在没有绑定队列后是否自动删除 第五个参数 一些参数 */ channel.exchangeDeclare( "exchange.order.restaurant", BuiltinExchangeType.DIRECT, true, false, null ); /** 创建队列 第一个参数 队列的名称 第二个参数 交换机是否被持久化 第三个参数 ... 第四个参数 交换机在没有绑定队列后是否自动删除 第五个参数 一些参数 */ channel.queueDeclare( "queue.order", true, false, false, null ); /** 将交换机和队列进行绑定 第一个参数 队列的名称 第二个参数 交换机的名称 第三个参数 绑定key的名称 */ channel.queueBind( "queue.order", "exchange.order.restaurant", "key.order" ); }
发送消息
/**
发送消息
第一个参数 要发送给交换机的名称
第二个参数 交换机和队列的绑定key
第三个参数 某些参数
第四个参数 要发送的消息,需要将消息转换为字节数组进行发送
*/
channel.basicPublish("exchange.order.deliveryman", "key.deliveryman", null, messageToSend.getBytes());
接收消息
/**
接收消息
第一个参数 要接收的队列的名称
第二个参数 是否开启自动确认消息接收
第三个参数 对接收到的参数进行处理
第四个参数 .....
*/
channel.basicConsume("queue.order", true, deliverCallback, consumerTag -> {
});
DeliverCallback deliverCallback = (consumerTag, message) -> {
//消费数据
String messageBody = new String(message.getBody());
//进行一些业务的处理
}
在使用RabbitMQ时,我们要保证RabbitMQ这个线程要一直存在,并且在程序运行时就要开始调用他,所以我们可以使用异步线程,以及@Autowired直接调用RabbitMQ的相关方法
//使用@Async开启异步线程
@Async
public void handleMessage() throws IOException, TimeoutException, InterruptedException {
//RabbitMQ的处理逻辑
//为保持一直在运行,我们可以直接进行休眠
while (true) {
Thread.sleep(100000);
}
}
在使用异步线程后,我们需要构造一个异步线程的线程池
@Configuration @EnableAsync public class AsyncTaskConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); //设置核心线程数 threadPoolTaskExecutor.setCorePoolSize(10); //设置最大线程数、 threadPoolTaskExecutor.setMaxPoolSize(100); //线程池所使用的的缓冲队列 threadPoolTaskExecutor.setQueueCapacity(10); //等待任务在关机时完成 -- 表明等待所有线程执行完 threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true); //等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止 threadPoolTaskExecutor.setAwaitTerminationSeconds(60); //线程名称前缀 threadPoolTaskExecutor.setThreadNamePrefix("Rabbit-Async-"); //初始化线程 threadPoolTaskExecutor.initialize(); return threadPoolTaskExecutor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; } }
然后在项目启动时,就将有关RabbitMQ的方法直接启动
@Configuration
public class RabbitConfig {
@Autowired
private OrderMessageService orderMessageService;
@Autowired
public void startListenMessage() throws IOException, InterruptedException, TimeoutException {
orderMessageService.handleMessage();
}
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。