当前位置:   article > 正文

如何在rabbitmq中实现一个生产者,多个消费者,多个消费者都能收到同一条消息_rabbitmq 多个消费者消费相同的消息

rabbitmq 多个消费者消费相同的消息

如何在rabbitmq中实现一个生产者,多个消费者,多个消费者都能收到同一条消息

场景:用户登录,邀请其它用户进行视频会议,收到邀请的用户进入会议

rabbitmq实现思路:

选型:发布订阅模式(Publish/Subscribe)

一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。

这种情况下,我们有四种交换机可供选择,分别是:

  • Direct
  • Fanout
  • Topic
  • Header

由于消费者的数量不固定,所以要动态生成临时队列,无法指定routingkey因此选fanout模式

FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用

代码实现:
1.pom文件引入rabbitmq依赖

 <!-- rabbitMQ -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

2.配置文件

server:
  port: 9091
spring:
  application:
    name: rabbitmq
  # rabbitmq配置
  rabbitmq:
    host: 192.168.8.142
    port: 5672
    username: admin
    password: admin
    virtual-host: my_vhost
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

3.constant类

package com.anychat.rabbitmqtest.constant;

/**
 * @author Liby
 * @date 2022-05-05 10:02
 * @description:
 * @version:
 */

public class RabbitmqConstant {
    public  static final String  MEETING_FANOUT_EXCHANGE = "meeting_exchange";
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

4.用户实体类

package com.anychat.rabbitmqtest.entity;

/**
 * @author Liby
 * @date 2022-05-06 09:39
 * @description:
 * @version:
 */

public class User {
    private Integer userId;
    private String username;

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public User(Integer userId, String username) {
        this.userId = userId;
        this.username = username;
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

5.工具类

package com.anychat.rabbitmqtest.util;

import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;

/**
 * @author Liby
 * @date 2022-04-28 10:27
 * @description:
 * @version:
 */

public class RabbitmqUtil {
    @Autowired
    private static RabbitTemplate rabbitTemplate;

    public static Channel getChannel() {
        Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);
        return channel;

    }


}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

6.消费者类

package com.anychat.rabbitmqtest.consumer;

import cn.hutool.core.util.StrUtil;
import com.anychat.rabbitmqtest.constant.RabbitmqConstant;
import com.anychat.rabbitmqtest.entity.User;
import com.rabbitmq.client.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author Liby
 * @date 2022-04-25 11:18
 * @description:消费者,动态创建临时队列
 * @version:
 */
@Slf4j
@Component
public class FanoutConsumer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void createQueue(User user) {

        //创建信道
        Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true);

        try {
            //声明一个交换机与生产者相同

            channel.exchangeDeclare(RabbitmqConstant.MEETING_FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);
            //获取一个随机的队列名称,使用默认方式,产生的队列为临时队列,在没有消费者时将会自动删除
            String queueName = channel.queueDeclare().getQueue();
            //用户Id与队列名绑定
            ConcurrentHashMap<String, Integer> userQueueMap = new ConcurrentHashMap<>();
            userQueueMap.putIfAbsent(queueName, user.getUserId());
            //关联 exchange 和 queue ,因为是广播无需指定routekey,routingKey设置为空字符串
            // channel.queueBind(queue, exchange, routingKey)
            channel.queueBind(queueName, RabbitmqConstant.MEETING_FANOUT_EXCHANGE, "");
            DefaultConsumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    super.handleDelivery(consumerTag, envelope, properties, body);
                    //对信息进行操作
                    String message = new String(body, "UTF-8");
                    if (StrUtil.isNotBlank(message)) {
                        String[] receiveIds = message.split(",");
                        Integer userId = userQueueMap.get(queueName);
                        for (String id : receiveIds) {
                            if (userId.equals(Integer.valueOf(id))) {
                                log.info("用户{}收到入会邀请", id);
                            }

                        }

                    }

                }
            };
            //true 自动回复ack
            channel.basicConsume(queueName, true, consumer);
        } catch (Exception ex) {
        }
    }
}


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71

7.controller类

package com.anychat.rabbitmqtest.controller;

import com.anychat.rabbitmqtest.constant.RabbitmqConstant;
import com.anychat.rabbitmqtest.consumer.FanoutConsumer;
import com.anychat.rabbitmqtest.entity.User;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author Liby
 * @date 2022-04-24 16:34
 * @description:生产者
 * @version:
 */
@RestController
@Slf4j
@RequestMapping("/producer")
public class ProducerController {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private FanoutConsumer fanoutConsumer;
    /**
    * 模拟用户登录后,创建一个临时队列,与该用户绑定
    */
    @PostMapping("/login")
    public String login(){
        //模拟三个用户登录
        int userNum=3;

        for (int i = 0; i < userNum; i++) {
            //用户绑定临时队列,并监听队列
            fanoutConsumer.createQueue(new User(i, "用户" + i));
            log.info("用户{}登录成功",i);
        }
        return "用户登录成功";

    }

    @PostMapping("/meeting")
    public String meeting(){
        String message="1,2";
        log.info("邀请用户{}进入会议",message);
        //发送消息,要求userId为2和3的用户进入会议
        rabbitTemplate.convertAndSend(RabbitmqConstant.MEETING_FANOUT_EXCHANGE,"",message);
        return "发送成功";

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54

postman分别调用login和meeting两个接口
可以看到日志打印在这里插入图片描述

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/621008
推荐阅读
相关标签
  

闽ICP备14008679号