当前位置:   article > 正文

docker安装kafka【手把手系列】_docker 安装kafka

docker 安装kafka

Kafka概述

Kafka是由Apache基金会开发的分布式流处理平台,采用发布-订阅模式,支持高吞吐量、低延迟的数据传输。主要用于处理实时数据管道、数据存储和数据分析等大数据应用场景。Kafka采用高效的数据压缩算法,可以在集群中存储大量的数据,并通过分区机制来实现数据的高可靠性和可扩展性。

  • 数据管道:在数据采集和分发过程中构建可扩展的流式数据管道,用于实时数据处理和分析。例如,数据收集、日志聚合、网络追踪、用户活动跟踪等。
  • 数据存储:将Kafka作为持久化存储来存储大量的数据,以便用于后续的批量处理和离线分析,例如数据挖掘、机器学习等应用场景。
  • 实时流处理:通过将Kafka与追求低延迟的流式处理平台,例如Apache Storm、Apache Samza和Apache Flink等相结合,可以实现实时数据处理和分析。这是许多实时数据分析和日志处理需求的主要场景。
  • 系统日志跟踪:通过Kafka将来自不同系统的日志数据统一收集和存储,便于进行统一的日志分析和事件跟踪,在软件开发过程中可以快速定位和解决问题。

Kafka基础

  • Producer 生产者
    生产者用于创建消息。生产者在默认情况下把消息均衡地分布到主题的所有分区上,而并不关心特定消息会被写到哪个分区。不过,在某些情况下,生产者会把消息直接写到指定的分区

  • 拦截器
    生产者拦截器既可以用来在消息发送前做一些准备工作,比如按照某个规则过滤不符合要求的消息、修改消息的内容等,也可以用来在发送回调逻辑前做一些定制化的需求,比如统计类工作。通过自定义实现 ProducerInterceptor 接口来使用。

  • 序列化
    生产者需要用序列化器把对象转换成字节数组才能通过网络发送给 Kafka。消费者需要用反序列化把从 Kafka 中收到的字节数组转换成相应的对象。自带的有StringSerializer,ByteArray、ByteBuffer、Bytes、Double、Integer、Long等,还可以自定义序列化器。

  • 分区器
    如果消息中没有指定 partition 字段,那么就需要依赖分区器,根据 key 这个字段来计算 partition 的值。也可以自定义分区器。

  • 消息累加器
    消息累加器主要用来缓存消息以便 Sender线程可以批量发送进而减少网络传输的资源消耗以提升性能。消息累加器的缓存大小可以通过buffer.memory配置。在消息累加器的内部为每个分区都维护了一个双端队列,主线程发送过来的消息都会被追加到某个双端队列中,队列中的内容就是 ProducerBatch,即Dqueue< ProducerBatch >。

  • Consumer 消费者
    消费者,消息的订阅者,可以订阅一个或多个主题,并且依据消息生产的顺序读取他们,消费者通过检查消息的偏移量来区分已经读取过的消息。消费者一定属于某一个特定的消费组。

    通过 subscribe 方法订阅主题具有消费者自动再均衡的功能,在多个消费者的情况下可以根据分区分配政策来自动分配各个消费者与分区的关系,以实现消费者负载均衡和故障自动转移。而通过 assign 方法则没有。

    Kafka 中的消息是基于推拉模式的。Kafka 中的消息消费是一个不断轮询的过程,消费者所要做的就是重复地调用poll 方法,而 poll 方法返回的是所订阅的主题(分区)上的一组消息。如果没有消息则返回空。
    消费确认

  1. Kafka 中的每条消息都有唯一的 offset,用来标识消息在分区中对应的位置。Kafka 默认的消费唯一的提交方式是自动提交,由enable.auto.commit配置,默认为true。自动提交不是每一条消息提交一次,而是定期提交,周期由auto.commit.interval.ms配置,默认为5秒。自动提交可能发生消息重复或者丢失的情况,Kafka 还提供了手动提交的方式。enable.auto.commit配置为false开启手动提交。
  2. 在 Kafka 中每当消费者查找不到所记录的消费位移时,就会根据消费者客户端参数auto.offset.reset的配置来决定从何处开始进行消费。默认值为 lastest,表示从分区末尾开始消费消息;earliest 表示从起始开始消费;none为不进行消费,而是抛出异常。seek 可以从特定的位移处开始拉去消息,得以追前消费或回溯消费。
  • Consumer Group 消费者群组
    同一个消费者组中保证每个分区只能被一个消费者使用 ,不会出现多个消费者读取同一个分区的情况,通过这种方式,消费者可以消费包含大量消息的主题。而且如果某个消费者失效,群组里的其他消费者可以接管失效消费者的工作。

  • Topic 主题
    Kafka中 的消息是根据 Topic 进行分类的,Topic 是支持多订阅的,一个 Topic 可以有多个不同的订阅消息的消费者。Kafka 集群 Topic 的数量没有限制,同一个 Topic 的数据会被划分在同一个目录下,一个 Topic 可以包含 1 至多个分区,所有分区的消息加在一起就是一个 Topic 的所有消息。

  • Partition 分区
    在Kafka中,每个 Topic 至少有一个 Partition ,一个 topic 可以包含多个分区 partition,topic 消息保存在各个 partition 上,由于一个 topic 能被分到多个分区上,给 kafka 提供给了并行的处理能力,这也正是 kafka 高吞吐的原因之一。
    每一个分区都是一个顺序的、不可变的消息队列, 并且可以持续的添加。分区中的消息都被分了一个序列号,称之为偏移量(offset):消息在日志中的位置,可以理解是消息在 partition 上的偏移量,也是代表消息的唯一序号。

  • Log 日志存储
    一个分区对应一个日志文件(Log),为了防止Log过大,Kafka又引入了日志分段(LogSegment)的概念,将Log 切分为多个 LogSegment,便于消息的维护和清理。Log在物理上只以(命名为topic-partitiom)文件夹的形式存储,而每个LogSegment对应磁盘上的一个日志文件和两个索引文件,以及可能的其他文件。
    LogSegment文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为segment的索引文件和数据文件

    1. partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值
    2. 数值大小为64位,20位数据字符长度,没有数字用0填充
    • 消息压缩
      一条消息通常不会太大,Kafka 是批量消息压缩,通过compression.type配置,默认为 producer,还可以配置为gzip、snappy、lz4,uncompressed表示不压缩。
    • 日志索引
      Kafka中的索引文件以稀疏索引的方式构造消息的索引,它并不保证每个消息在索引文件中都有对应的索引项。每当写入一定量(log.index.interval.bytes指定,默认4KB)的消息时,偏移量索引文件和时间戳索引文件分别增加一个偏移量索引文件项和时间戳索引文件项。稀疏索引通过MappedByteBuffer将索引文件映射到内存中,以加快索引的查询速度。
    • 日志清理
      日志删除:按照一定的保留策略(基于时间、日志大小或日志起始偏移量)直接删除不符合条件的日志分段。
      日志压缩:针对每个消息的key进行整合,对于有相同key的不同value值,只保留最后一个版本
    • 页缓存
      页缓存是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问,减少对磁盘IO的操作。
    • 零拷贝
      所谓的零拷贝是将数据直接从磁盘文件复制到网卡设备中,而不需要经由应用程序之手。减少了数据拷贝的次数和内核和用户模式之间的上下文切换。
  • Broker 服务器
    一个独立的 Kafka 服务器被称为 broker, broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存

  • Offset 偏移量
    消息的唯一标识,是连续的序列号,偏移量决定读取数据的位置,不会有线程安全的问题,消费者通过偏移量来决定下次读取的消息。消息被消费之后,并不被马上删除,这样多个业务就可以重复使用 kafkal 的消息,我们某一个业务也可以通过修改偏移量达到重新读取消息的目的,偏移量由用户控制。消息最终还是会被删除的,默认生命周期为1周(7*24小时)。

  • Replication 副本
    每个 partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当前负责数据的读写的 partition。 producer 和 consumer 只跟 leader 交互。Follower 跟随 Leader,所有写请求都通过 Leader 路由,数据变更会广播给所有 Follower,Follower 与 Leader 保持数据同步。如果 Leader 失效,则从 Follower 中选举出一个新的 Leader。

  • Zookeeper
    存储 Kafka 的元数据信息,帮助进行选举

