当前位置:   article > 正文

kafka安装及java消费kafka

java消费kafka

安装环境

linux:Linux localhost.localdomain 3.10.0-862.el7.x86_64 #1 SMP Fri Apr 20 16:44:24 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
kafka:kafka_2.11-2.3.0.tgz
zookeeper:apache-zookeeper-3.5.5-bin.tar.gz

zookeeper安装

  1. 创建安装目录:mkdir /usr/local/zookeeper
  2. 解压zookeeper:tar -zxvf apache-zookeeper-3.5.5-bin.tar.gz
  3. cd /usr/local/zookeeper/apache-zookeeper-3.5.5-bin/config
    cp zoo_sample.cfg zoo.cfg
  4. vim zoo.cfg
    # The number of milliseconds of each tick
    tickTime=2000
     # The number of ticks that the initial
     # synchronization phase can take
     initLimit=10
     # The number of ticks that can pass between
     # sending a request and getting an acknowledgement
     syncLimit=5
     # the directory where the snapshot is stored.
     # do not use /tmp for storage, /tmp here is just
     # example sakes.
     dataDir=/tmp/zookeeper/data
     dataLogDir=/tmp/zookeeper/log
     # the port at which the clients will connect
     clientPort=2181
     server.1=206.206.127.191:2888:3888
     admin.serverPort=8099
     # the maximum number of client connections.
     # increase this if you need to handle more clients
     # maxClientCnxns=60
     # Be sure to read the maintenance section of the
     # administrator guide before turning on autopurge.
     # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
     # The number of snapshots to retain in dataDir
     # autopurge.snapRetainCount=3
     # Purge task interval in hours
     # Set to "0" to disable auto purge feature
     # autopurge.purgeInterval=1
    You have mail in /var/spool/mail/root
    
    
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
  • tickTime:CS通信心跳时间(tickTime=2000)
    Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,
    ​ 也就是每个tickTime时间就会发送一个心跳。tickTime以毫秒为单位

  • initLimit:LF初始通信时限(initLimit=10)
    集群中的follower服务器(F)与leader服务器(L)之间初始化连接时能容忍的最多心跳数 (tickTime的数量)

  • syncLimit:LF同步通信时限(syncLimit=5)
    集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数(tickTime的数量)

  • dataDir:数据文件目录
    Zookeeper保存数据的目录,默认情况下,Zookeeper将写数据的日志文件也保存在这个目录里
    例:dataDir=/tmp/zookeeper/data
    dataLogDir=/tmp/zookeeper/log

  • clientPort:客户端连接端口
    客户端连接Zookeeper服务器的端口,Zookeeper会监听这个端口,接受客户端的访问请求。
    例:clientPort=2181

  • 服务器名称与地址:集群信息(服务器编号,服务器地址,LF通信端口,选举端口)
    例:server.N=YYY:A:B
    server.1=206.206.127.191:2888:3888

  1. 创建tmp
    mkdir /tmp/zookeeper/data
    mkfir /tmp/zookeeper/log
  2. 创建myid文件
    1)cd /tmp/zookeeper
    2)touch myid
    3)vim myid
    例:集群环境zoo.cfg中:
    server.0=127.0.0.1:5555:6666
    server.1=127.0.0.1:5556:6667
    server.2=127.0.0.1:5557:6668
    分别在zk、zk2、zk3、的datadir中新建myid文件, 写入一个数字, 该数字表示这是第几号server. 该数字必须和zoo.cfg文件中的server.x中的x一一对应.
    /opt/zk/zk/data/myid文件中写入0
    /opt/zk/zk2/data/myid文件中写入1
    /opt/zk/zk3/data/myid文件中写入2
  3. 配置环境变量,
    1)vim /etc/profile
    添加内容:
    ​ export ZOOKEEPER_INSTALL=/usr/local/zookeeper/apache-zookeeper-3.5.5-bin
    ​ export PATH= P A T H : PATH: PATH:ZOOKEEPER_INSTALL/bin
    2)source /etc/profile
  4. zookeeper操作命令:
    启动:./usr/local/zookeeper/apache-zookeeper-3.5.5-bin/bin/zkServer.sh start
    查看状态:./usr/local/zookeeper/apache-zookeeper-3.5.5-bin/bin/zkServer.sh status
    状态为:Starting zookeeper … STARTED表示启动成功

kafka安装

1.创建kafka安装目录
mkdir /usr/local/kafka
2.将安装包:kafka_2.11-2.3.0.tgz,放置在/usr/local/kafka目录下
解压安装包:tar -zxvf kafka_2.11-2.3.0.tgz
3.配置server.properties
./usr/local/kafka/kafka_2.11-2.3.0/config/server.properties

# broker的全局唯一编号,不能重复,只能是数字
broker.id=0

# 集群中必须写ip地址,不然客户端访问kafka链接不上
listeners=PLAINTEXT://206.206.127.191:9092

# 处理网络请求的线程数量
num.network.threads=3

# 用来处理磁盘IO的线程数量
num.io.threads=8

# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400

# 接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400

# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600

# kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分割
log.dirs=/tmp/kafka/log

