赞
踩
# 以守护进程的方式启动kafka服务端,去掉 -daemon 参数则关闭当前窗口服务端自动退出
$ bin/kafka-server-start.sh -daemon config/server.properties
分区数量为3
分区影响因子为1
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 3 --topic op_log
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
$ bin/kafka-topics.sh --zookeeper localhost:2181 --describe op_log
Error while executing topic command : The number of partitions for a topic can only be increased
$ bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic op_log1 --partition 4
$ bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic op_log
$ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --list
$ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list
$ bin/kafka-consumer-groups.sh --zookeeper localhost:2181 --group console-consumer-1291 --describe
$ bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --group mygroup --describe
$ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic mytopic --group console-consumer-1291
$ bin/kafka-consumer-offset-checker.sh --zookeeper localhost:2181 --topic mytopic --group console-consumer-1291 --broker-info
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name mytopic --add-config 'max.message.bytes=50000000,flush.message=5'
$ bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name mytopic --delete-config 'flush.message'
$ bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name mytopic --describe
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --broker-list "0" --topics-to-move-json-file ~/json/op_log_topic-to-move.json --generate
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/json/op_log_reassignment.json --execute
$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file ~/json/op_log_reassignment.json --verify
连接 zookeeper 集群的地址,用于将 kafka 集群相关的元数据信息存储到指定的 zookeeper 集群中
注册到 zookeeper 中的地址端口信息,在
IaaS
环境中,默认注册到 zookeeper 中的是内网地址,通过该配置指定外网访问的地址及端口,advertised.host.name 和 advertised.port 作用和 advertised.port 差不多,在 0.10.x 之后,直接配置 advertised.port 即可,前两个参数被废弃掉了。
自动创建 topic,默认为 true。其作用是当向一个还没有创建的 topic 发送消息时,此时会自动创建一个 topic,并同时设置 -num.partition 1 (partition 个数) 和 default.replication.factor (副本个数,默认为 1) 参数。一般该参数需要手动关闭,因为自动创建会影响 topic 的管理,我们可以通过 kafka-topic.sh 脚本手动创建 topic,通常也是建议使用这种方式创建 topic。在 0.10.x 之后提供了 kafka-admin 包,可以使用其来创建 topic。
自动 rebalance,默认为 true。其作用是通过后台线程定期扫描检查,在一定条件下触发重新 leader 选举;在生产环境中,不建议开启,因为替换 leader 在性能上没有什么提升。
后台线程数,默认为 10。用于后台操作的线程,可以不用改动。
Broker 的唯一标识,用于区分不同的 Broker。kafka 的检查就是基于此 id 是否在 zookeeper 中/brokers/ids 目录下是否有相应的 id 目录来判定 Broker 是否健康。
压缩类型。此配置可以接受的压缩类型有 gzip、snappy、lz4。另外可以不设置,即保持和生产端相同的压缩格式。
启用删除 topic。如果关闭,则无法使用 admin 工具进行 topic 的删除操作。
检查重新 rebalance 的周期时间
标识每个 Broker 失去平衡的比率,如果超过改比率,则执行重新选举 Broker 的 leader
保存 kafka 日志数据的位置。如果 log.dirs 没有设置,则使用 log.dir 指定的目录进行日志数据存储。
分区的数据量达到指定大小时,对数据进行一次刷盘操作。比如设置了 1024k 大小,当 partition 积累的数据量达到这个数值时则将数据写入到磁盘上。
数据写入磁盘时间间隔,即内存中的数据保留多久就持久化一次,如果没有设置,则使用 log.flush.scheduler.interval.ms 参数指定的值。
表示 topic 的容量达到指定大小时,则对其数据进行清除操作,默认为-1,永远不删除。
标示 topic 的数据最长保留多久,单位是小时
表示 topic 的数据最长保留多久,单位是分钟,如果没有设置该参数,则使用 log.retention.hours 参数
表示 topic 的数据最长保留多久,单位是毫秒,如果没有设置该参数,则使用 log.retention.minutes 参数
新的 segment 创建周期,单位小时。kafka 数据是以 segment 存储的,当周期时间到达时,就创建一个新的 segment 来存储数据。
segment 的大小。当 segment 大小达到指定值时,就新创建一个 segment。
能够接收的最大文件大小。需要注意的是 producer 和 consumer 端设置的大小需要一致。
最小副本同步个数。当 producer 设置了 request.required.acks 为-1 时,则 topic 的副本数要同步至该参数指定的个数,如果达不到,则 producer 端会产生异常。
指定 io 操作的线程数
执行网络操作的线程数
每个数据目录用于恢复数据的线程数
从 leader 备份数据的线程数
允许消费者端保存 offset 的最大个数
offset 提交的延迟时间
topic 的 offset 的备份数量。该参数建议设置更高保证系统更高的可用性
端口号,Broker 对外提供访问的端口号。
Broker 接收到请求后的最长等待时间,如果超过设定时间,则会给客户端发送错误信息
客户端和 zookeeper 建立连接的超时时间,如果没有设置该参数,则使用 zookeeper.session.timeout.ms 值
空连接的超时时间。即空闲连接超过该时间时则自动销毁连接。
服务端列表。即接收生产消息的服务端列表
消息键的序列化方式。指定 key 的序列化类型
消息内容的序列化方式。指定 value 的序列化类型
消息写入 Partition 的个数。通常可以设置为 0,1,all;当设置为 0 时,只需要将消息发送完成后就完成消息发送功能;当设置为 1 时,即 Leader Partition 接收到数据并完成落盘;当设置为 all 时,即主从 Partition 都接收到数据并落盘。
客户端缓存大小。即 Producer 生产的数据量缓存达到指定值时,将缓存数据一次发送的 Broker 上。
压缩类型。指定消息发送前的压缩类型,通常有 none, gzip, snappy, or, lz4 四种。不指定时消息默认不压缩。
消息发送失败时重试次数。当该值设置大于 0 时,消息因为网络异常等因素导致消息发送失败进行重新发送的次数。
bootstrap.servers
服务端列表。即消费端从指定的服务端列表中拉取消息进行消费。
key.deserializer
消息键的反序列化方式。指定 key 的反序列化类型,与序列化时指定的类型相对应。
消息内容的反序列化方式。指定 value 的反序列化类型,与序列化时指定的类型相对应。
抓取消息的最小内容。指定每次向服务端拉取的最小消息量。
消费组中每个消费者的唯一表示。
心跳检查周期。即在周期性的向 group coordinator 发送心跳,当服务端发生 rebalance 时,会将消息发送给各个消费者。该参数值需要小于 session.timeout.ms,通常为后者的 1/3。
Partition 每次返回的最大数据量大小。
consumer 失效的时间。即 consumer 在指定的时间后,还没有响应则认为该 consumer 已经发生故障了。
当 kafka 中没有初始偏移量或服务器上不存在偏移量时,指定从哪个位置开始消息消息。earliest:指定从头开始;latest:从最新的数据开始消费
group.id
消费者在消费组中的唯一标识
internal.key.converter
内部 key 的转换类型。
内部 value 的转换类型。
服务端接收到 key 时指定的转换类型。
服务端接收到 value 时指定的转换类型。
服务端列表。
心跳检测,与 consumer 中指定的配置含义相同。
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. //1.每个broker的id需要不同 broker.id=0 ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 //2.brokers对外提供的服务入口地址 #listeners=PLAINTEXT://:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3//网络IO的线程默认为3 # The number of threads that the server uses for processing requests, which may include disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma separated list of directories under which to store log files //存放消息日志文件的地址 log.dirs=/tmp/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Internal Topic Settings ############################# # The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state" # For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3. offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to excessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log unless the remaining # segments drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000 ############################# Group Coordinator Settings ############################# # The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. # The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. # The default value for this is 3 seconds. # We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. # However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. group.initial.rebalance.delay.ms=0
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see org.apache.kafka.clients.consumer.ConsumerConfig for more details # list of brokers used for bootstrapping knowledge about the rest of the cluster # format: host1:port1,host2:port2 ... bootstrap.servers=localhost:9092 # consumer group id group.id=test-consumer-group # What to do when there is no initial offset in Kafka or if the current # offset does not exist any more on the server: latest, earliest, none #auto.offset.reset=
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see org.apache.kafka.clients.producer.ProducerConfig for more details ############################# Producer Basics ############################# # list of brokers used for bootstrapping knowledge about the rest of the cluster # format: host1:port1,host2:port2 ... bootstrap.servers=localhost:9092 # specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd compression.type=none # name of the partitioner class for partitioning events; default partition spreads data randomly #partitioner.class= # the maximum amount of time the client will wait for the response of a request #request.timeout.ms= # how long `KafkaProducer.send` and `KafkaProducer.partitionsFor` will block for #max.block.ms= # the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together #linger.ms= # the maximum size of a request in bytes #max.request.size= # the default batch size in bytes when batching multiple records sent to a partition #batch.size= # the total bytes of memory the producer can use to buffer records waiting to be sent to the server #buffer.memory=
(1)前台方式启动:
JMX_PORT=9997 bin/kafka-server-start.sh config/server.properties
(2)后台方式启动:
export JMX_PORT=9997 bin/kafka-server-start.sh -daemon config/server.properties
xsync bin/kafka-server-start.sh
tar -zxvf XXXX
cd XXXX
cd bin/
chmod 777 ke.sh
1.根据需要修改kafka集群
2.数组不存储在zk,只存储在kafka
3.metrics.charts=true 表示显示图表数据
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。