赞
踩
系统之间的通信技术
通常情况下,实现系统间通信的方式有以下两种:
- 基于远程过程调用的方式(RPC)
- 基于消息队列的方式
基于RPC调用实现系统间通信:客户端不需要知道调用的具体实现细节,只需直接调用实际存在于远程计算机上的某个对象即可,但调用方式方式看起来和调用本地应用程序中的对象一样。 RPC:是一种通过网络从远程计算机上请求服务,并且不需要了解底层网络技术的协议(相当于协议)。典型的RPC实现包括Dubbo、Thrift、gRPC等
基于消息队列实现系统间通信 :“消息队列”(Message Queue,MQ)从字面意思来理解,是一个队列,拥有先进先出的特性。消息队列主要用于不同进程或线程之间的通信,用来处理一系列的输入请求。
消息队列采用异步通信机制,即消息在被消息发送者发送后可以立即返回,由消息队列来负责消息的传递和存储,消息发送者只管将消息发布到消息队列而不用管谁来取这条消息,消息调用者(消息消费者)只管从消息队列中取消息而不用管是谁发布的,这样发布者和调用者都不用知道对方的存在。从而实现异步通信。
比较有名的消息队列产品有:RocketMQ、RabbitMQ、ZeroMQ、Kafka等。
消息队列(Message Queue,简称MQ)是一种系统间相互协作的通信机制。采用的是异步通信机制。
实际应用中,消息队列有以下作用:
消息队列的功能特点
服务端(brokers)和客户端(producer、consumer)之间通信通过TCP协议来完成。
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是可以实时的处理大量数据以满足各种需求场景:比如基于hadoop的批处理系统、低延迟的实时系统、Storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
Kafka的使用场景:
Kafka的架构主要由四部分构成:Producer, Broker, Consumer和Zookeeper。
Kafka中,Producer把消息发送到Broker中的特定topic,消费者组Consumer Group中的各Consumer实例从broker的一个或者多个topic中拉取数据进行消费。Zookeeper用于协调和监控Producer,Broker,Consumer的交互。
此外,Kafka的Topic被划分为一个或多个Partition,每个Partition可以在Broker集群中的不同节点上,从而实现数据的分布式存储。每个Partition内的消息按照插入的顺序存储,并分配一个序列号称为Offset,用于唯一标识该消息。
Kafka的消息模型主要由以下几个部分组成:
名称 | 解释 |
---|---|
Broker | 消息中间件处理节点,一个Kafka节点就是一个Broker,一个或多个Broker可以组成一个Kafka集群 |
Topic | Kafka根据Topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个Topic |
Producer | 消息生产者,向Broker发送消息的客户端 |
Consumer | 消息消费者,从Broker读取消息的客户端 |
ConsumerGroup | 每个Consumer属于一个特定的ConsumerGroup,一个消息可以被多个不同的ConsumerGroup消费,但是一个Consumer Group中只能有一个Consumer能够消费该消息 |
Partition | 分区,是Kafka进行数据分割的单位。一个Topic可以被分割成多个Partition,每个Partition是一个有序的、不可变的消息序列 |
Offset | 偏移量,是Kafka中每条消息在Partition中的位置标识。每条消息在被写入Partition时,都会被赋予一个唯一的、递增的id,这个id就是Offset |
Kafka通过Offset(偏移量)来保证Partition内的消息有序。
在Kafka中,每个Topic被分为一个或多个Partition,每个Partition是一个有序的、不可变的消息序列。每条消息在被写入Partition时,都会被赋予一个唯一的、递增的id,这个id就是Offset。
Offset是一个长整型的数字,在每个Partition中,每条消息的Offset都是唯一的,并且是递增的。也就是说,第一条消息的Offset是0,第二条消息的Offset是1,以此类推。因此,通过Offset,我们可以知道每条消息在Partition中的位置,从而保证了消息的顺序。
需要注意的是,Kafka只保证同一个Partition内的消息是有序的,不同Partition之间的消息是无序的。这是因为不同Partition的消息可能会被存储在不同的Broker上,而不同的Broker之间并没有进行时间同步,因此无法保证不同Partition之间的消息顺序。
Kafka的Producer在发送消息时,可以指定一个Partitioner来决定将消息发送到哪个Partition。
默认情况下,Kafka使用的是RoundRobinPartitioner,即轮询的方式将消息均匀地发送到各个Partition。但是,用户可以自定义Partitioner,根据特定的规则将消息发送到特定的Partition。
例如,如果你希望同一个用户的所有消息都发送到同一个Partition,你可以自定义一个Partitioner,根据用户ID的hash值来决定将消息发送到哪个Partition。这样,同一个用户的所有消息就会被发送到同一个Partition。
这种方式的好处是,可以保证同一个用户的所有消息在Kafka中的顺序,因为Kafka只保证同一个Partition内的消息是有序的。同时,这也可以提高消费者消费消息的效率,因为消费者可以并行地从多个Partition中消费消息。
需要注意的是,自定义Partitioner需要考虑Partition的数量可能会变化的情况。例如,如果增加了Partition的数量,那么之前根据用户ID的hash值决定的Partition可能就会发生变化,这可能会影响到消息的顺序。因此,在设计Partitioner时,需要考虑到这种情况。
自定义Partitioner并将特定的消息发送到同一个Partition有以下几个好处:
总的来说,适当地使用自定义Partitioner,可以大大提高Kafka数据处理的效率和稳定性。
Kafka的持久化主要是通过以下几个步骤来实现的:
这种设计使得Kafka能够高效地处理大量的写入操作,因为追加操作通常比随机写入要快得多。同时,由于每个Partition都有一个独立的Log,这使得Kafka可以将不同Partition的数据存储在不同的磁盘上,从而进一步提高写入效率。
Kafka的索引具有以下特征:
- 紧凑和高效:Kafka的索引文件设计得非常紧凑,每个索引条目只包含8字节的偏移量和4字节的物理位置。这使得索引文件非常小,可以被高效地加载到内存中。
- 稀疏索引:Kafka的索引文件是稀疏的,也就是说,并不是每一条消息都有对应的索引,而是每隔一定数量的消息才会创建一个索引。这样可以进一步减小索引文件的大小,提高索引的效率。
- 不可变:一旦索引文件被创建,就不会再被修改。这使得索引文件可以被安全地缓存,并且可以被多个线程并发地读取。
- 映射到物理文件:Kafka的索引文件直接映射到物理文件,这使得操作系统可以利用页面缓存来提高索引的读取效率。
通过这种设计,Kafka的索引可以快速地定位到某个偏移量对应的消息,从而提高消息的读取效率。同时,由于索引文件的大小非常小,因此即使在处理大量数据的情况下,Kafka的索引也可以被高效地管理和维护。
通过以上步骤,Kafka实现了消息的持久化存储。这种持久化机制保证了Kafka的高可用性和数据的可靠性。除此之外,Kafka的消息存储还有以下特征:
Kafka的rebalance主要是为了保证消费者组中的消费者能够均匀地消费到分区中的消息。以下几种情况会触发Kafka的rebalance:
Kafka的rebalance过程主要包括以下步骤:
这个过程中,如果有消费者在规定的时间内没有发送心跳给协调者,协调者会认为这个消费者已经死亡,从消费者组中移除它,然后触发rebalance。
在Kafka中,rebalance是一种机制,用于在消费者组的成员发生变化时,重新分配topic的分区给各个消费者,以保证每个分区都有消费者消费,且每个消费者消费的分区数量大致相等。
具体来说,rebalance操作主要包括以下步骤:
这个过程会在消费者组的成员发生变化(如新消费者加入,旧消费者离开),或者订阅的topic的分区数发生变化时触发。虽然rebalance可以保证分区的均匀分配,但是它也会导致消费者在rebalance过程中无法消费消息,因此可能会影响到消费的延迟。
Kafka消费者频繁发生rebalance可能由以下几个原因导致:
解决这个问题的方法主要包括优化消费者的处理速度,增加消费者的心跳间隔,优化网络连接,以及优化Kafka服务器的性能等。
Kafka的lag是指消费者消费消息的速度落后于生产者生产消息的速度的情况。具体来说,对于每个topic的每个分区,Kafka都会维护一个名为"High Watermark"的标记,表示生产者生产的最新的消息的位置;同时,每个消费者也会维护一个名为"Offset"的标记,表示消费者消费到的最新的消息的位置。这两者之间的差值就是lag。
解决Kafka的lag问题,可以从以下几个方面考虑:
consumer自身消费能力不足,是常见的导致消费lag的原因,可以从以下几个方面排查消费能力的瓶颈。
consumer数据拉取瓶颈
consumer rebalance时会停止消费,若rebalance较频繁,也可能导致消费lag。可对 consumer 频繁rebalance 的问题进行排查处理。
producer发送partition不均匀,有单partition热点问题。建议从上游对热点数据进行优化,比如针对热点数据使用单独的topic。
第一步:引入kafka相关依赖
<!-- kafka相关依赖,与本地安装的kafka版本一致 -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
第二步:编写消息生产者
import java.util.HashMap; import java.util.Map; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class producerTest { public static void main(String[] args){ Map<String,Object> props=new HashMap<String,Object>(); //bootstrap.servers表示Kafka集群。如果集群中有多台物理服务器,则服务器地址之间用逗号分隔,9092是Kafka服务器默认监听的端口号 props.put("bootstrap.servers", "localhost:9092"); /* * key.serializer和value.serializer表示消息的序列化类型。 * Kafka的消息是以键值对的形式发送到Kafka服务器的,在消息被发送到服务器之前, * 消息生产者需要把不同类型的消息序列化为二进制类型 */ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /* * key.deserializer和value.deserializer表示消息的反序列化类型。 * 把来自Kafka集群的二进制消息反序列化为指定的类型,因为序列化使用的是StringSerializer * 所以用StringDeserializer来反序列化 */ props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); /* * zk.connect用于指定Kafka连接Zookeeper的URL,提供了基于Zookeeper的集群服务器自动感知功能, * 可以动态从Zookeeper中读取Kafka集群配置信息 */ props.put("zk.connect", "127.0.0.1:2181"); /* * 有了消息生产者之后,就可以调用send方法发送信息了。 * 该方法的参数是ProducerRecord类型对象 * ProducerRecord类提供了多种构造函数形参 * 1. ProducerRecord(topic,partition,key,value); * 2. ProducerRecord(topic,key,value) * 3. ProducerRecord(topic,value) * 其中,topic和value是必填的,partition和key是可选的 * 如果指定了partition,那么消息会被发送到指定的partition * 如果没指定partition但指定了key,那么消息会按照hash(key)发送至对应的partition * 如果既没指定partition也没指定key,那么消息会按照round-robin模式发送(即以轮询的方式依次发送)到每个partition。 */ /* * 向test-topic发送两条消息 */ String topic="test-topic"; Producer<String,String> producer=new KafkaProducer<String,String>(props); producer.send(new ProducerRecord<String,String>(topic,"name","xingze")); producer.send(new ProducerRecord<String,String>(topic,"age","21")); producer.close(); } }
第三步:编写消息消费者(消息调用者)
import java.util.Arrays; import java.util.Properties; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; public class ConsumerTest{ private static Consumer<String,String> consumer; public static void main(String[] args) { String topic="test-topic"; /* * bootstarp.servers和生产者一样,表示kafka集群 * group.id表示消费者的分组id * enable.auto.commit表示Consumer的offset是否自动提交 * auto.commit.interval.ms用于设置自动提交offset到ZooKeeper的时间间隔,单位是毫秒 * key.deserializer和value.deserializer表示用字符串来反序列化消息数据 */ Properties props=new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "testGroup1"); props.put("enable.auto.commit", "true"); props.put("suto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); while(true) { //@SuppressWarnings("deprecation") ConsumerRecords<String,String> records=consumer.poll(100); for(ConsumerRecord<String, String> record : records) { System.out.printf("partition= %d ,offset= %d,key= %s,value= %s%n", record.partition(),record.offset(),record.key(),record.value()); } } } }
第四步:启动ZooKeeper(Kafka依赖于ZooKeeper)
第五步:启动Kafka
第六步:运行Consumer
运行Consumer,这样当生产者发送消息时就能在消费者后端看到消费记录
第七步:运行Producer
运行Producer,发布几条消息,在Consumer的控制台就能看到接受到的消息
运行结果如下图所示,在控制台可看到输出如下:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。