赞
踩
场景:用户登录,邀请其它用户进行视频会议,收到邀请的用户进入会议
rabbitmq实现思路:
选型:发布订阅模式(Publish/Subscribe)
一个生产者,多个消费者,每一个消费者都有自己的一个队列,生产者没有将消息直接发送到队列,而是发送到了交换机,每个队列绑定交换机,生产者发送的消息经过交换机,到达队列,实现一个消息被多个消费者获取的目的。
这种情况下,我们有四种交换机可供选择,分别是:
- Direct
- Fanout
- Topic
- Header
由于消费者的数量不固定,所以要动态生成临时队列,无法指定routingkey因此选fanout模式
FanoutExchange 的数据交换策略是把所有到达 FanoutExchange 的消息转发给所有与它绑定的 Queue 上,在这种策略中,routingkey 将不起任何作用
代码实现:
1.pom文件引入rabbitmq依赖
<!-- rabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.配置文件
server:
port: 9091
spring:
application:
name: rabbitmq
# rabbitmq配置
rabbitmq:
host: 192.168.8.142
port: 5672
username: admin
password: admin
virtual-host: my_vhost
3.constant类
package com.anychat.rabbitmqtest.constant;
/**
* @author Liby
* @date 2022-05-05 10:02
* @description:
* @version:
*/
public class RabbitmqConstant {
public static final String MEETING_FANOUT_EXCHANGE = "meeting_exchange";
}
4.用户实体类
package com.anychat.rabbitmqtest.entity; /** * @author Liby * @date 2022-05-06 09:39 * @description: * @version: */ public class User { private Integer userId; private String username; public Integer getUserId() { return userId; } public void setUserId(Integer userId) { this.userId = userId; } public String getUsername() { return username; } public void setUsername(String username) { this.username = username; } public User(Integer userId, String username) { this.userId = userId; this.username = username; } }
5.工具类
package com.anychat.rabbitmqtest.util; import com.rabbitmq.client.Channel; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; /** * @author Liby * @date 2022-04-28 10:27 * @description: * @version: */ public class RabbitmqUtil { @Autowired private static RabbitTemplate rabbitTemplate; public static Channel getChannel() { Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true); return channel; } }
6.消费者类
package com.anychat.rabbitmqtest.consumer; import cn.hutool.core.util.StrUtil; import com.anychat.rabbitmqtest.constant.RabbitmqConstant; import com.anychat.rabbitmqtest.entity.User; import com.rabbitmq.client.*; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.concurrent.ConcurrentHashMap; /** * @author Liby * @date 2022-04-25 11:18 * @description:消费者,动态创建临时队列 * @version: */ @Slf4j @Component public class FanoutConsumer { @Autowired private RabbitTemplate rabbitTemplate; public void createQueue(User user) { //创建信道 Channel channel = rabbitTemplate.getConnectionFactory().createConnection().createChannel(true); try { //声明一个交换机与生产者相同 channel.exchangeDeclare(RabbitmqConstant.MEETING_FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true); //获取一个随机的队列名称,使用默认方式,产生的队列为临时队列,在没有消费者时将会自动删除 String queueName = channel.queueDeclare().getQueue(); //用户Id与队列名绑定 ConcurrentHashMap<String, Integer> userQueueMap = new ConcurrentHashMap<>(); userQueueMap.putIfAbsent(queueName, user.getUserId()); //关联 exchange 和 queue ,因为是广播无需指定routekey,routingKey设置为空字符串 // channel.queueBind(queue, exchange, routingKey) channel.queueBind(queueName, RabbitmqConstant.MEETING_FANOUT_EXCHANGE, ""); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); //对信息进行操作 String message = new String(body, "UTF-8"); if (StrUtil.isNotBlank(message)) { String[] receiveIds = message.split(","); Integer userId = userQueueMap.get(queueName); for (String id : receiveIds) { if (userId.equals(Integer.valueOf(id))) { log.info("用户{}收到入会邀请", id); } } } } }; //true 自动回复ack channel.basicConsume(queueName, true, consumer); } catch (Exception ex) { } } }
7.controller类
package com.anychat.rabbitmqtest.controller; import com.anychat.rabbitmqtest.constant.RabbitmqConstant; import com.anychat.rabbitmqtest.consumer.FanoutConsumer; import com.anychat.rabbitmqtest.entity.User; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author Liby * @date 2022-04-24 16:34 * @description:生产者 * @version: */ @RestController @Slf4j @RequestMapping("/producer") public class ProducerController { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private FanoutConsumer fanoutConsumer; /** * 模拟用户登录后,创建一个临时队列,与该用户绑定 */ @PostMapping("/login") public String login(){ //模拟三个用户登录 int userNum=3; for (int i = 0; i < userNum; i++) { //用户绑定临时队列,并监听队列 fanoutConsumer.createQueue(new User(i, "用户" + i)); log.info("用户{}登录成功",i); } return "用户登录成功"; } @PostMapping("/meeting") public String meeting(){ String message="1,2"; log.info("邀请用户{}进入会议",message); //发送消息,要求userId为2和3的用户进入会议 rabbitTemplate.convertAndSend(RabbitmqConstant.MEETING_FANOUT_EXCHANGE,"",message); return "发送成功"; } }
postman分别调用login和meeting两个接口
可以看到日志打印
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。