http://kafka.apache.org/ 进入到Kafka的官网。找到下载选项
wget http://labfile.oss.aliyuncs.com/courses/859/kafka_2.10-
会看到在bin目录下存在一个 zookeeper-server-start.sh 的启动脚本。执行启动脚本之后会发现,并没有启动而是给出了一个使用提示。这个是指向了一个配置文件,那么接下来就是要去查看这个配置文件。
查看zookeeper.properties 配置文件会发现其中有三个配置参数分别是数据存放的目录、端口号、最大客户端链接数。
root@nihui-PC:~/kafka_2.10- ./zookeeper-server-start.sh ../config/zookeeper.properties Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=gasp [2019-08-10 11:45:18,810] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2019-08-10 11:45:18,821] INFO autopurge.snapRetainCount set to 3 (org.apache.zookeeper.server.DatadirCleanupManager) [2019-08-10 11:45:18,821] INFO autopurge.purgeInterval set to 0 (org.apache.zookeeper.server.DatadirCleanupManager) [2019-08-10 11:45:18,821] INFO Purge task is not scheduled. (org.apache.zookeeper.server.DatadirCleanupManager) [2019-08-10 11:45:18,821] WARN Either no config or no quorum defined in config, running in standalone mode (org.apache.zookeeper.server.quorum.QuorumPeerMain) [2019-08-10 11:45:18,841] INFO Reading configuration from: ../config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig) [2019-08-10 11:45:18,842] INFO Starting server (org.apache.zookeeper.server.ZooKeeperServerMain) [2019-08-10 11:45:18,853] INFO Server environment:zookeeper.version=3.4.9-1757313, built on 08/23/2016 06:50 GMT (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,853] INFO Server environment:host.name=nihui-PC (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,853] INFO Server environment:java.version=1.8.0_161 (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,853] INFO Server environment:java.vendor=Oracle Corporation (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,853] INFO Server environment:java.home=/root/jdk1.8.0_161/jre (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,853] INFO Server environment:java.class.path=.:/root/jdk1.8.0_161/lib/dt.jar:/root/jdk1.8.0_161/lib/tools.jar:/root/kafka_2.10- (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,853] INFO Server environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,853] INFO Server environment:java.io.tmpdir=/tmp (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,853] INFO Server environment:java.compiler=<NA> (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,853] INFO Server environment:os.name=Linux (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,853] INFO Server environment:os.arch=amd64 (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,853] INFO Server environment:os.version=4.15.0-29deepin-generic (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,853] INFO Server environment:user.name=root (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,853] INFO Server environment:user.home=/root (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,853] INFO Server environment:user.dir=/root/kafka_2.10- (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,860] INFO tickTime set to 3000 (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,860] INFO minSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,860] INFO maxSessionTimeout set to -1 (org.apache.zookeeper.server.ZooKeeperServer) [2019-08-10 11:45:18,911] INFO binding to port (org.apache.zookeeper.server.NIOServerCnxnFactory)
接下来就是启动kafka的服务器也就是在bin目录下的 kafka-server-start.sh 启动脚本,同样不输入任何的参数。
# Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # see kafka.server.KafkaConfig for additional details and defaults ############################# Server Basics ############################# # The id of the broker. This must be set to a unique integer for each broker. broker.id=0 # Switch to enable topic deletion or not, default value is false #delete.topic.enable=true ############################# Socket Server Settings ############################# # The address the socket server listens on. It will get the value returned from # java.net.InetAddress.getCanonicalHostName() if not configured. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL # The number of threads handling network requests num.network.threads=3 # The number of threads doing disk I/O num.io.threads=8 # The send buffer (SO_SNDBUF) used by the socket server socket.send.buffer.bytes=102400 # The receive buffer (SO_RCVBUF) used by the socket server socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 ############################# Log Basics ############################# # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=1 # The number of threads per data directory to be used for log recovery at startup and flushing at shutdown. # This value is recommended to be increased for installations with data dirs located in RAID array. num.recovery.threads.per.data.dir=1 ############################# Log Flush Policy ############################# # Messages are immediately written to the filesystem but by default we only fsync() to sync # the OS cache lazily. The following configurations control the flush of data to disk. # There are a few important trade-offs here: # 1. Durability: Unflushed data may be lost if you are not using replication. # 2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush. # 3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks. # The settings below allow one to configure the flush policy to flush data after a period of time or # every N messages (or both). This can be done globally and overridden on a per-topic basis. # The number of messages to accept before forcing a flush of data to disk #log.flush.interval.messages=10000 # The maximum amount of time a message can sit in a log before we force a flush #log.flush.interval.ms=1000 ############################# Log Retention Policy ############################# # The following configurations control the disposal of log segments. The policy can # be set to delete segments after a period of time, or after a given size has accumulated. # A segment will be deleted whenever *either* of these criteria are met. Deletion always happens # from the end of the log. # The minimum age of a log file to be eligible for deletion due to age log.retention.hours=168 # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining # segments don't drop below log.retention.bytes. Functions independently of log.retention.hours. #log.retention.bytes=1073741824 # The maximum size of a log segment file. When this size is reached a new log segment will be created. log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 ############################# Zookeeper ############################# # Zookeeper connection string (see zookeeper docs for details). # This is a comma separated host:port pairs, each corresponding to a zk # server. e.g. ",,". # You can also append an optional chroot string to the urls to specify the # root directory for all kafka znodes. zookeeper.connect=localhost:2181 # Timeout in ms for connecting to zookeeper zookeeper.connection.timeout.ms=6000
socket.send.buffer.bytes=102400 发送数据缓存大小
socket.receive.buffer.bytes=102400 接收数据缓存大小
zookeeper.connect=localhost:2181 zk 地址
zookeeper.connection.timeout.ms=6000 zk链接超时时间
root@nihui-PC:~/kafka_2.10- ./kafka-server-start.sh ../config/server.properties Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=gasp [2019-08-10 11:55:38,544] INFO KafkaConfig values: advertised.host.name = null advertised.listeners = null advertised.port = null authorizer.class.name = auto.create.topics.enable = true auto.leader.rebalance.enable = true background.threads = 10 broker.id = 0 broker.id.generation.enable = true broker.rack = null compression.type = producer connections.max.idle.ms = 600000 controlled.shutdown.enable = true controlled.shutdown.max.retries = 3 controlled.shutdown.retry.backoff.ms = 5000 controller.socket.timeout.ms = 30000 create.topic.policy.class.name = null default.replication.factor = 1 delete.topic.enable = false fetch.purgatory.purge.interval.requests = 1000 group.max.session.timeout.ms = 300000 group.min.session.timeout.ms = 6000 host.name = inter.broker.listener.name = null inter.broker.protocol.version = 0.10.2-IV0 leader.imbalance.check.interval.seconds = 300 leader.imbalance.per.broker.percentage = 10 listener.security.protocol.map = SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,TRACE:TRACE,SASL_SSL:SASL_SSL,PLAINTEXT:PLAINTEXT listeners = null log.cleaner.backoff.ms = 15000 log.cleaner.dedupe.buffer.size = 134217728 log.cleaner.delete.retention.ms = 86400000 log.cleaner.enable = true log.cleaner.io.buffer.load.factor = 0.9 log.cleaner.io.buffer.size = 524288 log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308 log.cleaner.min.cleanable.ratio = 0.5 log.cleaner.min.compaction.lag.ms = 0 log.cleaner.threads = 1 log.cleanup.policy = [delete] log.dir = /tmp/kafka-logs log.dirs = /tmp/kafka-logs log.flush.interval.messages = 9223372036854775807 log.flush.interval.ms = null log.flush.offset.checkpoint.interval.ms = 60000 log.flush.scheduler.interval.ms = 9223372036854775807 log.index.interval.bytes = 4096 log.index.size.max.bytes = 10485760 log.message.format.version = 0.10.2-IV0 log.message.timestamp.difference.max.ms = 9223372036854775807 log.message.timestamp.type = CreateTime log.preallocate = false log.retention.bytes = -1 log.retention.check.interval.ms = 300000 log.retention.hours = 168 log.retention.minutes = null log.retention.ms = null log.roll.hours = 168 log.roll.jitter.hours = 0 log.roll.jitter.ms = null log.roll.ms = null log.segment.bytes = 1073741824 log.segment.delete.delay.ms = 60000 max.connections.per.ip = 2147483647 max.connections.per.ip.overrides = message.max.bytes = 1000012 metric.reporters = [] metrics.num.samples = 2 metrics.recording.level = INFO metrics.sample.window.ms = 30000 min.insync.replicas = 1 num.io.threads = 8 num.network.threads = 3 num.partitions = 1 num.recovery.threads.per.data.dir = 1 num.replica.fetchers = 1 offset.metadata.max.bytes = 4096 offsets.commit.required.acks = -1 offsets.commit.timeout.ms = 5000 offsets.load.buffer.size = 5242880 offsets.retention.check.interval.ms = 600000 offsets.retention.minutes = 1440 offsets.topic.compression.codec = 0 offsets.topic.num.partitions = 50 offsets.topic.replication.factor = 3 offsets.topic.segment.bytes = 104857600 port = 9092 principal.builder.class = class org.apache.kafka.common.security.auth.DefaultPrincipalBuilder producer.purgatory.purge.interval.requests = 1000 queued.max.requests = 500 quota.consumer.default = 9223372036854775807 quota.producer.default = 9223372036854775807 quota.window.num = 11 quota.window.size.seconds = 1 replica.fetch.backoff.ms = 1000 replica.fetch.max.bytes = 1048576 replica.fetch.min.bytes = 1 replica.fetch.response.max.bytes = 10485760 replica.fetch.wait.max.ms = 500 replica.high.watermark.checkpoint.interval.ms = 5000 replica.lag.time.max.ms = 10000 replica.socket.receive.buffer.bytes = 65536 replica.socket.timeout.ms = 30000 replication.quota.window.num = 11 replication.quota.window.size.seconds = 1 request.timeout.ms = 30000 reserved.broker.max.id = 1000 sasl.enabled.mechanisms = [GSSAPI] sasl.kerberos.kinit.cmd = /usr/bin/kinit sasl.kerberos.min.time.before.relogin = 60000 sasl.kerberos.principal.to.local.rules = [DEFAULT] sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 sasl.kerberos.ticket.renew.window.factor = 0.8 sasl.mechanism.inter.broker.protocol = GSSAPI security.inter.broker.protocol = PLAINTEXT socket.receive.buffer.bytes = 102400 socket.request.max.bytes = 104857600 socket.send.buffer.bytes = 102400 ssl.cipher.suites = null ssl.client.auth = none ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] ssl.endpoint.identification.algorithm = null ssl.key.password = null ssl.keymanager.algorithm = SunX509 ssl.keystore.location = null ssl.keystore.password = null ssl.keystore.type = JKS ssl.protocol = TLS ssl.provider = null ssl.secure.random.implementation = null ssl.trustmanager.algorithm = PKIX ssl.truststore.location = null ssl.truststore.password = null ssl.truststore.type = JKS unclean.leader.election.enable = true zookeeper.connect = localhost:2181 zookeeper.connection.timeout.ms = 6000 zookeeper.session.timeout.ms = 6000 zookeeper.set.acl = false zookeeper.sync.time.ms = 2000 (kafka.server.KafkaConfig) [2019-08-10 11:55:38,576] INFO starting (kafka.server.KafkaServer) [2019-08-10 11:55:38,578] INFO Connecting to zookeeper on localhost:2181 (kafka.server.KafkaServer) [2019-08-10 11:55:38,589] INFO Starting ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) [2019-08-10 11:55:38,593] INFO Client environment:zookeeper.version=3.4.9-1757313, built on 08/23/2016 06:50 GMT (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,593] INFO Client environment:host.name=nihui-PC (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,593] INFO Client environment:java.version=1.8.0_161 (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,593] INFO Client environment:java.vendor=Oracle Corporation (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,593] INFO Client environment:java.home=/root/jdk1.8.0_161/jre (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,593] INFO Client environment:java.class.path=.:/root/jdk1.8.0_161/lib/dt.jar:/root/jdk1.8.0_161/lib/tools.jar:/root/kafka_2.10- (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,593] INFO Client environment:java.library.path=/usr/java/packages/lib/amd64:/usr/lib64:/lib64:/lib:/usr/lib (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,593] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,593] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,593] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,593] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,593] INFO Client environment:os.version=4.15.0-29deepin-generic (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,593] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,593] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,593] INFO Client environment:user.dir=/root/kafka_2.10- (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,594] INFO Initiating client connection, connectString=localhost:2181 sessionTimeout=6000 watcher=org.I0Itec.zkclient.ZkClient@4567f35d (org.apache.zookeeper.ZooKeeper) [2019-08-10 11:55:38,605] INFO Waiting for keeper state SyncConnected (org.I0Itec.zkclient.ZkClient) [2019-08-10 11:55:38,609] INFO Opening socket connection to server localhost/ Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn) [2019-08-10 11:55:38,619] INFO Socket connection established to localhost/, initiating session (org.apache.zookeeper.ClientCnxn) [2019-08-10 11:55:38,686] INFO Session establishment complete on server localhost/, sessionid = 0x16c79a1704c0000, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn) [2019-08-10 11:55:38,687] INFO zookeeper state changed (SyncConnected) (org.I0Itec.zkclient.ZkClient) [2019-08-10 11:55:38,911] INFO Cluster ID = VogbcQUaRR2E3y6tLPRDTg (kafka.server.KafkaServer) [2019-08-10 11:55:38,917] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2019-08-10 11:55:38,939] INFO [ThrottledRequestReaper-Fetch], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2019-08-10 11:55:38,940] INFO [ThrottledRequestReaper-Produce], Starting (kafka.server.ClientQuotaManager$ThrottledRequestReaper) [2019-08-10 11:55:38,965] INFO Log directory '/tmp/kafka-logs' not found, creating it. (kafka.log.LogManager) [2019-08-10 11:55:38,976] INFO Loading logs. (kafka.log.LogManager) [2019-08-10 11:55:38,980] INFO Logs loading complete in 4 ms. (kafka.log.LogManager) [2019-08-10 11:55:39,029] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager) [2019-08-10 11:55:39,030] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager) [2019-08-10 11:55:39,067] INFO Awaiting socket connections on (kafka.network.Acceptor) [2019-08-10 11:55:39,069] INFO [Socket Server on Broker 0], Started 1 acceptor threads (kafka.network.SocketServer) [2019-08-10 11:55:39,081] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2019-08-10 11:55:39,082] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2019-08-10 11:55:39,104] INFO Creating /controller (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2019-08-10 11:55:39,121] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2019-08-10 11:55:39,122] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector) [2019-08-10 11:55:39,198] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2019-08-10 11:55:39,200] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2019-08-10 11:55:39,201] INFO [ExpirationReaper-0], Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper) [2019-08-10 11:55:39,208] INFO [GroupCoordinator 0]: Starting up. (kafka.coordinator.GroupCoordinator) [2019-08-10 11:55:39,209] INFO [GroupCoordinator 0]: Startup complete. (kafka.coordinator.GroupCoordinator) [2019-08-10 11:55:39,210] INFO [Group Metadata Manager on Broker 0]: Removed 0 expired offsets in 0 milliseconds. (kafka.coordinator.GroupMetadataManager) [2019-08-10 11:55:39,229] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$) [2019-08-10 11:55:39,244] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener) [2019-08-10 11:55:39,247] INFO Creating /brokers/ids/0 (is it secure? false) (kafka.utils.ZKCheckedEphemeral) [2019-08-10 11:55:39,253] INFO Result of znode creation is: OK (kafka.utils.ZKCheckedEphemeral) [2019-08-10 11:55:39,254] INFO Registered broker 0 at path /brokers/ids/0 with addresses: EndPoint(nihui-PC,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.utils.ZkUtils) [2019-08-10 11:55:39,255] WARN No meta.properties file under dir /tmp/kafka-logs/meta.properties (kafka.server.BrokerMetadataCheckpoint) [2019-08-10 11:55:39,369] INFO Kafka version : (org.apache.kafka.common.utils.AppInfoParser) [2019-08-10 11:55:39,369] INFO Kafka commitId : e89bffd6b2eff799 (org.apache.kafka.common.utils.AppInfoParser) [2019-08-10 11:55:39,370] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
root@nihui-PC:~/kafka_2.10- ./kafka-topics.sh Picked up _JAVA_OPTIONS: -Dawt.useSystemAAFontSettings=gasp Create, delete, describe, or change a topic. Option Description ------ ----------- --alter Alter the number of partitions, replica assignment, and/or configuration for the topic. --config <String: name=value> A topic configuration override for the topic being created or altered.The following is a list of valid configurations: cleanup.policy compression.type delete.retention.ms file.delete.delay.ms flush.messages flush.ms follower.replication.throttled. replicas index.interval.bytes leader.replication.throttled.replicas max.message.bytes message.format.version message.timestamp.difference.max.ms message.timestamp.type min.cleanable.dirty.ratio min.compaction.lag.ms min.insync.replicas preallocate retention.bytes retention.ms segment.bytes segment.index.bytes segment.jitter.ms segment.ms unclean.leader.election.enable See the Kafka documentation for full details on the topic configs. --create Create a new topic. --delete Delete a topic --delete-config <String: name> A topic configuration override to be removed for an existing topic (see the list of configurations under the --config option). --describe List details for the given topics. --disable-rack-aware Disable rack aware replica assignment --force Suppress console prompts --help Print usage information. --if-exists if set when altering or deleting topics, the action will only execute if the topic exists --if-not-exists if set when creating topics, the action will only execute if the topic does not already exist --list List all available topics. --partitions <Integer: # of partitions> The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected --replica-assignment <String: A list of manual partition-to-broker broker_id_for_part1_replica1 : assignments for the topic being broker_id_for_part1_replica2 , created or altered. broker_id_for_part2_replica1 : broker_id_for_part2_replica2 , ...> --replication-factor <Integer: The replication factor for each replication factor> partition in the topic being created. --topic <String: topic> The topic to be create, alter or describe. Can also accept a regular expression except for --create option --topics-with-overrides if set when describing topics, only show topics that have overridden configs --unavailable-partitions if set when describing topics, only show partitions whose leader is not available --under-replicated-partitions if set when describing topics, only show under replicated partitions --zookeeper <String: urls> REQUIRED: The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.
./kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 1
./kafka-topics.sh --zookeeper localhost:2181 --describe --topic test
./kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。