Docker安装Kafka

安装Zookeeper

docker pull wurstmeister/zookeeper
  • 1

Alt

运行Zookeeper容器

docker run -d --name zookeeper -p 2181:2181 wurstmeister/zookeeper
  • 1

安装Kafka并运行Kafka

安装

docker pull wurstmeister/kafka
  • 1

运行

docker run -d --name kafka -p 9092:9092 --link zookeeper:zookeeper 
-v /usr/kafka/config/server.properties:/config/server.properties
--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 
--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 wurstmeister/kafka
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
--name kafka: 设置容器的名字为“kafka”。

-p 9092:9092: 将容器的9092端口映射到宿主机的9092端口。

--link zookeeper:zookeeper: 连接到名为“zookeeper”的另一个Docker容器,并且在当前的容器中可以通过zookeeper这个别名来访问它。

--env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181: 设置环境变量,指定ZooKeeper的连接字符串。

--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092: 设置环境变量,指定Kafka的advertised listeners。

--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092: 设置环境变量,指定Kafka的listeners。

--env KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1: 设置环境变量,指定offsets topic的副本因子。

wurstmeister/kafka: 使用的Docker镜像名字。
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

测试

docker exec -it kafka /bin/bash
  • 1

Java 工具类


public class AdminClient {

    public final static String TOPIC_NAME = "test";

    public static AdminClient adminClient() {
        Properties properties = new Properties();
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9999");
        AdminClient adminClient = AdminClient.create(properties);
        return adminClient;
    }

