当前位置:   article > 正文

Java代码操作RabbitMq_java 初始化rabbitmq

java 初始化rabbitmq
Java代码操作RabbitMq
使用Java代码创建交换机,队列以及绑定关系
/**
创建connection,根据connection创建channel,这样就形成了代码与RabbitMq的会话通道
*/
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost(主机Ip);

/**
创建连接后,要将连接关闭,因为Connection,Channel都实现了java.lang.AutoCloseable接口,所以我们可以使用
以下的写法,当我们使用完以后,连接会自动关闭
*/
try (Connection connection = connectionFactory.newConnection();
     Channel channel = connection.createChannel()) {
    
    /**
    创建交换机
    第一个参数  交换机的名称
    第二个参数  交换机的工作模式
    第三个参数  交换机是否被持久化
    第四个参数  交换机在没有绑定队列后是否自动删除
    第五个参数  一些参数
    */
    channel.exchangeDeclare(
            "exchange.order.restaurant",
            BuiltinExchangeType.DIRECT,
            true,
            false,
            null
    );

    /**
    创建队列
    第一个参数  队列的名称
    第二个参数  交换机是否被持久化
    第三个参数  ...
    第四个参数  交换机在没有绑定队列后是否自动删除
    第五个参数  一些参数
    */
    channel.queueDeclare(
            "queue.order",
            true,
            false,
            false,
            null
    );

    /**
    将交换机和队列进行绑定
    第一个参数  队列的名称
    第二个参数  交换机的名称
    第三个参数  绑定key的名称
    */
    channel.queueBind(
            "queue.order",
            "exchange.order.restaurant",
            "key.order"
    );
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
接收和发送消息

发送消息

/**
发送消息
第一个参数  要发送给交换机的名称
第二个参数  交换机和队列的绑定key
第三个参数  某些参数
第四个参数  要发送的消息,需要将消息转换为字节数组进行发送
*/
channel.basicPublish("exchange.order.deliveryman", "key.deliveryman", null, messageToSend.getBytes());
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

接收消息

/**
接收消息
第一个参数  要接收的队列的名称
第二个参数  是否开启自动确认消息接收
第三个参数  对接收到的参数进行处理
第四个参数  .....
*/
channel.basicConsume("queue.order", true, deliverCallback, consumerTag -> {
});

DeliverCallback deliverCallback = (consumerTag, message) -> {
        //消费数据
        String messageBody = new String(message.getBody());
    	//进行一些业务的处理
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

在使用RabbitMQ时,我们要保证RabbitMQ这个线程要一直存在,并且在程序运行时就要开始调用他,所以我们可以使用异步线程,以及@Autowired直接调用RabbitMQ的相关方法

//使用@Async开启异步线程
@Async
public void handleMessage() throws IOException, TimeoutException, InterruptedException {
	//RabbitMQ的处理逻辑
    
    //为保持一直在运行,我们可以直接进行休眠
    while (true) {
        Thread.sleep(100000);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

在使用异步线程后,我们需要构造一个异步线程的线程池

@Configuration
@EnableAsync
public class AsyncTaskConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        //设置核心线程数
        threadPoolTaskExecutor.setCorePoolSize(10);
        //设置最大线程数、
        threadPoolTaskExecutor.setMaxPoolSize(100);
        //线程池所使用的的缓冲队列
        threadPoolTaskExecutor.setQueueCapacity(10);
        //等待任务在关机时完成 -- 表明等待所有线程执行完
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        //等待时间 (默认为0,此时立即停止),并没等待xx秒后强制停止
        threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
        //线程名称前缀
        threadPoolTaskExecutor.setThreadNamePrefix("Rabbit-Async-");
        //初始化线程
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

然后在项目启动时,就将有关RabbitMQ的方法直接启动

@Configuration
public class RabbitConfig {

    @Autowired
    private OrderMessageService orderMessageService;

    @Autowired
    public void  startListenMessage() throws IOException, InterruptedException, TimeoutException {
        orderMessageService.handleMessage();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/384397
推荐阅读
相关标签
  

闽ICP备14008679号