赞
踩
[root@ywxtdb opt]# wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz
[root@ywxtdb opt]# tar -xvf kafka_2.12-2.1.0.tgz
[root@ywxtdb config]# vi /opt/kafka_2.12-2.1.0/config/server.properties
主要修改:
broker.id=0
listeners=PLAINTEXT://192.168.1.128:9092
log.dirs=/var/kafka-logs
zookeeper.connect=localhost:2181
集群:
zookeeper.connect=192.168.91.128:2181,192.168.91.129:2181,129.168.91.130:2181/kafka
[root@ywxtdb config]# vi /etc/profile
export KAFKA_HOME=/opt/kafka_2.12-2.1.0
export PATH=$PATH:$KAFKA_HOME/bin
启动kafka前,记得先启动zookeeper。
[root@ywxtdb config]# kafka-server-start.sh ./server.properties
启动正常后,可以看到zookeeper下面创建了一个kafka节点
- [zk: localhost:2181(CONNECTED) 16] ls /
- [kafka, zookeeper]
其中REQUIRED是必填的参数
- [root@ywxtdb ~]# kafka-topics.sh
- OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase
- from one, then you should configure the number of parallel GC threads appropriately using
- -XX:ParallelGCThreads=N
- Create, delete, describe, or change a topic.
- Option Description
- ------ -----------
- --alter Alter the number of partitions,
- replica assignment, and/or
- configuration for the topic.
- --config <String: name=value> A topic configuration override for the
- topic being created or altered.The
- following is a list of valid
- configurations:
- cleanup.policy
- compression.type
- delete.retention.ms
- file.delete.delay.ms
- flush.messages
- flush.ms
- follower.replication.throttled.
- replicas
- index.interval.bytes
- leader.replication.throttled.replicas
- max.message.bytes
- message.downconversion.enable
- message.format.version
- message.timestamp.difference.max.ms
- message.timestamp.type
- min.cleanable.dirty.ratio
- min.compaction.lag.ms
- min.insync.replicas
- preallocate
- retention.bytes
- retention.ms
- segment.bytes
- segment.index.bytes
- segment.jitter.ms
- segment.ms
- unclean.leader.election.enable
- See the Kafka documentation for full
- details on the topic configs.
- --create Create a new topic.
- --delete Delete a topic
- --delete-config <String: name> A topic configuration override to be
- removed for an existing topic (see
- the list of configurations under the
- --config option).
- --describe List details for the given topics.
- --disable-rack-aware Disable rack aware replica assignment
- --exclude-internal exclude internal topics when running
- list or describe command. The
- internal topics will be listed by
- default
- --force Suppress console prompts
- --help Print usage information.
- --if-exists if set when altering or deleting
- topics, the action will only execute
- if the topic exists
- --if-not-exists if set when creating topics, the
- action will only execute if the
- topic does not already exist
- --list List all available topics.
- --partitions <Integer: # of partitions> The number of partitions for the topic
- being created or altered (WARNING:
- If partitions are increased for a
- topic that has a key, the partition
- logic or ordering of the messages
- will be affected
- --replica-assignment <String: A list of manual partition-to-broker
- broker_id_for_part1_replica1 : assignments for the topic being
- broker_id_for_part1_replica2 , created or altered.
- broker_id_for_part2_replica1 :
- broker_id_for_part2_replica2 , ...>
- --replication-factor <Integer: The replication factor for each
- replication factor> partition in the topic being created.
- --topic <String: topic> The topic to be create, alter or
- describe. Can also accept a regular
- expression except for --create option
- --topics-with-overrides if set when describing topics, only
- show topics that have overridden
- configs
- --unavailable-partitions if set when describing topics, only
- show partitions whose leader is not
- available
- --under-replicated-partitions if set when describing topics, only
- show under replicated partitions
- --zookeeper <String: hosts> REQUIRED: The connection string for
- the zookeeper connection in the form
- host:port. Multiple hosts can be
- given to allow fail-over.
kafka-topics.sh 处理topic的脚本,他需要依赖zookeeper去创建topic,需要加上zk的配置。
--create 说明当前是创建topic
--topic 要创建的topic名称
--partitions 创建的分区数,主要看你节点数量来配置
--replication-factor 创建分区的附本数量
- [root@node01 bin]# kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --create --topic bobby --partitions 2 --replication-factor 2
- Created topic "bobby".
- [root@node01 bin]# kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --list bobby
- bobby
- [root@node01 bin]# kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --describe --topic bobby
- Topic:bobby PartitionCount:2 ReplicationFactor:2 Configs:
- Topic: bobby Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
- Topic: bobby Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1
其中REQUIRED是必填的参数
- [root@ywxtdb ~]# kafka-console-consumer.sh
- The console consumer is a tool that reads data from Kafka and outputs it to standard output.
- Option Description
- ------ -----------
- --bootstrap-server <String: server to REQUIRED: The server(s) to connect to.
- connect to>
- --consumer-property <String: A mechanism to pass user-defined
- consumer_prop> properties in the form key=value to
- the consumer.
- --consumer.config <String: config file> Consumer config properties file. Note
- that [consumer-property] takes
- precedence over this config.
- --enable-systest-events Log lifecycle events of the consumer
- in addition to logging consumed
- messages. (This is specific for
- system tests.)
- --formatter <String: class> The name of a class to use for
- formatting kafka messages for
- display. (default: kafka.tools.
- DefaultMessageFormatter)
- --from-beginning If the consumer does not already have
- an established offset to consume
- from, start with the earliest
- message present in the log rather
- than the latest message.
- --group <String: consumer group id> The consumer group id of the consumer.
- --isolation-level <String> Set to read_committed in order to
- filter out transactional messages
- which are not committed. Set to
- read_uncommittedto read all
- messages. (default: read_uncommitted)
- --key-deserializer <String:
- deserializer for key>
- --max-messages <Integer: num_messages> The maximum number of messages to
- consume before exiting. If not set,
- consumption is continual.
- --offset <String: consume offset> The offset id to consume from (a non-
- negative number), or 'earliest'
- which means from beginning, or
- 'latest' which means from end
- (default: latest)
- --partition <Integer: partition> The partition to consume from.
- Consumption starts from the end of
- the partition unless '--offset' is
- specified.
- --property <String: prop> The properties to initialize the
- message formatter. Default
- properties include:
- print.timestamp=true|false
- print.key=true|false
- print.value=true|false
- key.separator=<key.separator>
- line.separator=<line.separator>
- key.deserializer=<key.deserializer>
- value.deserializer=<value.
- deserializer>
- Users can also pass in customized
- properties for their formatter; more
- specifically, users can pass in
- properties keyed with 'key.
- deserializer.' and 'value.
- deserializer.' prefixes to configure
- their deserializers.
- --skip-message-on-error If there is an error when processing a
- message, skip it instead of halt.
- --timeout-ms <Integer: timeout_ms> If specified, exit if no message is
- available for consumption for the
- specified interval.
- --topic <String: topic> The topic id to consume on.
- --value-deserializer <String:
- deserializer for values>
- --whitelist <String: whitelist> Whitelist of topics to include for
- consumption.
客户端无需依赖zookeeper,使用bootstrap-server启动,
--topic topic的名称
--group 创建一个分组,带上分组名
[root@node01 bin]# kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bobby --group demo
3、检查group
- [root@node01 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --list
- demo
- demo1
查看指定group情况
- [root@node01 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --group demo
-
- TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
- bobby 0 16 16 0 consumer-1-74554efe-e529-4c25-ab7f-c0128e3c7f22 /192.168.91.128 consumer-1
- bobby 1 25 25 0 consumer-1-74554efe-e529-4c25-ab7f-c0128e3c7f22 /192.168.91.128 consumer-1
- [root@node01 kafka-logs]#
其中REQUIRED是必填的参数
- [root@ywxtdb ~]# kafka-console-producer.sh
- Read data from standard input and publish it to Kafka.
- Option Description
- ------ -----------
- --batch-size <Integer: size> Number of messages to send in a single
- batch if they are not being sent
- synchronously. (default: 200)
- --broker-list <String: broker-list> REQUIRED: The broker list string in
- the form HOST1:PORT1,HOST2:PORT2.
- --compression-codec [String: The compression codec: either 'none',
- compression-codec] 'gzip', 'snappy', 'lz4', or 'zstd'.
- If specified without value, then it
- defaults to 'gzip'
- --line-reader <String: reader_class> The class name of the class to use for
- reading lines from standard in. By
- default each line is read as a
- separate message. (default: kafka.
- tools.
- ConsoleProducer$LineMessageReader)
- --max-block-ms <Long: max block on The max time that the producer will
- send> block for during a send request
- (default: 60000)
- --max-memory-bytes <Long: total memory The total memory used by the producer
- in bytes> to buffer records waiting to be sent
- to the server. (default: 33554432)
- --max-partition-memory-bytes <Long: The buffer size allocated for a
- memory in bytes per partition> partition. When records are received
- which are smaller than this size the
- producer will attempt to
- optimistically group them together
- until this size is reached.
- (default: 16384)
- --message-send-max-retries <Integer> Brokers can fail receiving the message
- for multiple reasons, and being
- unavailable transiently is just one
- of them. This property specifies the
- number of retires before the
- producer give up and drop this
- message. (default: 3)
- --metadata-expiry-ms <Long: metadata The period of time in milliseconds
- expiration interval> after which we force a refresh of
- metadata even if we haven't seen any
- leadership changes. (default: 300000)
- --producer-property <String: A mechanism to pass user-defined
- producer_prop> properties in the form key=value to
- the producer.
- --producer.config <String: config file> Producer config properties file. Note
- that [producer-property] takes
- precedence over this config.
- --property <String: prop> A mechanism to pass user-defined
- properties in the form key=value to
- the message reader. This allows
- custom configuration for a user-
- defined message reader.
- --request-required-acks <String: The required acks of the producer
- request required acks> requests (default: 1)
- --request-timeout-ms <Integer: request The ack timeout of the producer
- timeout ms> requests. Value must be non-negative
- and non-zero (default: 1500)
- --retry-backoff-ms <Integer> Before each retry, the producer
- refreshes the metadata of relevant
- topics. Since leader election takes
- a bit of time, this property
- specifies the amount of time that
- the producer waits before refreshing
- the metadata. (default: 100)
- --socket-buffer-size <Integer: size> The size of the tcp RECV size.
- (default: 102400)
- --sync If set message send requests to the
- brokers are synchronously, one at a
- time as they arrive.
- --timeout <Integer: timeout_ms> If set and the producer is running in
- asynchronous mode, this gives the
- maximum amount of time a message
- will queue awaiting sufficient batch
- size. The value is given in ms.
- (default: 1000)
- --topic <String: topic> REQUIRED: The topic id to produce
- messages to.
--topic topic名称
--broker-list 指定kafka节点,关联当下的broker
当producer输入hello后
- [root@node01 bin]# kafka-console-producer.sh --topic bobby --broker-list node3:9092
- >111
可以看到consumer端已经消费到111
- [root@node01 bin]# kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic bobby --group demo
- 111
一个生产者 , 一个消费者
一个生产者,两组消费者,demo组有两个消费者,demo1组有一个消费者
生产者
demo组1号消费者
demo组2号消费者
demo1组消费者
结论:
1、不同组可以消费相同的数据,一个组内不能重复消费数据。
2、不同的分区不能重复消费数据,一个分区不能同时给一个组内的多个消费者消费,
多个分区可以给一个消费者消费。
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。