赞
踩
linux:Linux localhost.localdomain 3.10.0-862.el7.x86_64 #1 SMP Fri Apr 20 16:44:24 UTC 2018 x86_64 x86_64 x86_64 GNU/Linux
kafka:kafka_2.11-2.3.0.tgz
zookeeper:apache-zookeeper-3.5.5-bin.tar.gz
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/tmp/zookeeper/data dataLogDir=/tmp/zookeeper/log # the port at which the clients will connect clientPort=2181 server.1=206.206.127.191:2888:3888 admin.serverPort=8099 # the maximum number of client connections. # increase this if you need to handle more clients # maxClientCnxns=60 # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # The number of snapshots to retain in dataDir # autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature # autopurge.purgeInterval=1 You have mail in /var/spool/mail/root
tickTime:CS通信心跳时间(tickTime=2000)
Zookeeper服务器之间或客户端与服务器之间维持心跳的时间间隔,
也就是每个tickTime时间就会发送一个心跳。tickTime以毫秒为单位
initLimit:LF初始通信时限(initLimit=10)
集群中的follower服务器(F)与leader服务器(L)之间初始化连接时能容忍的最多心跳数 (tickTime的数量)
syncLimit:LF同步通信时限(syncLimit=5)
集群中的follower服务器与leader服务器之间请求和应答之间能容忍的最多心跳数(tickTime的数量)
dataDir:数据文件目录
Zookeeper保存数据的目录,默认情况下,Zookeeper将写数据的日志文件也保存在这个目录里
例:dataDir=/tmp/zookeeper/data
dataLogDir=/tmp/zookeeper/log
clientPort:客户端连接端口
客户端连接Zookeeper服务器的端口,Zookeeper会监听这个端口,接受客户端的访问请求。
例:clientPort=2181
服务器名称与地址:集群信息(服务器编号,服务器地址,LF通信端口,选举端口)
例:server.N=YYY:A:B
server.1=206.206.127.191:2888:3888
1.创建kafka安装目录
mkdir /usr/local/kafka
2.将安装包:kafka_2.11-2.3.0.tgz,放置在/usr/local/kafka目录下
解压安装包:tar -zxvf kafka_2.11-2.3.0.tgz
3.配置server.properties
./usr/local/kafka/kafka_2.11-2.3.0/config/server.properties
# broker的全局唯一编号,不能重复,只能是数字 broker.id=0 # 集群中必须写ip地址,不然客户端访问kafka链接不上 listeners=PLAINTEXT://206.206.127.191:9092 # 处理网络请求的线程数量 num.network.threads=3 # 用来处理磁盘IO的线程数量 num.io.threads=8 # 发送套接字的缓冲区大小 socket.send.buffer.bytes=102400 # 接收套接字的缓冲区大小 socket.receive.buffer.bytes=102400 # 请求套接字的缓冲区大小 socket.request.max.bytes=104857600 # kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用","分割 log.dirs=/tmp/kafka/log # topic在当前broker上的分区个数 num.partitions=1 # 用来恢复和清理data下数据的线程数量 num.recovery.threads.per.data.dir=1 # 每个topic创建时副本数,默认时1个副本 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 # segment文件保留的最长时间,超时将被删除 log.retention.hours=168 # 每个segment文件的大小。默认最大1G log.segment.bytes=1073741824 # 检查过期数据的时间,默认5分钟检查一次是否数据过期 log.retention.check.interval.ms=300000 # 配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理) zookeeper.connect=206.206.127.191:2181(单机) #zookeeper.connect=192.168.3.51:2181,192.168.3.52:2181,192.168.3.53:2181/kafka(集群) # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0
4.kafka启动命令:
./bin/kafka-server-start.sh -daemon ./config/server.properties
5.查看kafka状态:jps
6.创建topic:
./bin/kafka-topics.sh --create --zookeeper 206.206.127.191:2181
--replication-factor 1 --partitions 1 --topic test1
7.查看topic
./bin/kafka-topics.sh --list --zookeeper 206.206.127.191:2181
8.生产者,消费者
./kafka-console-producer.sh --broker-list 192.168.20.91:9092 --topic test
./kafka-console-consumer.sh --bootstrap-server 192.168.20.91:9092 --topic test --from-beginning
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class KafkaProducerTest implements Runnable { private final KafkaProducer<String, String> producer; private final String topic; public KafkaProducerTest(String topicName) { Properties props = new Properties(); //bootstrap.servers:kafka的地址 props.put("bootstrap.servers", "206.206.127.191:9092"); /*acks:消息的确认机制,默认值是0 * acks=0:如果设置为0,生产者不会等待kafka的响应 * acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应 * acks=all:这个配置意味着leader会等待所有的follower同步完成,这个确保消息不会丢失,除非kafka集群中 * * 所有机器挂掉。这是最强的可用性保证 * */ props.put("acks", "all"); //retries:配置为大于0的值的话,客户端会在消息发送失败时重新发送 props.put("retries", 0); //batch.size:当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率 props.put("batch.size", 16384); //key.serializer:键序列化,默认org.apache.kafka.common.serialization.StringDeserializer props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value.serializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer。 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); this.producer = new KafkaProducer<String, String>(props); this.topic = topicName; } @Override public void run() { int messageNo = 1; try { for(;;) { String messageStr="aaaa"; producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr)); //生产了100条就打印 if(messageNo%100==0){ System.out.println("发送的信息:" + messageStr); } //生产1000条就退出 if(messageNo%1000==0){ System.out.println("成功发送了"+messageNo+"条"); break; } messageNo++; } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } public static void main(String args[]) { KafkaProducerTest test = new KafkaProducerTest("test"); Thread thread = new Thread(test); thread.start(); } }
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class KafkaConsumerTest implements Runnable { private final KafkaConsumer<String, String> consumer; private ConsumerRecords<String, String> msgList; private final String topic; private static final String GROUPID = "groupA"; public KafkaConsumerTest(String topicName) { Properties props = new Properties(); //bootstrap.servers:kafka的地址 props.put("bootstrap.servers", "206.206.127.191:9092"); //group.id:组名不同可以重复消费。例如先使用了组名A消费了kafka的1000条数据,但是你还想再次 //进行消费这1000条数据,并且不想重新去产生,那么这里你只需要更改组名就可以重复消费了 props.put("group.id", GROUPID); //enable.auto.commit:是否自动提交,默认为true props.put("enable.auto.commit", "true"); //auto.commit.interval.ms:从poll(拉)的回话处理时长 props.put("auto.commit.interval.ms", "1000"); //session.timeout.ms:超时时间 props.put("session.timeout.ms", "30000"); /*auto.offset.reset:消费规则,默认earliest * earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 * latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 * none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 * */ props.put("auto.offset.reset", "earliest"); //key.deserializer:键序列化,默认org.apache.kafka.common.serialization.StringDeserializer props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); //value.deserializer:值序列化,默认org.apache.kafka.common.serialization.StringDeserializer props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); this.consumer = new KafkaConsumer<String, String>(props); this.topic = topicName; this.consumer.subscribe(Arrays.asList(topic)); } @Override public void run() { int messageNo = 1; System.out.println("---------开始消费---------"); try { for (; ; ) { msgList = consumer.poll(1000); if (null != msgList && msgList.count() > 0) { for (ConsumerRecord<String, String> record : msgList) { //消费100条就打印 ,但打印的数据不一定是这个规律的 if (messageNo % 100 == 0) { System.out.println(messageNo + "=======receive: key = " + record.key() + ", value = " + record.value() + " offset===" + record.offset()); } //当消费了1000条就退出 if (messageNo % 1000 == 0) { break; } messageNo++; } } else { Thread.sleep(1000); } } } catch (InterruptedException e) { e.printStackTrace(); } finally { consumer.close(); } } public static void main(String args[]) { KafkaConsumerTest test1 = new KafkaConsumerTest("test"); Thread thread1 = new Thread(test1); thread1.start(); } }
依赖
<!--kafka依赖-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.7.2</version>
</dependency>
@KafkaListener(topics = "#{'${kafka.topics}'.split(',')}")
public void message1(ConsumerRecord<?, ?> record) {
// 消费的哪个topic、partition的消息,打印出消息内容
log.info("kafka消费的topic: {},分区数量:{},消费到的数据:{}", record.topic(), record.partition(), record.value());
}
依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.5.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.6.3</version>
</dependency>
配置文件:
#zookeeper服务地址
zookeeper.address=127.0.0.1:2181
#zookeeper服务超时时间
zookeeper.timeout=4000
#消费kafka数据开始位置
spring.kafka.consumer.auto-offset-reset=earliest
#kafka服务ip
spring.kafka.bootstrap-servers=127.0.0.1:9092
#kafka服务消费者Consumer
spring.kafka.consumer.group-id=consumer808205
#kafka服务的topic
kafka.topics=frms
Kafka和Zookeeper是两个独立的开源项目,他们之间有着密切的联系,在Kafka集群中,Zookeeper用于存储集群的元数据和消费者的偏移量信息等,同时还用于协调Kafka的分区分配和Leader选举等操作
Kafka和Zookeeper的版本之间需要保持兼容性,否则可能会导致不兼容或错误。
具体以下有些常见的Kafka和Zookeeper版本之间的配套关系:
总之,kafka与Zookeeper之间需要保持兼容性,这意味应该使用与kafka集群中运行的zookeeper版本相兼容的版本。建议查阅相关文档以及了解具体版本之间的兼容性情况,并进行必要的测试和验证。
注:
在kafka集群中,Zookeeper用于存储集群的元数据和消费者的偏移量等重要数据
由于这些数据都是非常重要的,它们需要被可靠的存储和保护。zookeeper提供了一个分布式协调服务,它可以帮助Kafka存储这些数据,并确保它们在整个集群中是一致的和可靠的。因此,zookeeper在kafka集群中扮演着至关重要的角色,它的稳定性和高可用性直接影响着kafka集群的稳定性和可靠性。
Zookeeper协调kafka的分区分配和Leader选举等操作
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。