赞
踩
消息队列是分布式系统中重要的中间件,在高性能、高可用、低耦合等系统架构中扮演着重要作用。分布式系统可以借助消息队列的能力,轻松实现以下功能:
下图便是消息队列的基本模型,向消息队列中存放数据的叫做生产者,从消息队列中获取数据的叫做消费者。
上图为整体架构,会涉及三类角色:
1)Producer 消息生产者:负责产生和发送消息到 Broker;
2)Broker 消息处理中心:负责消息存储、确认、重试等,一般其中会包含多个 queue;
3)Consumer 消息消费者:负责从 Broker 中获取消息,并进行相应处理;
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
公司/社区 | Rabbit | Apache | 阿里 | Apache |
开发语言 | Erlang | Java | Java | scala&Java |
协议支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定义协议 | 自定义协议 |
可用性 | 高 | 一般 | 高 | 高 |
单机吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延迟 | 微秒级 | 毫秒级 | 毫秒级 | 毫秒以内 |
消息可靠性 | 高 | 一般 | 高 | 一般 |
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、 安全。AMQP协议更多用在企业系统内,对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
rabbitmq在Windows下docker安装:
docker run -d --hostname my-rabbit --name my-rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -p 5672:5672 -p 15672:15672 rabbitmq:3-management
rabbitMQ参数说明:
docker参数说明:
访问地址
rabbitMQ中的几个概念:
常见的消息模型:
一、不使用exchange(一条消息只能被一个消费者消费)
基本消息队列(BasicQueue)
工作消息队列(WorkQueue)
二、使用exchange
发布订阅(Publish\Subscribe),又根据交换机类型不同分为三种:
Fanout Exchange:广播
Direct Exchange:路由
Topic Exchange:主题
消息从生产者发送到exchange,再到queue,再到消费者,导致消息丢失的可能性:
解决方案:
rabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ以后,会返回一个结果给发送者,表示消息是否处理成功。结果有两种请求:
确认机制发送消息时,需要给每个消息设置一个全局唯一id,以区分不同消息,避免ack冲突
2)消息持久化
在rabbitmq客户端创建交换机、队列中如果不设置Durability为Durable的情况都不是持久化,而消息需要设置Delivery mode:persistent,否则也不是持久化,在mq重启之后,交换机、队列、消息都会消失。
3)消费者消息确认
rabbitMQ支持消费者确认机制,即:消费者处理消息后可以向MQ发送ack回执,MQ收到ack回执后才会删除该消息。而SpringAMQP则允许配置三种确认模式:
4)失败重试机制
当消费者出现异常时,消息会不断requeue(重新入列)到队列,再重新发送给消费者,然后再次异常,再次requeue,无限循环,导致mq的消息处理飙升,带来不必要的压力
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)。
如何给队列绑定死信交换机
TTL,也就是Time-To-Live。如果一个队列中的消息TTL结束仍为消费,则会变为死信,ttl超时分为两种情况:
消息堆积问题
当生产者发送消息的速度超过了消费者处理消息的速度,就会导致嘟列中的消息堆积,直到队列存储消息达到上限。最早接收到的消息,可能就会成为死信,会被丢弃,这就是消息堆积问题。
解决消息堆积问题有三种思路:
惰性队列
从rabbitMQ的3.6.0版本开始,就增加了Lazy Queues的概念,也就是惰性队列。
惰性队列的特征如下:
镜像集群虽然支持中从,但主从同步并不是强一致的,某些情况下可能有数据丢失的风险。因此在rabbitMQ的3.8版本以后,退出了新的功能:仲裁队列来代替镜像集群,底层采用Raft协议确保主从的数据一致性。
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
kafka的诞生是为了解决linkedin的数据管道问题,起初linkedin采用了ActiveMQ来进行数据交换,大约是在2010年前后,那是的ActiveMQ还远远无法满足linkedin对数据传递系统的要求,经常由于各种缺陷而导致消息阻塞或者服务无法正常访问,为了能够解决这个问题,linkedin决定研发自己的消息传递系统,当时linkedin的首席架构师Jay kreps便开始组织团队进行消息传递系统的研发。
安装zookeeper
docker pull zookeeper
docker run --name zoo -p 2181:2181 -d zookeeper
安装kafka(方法一)
docker pull bitnami/kafka
docker run --name kafka -p 9092:9092 -e KAFKA_ZOOKEEPER_CONNECT=10.30.1.13:2181 -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 -d bitnami/kafka
docker容器部署必须指定以下环境变量:
使用docker-compose集群部署(方法二)
docker-compose.yml
version: '2' services: zoo1: image: zookeeper container_name: zoo ports: - 2181:2181 kafka1: image: 'bitnami/kafka:latest' ports: - '9092:9092' container_name: kafka1 environment: - KAFKA_ZOOKEEPER_CONNECT=zoo1:2181 - KAFKA_BROKER_ID=1 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT:///127.0.0.1:9092 depends_on: - zoo1 kafka2: image: 'bitnami/kafka:latest' ports: - '9093:9092' container_name: kafka2 environment: - KAFKA_ZOOKEEPER_CONNECT=zoo1:2181 - KAFKA_BROKER_ID=2 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT:///127.0.0.1:9093 depends_on: - zoo1 kafka3: image: 'bitnami/kafka:latest' ports: - '9094:9092' container_name: kafka3 environment: - KAFKA_ZOOKEEPER_CONNECT=zoo1:2181 - KAFKA_BROKER_ID=3 - ALLOW_PLAINTEXT_LISTENER=yes - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT:///127.0.0.1:9094 depends_on: - zoo1
kafka tool安装
安装完成后添加kafka信息,如下图操作界面
添加完成后如下图所示:
broker
zookeeper
producer(生产者)
consumer(消费者)
consumer group (消费者组)
个人理解总结:
通常来讲,消息模型可以分为两种, 队列和发布-订阅式。 队列的处理方式是 一组消费者从服务器读取消息,一条消息只有其中的一个消费者来处理。在发布-订阅模型中,消息被广播给所有的消费者,接收到消息的消费者都可以处理此消息。Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group)。 消费者用一个消费者组名标记自己。 一个发布在Topic上消息被分发给此消费者组中的一个消费者。 假如所有的消费者都在一个组中,那么这就变成了queue模型。 假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。
一个分区只能同时被一个消费者消费
replication(副本)
topic(主题)
offset(偏移量)
为了实现生产者的幂等性,kafka一如了Producer ID(PID)和Sequence Number的概念。
当kafka的生产者生产消息时,会增加一个pid和sequence number,发送消息会将pid和sn一块发送,kafka接收到消息会将消息和pid、sn一并保存下来,如果ack响应失败,生产者重试,再次发送消息时,kafka会根据pid/sn是否需要在保存一条消息(判断条件:生产者发送过来的sn是否小于等于partition中消息对应的sn)
生产者写入消息到topic,kafka依据每个不同的策略将数据分配到不同的分区
乱序问题
轮询策略,随机策略都会导致一个问题,生产到kafka中的数据都是乱序存储的,而按key分区可以 一定程度上实现数据有序存储也就是局部有序,但这又可能会导致数据倾斜,所以在实际生产环境中要结合实际情况来做取舍。
kafka中的rebalance称之为再均衡,是kafka中确保consumer group下所有的consumer如何达成一致,分配订阅的topic的每个分区的机制。
rebalance触发的时机有:
rebalance的不良影响:
public class Recv { private final static String QUEUE_NAME = "hello"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); // 建立连接 对应图中的Connetcions模块 Connection connection = factory.newConnection(); // 创建通道 对应图中的Channels Channel channel = connection.createChannel(); // 创建队列名 对应图中的Queues channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); } }
AMQP
是用于在应用程序或之间传递业务消息的标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
Spring AMQP
基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
使用maven导入相关依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置文件编写rabbitMQ连接信息:
spring:
rabbitmq:
host: 127.0.0.1 # 主机
port: 5672 # 端口
virtual-host: / # 虚拟空间
username: admin # 用户名
password: 123456 # 密码
发送消息到队列中:
@Service
public class RabbitMQServiceImpl implements IrabbitMQService {
@Autowired
private RabbitTemplate rabbitTemplate;
@Override
public void sendMsg(String msg) {
rabbitTemplate.convertAndSend("testQueue",msg);
}
}
作用:提高消息处理速度,避免消息堆积。(一个消息对应多个消费者,谁的能力强谁把消息消费掉)
生产者:
@Override
public void sengMsg2WorkQueue(String msg) {
for (int i = 0; i < 10 ; i++) {
System.out.println("工作消息队列,生产者发送消息:"+msg+i);
rabbitTemplate.convertAndSend("workQueue",msg+i);
}
}
消费者:
/**
* 模拟两个消费者去消费工作队列中的数据
* @param msg
*/
@RabbitListener(queues = "workQueue")
public void listenWorkQueue(String msg) throws InterruptedException {
System.out.println("工作消息队列,消费者1接收消息:"+msg);
Thread.sleep(200);
}
@RabbitListener(queues = "workQueue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.out.println("工作消息队列,消费者2接收消息:"+msg);
Thread.sleep(20);
}
结论:
消息平均分配到每一个消费者,在特定的环境下是不满足生产需求,理应谁的消费能力强谁消费更多的消息。
推进:
在配置文件里加入如下配置信息方可解决以上问题
listener:
simple:
prefetch: 1 # 表示每次只能占用消费一条消息
声明fanoutExchange和队列并绑定要广播的队列交给spring容器
package com.gdc.springboottest.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class FanoutConfig { // 1.声明交换机 @Bean public FanoutExchange fanoutExchange () { return new FanoutExchange("fanoutExchange"); } // 2.声明队列 @Bean public Queue queue1 () { return new Queue("fanoutQueue1"); } // 3.绑定 @Bean public Binding binding1 (FanoutExchange fanoutExchange,Queue queue1) { return BindingBuilder.bind(queue1).to(fanoutExchange); } // 2.声明队列2 @Bean public Queue queue2 () { return new Queue("fanoutQueue2"); } // 3.绑定 @Bean public Binding binding2 (FanoutExchange fanoutExchange,Queue queue2) { return BindingBuilder.bind(queue2).to(fanoutExchange); } }
发布消息(生产者)
@Override
public void sengMsg2FanoutExchange(String msg) {
rabbitTemplate.convertAndSend("fanoutExchange","",msg);
}
订阅消息(消费者)
/**
* 监听fanoutExchange
* @param msg
*/
@RabbitListener(queues = "fanoutQueue1")
public void listenFanoutQueue1(String msg) {
System.out.println("fanoutQueue1消息队列,消费者接收消息:"+msg);
}
@RabbitListener(queues = "fanoutQueue2")
public void listenFanoutQueue2(String msg) {
System.out.println("fanoutQueue2消息队列,消费者接收消息:"+msg);
}
发布消息(生产者)
@Override
public void sengMsg2DirectExchange(String msg, String routingKey) {
rabbitTemplate.convertAndSend("directExchange",routingKey,msg);
}
订阅消息(消费者)
/** * 监听directExchange * @param msg */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "directQueue1"), exchange = @Exchange(name = "directExchange",type = "direct"), key = {"red","orange"} )) public void listenDirectQueue1(String msg) { System.out.println("directQueue1消息队列,消费者接收消息:"+msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "directQueue2"), exchange = @Exchange(name = "directExchange",type = "direct"), key = {"red","yellow"} )) public void listenDirectQueue2(String msg) { System.out.println("directQueue2消息队列,消费者接收消息:"+msg); }
TopicExchange与DirectExchange类似,区别在于routingKey必须是多个单词的列表,并且以.分割。
Queue与Exchange指定Bindingkey时可以使用通配符:
#:代指0个或多个单词
*:代指一个单词
发布消息(生产者)
@Override
public void sengMsg2TopicExchange(String msg, String routingKey) {
rabbitTemplate.convertAndSend("topicExchange",routingKey,msg);
}
订阅消息(消费者)
/** * topic * @param msg */ @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topicQueue1"), exchange = @Exchange(name = "topicExchange",type = ExchangeTypes.TOPIC), key = "shanghai.#" )) public void listenTopictQueue1(String msg) { System.out.println("topicQueue1消息队列,消费者接收消息:"+msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "topicQueue2"), exchange = @Exchange(name = "topicExchange",type = ExchangeTypes.TOPIC), key = "#.songjiang" )) public void listenTopictQueue2(String msg) { System.out.println("topicQueue2消息队列,消费者接收消息:"+msg); }
1.SpringAMQP实现生产者确认
spring:
rabbitmq:
publisher-confirm-type: correlated
publisher-returns: true
template:
mandatory: true
配置说明:
publish-confirm-type:开启publisher-confirm,这里支持两种类型:
1)simple:同步等待confirm结果,直到超时
2)correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
template.mandatory:定义消息路由失败时的策略。true,则调用ReturnCallback,false:则直接丢弃消息。
@Configuration public class CommonConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { // 从容器中获取rabbitTemplate RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); /*rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() { @Override public void returnedMessage(Message message, int i, String s, String s1, String s2) { } });*/ // 设置ReturCallback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { System.out.println(message.toString()); System.out.println(replyCode); System.out.println(replyText); System.out.println(exchange); System.out.println(routingKey); }); } }
// 消息id,唯一 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); // 添加callback correlationData.getFuture().addCallback(new SuccessCallback<CorrelationData.Confirm>() { @Override public void onSuccess(CorrelationData.Confirm confirm) { if (confirm.isAck()) { System.out.println("生产者发送消息成功"); }else { System.out.println("nack"); System.out.println("消息发送失败"); System.out.println("原因:"+confirm.getReason()); } } }, new FailureCallback() { @Override public void onFailure(Throwable throwable) { System.out.println("消息发送异常"+throwable.getMessage()); } }); rabbitTemplate.convertAndSend(DIRECT_EXCHANGE,"red","hello red",correlationData);
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "directQueue1"), exchange = @Exchange(name = "directExchange",type = ExchangeTypes.DIRECT), key = {"red","orange"} )) public void directMsg1(String msg) { System.out.println("消费者接收消息:"+msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue(name = "directQueue2"), exchange = @Exchange(name = "directExchange",type = ExchangeTypes.DIRECT), key = {"red","yellow"} )) public void directMsg2(String msg) { System.out.println("消费者接收消息:"+msg); }
消息持久化
消费者消息确认
listener:
simple:
acknowledge-mode: auto
消费失败重试机制
listener:
simple:
prefetch: 1
acknowledge-mode: auto
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000 # 初始的失败等待时长为1秒
multiplier: 1 # 下次失败的等待时长倍数,下次等待时长=multipler * last-interval
max-attempts: 3 #最大重试次数
stateless: true #true无状态,false有状态。如果业务中包含事务,这里改为false
在开启重试模式后,重试次数耗尽,如果消息依然失败,则需要有MessageRecoverer接口来处理,它包含三种不同的实现:
RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重入队列
RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机
@Component public class RabbitConfig { /** * 定义错误信息交换机 * @return */ @Bean public DirectExchange errMsgExchange() { return new DirectExchange("errorExchange"); } /** * 定义错误信息队列 * @return */ @Bean public Queue errQueue() { return new Queue("errQueue"); } /** * 将队列与交换机相互绑定 * @return */ @Bean public Binding errorBind() { return BindingBuilder.bind(errQueue()).to(errMsgExchange()).with("error"); } /** * 定义republishMessageRecoverer * */ @Bean public MessageRecoverer republishMessageRecoverer (RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate,"errorExchange","error"); } }
导入kafka依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置文件
spring: kafka: bootstrap-servers: 127.0.0.1:9092 # kafka集群信息 producer: # 生产者配置 retries: 3 # 设置大于0的值,则客户端会将发送失败的记录重新发送 batch-size: 16384 #16K buffer-memory: 33554432 #32M acks: 1 # 指定消息key和消息体的编解码方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: testGroup # 消费者组 enable-auto-commit: true # 自动提交 auto-offset-reset: earliest # 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
生产者
kafkaTemplate.send("testTopic", "key", msg);
消费者
package com.gdc.springboottest.config; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class KafkaListeners { //kafka的监听器,topic为"testTopic",消费者组为"testGroup" @KafkaListener(topics = "testTopic", groupId = "testGroup") public void listenKafkaMsg(ConsumerRecord<String, String> record) { String value = record.value(); System.out.println(value); System.out.println(record); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。