当前位置:   article > 正文

Kafka-安装和使用_kafka-connect下载安装及使用

kafka-connect下载安装及使用

一、下载安装包

1、选择所需版本下载

[root@ywxtdb opt]# wget https://archive.apache.org/dist/kafka/2.1.0/kafka_2.12-2.1.0.tgz

2、解压安装包

[root@ywxtdb opt]# tar -xvf kafka_2.12-2.1.0.tgz 

二、修改配置

[root@ywxtdb config]# vi /opt/kafka_2.12-2.1.0/config/server.properties 

主要修改:

broker.id=0

listeners=PLAINTEXT://192.168.1.128:9092

log.dirs=/var/kafka-logs

zookeeper.connect=localhost:2181

集群:

zookeeper.connect=192.168.91.128:2181,192.168.91.129:2181,129.168.91.130:2181/kafka

三、配置环境变量

[root@ywxtdb config]# vi /etc/profile

export KAFKA_HOME=/opt/kafka_2.12-2.1.0
export PATH=$PATH:$KAFKA_HOME/bin

四、启动kafka

启动kafka前,记得先启动zookeeper。

[root@ywxtdb config]# kafka-server-start.sh ./server.properties 

启动正常后,可以看到zookeeper下面创建了一个kafka节点 

  1. [zk: localhost:2181(CONNECTED) 16] ls /
  2. [kafka, zookeeper]

五、创建topics

1、查看关于topics的相关脚本参数

