当前位置:   article > 正文

RabbitMQ(高性能的异步通讯组件)

RabbitMQ(高性能的异步通讯组件)

视频通话是同步通讯,只能跟一个人。同步调用,失效性强,但是拓展性差。

微信发消息是异步通讯,可以多线操作。异步调用,

安装部署

  1. docker run \
  2. -e RABBITMQ_DEFAULT_USER=itheima \控制台行账户密码
  3. -e RABBITMQ_DEFAULT_PASS=123321 \
  4. -v mq-plugins:/plugins \数据卷挂载
  5. --name mq \
  6. --hostname mq \
  7. -p 15672:15672 \控制台端口
  8. -p 5672:5672 \收发消息端口
  9. --network hm-net\网络
  10. -d \
  11. rabbitmq:3.8-management

快速入门

当登陆上MQ,我们创建两个队列,然后使用一个交换机对其进行绑定,这样我们从交换机发送消息,两个队列就可以收到。

数据隔离

新建一个用户hmall

为hmall用户创建一个virtual host

测试不同virtual host之间的数据隔离现象

在这个页面我们可以设置虚拟主机还有用户,需要注意的是,当我们不同的用户登录,对之前其他用户发送的消息,不能进行取出操作。

在这个页面进行添加虚拟主机即可。

JAVA客户端

我们使用Spring AMQP

第一步:首先在控制台里面创建一个队列,simple.queue

第二步

引入依赖

在父工程中引入spring-amqp依赖,这样两个服务器都可以使用。

配置MQ地址

发送消息

当我们引入依赖之后,自动就注入了RabbitTemplate

接收消息

WorkQueues

创建一个队列名字是work.querue

在publisher服务中定义测试方法,发送50条消息到work.queue

在consumer服务中定义两个消息监听器

还是上面代码,然后结果是,五十条消息,被均匀分配到两个消费者,就算,两个消费者运行速率不同,也是会被均匀分配。

所以我们有另一个解决方案

Fanout交换机:大喇叭广播

创建fanout.queue1还有fanout.queue2

创建交换机hmall.fanout

利用java实现

发送者

接收者

  1. @RabbitListener(queues = "fanout.queue1")
  2. public void listenFanoutQueue1(String mes) throws InterruptedException {
  3. System.err.println("消费者一接收到消息"+mes+ LocalTime.now());
  4. }
  5. @RabbitListener(queues = "fanout.queue2")
  6. public void listenFanoutQueue2(String mes) throws InterruptedException {
  7. System.err.println("消费者二接收到消息"+mes+ LocalTime.now());
  8. }

Direct交换机

首先我们创建一个direct交换机,绑定两个队列,然后在程序中编写代码。

在绑定队列的时候,需要添加上指定字段。比如blue,red,

发送端

  1. @Test
  2. public void testDirectQueue(){
  3. //1.交换机名字
  4. String exchangName = "hmall.direct";
  5. //2.消息
  6. String message = "hello,blue";
  7. //3.发送消息
  8. rabbitTemplate.convertAndSend(exchangName,"blue", message);
  9. }

接收端

  1. @RabbitListener(queues = "direct.queue1")
  2. public void listenDirectQueue1(String mes) throws InterruptedException {
  3. System.err.println("消费者一接收到消息"+mes+ LocalTime.now());
  4. }
  5. @RabbitListener(queues = "direct.queue2")
  6. public void listenDirectQueue2(String mes) throws InterruptedException {
  7. System.err.println("消费者一接收到消息"+mes+ LocalTime.now());
  8. }

topic交换机

绑定的时候,加上#.什么    或者是什么.#就是起到一个通配符的作用。

基于Bean声明队列交换机

  1. @Configuration
  2. public class FanoutConfiguration {
  3. @Bean
  4. public FanoutExchange fanoutExchange(){
  5. //return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
  6. return new FanoutExchange("hmall.fanout");
  7. }
  8. @Bean
  9. public Queue fanoutQ1(){
  10. //return QueueBuilder.durable("fanout.queue1").build();
  11. return new Queue("fanout.queue1");
  12. }
  13. @Bean
  14. public Binding fanoutQueueBindind(Queue Fanoutqueue1, FanoutExchange fanoutExchange){
  15. return BindingBuilder.bind(Fanoutqueue1).to(fanoutExchange);
  16. }
  17. }

只要一启动就能自动创建。

基于注解声明交换机队列

就是假如我们声明的是一个direct交换机,但是我们需要绑定key的时候,是在binding.().to().with(key),但是我们每次只能绑定一个key,所以非常繁琐。

自动创建还有声明交换机,然后添加key。

@QueueBinding

@Queue

@Exchange

消息转换器

rabbitTemplate.convertAndSend(exchangName,"blue", message);

在我们之前发送消息的时候,我们传入的是String类型的对象,但是源码是Object对象,也就意味着任何对象我们都可以传输,但是我们传输对象首先传给Rabbit服务器,通过网络是字节流,也就是说,进行了一个转字节的转换,所以方法名字是converAndSend。

当我们发送消息到队列的时候,我们打开控制台看到是到的是一堆乱码,使用的是JDK自带的序列化。

但是安全,空间问题是个缺点。

  1. @Bean
  2. public MessageConverter messageConverter(){
  3. return new Jackson2JsonMessageConverter();
  4. }

这个可以添加在配置类或者启动类里面,只要编写好就可以自动进行装配。

可以自动转化。

业务改造

因为需要统一发送请求,所以使用交换机。进行按需请求,使用direct交换机。

声明交换机一般在消费者里面进行。

所以pay作为发送者,trade作为消费者。

前提准备

在父工程引入ampq的依赖

在appllication中配置Rabbit的地址与端口,用户密码

在common中,配置消息转换器。

消费者代码

发布者代码

  1. private final RabbitTemplate rabbitTemplate;
  2. @Override
  3. @Transactional
  4. public void tryPayOrderByBalance(PayOrderFormDTO payOrderFormDTO) {
  5. // 1.查询支付单
  6. PayOrder po = getById(payOrderFormDTO.getId());
  7. // 2.判断状态
  8. if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){
  9. // 订单不是未支付,状态异常
  10. throw new BizIllegalException("交易已支付或关闭!");
  11. }
  12. // 3.尝试扣减余额
  13. userClient.deductMoney(payOrderFormDTO.getPw(), po.getAmount());
  14. // 4.修改支付单状态
  15. boolean success = markPayOrderSuccess(payOrderFormDTO.getId(), LocalDateTime.now());
  16. if (!success) {
  17. throw new BizIllegalException("交易已支付或关闭!");
  18. }
  19. // TODO 5.修改订单状态
  20. //tradeClient.markOrderPaySuccess(po.getBizUserId());
  21. try {
  22. rabbitTemplate.convertAndSend("pay.direct", "pay.success",po.getBizOrderNo());
  23. }catch (Exception e){
  24. log.error("发送消息失败异常通知:{}",po.getBizOrderNo(),e);
  25. }
  26. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/天景科技苑/article/detail/746990
推荐阅读
相关标签
  

闽ICP备14008679号