当前位置:   article > 正文

Kafka简单总结_kafka-console-producer.sh --broker-list

kafka-console-producer.sh --broker-list

1.Kafka 概述

为什么需要消息队列:

解耦. 冗余. 拓展性. 灵活性&峰值处理能力. 可恢复性. 顺序保证. 缓冲. 异步通信

Kafka架构

(1)Producer: 消息生产者, 向kafka broker 发消息的客户端
(2)Consumer: 消息消费者, 向kafka broker区小溪的客户端
(3)Topic: 可以理解为一个队列;
(4)Consumer Group 是kafka实现消息广播和单播的手段, 一个topic 可以有多个GC, 一个GC中的多个成员不可以多次接受信息,
(5)Broker : 一台kafka服务器就是一个broker . 一个集群由多个broker组成, 一个broker可以容纳多个topic
(6)Partition : 一个topic也可以分布到多个broker上, 一个topic可以分为多个partition, 每个partition是一个有序的队列, partition中的每条消息都会被分配一个有序的id(offset), kafka只保证按一个partition中的顺序将消息发给consumer.
(7)Offset: kafka的存储文件是按照offset.kafka来命名

2.Kafka集群部署

Kafka集群安装

0)集群规划
hadoop102 hadoop103 hadoop104
zk zk zk
kafka kafka kafka
1)解压安装包
[atguigu@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
2)修改解压后的文件名称
[atguigu@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka
3)创建logs文件夹
在/opt/module/kafka目录下
[atguigu@hadoop102 kafka]$ mkdir logs
4)修改配置文件
[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vi server.properties
输入以下内容:
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
5)配置环境变量
[atguigu@hadoop102 module]$ sudo vi /etc/profile

#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH= P A T H : PATH: PATH:KAFKA_HOME/bin

[atguigu@hadoop102 module]$ source /etc/profile
6)分发安装包
[atguigu@hadoop102 module]$ xsync kafka/
注意:分发之后记得配置其他机器的环境变量
7)修改broker.id
分别在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2
注:broker.id不得重复
8)启动集群
依次在hadoop102、hadoop103、hadoop104节点上启动kafka
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties &
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties &
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties &
9)关闭集群
[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh stop
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh stop
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh stop

Kafka命令行操作

1)查看当前服务器中的所有topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
2)创建topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181
–create --replication-factor 3 --partitions 1 --topic first
选项说明:
–topic 定义topic名
–replication-factor 定义副本数
–partitions 定义分区数
3)删除topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181
–delete --topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
4)发送消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh
–broker-list hadoop102:9092 --topic first

hello world
atguigu atguigu
5)消费消息
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh
–zookeeper hadoop102:2181 --from-beginning --topic first
–from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。
6)查看某个Topic的详情4
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181
–describe --topic first
[atguigu@hadoop102 kafka]$ bin/

3.Kafka 工作流程分析

3.1Kafka生产

写入方式: 顺序写磁盘

分区:

topic下有分区, 每个partition中的消息都是有序的, 生产的消息被不断追加到partititon log 上, 其中的每一个消息都被赋予了一个唯一的offset

分区原因:

方便扩展
提高并发

分区原则;

指定partition
根据key的value进行hash, 得出partition
轮询, 选出一个partition

写入流程:

1). produce 先从zookeeper的brokers/…/state 节点找到该partition的leader
2)Produce将消息发送给该leader
3)Leader将消息写入本地log
4)Followers 从leader pull消息, 写入本地log后向leader发送ack
5)Leader收到所有ISR中的replication的ack后增加(high watermark, 最后commit offset), 并向Producer发送ACK

3.2Broker保存信息

存储方式: 多个partition
存储策略: 无论信息是否被消费, kafka都会保存所有信息, 只会基于时间和大小删除数据

3.3消费过程分析

3.3.1 消费者组

每个分区只能由group中的一个消费者读取, 但是过个group的成员可以同时消费这个partition.

3.3.1 消费方式

consumer采用pull的方式从broker中读取数据

4.常见问题:

6.1kafka分区的原因:

方便扩展
负载均衡, 提高并发度

6.2kafka副本机制的原因:

能够立刻看到写入的消息,就是你使用生产者 API 成功向分区写入消息后,马上使用消费者就能读取刚才写入的消息
能够实现消息的幂等性,啥意思呢?就是对于生产者产生的消息,在消费者进行消费的时候,它每次都会看到消息存在,并不会存在消息不存在的情况

6.3kafka的架构

Producer: 消息生产者, 向kafka broker 发消息的客户端
Consumer: 消息消费者, 向kafka broker区小溪的客户端
Topic: 可以理解为一个队列;
Consumer Group: 是kafka实现消息广播和单播的手段, 一个topic 可以有多个GC, 一个GC中的多个成员不可以多次接受信息,
Broker : 一台kafka服务器就是一个broker . 一个集群由多个broker组成, 一个broker可以容纳多个topic
Partition : 一个topic也可以分布到多个broker上, 一个topic可以分为多个partition, 每个partition是一个有序的队列, partition中的每条消息都会被分配一个有序的id(offset), kafka只保证按一个partition中的顺序将消息发给consumer.
Offset: kafka的存储文件是按照offset.kafka来命名

6.4Kafka的消息写入过程

producer 先从zookeeper的brokers/…/state 节点找到该partition的leader
Producer将消息发送给该leader
Leader将消息写入本地log
Followers 从leader pull消息, 写入本地log后向leader发送ack
Leader收到所有ISR中的replication的ack后增加(high watermark, 最后commit offset), 并向Producer发送ACK

6.5kafka的消息存储策略

无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
1)基于时间:log.retention.hours=168
2)基于大小:log.retention.bytes=1073741824
需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。

6.6kafka是如何做到如此高的吞吐量和性能的呢?

1.页缓存技术: 你在写入磁盘文件的时候,可以直接写入这个 OS Cache 里,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把 OS Cache 里的数据真的刷入磁盘文件中
2.磁盘顺序写: 它是以磁盘顺序写的方式来写的。也就是说,仅仅将数据追加到文件的末尾,不是在文件的随机位置来修改数据。
3.零拷贝技术:

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/IT小白/article/detail/78569
推荐阅读
相关标签
  

闽ICP备14008679号