其中REQUIRED是必填的参数

  1. [root@ywxtdb ~]# kafka-topics.sh
  2. OpenJDK 64-Bit Server VM warning: If the number of processors is expected to increase
  3. from one, then you should configure the number of parallel GC threads appropriately using
  4. -XX:ParallelGCThreads=N
  5. Create, delete, describe, or change a topic.
  6. Option Description
  7. ------ -----------
  8. --alter Alter the number of partitions,
  9. replica assignment, and/or
  10. configuration for the topic.
  11. --config <String: name=value> A topic configuration override for the
  12. topic being created or altered.The
  13. following is a list of valid
  14. configurations:
  15. cleanup.policy
  16. compression.type
  17. delete.retention.ms
  18. file.delete.delay.ms
  19. flush.messages
  20. flush.ms
  21. follower.replication.throttled.
  22. replicas
  23. index.interval.bytes
  24. leader.replication.throttled.replicas
  25. max.message.bytes
  26. message.downconversion.enable
  27. message.format.version
  28. message.timestamp.difference.max.ms
  29. message.timestamp.type
  30. min.cleanable.dirty.ratio
  31. min.compaction.lag.ms
  32. min.insync.replicas
  33. preallocate
  34. retention.bytes
  35. retention.ms
  36. segment.bytes
  37. segment.index.bytes
  38. segment.jitter.ms
  39. segment.ms
  40. unclean.leader.election.enable
  41. See the Kafka documentation for full
  42. details on the topic configs.
  43. --create Create a new topic.
  44. --delete Delete a topic
  45. --delete-config <String: name> A topic configuration override to be
  46. removed for an existing topic (see
  47. the list of configurations under the
  48. --config option).
  49. --describe List details for the given topics.
  50. --disable-rack-aware Disable rack aware replica assignment
  51. --exclude-internal exclude internal topics when running
  52. list or describe command. The
  53. internal topics will be listed by
  54. default
  55. --force Suppress console prompts
  56. --help Print usage information.
  57. --if-exists if set when altering or deleting
  58. topics, the action will only execute
  59. if the topic exists
  60. --if-not-exists if set when creating topics, the
  61. action will only execute if the
  62. topic does not already exist
  63. --list List all available topics.
  64. --partitions <Integer: # of partitions> The number of partitions for the topic
  65. being created or altered (WARNING:
  66. If partitions are increased for a
  67. topic that has a key, the partition
  68. logic or ordering of the messages
  69. will be affected
  70. --replica-assignment <String: A list of manual partition-to-broker
  71. broker_id_for_part1_replica1 : assignments for the topic being
  72. broker_id_for_part1_replica2 , created or altered.
  73. broker_id_for_part2_replica1 :
  74. broker_id_for_part2_replica2 , ...>
  75. --replication-factor <Integer: The replication factor for each
  76. replication factor> partition in the topic being created.
  77. --topic <String: topic> The topic to be create, alter or
  78. describe. Can also accept a regular
  79. expression except for --create option
  80. --topics-with-overrides if set when describing topics, only
  81. show topics that have overridden
  82. configs
  83. --unavailable-partitions if set when describing topics, only
  84. show partitions whose leader is not
  85. available
  86. --under-replicated-partitions if set when describing topics, only
  87. show under replicated partitions
  88. --zookeeper <String: hosts> REQUIRED: The connection string for
  89. the zookeeper connection in the form
  90. host:port. Multiple hosts can be
  91. given to allow fail-over.

2、执行创建命令

kafka-topics.sh 处理topic的脚本,他需要依赖zookeeper去创建topic,需要加上zk的配置。

--create 说明当前是创建topic

--topic 要创建的topic名称

--partitions 创建的分区数,主要看你节点数量来配置

--replication-factor 创建分区的附本数量

  1. [root@node01 bin]# kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --create --topic bobby --partitions 2 --replication-factor 2
  2. Created topic "bobby".

3、查询创建的topic

  1. [root@node01 bin]# kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --list bobby
  2. bobby

4、查看描述信息

  1. [root@node01 bin]# kafka-topics.sh --zookeeper node1:2181,node2:2181,node3:2181/kafka --describe --topic bobby
  2. Topic:bobby PartitionCount:2 ReplicationFactor:2 Configs:
  3. Topic: bobby Partition: 0 Leader: 2 Replicas: 2,0 Isr: 2,0
  4. Topic: bobby Partition: 1 Leader: 0 Replicas: 0,1 Isr: 0,1

六、启动consumer

1、查看下关于consumer处理脚本的参数

其中REQUIRED是必填的参数

  1. [root@ywxtdb ~]# kafka-console-consumer.sh
  2. The console consumer is a tool that reads data from Kafka and outputs it to standard output.
  3. Option Description
  4. ------ -----------
  5. --bootstrap-server <String: server to REQUIRED: The server(s) to connect to.
  6. connect to>
  7. --consumer-property <String: A mechanism to pass user-defined
  8. consumer_prop> properties in the form key=value to
  9. the consumer.
  10. --consumer.config <String: config file> Consumer config properties file. Note
  11. that [consumer-property] takes
  12. precedence over this config.
  13. --enable-systest-events Log lifecycle events of the consumer
  14. in addition to logging consumed
  15. messages. (This is specific for
  16. system tests.)
  17. --formatter <String: class> The name of a class to use for
  18. formatting kafka messages for
  19. display. (default: kafka.tools.
  20. DefaultMessageFormatter)
  21. --from-beginning If the consumer does not already have
  22. an established offset to consume
  23. from, start with the earliest
  24. message present in the log rather
  25. than the latest message.
  26. --group <String: consumer group id> The consumer group id of the consumer.
  27. --isolation-level <String> Set to read_committed in order to
  28. filter out transactional messages
  29. which are not committed. Set to
  30. read_uncommittedto read all
  31. messages. (default: read_uncommitted)
  32. --key-deserializer <String:
  33. deserializer for key>
  34. --max-messages <Integer: num_messages> The maximum number of messages to
  35. consume before exiting. If not set,
  36. consumption is continual.
  37. --offset <String: consume offset> The offset id to consume from (a non-
  38. negative number), or 'earliest'
  39. which means from beginning, or
  40. 'latest' which means from end
  41. (default: latest)
  42. --partition <Integer: partition> The partition to consume from.
  43. Consumption starts from the end of
  44. the partition unless '--offset' is
  45. specified.
  46. --property <String: prop> The properties to initialize the
  47. message formatter. Default
  48. properties include:
  49. print.timestamp=true|false
  50. print.key=true|false
  51. print.value=true|false
  52. key.separator=<key.separator>
  53. line.separator=<line.separator>
  54. key.deserializer=<key.deserializer>
  55. value.deserializer=<value.
  56. deserializer>
  57. Users can also pass in customized
  58. properties for their formatter; more
  59. specifically, users can pass in
  60. properties keyed with 'key.
  61. deserializer.' and 'value.
  62. deserializer.' prefixes to configure
  63. their deserializers.
  64. --skip-message-on-error If there is an error when processing a
  65. message, skip it instead of halt.
  66. --timeout-ms <Integer: timeout_ms> If specified, exit if no message is
  67. available for consumption for the
  68. specified interval.
  69. --topic <String: topic> The topic id to consume on.
  70. --value-deserializer <String:
  71. deserializer for values>
  72. --whitelist <String: whitelist> Whitelist of topics to include for
  73. consumption.

2、执行启动脚本

客户端无需依赖zookeeper,使用bootstrap-server启动,

--topic topic的名称

--group 创建一个分组,带上分组名

[root@node01 bin]#  kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic bobby --group demo

3、检查group

  1. [root@node01 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --list
  2. demo
  3. demo1

查看指定group情况 

  1. [root@node01 kafka-logs]# kafka-consumer-groups.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --describe --group demo
  2. TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. bobby 0 16 16 0 consumer-1-74554efe-e529-4c25-ab7f-c0128e3c7f22 /192.168.91.128 consumer-1
  4. bobby 1 25 25 0 consumer-1-74554efe-e529-4c25-ab7f-c0128e3c7f22 /192.168.91.128 consumer-1
  5. [root@node01 kafka-logs]#

七、启动producer

1、查看关于producer处理的脚本参数

其中REQUIRED是必填的参数

  1. [root@ywxtdb ~]# kafka-console-producer.sh
  2. Read data from standard input and publish it to Kafka.
  3. Option Description
  4. ------ -----------
  5. --batch-size <Integer: size> Number of messages to send in a single
  6. batch if they are not being sent
  7. synchronously. (default: 200)
  8. --broker-list <String: broker-list> REQUIRED: The broker list string in
  9. the form HOST1:PORT1,HOST2:PORT2.
  10. --compression-codec [String: The compression codec: either 'none',
  11. compression-codec] 'gzip', 'snappy', 'lz4', or 'zstd'.
  12. If specified without value, then it
  13. defaults to 'gzip'
  14. --line-reader <String: reader_class> The class name of the class to use for
  15. reading lines from standard in. By
  16. default each line is read as a
  17. separate message. (default: kafka.
  18. tools.
  19. ConsoleProducer$LineMessageReader)
  20. --max-block-ms <Long: max block on The max time that the producer will
  21. send> block for during a send request
  22. (default: 60000)
  23. --max-memory-bytes <Long: total memory The total memory used by the producer
  24. in bytes> to buffer records waiting to be sent
  25. to the server. (default: 33554432)
  26. --max-partition-memory-bytes <Long: The buffer size allocated for a
  27. memory in bytes per partition> partition. When records are received
  28. which are smaller than this size the
  29. producer will attempt to
  30. optimistically group them together
  31. until this size is reached.
  32. (default: 16384)
  33. --message-send-max-retries <Integer> Brokers can fail receiving the message
  34. for multiple reasons, and being
  35. unavailable transiently is just one
  36. of them. This property specifies the
  37. number of retires before the
  38. producer give up and drop this
  39. message. (default: 3)
  40. --metadata-expiry-ms <Long: metadata The period of time in milliseconds
  41. expiration interval> after which we force a refresh of
  42. metadata even if we haven't seen any
  43. leadership changes. (default: 300000)
  44. --producer-property <String: A mechanism to pass user-defined
  45. producer_prop> properties in the form key=value to
  46. the producer.
  47. --producer.config <String: config file> Producer config properties file. Note
  48. that [producer-property] takes
  49. precedence over this config.
  50. --property <String: prop> A mechanism to pass user-defined
  51. properties in the form key=value to
  52. the message reader. This allows
  53. custom configuration for a user-
  54. defined message reader.
  55. --request-required-acks <String: The required acks of the producer
  56. request required acks> requests (default: 1)
  57. --request-timeout-ms <Integer: request The ack timeout of the producer
  58. timeout ms> requests. Value must be non-negative
  59. and non-zero (default: 1500)
  60. --retry-backoff-ms <Integer> Before each retry, the producer
  61. refreshes the metadata of relevant
  62. topics. Since leader election takes
  63. a bit of time, this property
  64. specifies the amount of time that
  65. the producer waits before refreshing
  66. the metadata. (default: 100)
  67. --socket-buffer-size <Integer: size> The size of the tcp RECV size.
  68. (default: 102400)
  69. --sync If set message send requests to the
  70. brokers are synchronously, one at a
  71. time as they arrive.
  72. --timeout <Integer: timeout_ms> If set and the producer is running in
  73. asynchronous mode, this gives the
  74. maximum amount of time a message
  75. will queue awaiting sufficient batch
  76. size. The value is given in ms.
  77. (default: 1000)
  78. --topic <String: topic> REQUIRED: The topic id to produce
  79. messages to.

2、执行命令

--topic topic名称

--broker-list 指定kafka节点,关联当下的broker

当producer输入hello后

  1. [root@node01 bin]# kafka-console-producer.sh --topic bobby --broker-list node3:9092
  2. >111

可以看到consumer端已经消费到111

  1. [root@node01 bin]# kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic bobby --group demo
  2. 111

一个生产者 , 一个消费者

 

 一个生产者,两组消费者,demo组有两个消费者,demo1组有一个消费者

生产者 

demo组1号消费者

 demo组2号消费者

 demo1组消费者

 结论:

1、不同组可以消费相同的数据,一个组内不能重复消费数据。

2、不同的分区不能重复消费数据,一个分区不能同时给一个组内的多个消费者消费,

      多个分区可以给一个消费者消费。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/78554
推荐阅读
相关标签
  

闽ICP备14008679号