赞
踩
视频通话是同步通讯,只能跟一个人。同步调用,失效性强,但是拓展性差。
微信发消息是异步通讯,可以多线操作。异步调用,
- docker run \
- -e RABBITMQ_DEFAULT_USER=itheima \控制台行账户密码
- -e RABBITMQ_DEFAULT_PASS=123321 \
- -v mq-plugins:/plugins \数据卷挂载
- --name mq \
- --hostname mq \
- -p 15672:15672 \控制台端口
- -p 5672:5672 \收发消息端口
- --network hm-net\网络
- -d \
- rabbitmq:3.8-management
当登陆上MQ,我们创建两个队列,然后使用一个交换机对其进行绑定,这样我们从交换机发送消息,两个队列就可以收到。
新建一个用户hmall
为hmall用户创建一个virtual host
测试不同virtual host之间的数据隔离现象
在这个页面我们可以设置虚拟主机还有用户,需要注意的是,当我们不同的用户登录,对之前其他用户发送的消息,不能进行取出操作。
在这个页面进行添加虚拟主机即可。
我们使用Spring AMQP
在父工程中引入spring-amqp依赖,这样两个服务器都可以使用。
当我们引入依赖之后,自动就注入了RabbitTemplate
创建一个队列名字是work.querue
在publisher服务中定义测试方法,发送50条消息到work.queue
在consumer服务中定义两个消息监听器
还是上面代码,然后结果是,五十条消息,被均匀分配到两个消费者,就算,两个消费者运行速率不同,也是会被均匀分配。
所以我们有另一个解决方案
创建fanout.queue1还有fanout.queue2
创建交换机hmall.fanout
利用java实现
发送者
接收者
- @RabbitListener(queues = "fanout.queue1")
- public void listenFanoutQueue1(String mes) throws InterruptedException {
- System.err.println("消费者一接收到消息"+mes+ LocalTime.now());
- }
- @RabbitListener(queues = "fanout.queue2")
- public void listenFanoutQueue2(String mes) throws InterruptedException {
- System.err.println("消费者二接收到消息"+mes+ LocalTime.now());
- }
首先我们创建一个direct交换机,绑定两个队列,然后在程序中编写代码。
在绑定队列的时候,需要添加上指定字段。比如blue,red,
- @Test
- public void testDirectQueue(){
- //1.交换机名字
- String exchangName = "hmall.direct";
-
- //2.消息
- String message = "hello,blue";
- //3.发送消息
- rabbitTemplate.convertAndSend(exchangName,"blue", message);
-
- }
- @RabbitListener(queues = "direct.queue1")
- public void listenDirectQueue1(String mes) throws InterruptedException {
- System.err.println("消费者一接收到消息"+mes+ LocalTime.now());
- }
-
- @RabbitListener(queues = "direct.queue2")
- public void listenDirectQueue2(String mes) throws InterruptedException {
- System.err.println("消费者一接收到消息"+mes+ LocalTime.now());
- }
绑定的时候,加上#.什么 或者是什么.#就是起到一个通配符的作用。
- @Configuration
- public class FanoutConfiguration {
- @Bean
- public FanoutExchange fanoutExchange(){
- //return ExchangeBuilder.fanoutExchange("hmall.fanout").build();
- return new FanoutExchange("hmall.fanout");
- }
-
-
- @Bean
- public Queue fanoutQ1(){
- //return QueueBuilder.durable("fanout.queue1").build();
- return new Queue("fanout.queue1");
- }
-
- @Bean
- public Binding fanoutQueueBindind(Queue Fanoutqueue1, FanoutExchange fanoutExchange){
- return BindingBuilder.bind(Fanoutqueue1).to(fanoutExchange);
- }
-
- }
只要一启动就能自动创建。
就是假如我们声明的是一个direct交换机,但是我们需要绑定key的时候,是在binding.().to().with(key),但是我们每次只能绑定一个key,所以非常繁琐。
自动创建还有声明交换机,然后添加key。
@QueueBinding
@Queue
@Exchange
rabbitTemplate.convertAndSend(exchangName,"blue", message);
在我们之前发送消息的时候,我们传入的是String类型的对象,但是源码是Object对象,也就意味着任何对象我们都可以传输,但是我们传输对象首先传给Rabbit服务器,通过网络是字节流,也就是说,进行了一个转字节的转换,所以方法名字是converAndSend。
当我们发送消息到队列的时候,我们打开控制台看到是到的是一堆乱码,使用的是JDK自带的序列化。
但是安全,空间问题是个缺点。
- @Bean
- public MessageConverter messageConverter(){
- return new Jackson2JsonMessageConverter();
- }
这个可以添加在配置类或者启动类里面,只要编写好就可以自动进行装配。
可以自动转化。
因为需要统一发送请求,所以使用交换机。进行按需请求,使用direct交换机。
声明交换机一般在消费者里面进行。
所以pay作为发送者,trade作为消费者。
在父工程引入ampq的依赖
在appllication中配置Rabbit的地址与端口,用户密码
在common中,配置消息转换器。
- private final RabbitTemplate rabbitTemplate;
-
- @Override
- @Transactional
- public void tryPayOrderByBalance(PayOrderFormDTO payOrderFormDTO) {
- // 1.查询支付单
- PayOrder po = getById(payOrderFormDTO.getId());
- // 2.判断状态
- if(!PayStatus.WAIT_BUYER_PAY.equalsValue(po.getStatus())){
- // 订单不是未支付,状态异常
- throw new BizIllegalException("交易已支付或关闭!");
- }
- // 3.尝试扣减余额
- userClient.deductMoney(payOrderFormDTO.getPw(), po.getAmount());
- // 4.修改支付单状态
- boolean success = markPayOrderSuccess(payOrderFormDTO.getId(), LocalDateTime.now());
- if (!success) {
- throw new BizIllegalException("交易已支付或关闭!");
- }
- // TODO 5.修改订单状态
- //tradeClient.markOrderPaySuccess(po.getBizUserId());
- try {
- rabbitTemplate.convertAndSend("pay.direct", "pay.success",po.getBizOrderNo());
- }catch (Exception e){
- log.error("发送消息失败异常通知:{}",po.getBizOrderNo(),e);
- }
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。