# topic在当前broker上的分区个数
num.partitions=1

# 用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1

# 每个topic创建时副本数,默认时1个副本
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

# segment文件保留的最长时间,超时将被删除
log.retention.hours=168

# 每个segment文件的大小。默认最大1G
log.segment.bytes=1073741824

# 检查过期数据的时间,默认5分钟检查一次是否数据过期
log.retention.check.interval.ms=300000

# 配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)
zookeeper.connect=206.206.127.191:2181(单机)
#zookeeper.connect=192.168.3.51:2181,192.168.3.52:2181,192.168.3.53:2181/kafka(集群)

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

group.initial.rebalance.delay.ms=0
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

4.kafka启动命令:
./bin/kafka-server-start.sh -daemon ./config/server.properties
5.查看kafka状态:jps
6.创建topic:

./bin/kafka-topics.sh --create --zookeeper 206.206.127.191:2181 
--replication-factor 1 --partitions 1 --topic test1
  • 1
  • 2

7.查看topic

./bin/kafka-topics.sh --list --zookeeper 206.206.127.191:2181
  • 1

8.生产者,消费者

./kafka-console-producer.sh --broker-list 192.168.20.91:9092 --topic test
./kafka-console-consumer.sh --bootstrap-server 192.168.20.91:9092 --topic test --from-beginning
  • 1
  • 2

java消费kafka

