赞
踩
Kafka是由Apache基金会开发的分布式流处理平台,采用发布-订阅模式,支持高吞吐量、低延迟的数据传输。主要用于处理实时数据管道、数据存储和数据分析等大数据应用场景。Kafka采用高效的数据压缩算法,可以在集群中存储大量的数据,并通过分区机制来实现数据的高可靠性和可扩展性。
Producer 生产者
生产者用于创建消息。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区
拦截器
生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。通过自定义实现 ProducerInterceptor 接口来使用。
序列化
生产者需要用序列化器把对象转换成字节数组才能通过网络发送给 Kafka。消费者需要用反序列化把从 Kafka 中收到的字节数组转换成相应的对象。自带的有StringSerializer,ByteArray、ByteBuffer、Bytes、Double、Integer、Long等,还可以自定义序列化器。
分区器
如果消息中没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。也可以自定义分区器。
消息累加器
消息累加器主要用来缓存消息以便 Sender线程可以批量发送进而减少网络传输的资源消耗以提升性能。消息累加器的缓存大小可以通过buffer.memory配置。在消息累加器的内部为每个分区都维护了一个双端队列,主线程发送过来的消息都会被追加到某个双端队列中,队列中的内容就是 ProducerBatch,即Dqueue< ProducerBatch >。
Consumer 消费者
消费者,消息的订阅者,可以订阅一个或多个主题,并且依据消息生产的顺序读取他们,消费者通过检查消息的偏移量来区分已经读取过的消息。消费者一定属于某一个特定的消费组。
通过 subscribe 方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配政策来自动分配各个消费者与分区的关系,以实现消费者负载均衡和故障自动转移。而通过 assign 方法则没有。
Kafka 中的消息是基于推拉模式的。Kafka 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll 方法,而 poll 方法返回的是所订阅的主题(分区)上的一组消息。如果没有消息则返回空。
消费确认
Consumer Group 消费者群组
同一个消费者组中保证每个分区只能被一个消费者使用 ,不会出现多个消费者读取同一个分区的情况,通过这种方式,消费者可以消费包含大量消息的主题。而且如果某个消费者失效,群组里的其他消费者可以接管失效消费者的工作。
Topic 主题
Kafka中 的消息是根据 Topic 进行分类的,Topic 是支持多订阅的,一个 Topic 可以有多个不同的订阅消息的消费者。Kafka 集群 Topic 的数量没有限制,同一个 Topic 的数据会被划分在同一个目录下,一个 Topic 可以包含 1 至多个分区,所有分区的消息加在一起就是一个 Topic 的所有消息。
Partition 分区
在Kafka中,每个 Topic 至少有一个 Partition ,一个 topic 可以包含多个分区 partition,topic 消息保存在各个 partition 上,由于一个 topic 能被分到多个分区上,给 kafka 提供给了并行的处理能力,这也正是 kafka 高吞吐的原因之一。
每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分了一个序列号,称之为偏移量(offset):消息在日志中的位置,可以理解是消息在 partition 上的偏移量,也是代表消息的唯一序号。
Log 日志存储
一个分区对应一个日志文件(Log),为了防止Log过大,Kafka又引入了日志分段(LogSegment)的概念,将Log 切分为多个 LogSegment,便于消息的维护和清理。Log在物理上只以(命名为topic-partitiom)文件夹的形式存储,而每个LogSegment对应磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。
LogSegment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment的索引文件和数据文件
Broker 服务器
一个独立的 Kafka 服务器被称为 broker, broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存
Offset 偏移量
消息的唯一标识,是连续的序列号,偏移量决定读取数据的位置,不会有线程安全的问题,消费者通过偏移量来决定下次读取的消息。消息被消费之后,并不被马上删除,这样多个业务就可以重复使用 kafkal 的消息,我们某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制。消息最终还是会被删除的,默认生命周期为1周(7*24小时)。
Replication 副本
每个 partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当前负责数据的读写的 partition。 producer 和 consumer 只跟 leader 交互。Follower 跟随 Leader,所有写请求都通过 Leader 路由,数据变更会广播给所有 Follower,Follower 与 Leader 保持数据同步。如果 Leader 失效,则从 Follower 中选举出一个新的 Leader。
Zookeeper
存储 Kafka 的元数据信息,帮助进行选举
docker pull wurstmeister/zookeeper
docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
安装
docker pull wurstmeister/kafka
运行
docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper
-v /usr/kafka/config/server.properties:/config/server.properties
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka
--name kafka: 设置容器的名字为“kafka”。
-p 9092:9092: 将容器的9092端口映射到宿主机的9092端口。
--link zookeeper:zookeeper: 连接到名为“zookeeper”的另一个Docker容器,并且在当前的容器中可以通过zookeeper这个别名来访问它。
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181: 设置环境变量,指定ZooKeeper的连接字符串。
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092: 设置环境变量,指定Kafka的advertised listeners。
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092: 设置环境变量,指定Kafka的listeners。
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1: 设置环境变量,指定offsets topic的副本因子。
wurstmeister/kafka: 使用的Docker镜像名字。
docker exec -it kafka /bin/bash
public class AdminClient { public final static String TOPIC_NAME = "test"; public static AdminClient adminClient() { Properties properties = new Properties(); properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9999"); AdminClient adminClient = AdminClient.create(properties); return adminClient; } public static void main(String[] args) throws Exception { AdminClient adminClient = AdminClientYang.adminClient(); System.out.println("adminClient : " + adminClient); createTopic(); } /** * 创建Topic实例 */ public static void createTopic() { AdminClient adminClient = AdminClientYang.adminClient(); // 副本因子 Short re = 1; NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, re); CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic)); System.out.println("CreateTopicsResult : " + createTopicsResult); adminClient.close(); } /** * 获取Topic列表 */ public static void topicList() throws Exception { AdminClient adminClient = adminClient(); // 是否查看internal选项 ListTopicsOptions options = new ListTopicsOptions(); // 设置我们是否应该列出内部topic options.listInternal(true); // 列出集群中可用的topic ListTopicsResult listTopicsResult = adminClient.listTopics(options); // 返回一个topic名称集合的future(这里是KafkaFuture) Set<String> names = listTopicsResult.names().get(); // 返回一个KafkaFuture,它产生一个 TopicListing 对象的集合 Collection<TopicListing> topicListings = listTopicsResult.listings().get(); // 返回一个KafkaFuture,它产生一个topic名称到 TopicListing 对象的映射。 KafkaFuture<Map<String, TopicListing>> mapKafkaFuture = listTopicsResult.namesToListings(); // 打印names names.stream().forEach(System.out::println); System.out.println("---------------------------topic列表-------------------------"); // 打印topicListings topicListings.stream().forEach((topicList) -> { System.out.println(topicList); }); System.out.println("---------------------------topic列表-------------------------"); } /** * 删除topic */ public static void delTopics() throws Exception { AdminClient adminClient = adminClient(); // 删除一批topic。 // 此操作不是事务性的,因此它可能对某些主题成功,而对另一些主题则失败。 // DeleteTopicsResult返回成功后,所有代理可能需要几秒钟才能意识到主题已消失。 在此期间, // listTopics()和describeTopics(Collection)可能会继续返回有关已删除主题的信息。 DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME)); deleteTopicsResult.all().get(); } /** * 描述topic * name: yibo_topic * desc: (name=yibo_topic, * internal=false, * partitions= * (partition=0, * leader=192.168.174.128:9092 (id: 0 rack: null), * replicas=192.168.174.128:9092 (id: 0 rack: null), * isr=192.168.174.128:9092 (id: 0 rack: null)), * authorizedOperations=null) * @throws Exception */ public static void describeTopic() throws Exception { AdminClient adminClient = adminClient(); // 描述集群中的一些topic。 DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME)); Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get(); Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet(); System.out.println("----------------------------topic信息-----------------------------"); entries.stream().forEach((entry) -> { System.out.println("name :" + entry.getKey() + " , desc: " + entry.getValue()); }); System.out.println("----------------------------topic信息-----------------------------"); } /** * 查询配置信息 */ public static void describeConfig() throws Exception { AdminClient adminClient = adminClient(); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME); // 获取指定资源的配置 DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource)); Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get(); System.out.println("----------------------------配置信息-----------------------------"); configResourceConfigMap.entrySet().stream().forEach((entry) -> { System.out.println("configResource : " + entry.getKey() + " , Config : " + entry.getValue()); }); System.out.println("----------------------------配置信息-----------------------------"); } /** * 修改配置信息 老版API */ public static void alterConfig() throws Exception { AdminClient adminClient = adminClient(); Map<ConfigResource, Collection<AlterConfigOp>> configMaps = new HashMap<>(); // 具有配置的资源的类,需要提供type和名称 Type是他内部维护的枚举类,共有四种类型:BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), // UNKNOWN((byte) 0) ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME); // 包含名称、值和操作类型的更改配置条目的类 ,需要注入ConfigEntry,和操作类型,同样OpType是个枚举类 AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("preallocate", "false"), AlterConfigOp.OpType.SET); configMaps.put(configResource, Arrays.asList(alterConfigOp)); // 逐步更新指定资源的配置 AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps); alterConfigsResult.all().get(); } /** * 增加partitions数量 */ public static void incrPartitions(int partitions) throws Exception { AdminClient adminClient = adminClient(); Map<String, NewPartitions> partitionsMap = new HashMap<>(); NewPartitions newPartitions = NewPartitions.increaseTo(partitions); partitionsMap.put(TOPIC_NAME, newPartitions); CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap); createPartitionsResult.all().get(); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。