    public static void main(String[] args) throws Exception {
        AdminClient adminClient = AdminClientYang.adminClient();
        System.out.println("adminClient : " + adminClient);

        createTopic();
    }

    /**
     * 创建Topic实例
     */
    public static void createTopic() {
        AdminClient adminClient = AdminClientYang.adminClient();
        // 副本因子
        Short re = 1;
        NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, re);
        CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
        System.out.println("CreateTopicsResult : " + createTopicsResult);
        adminClient.close();
    }

    /**
     * 获取Topic列表
     */
    public static void topicList() throws Exception {
        AdminClient adminClient = adminClient();
        // 是否查看internal选项
        ListTopicsOptions options = new ListTopicsOptions();
        // 设置我们是否应该列出内部topic
        options.listInternal(true);

        // 列出集群中可用的topic
        ListTopicsResult listTopicsResult = adminClient.listTopics(options);
        // 返回一个topic名称集合的future(这里是KafkaFuture)
        Set<String> names = listTopicsResult.names().get();
        // 返回一个KafkaFuture,它产生一个 TopicListing 对象的集合
        Collection<TopicListing> topicListings = listTopicsResult.listings().get();
        // 返回一个KafkaFuture,它产生一个topic名称到 TopicListing 对象的映射。
        KafkaFuture<Map<String, TopicListing>> mapKafkaFuture = listTopicsResult.namesToListings();
        // 打印names
        names.stream().forEach(System.out::println);
        System.out.println("---------------------------topic列表-------------------------");
        // 打印topicListings
        topicListings.stream().forEach((topicList) -> {
            System.out.println(topicList);
        });
        System.out.println("---------------------------topic列表-------------------------");
    }

    /**
     * 删除topic
     */
    public static void delTopics() throws Exception {
        AdminClient adminClient = adminClient();
        // 删除一批topic。
        // 此操作不是事务性的,因此它可能对某些主题成功,而对另一些主题则失败。
        // DeleteTopicsResult返回成功后,所有代理可能需要几秒钟才能意识到主题已消失。 在此期间,
        // listTopics()和describeTopics(Collection)可能会继续返回有关已删除主题的信息。
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
        deleteTopicsResult.all().get();
    }

    /**
     * 描述topic
     * name: yibo_topic
     * desc: (name=yibo_topic,
     *      internal=false,
     *      partitions=
     *          (partition=0,
     *          leader=192.168.174.128:9092 (id: 0 rack: null),
     *          replicas=192.168.174.128:9092 (id: 0 rack: null),
     *          isr=192.168.174.128:9092 (id: 0 rack: null)),
     *          authorizedOperations=null)
     * @throws Exception
     */
    public static void describeTopic() throws Exception {
        AdminClient adminClient = adminClient();
        // 描述集群中的一些topic。
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));

        Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();

        Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
        System.out.println("----------------------------topic信息-----------------------------");
        entries.stream().forEach((entry) -> {
            System.out.println("name :" + entry.getKey() + " , desc: " + entry.getValue());
        });
        System.out.println("----------------------------topic信息-----------------------------");
    }

    /**
     * 查询配置信息
     */
    public static void describeConfig() throws Exception {
        AdminClient adminClient = adminClient();

        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        // 获取指定资源的配置
        DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
        Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
        System.out.println("----------------------------配置信息-----------------------------");
        configResourceConfigMap.entrySet().stream().forEach((entry) -> {
            System.out.println("configResource : " + entry.getKey() + " , Config : " + entry.getValue());
        });
        System.out.println("----------------------------配置信息-----------------------------");
    }


    /**
     * 修改配置信息 老版API
     */
    public static void alterConfig() throws Exception {
        AdminClient adminClient = adminClient();

        Map<ConfigResource, Collection<AlterConfigOp>> configMaps = new HashMap<>();
        // 具有配置的资源的类,需要提供type和名称 Type是他内部维护的枚举类,共有四种类型:BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2),
        // UNKNOWN((byte) 0)
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        // 包含名称、值和操作类型的更改配置条目的类 ,需要注入ConfigEntry,和操作类型,同样OpType是个枚举类
        AlterConfigOp alterConfigOp =
                new AlterConfigOp(new ConfigEntry("preallocate", "false"), AlterConfigOp.OpType.SET);
        configMaps.put(configResource, Arrays.asList(alterConfigOp));
        // 逐步更新指定资源的配置
        AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps);
        alterConfigsResult.all().get();
    }


    /**
     * 增加partitions数量
     */
    public static void incrPartitions(int partitions) throws Exception {
        AdminClient adminClient = adminClient();
        Map<String, NewPartitions> partitionsMap = new HashMap<>();

        NewPartitions newPartitions = NewPartitions.increaseTo(partitions);
        partitionsMap.put(TOPIC_NAME, newPartitions);
        CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
        createPartitionsResult.all().get();

    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/818833
推荐阅读
相关标签
  

闽ICP备14008679号