赞
踩
官方网站:kafka.apache.org
流式数据安全的存储在容器,用来构建实时数据管道,具有水平扩展的容错性
flume:一个进程 sorce —>channel—>sink
kafka:三个进程
1.broker:进程
2.producer:生产者
3.consumer:消费者
通常流程:flume–>kafka—>sparkStreaming/flink/结构化流
kafka的编程语言:scala
主题:topic,kafka创建主题一般根据业务系统划分,最终落在磁盘上就是创建文件夹(linux系统文件) mkdir XXtopic
日志存储在linux系统文件
主题分区存储: omstopic_0 100w/s 自身分区有序
omstopic_1 100w/s
omstopic_1 100w/s
问题:kafka是单分区有序,如何做到全局有序?
分布式安装
1.zookeeper
2.kafka
首先启动zookeeper
[root@yws85 zookeeper-3.4.6]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/software/zookeeper-3.4.6/bin/../conf/zoo.cfg
Mode: follower
[root@yws86 zookeeper-3.4.6]# bin/zkServer.sh status
下载kafka压缩包:kafka_2.11 - 0.10.0.1.tgz (2.11是scala版本 0.10是kafka版本)
kafka和spark-Streaming版本对应
安装前检查zookeeper是否有kafka信息残留
1.进入到zookeeper的bin目录,进入命令行终端
./zkCli.sh --> ls /kafka -->rmr /kafka
2.JDK部署
3. scala部署
scala-2.11.8.tgz
tar -xzvf scala-2.11.8.tgz 解压
chown -R scala-2.11.8 给用户用户组授权
ln -s scala-2.11.8 scala 创建软连接
配置环境变量
cd /etc/profile
export SCALA_HOME=/opt/software/scala-2.11.8
export PATH=
S
C
A
L
A
H
O
M
E
/
b
i
n
:
SCALA_HOME/bin:
SCALAHOME/bin:JAVA_HOME/bin:
H
A
D
O
O
P
H
O
M
E
/
b
i
n
:
HADOOP_HOME/bin:
HADOOPHOME/bin:FLINK_HOME/bin:$PATH
4. Kafka部署
到kafka的根目录创建存储目录
[root@yws87 kafka]# mkdir logs
cd /kafka/conf
[root@yws87 config]# vi server.properties
broker.id=3 # 每台机器的brokerid
port=9092 #端口号
host.name=192.168.0.87
log.dirs=/opt/software/kafka/logs #存储目录
zookeeper.connect=192.168.0.85:2181,192.168.0.86:2181,192.168.0.87:2181/kafka #存储在zookeeper里,方便后期维护
后台启动kafka:
[root@yws87 kafka]# nohup bin/kafka-server-start.sh config/server.properties & #后台启动
[1] 19455
[root@yws87 kafka]# tail -F nohup.out # 后台追踪
topic相关的操作
1.创建topic,如能成功创建topic则表示集群安装完成,
也可以用jps命令查看kafka进程是否存在。
bin/kafka-topics.sh
–create
–zookeeper 192.168.0.85:2181,192.168.0.86:2181,192.168.0.87:2181/kafka
–replication-factor 3
–partitions 3
–topic test
bin/kafka-topics.sh
–create
–zookeeper 192.168.0.85:2181,192.168.0.86:2181,192.168.0.87:2181/kafka
–replication-factor 1
–partitions 1
–topic g5
2.通过list命令查看创建的topic:
bin/kafka-topics.sh
–list
–zookeeper 192.168.0.85:2181,192.168.0.86:2181,192.168.0.87:2181/kafka
3.查看创建的Topic
bin/kafka-topics.sh
–describe
–zookeeper 192.168.0.85:2181,192.168.0.86:2181,192.168.0.87:2181/kafka
–topic test
Topic:test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader(读写): brokerid 3 Replicas: 3,1,2 Isr: 3,1,2
Topic: test Partition: 1 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: test Partition: 2 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
第一行列出了这个topic的总体情况,如topic名称,分区数量,副本数量等。
第二行开始,每一行列出了一个分区的信息,如它是第几个分区,这个分区的leader是哪个broker,副本位于哪些broker,有哪些副本处理同步状态。
Partition: 分区
Leader : 负责读写指定分区的节点
Replicas : 复制该分区log的节点列表
Isr : “in-sync” replicas,当前活跃的副本列表(是一个子集),并且可能成为Leader
我们可以通过Kafka自带的bin/kafka-console-producer.sh和bin/kafka-console-consumer.sh脚本,来验证演示如果发布消息、消费消息。
4.删除topic
bin/kafka-topics.sh --delete
–zookeeper 192.168.0.85:2181,192.168.0.86:2181,192.168.0.87:2181/kafka
–topic test
正常删除topic不会删除,只会打上标记,要想真正删除须在
conf/server.properties文件里进行设置:
添加:delete.topic.enable=true
重启kafka
假如删除不干净:1.linux磁盘文件夹 2.zk的/kafka的
ls /kafka/brokers/topics
ls /kafka/config/topics
bin/kafka-topics.sh --delete
–zookeeper 192.168.0.85:2181,192.168.0.86:2181,192.168.0.87:2181/kafka
–topic g5
整理:
1.生产上命名不要有标点符号的字符,就英文 可以带数字 默认小写
2.假如已经在运行的kafka 只有1个topic 你可以删除 风险0
3. 多个topic 忍
5.修改topic
使用—-alert原则上可以修改任何配置,以下列出了一些常用的修改选项:
(1)改变分区数量
bin/kafka-topics.sh
–alter
–zookeeper 192.168.0.85:2181,192.168.0.86:2181,192.168.0.87:2181/kafka
–topic g5 --partitions 3
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。