当前位置:   article > 正文

学习笔记——rabbitMq_rabbittemplate.convertandsend什么时候执行一次

rabbittemplate.convertandsend什么时候执行一次

MQ相关概念

什么是mq

message queue,从字面看,本质是队列。先入先出,只是存放的内容是消息。还是一种跨进程的通信机制,用于上游和下游传递消息。在互联网架构中,mq是一种常见的上下游“逻辑解耦+物理解耦”的消息通信服务。“消息队列”是在消息的传输过程中保存消息的容器。它是典型的:生产者、消费者模型。生产者不断向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接收,没有业务逻辑的侵入,这样就实现了生产者和消费者的解耦。

应用场景

1、任务异步处理:

高并发环境下,由于来不及同步处理,请求往往会发生堵塞,比如说,大量的insert,update之类的请求同时到达MySQL,直接导致无数的行锁表锁,甚至最后请求会堆积过多,从而触发too many connections错误。通过使用消息队列,我们可以异步处理请求,从而缓解系统的压力。将不需要同步处理的并且耗时长的操作由消息队列通知消息接收方进行异步处理。减少了应用程序的响应时间。(同步:一个请求结束才能进行下一个,异步:请求进行中可以进行其他操作)例子:传统:下单请求库存操作->将订单信息保存到订单表->下单成功的响应,处理信息耗时长用户等待久。下单库存操作释放锁后直接返回下单成功,将订单信息放在队列中分发给保存订单信息模块,主流程越过了耗时长的,让其独自一边进行。

2、应用程序解耦合:

MQ相当于一个中介,生产方通过MQ与消费方交互,它将应用程序进行解耦合。防止一个模块出问题系统崩溃。不同模块对应不同的队列,比如一个message要进行短信,email,weixin多种方式分发,不需要将这些方法写在一起共用message了。而是作为消费者分别对应各自的队列,message发放给这些队列。

3.流量消峰

当并发过高时,超出处理上限。消息队列作为缓存将消息放在队列里,过段时间再处理

常见MQ产品
  • ActiveMQ:基于JMS
  • RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好
  • RocketMQ:基于JMS,阿里巴巴产品,目前交由Apache基金会
  • Kafka:分布式消息系统,高吞吐量
基本概念

生产者->mq(交换机,队列)->消费者

生产者(Producer):发送消息的应用。

消费者(Consumer):接收消息的应用。

队列(Queue):存储消息的缓存。

消息(Message):由生产者通过RabbitMQ发送给消费者的信息。

连接(Connection):连接RabbitMQ和应用服务器的TCP连接。

通道(Channel):连接里的一个虚拟通道。当你通过消息队列发送或者接收消息时,这个操作都是通过通道进行的。

交换机(Exchange):交换机负责从生产者那里接收消息,并根据交换类型分发到对应的消息列队里。要实现消息的接收,一个队列必须到绑定一个交换机。

绑定(Binding):绑定是队列和交换机的一个关联连接。

路由键(Routing Key):路由键是供交换机查看并根据键来决定如何分发消息到列队的一个键。路由键可以说是消息的目的地址。

RabbitMQ消息发送模式

RabbitMQ包括五种队列模式,简单队列、工作队列、发布/订阅、路由、主题、rpc等。

安装

centos系统

(90条消息) RabbitMQ入门 (一) linux下安装启动_zl_momomo的博客-CSDN博客_linux启动rabbitmq

systemctl start rabbitmq-server
#或者
rabbitmq-server -detached #以后台守护进程方式启动
其他相关操作

#启动服务
systemctl start rabbitmq-server.service

#设置开机启动
systemctl enable rabbitmq-server.service

#停止开机启动
systemctl disable rabbitmq-server.service

#重新启动服务
systemctl restart rabbitmq-server.service

#查看服务当前状态
systemctl status rabbitmq-server.service

#查看所有已启动服务
systemctl list-units --type=service

systemctl stop rabbitmq-server.service

#关闭服务

/位置/service rabbitmq-server stop

#安装插件

rabbitmq-plugins enable rabbitmq_management

进入web插件

http://服务器ip:15672

rabbitmqctl add_user admin 123456 #添加用户名和密码
rabbitmqctl set_user_tags admin administrator #添加用户角色

rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*" #修改权限
  • 1

原理

img

组成部分:

Broker:消息队列服务进程,此进程包括两个部分:Exchange和Queue

Exchange:消息队列交换机,按一定的规则将消息路由转发到某个队列,对消息进行过虑。
Queue:消息队列,存储消息的队列,消息到达队列并转发给指定的
Producer:消息生产者,即生产方客户端,生产方客户端将消息发送
Consumer:消息消费者,即消费方客户端,接收MQ转发的消息。

AMQP协议

生产者发送消息流程:

1、生产者和Broker建立TCP连接。

2、生产者和Broker建立通道。

3、生产者通过通道消息发送给Broker,由Exchange将消息进行转发。

4、Exchange将消息根据模式转发到指定的Queue(队列)

消费者接收消息流程:

1、消费者和Broker建立TCP连接

2、消费者和Broker建立通道

3、消费者监听指定的Queue(队列)

4、当有消息到达Queue时Broker默认将消息推送给消费者。

5、消费者接收到消息。

6、ack回复(确认是可靠消息)

简单模式

(1)生产者将消息发送到队列,消费者从队列获取消息。

(2)一个队列对应一个消费者。

img

简单和工作模式有默认交换机,交换机将生产者的消息推入队列里

工作

(1)一个生产者,多个消费者。

(2)一个消息发送到队列时,只能被一个消费者获取。

(3)多个消费者并行处理消息,提升消息处理速度。

img

发布/订阅模式(Publish/Subcribe)