生产者:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
public class KafkaProducerTest implements Runnable {
    private final KafkaProducer<String, String> producer;
    private final String topic;
	public KafkaProducerTest(String topicName) {
        Properties props = new Properties();
        //bootstrap.servers:kafka的地址
        props.put("bootstrap.servers", "206.206.127.191:9092");
        /*acks:消息的确认机制,默认值是0
        * acks=0:如果设置为0,生产者不会等待kafka的响应
        * acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应
        * acks=all:这个配置意味着leader会等待所有的follower同步完成,这个确保消息不会丢失,除非kafka集群中
        *  *           所有机器挂掉。这是最强的可用性保证
        * */
        props.put("acks", "all");
        //retries:配置为大于0的值的话,客户端会在消息发送失败时重新发送
        props.put("retries", 0);
        //batch.size:当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率
        props.put("batch.size", 16384);
        //key.serializer:键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //value.serializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        this.producer = new KafkaProducer<String, String>(props);
        this.topic = topicName;
    }
@Override
    public void run() {
        int messageNo = 1;
        try {
            for(;;) {
             String messageStr="aaaa";
                producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
                //生产了100条就打印
                if(messageNo%100==0){
                    System.out.println("发送的信息:" + messageStr);
                }
                //生产1000条就退出
                if(messageNo%1000==0){
                    System.out.println("成功发送了"+messageNo+"条");
                    break;
                }
                messageNo++;
                 }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
public static void main(String args[]) {
        KafkaProducerTest test = new KafkaProducerTest("test");
        Thread thread = new Thread(test);
        thread.start();
         }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60

消费方式一:手动创建消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumerTest implements Runnable {
    private final KafkaConsumer<String, String> consumer;
    private ConsumerRecords<String, String> msgList;
    private final String topic;
    private static final String GROUPID = "groupA";

    public KafkaConsumerTest(String topicName) {
        Properties props = new Properties();
        //bootstrap.servers:kafka的地址
        props.put("bootstrap.servers", "206.206.127.191:9092");
       //group.id:组名不同可以重复消费。例如先使用了组名A消费了kafka的1000条数据,但是你还想再次
        //进行消费这1000条数据,并且不想重新去产生,那么这里你只需要更改组名就可以重复消费了
        props.put("group.id", GROUPID);
       //enable.auto.commit:是否自动提交,默认为true
        props.put("enable.auto.commit", "true");
        //auto.commit.interval.ms:从poll(拉)的回话处理时长
        props.put("auto.commit.interval.ms", "1000");
       //session.timeout.ms:超时时间
        props.put("session.timeout.ms", "30000");
        /*auto.offset.reset:消费规则,默认earliest
         * earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
         * latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
         * none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
         * */
         props.put("auto.offset.reset", "earliest");
        //key.deserializer:键序列化,默认org.apache.kafka.common.serialization.StringDeserializer
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        this.consumer = new KafkaConsumer<String, String>(props);
        this.topic = topicName;
        this.consumer.subscribe(Arrays.asList(topic));
    }
 @Override
    public void run() {
        int messageNo = 1;
        System.out.println("---------开始消费---------");
        try {
            for (; ; ) {
                msgList = consumer.poll(1000);
                if (null != msgList && msgList.count() > 0) {
                    for (ConsumerRecord<String, String> record : msgList) {
                        //消费100条就打印 ,但打印的数据不一定是这个规律的
                        if (messageNo % 100 == 0) {
                           System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset());
                        }
                        //当消费了1000条就退出
                        if (messageNo % 1000 == 0) {
                            break;
                        }
                        messageNo++;
                    }
                } else {
                    Thread.sleep(1000);
                }
                }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
 public static void main(String args[]) {
        KafkaConsumerTest test1 = new KafkaConsumerTest("test");
        Thread thread1 = new Thread(test1);
        thread1.start();
    }
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76

依赖

<!--kafka依赖-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.7.2</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

消费方式二:@KafkaListener方式消费kafka数据

@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}")
    public void message1(ConsumerRecord<?, ?> record) {
    	// 消费的哪个topic、partition的消息,打印出消息内容
		log.info("kafka消费的topic: {},分区数量:{},消费到的数据:{}", record.topic(), record.partition(), record.value());
}
  • 1
  • 2
  • 3
  • 4
  • 5

依赖:

		<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.3.5.RELEASE</version>
        </dependency>

		<dependency>
            <groupId>org.apache.zookeeper</groupId>
            <artifactId>zookeeper</artifactId>
            <version>3.6.3</version>
        </dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

配置文件:

#zookeeper服务地址
zookeeper.address=127.0.0.1:2181
#zookeeper服务超时时间
zookeeper.timeout=4000
#消费kafka数据开始位置
spring.kafka.consumer.auto-offset-reset=earliest
#kafka服务ip
spring.kafka.bootstrap-servers=127.0.0.1:9092
#kafka服务消费者Consumer
spring.kafka.consumer.group-id=consumer808205
#kafka服务的topic
kafka.topics=frms

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

kafka与Zookeeper

Kafka和Zookeeper是两个独立的开源项目,他们之间有着密切的联系,在Kafka集群中,Zookeeper用于存储集群的元数据和消费者的偏移量信息等,同时还用于协调Kafka的分区分配和Leader选举等操作

Kafka和Zookeeper的版本之间需要保持兼容性,否则可能会导致不兼容或错误。
具体以下有些常见的Kafka和Zookeeper版本之间的配套关系:

  • 在Kafka0.8x版本中,Zookeeper依然是必需的,并且需要安装Zookeeper3.3x或3.4x版本
  • 在Kafka0.9.0.0及更高版本中,引入了内置的_consumer_offsets主题来存储消费者的偏移量信息,因此不再强制要求使用Zookeeper存储偏移量信息,但是,仍然需要与Kafka集群中运行的Zookeeper版本保持兼容性,建议使用Zookeeper3.4.6或更高版本
  • 在Kafka2.0.0及更高版本中,Zookeeper的最低版本要求为3.4.9,因为该版本修复了某些安全漏洞

总之,kafka与Zookeeper之间需要保持兼容性,这意味应该使用与kafka集群中运行的zookeeper版本相兼容的版本。建议查阅相关文档以及了解具体版本之间的兼容性情况,并进行必要的测试和验证。

注:
在kafka集群中,Zookeeper用于存储集群的元数据和消费者的偏移量等重要数据

  • 元数据是描述Kafka集群中各broker,分区,副本以及消费者等组件的信息,包括但不限于:每个主题(topic)的分区数,每个分区(partation)的副本数,每个分区的ISR(IN-syncreplicas),Leader副本所在的broker等等。kafka生产者和消费者需要访问这些元数据来进行读写操作,进行负载均衡和故障恢复等等
  • 消费者的偏移量是指已经被消费者处理的消息的位置信息。为了避免重复读取消息,kafka消费者需要记录下已经消费过的消息的位置信息,这个位置被称为偏移量(offset),消费者定期提交偏移量信息到zookeeper中,以便在下次启动时从上一次停止的位置继续消费消息。

由于这些数据都是非常重要的,它们需要被可靠的存储和保护。zookeeper提供了一个分布式协调服务,它可以帮助Kafka存储这些数据,并确保它们在整个集群中是一致的和可靠的。因此,zookeeper在kafka集群中扮演着至关重要的角色,它的稳定性和高可用性直接影响着kafka集群的稳定性和可靠性。

Zookeeper协调kafka的分区分配和Leader选举等操作

  1. Broker注册:每个Kafka broker启动时会在zookeeper上注册一个节点。这样,kafka集群中的所有broker可以从同一个地方获取到其他broker的信息,例如:其ID,主机名,端口号等。
  2. 分区分配:当新的主题(topic)被创建,增加或删除分区时,zookeeper会协调进行分区分配。此时会维护一个/brokers/topics节点。其中包含了所有主题,分区和副本的信息。当有新的broker加入集群后,zookeeper会通知所有brokers进行重新分配,确保所有的分区和副本得到了合理的分配。
  3. Leader选举:当某个分区的Leader副本失效后,zookeeper会协调进行Leader选举。此时,zookeeper会维护一个/controller节点,用于存储当前负责Leader选举的Controller节点的ID。每个kafka broker都可以成为Controller节点。但只有一个Controller节点是活跃的。当一个Broker检测到某个分区的Leader副本失效时,它会想Controller节点发送一个请求参与Leader选举。Controller节点收到请求后,负责指定新的Leader.
  4. 消费者偏移量管理:zookeeper还用于协调消费者端的偏移量(offset)信息。kafka消费者需要记录下已经消费过的消费的位置信息以避免重复读取消息。消费者在处理完一些消息后,将偏移量提交到zookeeper上以持久化保存。zookeeper可以帮助kafka维护消费者偏移量并确保在不同的消费者客户端之间共享
声明:本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号