赞
踩
解耦. 冗余. 拓展性. 灵活性&峰值处理能力. 可恢复性. 顺序保证. 缓冲. 异步通信
(1)Producer: 消息生产者, 向kafka broker 发消息的客户端
(2)Consumer: 消息消费者, 向kafka broker区小溪的客户端
(3)Topic: 可以理解为一个队列;
(4)Consumer Group 是kafka实现消息广播和单播的手段, 一个topic 可以有多个GC, 一个GC中的多个成员不可以多次接受信息,
(5)Broker : 一台kafka服务器就是一个broker . 一个集群由多个broker组成, 一个broker可以容纳多个topic
(6)Partition : 一个topic也可以分布到多个broker上, 一个topic可以分为多个partition, 每个partition是一个有序的队列, partition中的每条消息都会被分配一个有序的id(offset), kafka只保证按一个partition中的顺序将消息发给consumer.
(7)Offset: kafka的存储文件是按照offset.kafka来命名
0)集群规划
hadoop102 hadoop103 hadoop104
zk zk zk
kafka kafka kafka
1)解压安装包
[atguigu@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
2)修改解压后的文件名称
[atguigu@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka
3)创建logs文件夹
在/opt/module/kafka目录下
[atguigu@hadoop102 kafka]$ mkdir logs
4)修改配置文件
[atguigu@hadoop102 kafka]$ cd config/
[atguigu@hadoop102 config]$ vi server.properties
输入以下内容:
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
5)配置环境变量
[atguigu@hadoop102 module]$ sudo vi /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=
P
A
T
H
:
PATH:
PATH:KAFKA_HOME/bin
[atguigu@hadoop102 module]$ source /etc/profile
6)分发安装包
[atguigu@hadoop102 module]$ xsync kafka/
注意:分发之后记得配置其他机器的环境变量
7)修改broker.id
分别在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2
注:broker.id不得重复
8)启动集群
依次在hadoop102、hadoop103、hadoop104节点上启动kafka
[atguigu@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties &
[atguigu@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties &
[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties &
9)关闭集群
[atguigu@hadoop102 kafka]$ bin/kafka-server-stop.sh stop
[atguigu@hadoop103 kafka]$ bin/kafka-server-stop.sh stop
[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh stop
1)查看当前服务器中的所有topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
2)创建topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181
–create --replication-factor 3 --partitions 1 --topic first
选项说明:
–topic 定义topic名
–replication-factor 定义副本数
–partitions 定义分区数
3)删除topic
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181
–delete --topic first
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
4)发送消息
[atguigu@hadoop102 kafka]$ bin/kafka-console-producer.sh
–broker-list hadoop102:9092 --topic first
hello world
atguigu atguigu
5)消费消息
[atguigu@hadoop103 kafka]$ bin/kafka-console-consumer.sh
–zookeeper hadoop102:2181 --from-beginning --topic first
–from-beginning:会把first主题中以往所有的数据都读取出来。根据业务场景选择是否增加该配置。
6)查看某个Topic的详情4
[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181
–describe --topic first
[atguigu@hadoop102 kafka]$ bin/
写入方式: 顺序写磁盘
topic下有分区, 每个partition中的消息都是有序的, 生产的消息被不断追加到partititon log 上, 其中的每一个消息都被赋予了一个唯一的offset
方便扩展
提高并发
指定partition
根据key的value进行hash, 得出partition
轮询, 选出一个partition
1). produce 先从zookeeper的brokers/…/state 节点找到该partition的leader
2)Produce将消息发送给该leader
3)Leader将消息写入本地log
4)Followers 从leader pull消息, 写入本地log后向leader发送ack
5)Leader收到所有ISR中的replication的ack后增加(high watermark, 最后commit offset), 并向Producer发送ACK
存储方式: 多个partition
存储策略: 无论信息是否被消费, kafka都会保存所有信息, 只会基于时间和大小删除数据
每个分区只能由group中的一个消费者读取, 但是过个group的成员可以同时消费这个partition.
consumer采用pull的方式从broker中读取数据
方便扩展
负载均衡, 提高并发度
能够立刻看到写入的消息,就是你使用生产者 API 成功向分区写入消息后,马上使用消费者就能读取刚才写入的消息
能够实现消息的幂等性,啥意思呢?就是对于生产者产生的消息,在消费者进行消费的时候,它每次都会看到消息存在,并不会存在消息不存在的情况
Producer: 消息生产者, 向kafka broker 发消息的客户端
Consumer: 消息消费者, 向kafka broker区小溪的客户端
Topic: 可以理解为一个队列;
Consumer Group: 是kafka实现消息广播和单播的手段, 一个topic 可以有多个GC, 一个GC中的多个成员不可以多次接受信息,
Broker : 一台kafka服务器就是一个broker . 一个集群由多个broker组成, 一个broker可以容纳多个topic
Partition : 一个topic也可以分布到多个broker上, 一个topic可以分为多个partition, 每个partition是一个有序的队列, partition中的每条消息都会被分配一个有序的id(offset), kafka只保证按一个partition中的顺序将消息发给consumer.
Offset: kafka的存储文件是按照offset.kafka来命名
producer 先从zookeeper的brokers/…/state 节点找到该partition的leader
Producer将消息发送给该leader
Leader将消息写入本地log
Followers 从leader pull消息, 写入本地log后向leader发送ack
Leader收到所有ISR中的replication的ack后增加(high watermark, 最后commit offset), 并向Producer发送ACK
无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
1)基于时间:log.retention.hours=168
2)基于大小:log.retention.bytes=1073741824
需要注意的是,因为Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,所以这里删除过期文件与提高 Kafka 性能无关。
1.页缓存技术: 你在写入磁盘文件的时候,可以直接写入这个 OS Cache 里,也就是仅仅写入内存中,接下来由操作系统自己决定什么时候把 OS Cache 里的数据真的刷入磁盘文件中
2.磁盘顺序写: 它是以磁盘顺序写的方式来写的。也就是说,仅仅将数据追加到文件的末尾,不是在文件的随机位置来修改数据。
3.零拷贝技术:
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。