当前位置:   article > 正文

Kafka(一):初始Kafka_kafka分区重新初始化

kafka分区重新初始化

前言

Kafka作为一个分布式消息中间件,在互联网公司广泛运用,所以Kafka是我们必须要掌握的一门技术,今天我们先了解一下Kafka中的一些核心概念和安装,为后边的学习打下基础。

Kafka简介

Kafka是由LinkedIn公司采用Scala语言开发的一个多分区、多副本且基于Zookeeper协调的分布式消息系统,现已捐献给Apache基金会。目前Kafka定位是一个分布式流式处理平台,他以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。

基本概念

典型的Kafka体系架构包括若干Producer、若干Broker、若干Consumer,以及一个Zookeeper集群,如下图所示:
在这里插入图片描述

  1. Producer:生产者,发送消息一方。生产者负责创建消息,然后将消息投递到Kafka中
  2. Consumer:消费者,也就是接收消息一方。消费者连接到Kafka上并接受消息,进而进行相应的消息处理;
  3. Broker:服务代理节点,对于kafka而言,Broker可以看做一个独立的Kafka服务节点或者服务实例。大多数情况下也可以将Broker看做一台Kafka服务器,前提是这台服务器只部署了一个kafka实例。一个或多个Broker实例组成了一个Kafka集群。

主题和分区

在Kafka中还有两个特别重要的概念——主题(Topic)和分区(partition)。Kafka中的消息以主题为单位进行归类,生产者负责将消息发送发到特定的主图,而消费者负责订阅主题并进行消费。

主题是一个逻辑上的概念,可以细分为多个分区,一个分区只能属于单个主题,大多数情况下会把分区称为主题分区(Topic-partition)。同一主题下不同分区包含的消息是不同的,分区在存储层面可以看做一个可以进行追加的日志文件,消息被追加到分区日志文件的时候都会分配一个特定的偏移量offsetoffset是消息分区中唯一的标识,Kafka通过它来保证消息在分区内的顺序行,需要注意的是offset并不会跨域分区,即offset是保证的分区有序性而不是主题有序性。

如下图所示,主题中有三个分区,消息被顺序追加到每个分区日志文件尾部。
在这里插入图片描述
每条消息被发送到broker之前,会根据分区规则选择存储到哪个具体的分区。如果分区规则设计合理,所有的消息都可以均匀地分配到不同的分区中。如果一个主题文件只对应一个文件,那么这个文件所在的机器I/O将会成为这个主题性能的瓶颈。

副本机制

Kafka为分区引入了多副本概念(Replica)机制,通过增加副本分区数量可以提升容灾能力,同一分区的不同副本保存相同的消息,但是同一时刻,副本之间并非完全一样,副本之间是“一主多从”的关系,其中leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。副本处于不同的broker中,当leader副本出现故障时,从follower副本重新选举新的leader副本提供对外服务。

如下图,Kafka集群中有4个broker,某个主题中有3个分区,副本因子(副本数)是3,所以一个分区有一个leader副本两个follower副本。生产者和消费者只与leader副本进行交互,而follower副本只负责消息的同步,但是很多时候follower副本中的消息相对leader副本而言会有一定的滞后。
在这里插入图片描述
分区中所有的副本统称为AR(Assigned Replicas)。
所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISR(In-Sync-Replicas),ISR是AR集合中的一个子集。消息会先发送到leader副本,然后follower副本才能从leader副本中拉取消息进行同步,同步期间内follower副本相对于leader副本而言会有一定程度的滞后。滞后范围我们可以通过参数进行配置,与leader副本滞后过多的副本(不包含leader副本)组成OSR(Out-Of-sync-Replicas)。由此可以可得出AR=ISR+OR。

ISR与HW和LEO也有紧密的关系。HW(High Watermark缩写),标志着一个特定的消息偏移量(offset),消费者只能拉取到这个offset之前的消息。如下图所示,它代表一个日志文件,这个日志文件中有8条消息,第一条消息的offset为0,最后一条消息的offset为7,offset为8的消息用虚线表示,代表下一条待写入的消息。日志文件的HW为6,表示消费者只能拉取到0-5之间的消息,即offser为6的消息对消费者而言是不可见的。
在这里插入图片描述

安装与配置

安装环境:

Linux lpl-PC 4.4.0-18362-Microsoft #1-Microsoft Mon Mar 18 12:02:00 PST 2019 x86_64 x86_64 x86_64 GNU/Linux
  • 1
  • jdk1.8.0_181
  • zookeeper-3.4.14.tar.gz
  • kafka_2.12-2.2.0.tgz

安装JDK

kafka从2.0版本开始不再支持jdk1.7,所以适用1.8.0_181版本

  • 下载

jdk8

  • 安装
    vim ~/.bashrc

在文件中增加如下配置:

export JAVA_HOME=/usr/java/jdk1.8.0_181                                                    
export JRE_HOME=${JAVA_HOME}/jre                               
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib            
export PATH=${JAVA_HOME}/bin:$PATH 
  • 1
  • 2
  • 3
  • 4
  • 验证
root@lpl-PC:/usr/local/kafka# java -version                  
java version "1.8.0_181"
Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
  • 1
  • 2
  • 3
  • 4

source ~/.bashrc

安装zookeeper

zookeeper下载地址

  • 解压