路由名为空的路由模式

交换机type: fanout

img

将消息发送到交换机,队列从交换机获取消息,队列需要绑定到交换机。

(1)一个生产者,多个消费者。

(2)每一个消费者都有自己的一个队列。

(3)生产者没有将消息直接发送到队列,而是发送到交换机。

(4)每一个队列都要绑定到交换机。

(5)生产者发送的消息,经过交换机到达队列,实现一个消息被多个消费者获取的目的。

(6)交换机类型为“fanout”。

例子

application.yml

server:
  port: 8081   #端口号
spring:
  rabbitmq:
    host: 121.40.242.138
    port: 5672
    virtual-host: /
    username: admin
    password: 123456
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

配置类(创建交换机,队列,绑定关系)

@Configuration
public class RabbitMqConfiguration {

    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanout_order_exchange",true,false);
    }

    @Bean
    public Queue smsQueue()
    {
        return new Queue("sms.fanout.queue",true);
    }

    @Bean
    public Queue duanxinQueue()
    {
        return new Queue("duanxin.fanout.queue",true);
    }

    @Bean
    public Queue emailQueue()
    {
        return new Queue("email.fanout.queue",true);
    }

    @Bean
    public Binding smsbinding()
    {
       return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding duanxinbinding()
    {
        return BindingBuilder.bind(duanxinQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding emailbinding()
    {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }

    
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

生产者(通过交换机发送数据)

@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void makeOrder(String userId,String productId,int num)
    {
        //1.查询商品的库存
        //2.保存订单
        //3.通过消息队列分发
        //参数1:交换机 参数2:路由名/队列名 参数3:内容名
        String orderId="001";
        String exchangeName="fanout_order_exchange";
        String routeKey="";
        rabbitTemplate.convertAndSend(exchangeName,routeKey,orderId );
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

消费者(监听队列)

@Service
@RabbitListener(queues = {"sms.fanout.queue"})
public class ListenerService {

    @RabbitHandler
    public void getMessage(String message) {
        System.out.println("Receiver消费者收到消息  : " + message);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

测试(controller)

@RestController
@RequestMapping("mq/fanout")
public class OrderController {

    @Autowired
    OrderService orderService;

    @RequestMapping("order")
    public void contextLoads()
    {
        orderService.makeOrder("1","1",12);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

主程序

@SpringBootApplication(exclude= {DataSourceAutoConfiguration.class})
public class FanoutApplication
{

    public static void main(String[] args) {
        SpringApplication.run(FanoutApplication.class, args);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

路由模式(Routing)

交换机type: direct

img

路由模式是发布/订阅模式的一种特殊情况。

(1)路由模式的交换机类型为“direct”。

(2)绑定队列到交换机时指定 key,即路由键,一个队列可以指定多个路由键。

(3)生产者发送消息时指定路由键,这时,消息只会发送到绑定的key的对应队列中

交换机创建时类型改为路由模式

在交换机与某队列绑定时设置路由with()

 @Bean
    public Binding emailbinding()
    {
        return BindingBuilder.bind(emailQueue()).to(directExchange()).with("routingKey");
    }

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

生产者中交换机分发消息,在convertAndSend中第二个参数填入路由名

 rabbitTemplate.convertAndSend(exchangeName,routeKey,orderId );
  • 1

主题模式(Topic)

img

将路由键和某模式进行匹配。此时,队列需要绑定到一个模式上。

符号“#”匹配一个或多个词,“*”匹配不多不少一个词。

绑定队列到交换机指定key时,进行通配符模式匹配。

过期时间TTL

ttl队列
 @Bean
    public Queue smsQueue()
    {
        Map<String,Object>args=new HashMap<>();
        args.put("x-message-ttl",5000);
        return new Queue("sms.fanout.queue",true,false,false,args);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

创建队列时设置过期时间,在过期时间内没有被消费者接收则自动消失

给消息设置过期时间

生产者处更改

        MessagePostProcessor messagePostProcessor=new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(exchangeName,routeKey,orderId,messagePostProcessor);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

死信队列

过期的消息,被拒绝的消息,超过长度的消息可以转入死信队列

    @Bean
    public FanoutExchange deadExchange(){
        return new FanoutExchange("dead_exchange",true,false);}

    @Bean
    public Queue deadQueue()
    {
        return new Queue("dead_queue",true);
    }

    @Bean
    public Binding deadBinding()
    {
        return BindingBuilder.bind(deadQueue()).to(deadExchange());
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

在ttl队列创建中加上 args.put(“x-dead-letter-exchange”,“dead_exchange”);

过期的消息分发给死信交换机

@Bean
public Queue smsQueue()
{
    Map<String,Object>args=new HashMap<>();
    args.put("x-message-ttl",5000);
    args.put("x-dead-letter-exchange","dead_exchange");
    //args.put("x-dead-letter-routing-key","key1");  死信交换机是路由模式时
    return new Queue("sms.fanout.queue",true,false,false,args);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

队列达到最大长度转给死信队列

@Bean
public Queue smsQueue()
{
    Map<String,Object>args=new HashMap<>();
    //args.put("x-message-ttl",5000);
    args.put("x-max-length",6);//设置最大长度,超过长度转死信队列
    args.put("x-dead-letter-exchange","dead_exchange");
    //args.put("x-dead-letter-routing-key","key1");  死信交换机是路由模式时
    return new Queue("sms.fanout.queue",true,false,false,args);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

延迟队列

ttl队列+死信队列

ttl过期完成延迟的作用,死信队列接纳过期的信息。达成延迟的效果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小小林熬夜学编程/article/detail/728280
推荐阅读
相关标签
  

闽ICP备14008679号