tar zxvf zookeeper-3.4.14.tar.gz
  • 1
  • 在bashrc文件中添加如下内容:

vim ~/.bashrc

# zookeeper
export ZOOKEEPER_HOME=/usr/local/zookeeper/zookeeper-3.4.14                     
export PATH=$PATH:$ZOOKEEPER_HOME/bin 
  • 1
  • 2
  • 3

source ~/.bashrc

  • 修改zookeeper配置文件
root@lpl-PC:/usr/local/zookeeper/zookeeper-3.4.14# cd conf/                                                             
root@lpl-PC:/usr/local/zookeeper/zookeeper-3.4.14/conf# cp zoo_sample.cfg zoo.cfg                                     
  • 1
  • 2

修改zoo.cfg配置文件:

# zookeeper服务器心跳时间,单位ms
tickTime=2000                                               

#投票选举新leader的初始化时间
initLimit=10                                                

#leader与follower心跳检测最大容忍时间
syncLimit=5                                                 

# 数据目录
dataDir=/tmp/zookeeper/data                                   

#日志目录      
dataLogDir=/tmp/zookeeper/log

# zookeeper对外暴露端口
clientPort=2181  
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

需要创建data和log目录

mkdir -p /tmp/zookeeper/log 
mkdir -p /tmp/zookeeper/data
  • 1
  • 2
  • 在${dataDir}目录下创建一个myid文件,并写入一个数值,比如0。myid文件里存放的是服务器编号;
  • 启动zookeeper服务:
Using config: /usr/local/zookeeper/zookeeper-3.4.14/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
root@lpl-PC:/usr/local/zookeeper/zookeeper-3.4.14/bin# ./zkServer.sh status                                          
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/zookeeper-3.4.14/bin/../conf/zoo.cfg
Mode: standalone
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

安装kafka

kafka下载地址

安装参考

  • 解压
# 解压
tar zxvf  kafka_2.12-2.2.0.tgz
  • 1
  • 2
  • 配置文件
brokerid=0
# broker对外提供服务地址
listener=PLAINTEXT://localhost:9092
# 存放日志文件目录
log.dir=/tmp/kafka-logs
# kafka所需的Zookeeper集群地址
zookeeper.connect=localhost:2181/kafka
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 启动
# 后台启动
bin/kafka-server-start.sh config/server.properties &
# 查看运行状态
jps -l
  • 1
  • 2
  • 3
  • 4

生产与消费

kafka自带工具

Kafka提供了许多实用的脚本工具,存在在bin目录下,其中与主题相关的就是kafka-topic.sh脚本,我们通过这个脚本创建一个分区为4,副本因子为3的主题topic-1

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --create --topic topic-1 --replication-factor 3 --partitions 4
  • 1

可以通过--describe展示主题信息

bin/kafka-topics.sh --zookeeper localhost:2181/kafka --describe --topic topic-1
  • 1
Topic:topic-1   PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: topic-1  Partition: 0    Leader: 2       Replicas: 2,1,0     Isr: 2,1,0
        Topic: topic-1  Partition: 1    Leader: 0       Replicas: 0,2,1     Isr: 0,2,1
        Topic: topic-1  Partition: 2    Leader: 1       Replicas: 1,0,2     Isr: 1,0,2
        Topic: topic-1  Partition: 3    Leader: 2       Replicas: 2,0,1     Isr: 2,0,1
  • 1
  • 2
  • 3
  • 4
  • 5

使用kafka-console-consumer.sh来订阅主题方便接收消息:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-1
  • 1

使用kafka-console-producer.sh发消息:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-1

>hello kafka!
>
  • 1
  • 2
  • 3
  • 4

这样在消费者就会接收到消息;

使用代码实现

maven依赖
<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.2.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
生产者
package org.lpl.kafkademo.producer;

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

/**
 * 生产者
 *
 * @author lpl
 * @version 1.0
 * @date 2019/7/21
 **/
public class Producer {

  public static final String brokerList = "127.0.0.1:9092";
  public static final String topic = "topic-1";

  public static void main(String[] args) {
    Properties properties = new Properties();
    properties.put("key.serializer",
        "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("value.serializer",
        "org.apache.kafka.common.serialization.StringSerializer");
    properties.put("bootstrap.servers", brokerList);

    KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

    ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello word");

    try {
      producer.send(record);
    } catch (Exception e) {
      e.printStackTrace();
    }
    producer.close();
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
消费者
package org.lpl.kafkademo.producer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/**
 * 消费者
 *
 * @author lpl
 * @version 1.0
 * @date 2019/7/21
 **/
public class Consumer {

  private static final String brokerList = "localhost:9092";
  private static final String topic = "topic-1";
  private static final String groupId = "group-topic-1";

  public static void main(String[] args) {

    Properties properties = new Properties();
    properties.put("key.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer",
        "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("bootstrap.servers", brokerList);
    //设置消费者组
    properties.put("group.id", groupId);

    KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

    consumer.subscribe(Collections.singleton(topic));

    while (true) {
      ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(1000));
      for (ConsumerRecord<String, String> records : poll) {
        System.out.println(records.value());
      }
    }
  }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46

参考

《深入理解kafka核心设计与实践原理》

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Guff_9hys/article/detail/1006789
推荐阅读
相关标签
  

闽ICP备14008679号