当前位置:   article > 正文

Hadoop(三)—— kafka_hadoop对接kafka的方式

hadoop对接kafka的方式
1、kafka是什么
类JMS消息队列,结合JMS中的两种模式,可以有多个消费者主动拉取数据,在JMS中只有点对点模式才有消费者主动拉取数据。
kafka是一个生产-消费模型。
Producer:生产者,只负责数据生产,生产者的代码可以集成到任务系统中。 
  数据的分发策略由producer决定,默认是defaultPartition  Utils.abs(key.hashCode) % numPartitions
Broker:当前服务器上的Kafka进程,俗称拉皮条。只管数据存储,不管是谁生产,不管是谁消费。
在集群中每个broker都有一个唯一brokerid,不得重复。
Topic:目标发送的目的地,这是一个逻辑上的概念,落到磁盘上是一个partition的目录。partition的目录中有多个segment组合(index,log)
一个Topic对应多个partition[0,1,2,3],一个partition对应多个segment组合。一个segment有默认的大小是1G。
每个partition可以设置多个副本(replication-factor 1),会从所有的副本中选取一个leader出来。所有读写操作都是通过leader来进行的。
特别强调,和mysql中主从有区别,mysql做主从是为了读写分离,在kafka中读写操作都是leader。
ConsumerGroup:数据消费者组,ConsumerGroup可以有多个,每个ConsumerGroup消费的数据都是一样的。
       可以把多个consumer线程划分为一个组,组里面所有成员共同消费一个topic的数据,组员之间不能重复消费。
2、kafka生产数据时的分组策略
默认是defaultPartition  Utils.abs(key.hashCode) % numPartitions
上文中的key是producer在发送数据时传入的,produer.send(KeyedMessage(topic,myPartitionKey,messageContent))
3、kafka如何保证数据的完全生产
ack机制:broker表示发来的数据已确认接收无误,表示数据已经保存到磁盘。
0:不等待broker返回确认消息
1:等待topic中某个partition leader保存成功的状态反馈
-1:等待topic中某个partition 所有副本都保存成功的状态反馈
4、生产者是怎么连接的
生成者有很多,连接的是broker list, 连接很多的Broker,写的时候也是给某个broker的partition写,在创建topic的时候,可以指定有几个分区,
副本数量现在我们创建了一个topic,有两个分区,数据的存储会进行冗余存储,在kafka里面数据也是存放多分,并且可以指定副本的数量
5、broker如何保存数据
在理论环境下,broker按照顺序读写的机制,可以每秒保存600M的数据。主要通过pagecache机制,尽可能的利用当前物理机器上的空闲内存来做缓存。当前topic所属的broker,必定有一个该topic的partition,partition是一个磁盘目录。partition的目录中有多个segment组合(index,log)
6、partition如何分布在不同的broker上
 --(5个partition 分布在3个broker )


int i = 0
list{kafka01,kafka02,kafka03}

for(int i=0;i<5;i++){
brIndex = i%broker;
hostName = list.get(brIndex)
}
根据hash值与节点的个数取模
7、Kafka的负载均衡问题
算法:
假如topic1,具有如下partitions: P0,P1,P2,P3
加入group中,有如下consumer: C1,C2
首先根据partition索引号对partitions排序: P0,P1,P2,P3
根据consumer.id排序: C0,C1
计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
然后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]
8、consumerGroup的组员和partition之间如何做负载均衡
最好是一一对应,一个partition对应一个consumer。
如果consumer的数量过多,必然有空闲的consumer。
9、Kafka消息的分发
kafka producer 负责数据分发策略:kafka.Producer.defaultPartition是否压缩,减少网络传输,序列化,数据重发机制
kafka集群中的任何一个broker都可以向producer提供metadata信息,这些metadata中包含”集群中存活的servers列表”/”partitions leader列表”等信息;
 当producer获取到metadata信息之后, producer将会和Topic下所有partition leader保持socket连接; 
 消息由producer直接通过socket发送到broker,中间不会经过任何”路由层”,事实上,消息被路由到哪个partition上由producer客户端决定; 
比如可以采用”random”“key-hash”“轮询”等,如果一个topic中有多个partitions,那么在producer端实现”消息均衡分发”是必要的。 
 在producer端的配置文件中,开发者可以指定partition路由的方式。
10、如何保证kafka消费者消费数据是全局有序的(伪命题)
11、kafka常用操作命令
查看当前服务器中的所有topic
./bin/kafka-topics.sh --list --zookeeper hadoop02:2181


创建topic  [第一个1是设置副本数为1,第二个1是设置分区为1个分区]
./bin/kafka-topics.sh --create --zookeeper m1:2181 --replication-factor 1 --partitions 1 --topic test


删除topic
./bin/kafka-topics.sh --delete --zookeeper m1:2181 --topic test
需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
通过shell命令发送消息  开启 
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test


通过shell消费消息     消费
./bin/kafka-console-consumer.sh --zookeeper hadoop02:2181 --from-beginning --topic test
查看某个Topic的详情
./bin/kafka-topics.sh --topic test --describe --zookeeper hadoop02:2181


新增配置
./bin/kafka-topics.sh --zookeeper hadoop02:2181 --create --topic test --partitions 1 --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1


修改配置
./bin/kafka-topics.sh --zookeeper hadoop02:2181 --alter --topic test --config max.message.bytes=128000 


删除配置:
./bin/kafka-topics.sh --zookeeper hadoop02:2181 --alter --topic test --delete-config max.message.bytes


12、Kafka集群部署
一. 准备工作:
   1. 准备3台机器,IP地址分别为:192.168.3.230(233,234)
   2. 下载kafka稳定版本,我的版本为:Scala 2.11  - kafka_2.11-0.9.0.0.tgz
        http://kafka.apache.org/downloads.html
    3. 分别解压放入到想安装的目录下,我的目录为:/home/rasa 解压命令为:
        tar -xzf *.tgz
二、搭建zookeeper集群
    1.进入解压后的目录/home/rasa/kafka_2.11-0.9.0.0
    2. 进入config目录下,修改zookeeper.properties文件(所有机器相同)
        tickTime =2000
        dataDir =/data/zookeeper/
        clientPort =2181
        initLimit =5
        syncLimit =2
        server.1 =192.168.3.230:2888:3888
        server.2 =192.168.3.233:2888:3888
        server.3 =192.168.3.234:2888:3888
    3.在dataDir目录/data/zookeeper/下写一个myid文件,命令如下:
        echo 1 >myid
        注意:这个id是zookeeper的主机标示,每个主机id不同第二台是2 第三台是3
    4.逐次启动3台机器的zookeeper 构成一个集群:
        > bin/zookeeper-server-start.sh config/zookeeper.properties &
        由于ZooKeeper集群启动的时候,每个结点都试图去连接集群中的其它结点,先启动的肯定连不上后面还没启动的,所以打印出来的部分的异常是可以忽略的。集群在选出一个Leader后,最后稳定了。其他结点可能也出现类似问题,属于正常。
三、搭建kafka集群
    1.进入config目录,修改server.properties(逐个机器修改)
        broker.id =1(其他机器为 2/3)
        port =9092
        zookeeper.connect= 192.168.3.230:2181,192.168.3.233:2181,192.168.3.234:2181
        log.dirs =/home/rasa/kafka-logs
        host.name =192.168.3.230
        advertised.host.name =192.168.3.230
    2.启动每台服务器的kafka
        > bin/kafka-server-start.sh config/server.properties &
四、测试集群
   1.创建一个topic
        > bin/kafka-topics.sh --create --zookeeper 192.168.3.230:2181 --replication-factor 3 --partitions 1
          --topic test-topic
    2.查看创建的topic
        > bin/kafka-topics.sh --describe --zookeeper 192.168.3.230:2181 --topic test-topic
            Topic:test-replicated-topic    PartitionCount:1    ReplicationFactor:3    Configs:
            Topic: test-replicated-topic    Partition: 0    Leader: 1    Replicas: 1,2,0    Isr: 1,2,
     3.查看topic列表
          > bin/kafka-topics.sh --list --zookeeper 192.168.3.230 :2181 
    test
    test-topic
    查看列表及具体信息
          > bin/kafka-topics.sh --zookeeper localhost --describe
    4.查看集群情况:
        >bin/kafka-topics.sh --describe --zookeeper 192.168.3.233:2181 --topic test-topic
        >bin/kafka-topics.sh --describe --zookeeper 192.168.3.234:2181 --topic test-topic
        发现都能看到test-topic。
     5.生产消息
          > bin/kafka-console-producer.sh --broker-list 192.168.3.234:9092 -topic test-topic
     6.消费消息
          > bin/kafka-console-consumer.sh --zookeeper 192.168.3.234:2181 --from-beginning --topic test-topic
13、Consumer与topic关系
 每个group中可以有多个consumer,每个consumer属于一个consumer group; 
通常情况下,一个group中会包含多个consumer,这样不仅可以提高topic中消息的并发消费能力,而且还能提高”故障容错”性,如果group中的某个consumer失效那么其消费的partitions将会有其他consumer自动接管。 
 对于Topic中的一条特定的消息,只会被订阅此Topic的每个group中的其中一个consumer消费,此消息不会发送给一个group的多个consumer; 
那么一个group中所有的consumer将会交错的消费整个Topic,每个group中consumer消息消费互相独立,我们可以认为一个group是一个”订阅”者。 
 在kafka中,一个partition中的消息只会被group中的一个consumer消费(同一时刻); 
一个Topic中的每个partions,只会被一个”订阅者”中的一个consumer消费,不过一个consumer可以同时消费多个partitions中的消息。 
 kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。 
kafka只能保证一个partition中的消息被某个consumer消费时是顺序的;事实上,从Topic角度来说,当有多个partitions时,消息仍不是全局有序的。
14、kafka receiver方式与Kafka Direct方式
Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复,但是效率会下降。
direct 这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
这种方式有如下优点:
1、简化并行读取:
2.2、高性能
3.一次且仅一次的事务机制:
15、Kafka配置文件详细说明(kafka producer.properties与kafka server.properties)
broker.id=0 (kafka节点ID,必须是唯一)
log.dirs=/home/hadoop/apps/kafka/logdir  (kafka日志存放目录)
log.retention.hours=168 (消息保存最长的时间)
log.segment.bytes=1073741824 (每一个segment文件的大小,默认是1G,可以更改)
zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181 (kafka 连接zk的信息,必须配置)
delete.topic.enable=true (kafka 的topic是否物理删除,true-物理删除,false-逻辑删除)
host.name=hadoop02(kafka节点的主机名) 
16、消息队列与kafka
消费者
  消息一般有两种模式,其一是队列,另一种是发布订阅。
  队列:有很多消费者,每一个消费者从队列中获取其中一个数据,一个数据只被消费一次。
  发布订阅模式:将一个消费广播到所有的消费者,一个数据会被消费N次。
kafka对于消费者提供了一种简单的抽象 — 消费组。每一个消费者都会有一个消费组的名称,一个消费会发布到一个topic上,同时递送到订阅了这个消息的消费组下的所有消费者。具体如下图所示:
若是所有的消费者都是相同的消费组,那么就是一个队列模式。
若是消费者有不同的组,那么就是发布订阅模式,那么消费就会递送到所有的消费者。
17、flume+kafka的整合
修改/usr/local/flume/conf/flume-conf.properties
agent.sinks.s1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.s1.topic = mytopic
agent.sinks.s1.brokerList = localhost:9092
agent.sinks.s1.requiredAcks = 1
agent.sinks.s1.batchSize = 20
agent.sinks.s1.channel = c1
启动kafka
启动zookeeper
/usr/local/kafka/bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
启动kafka
/usr/local/kafka/bin/kafka-server-start.sh config/server.properties
创建主题test
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
启动flume
/usr/local/flume/bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent&
18.#什么是kafka?
消息中间件,消息的缓存(转发)高吞吐的分布式的消息订阅系统
#flume 日志收集工具
19.#消息队列核心作用? 解耦 异步 并行
20.#kafka 应用场景?
  1、Messaging   
    对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的"事务性""消息传输担保(消息确认机制)""消息分组"等企业级特性;kafka只能使用作为"常规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)
    2、Websit activity tracking
    kafka可以作为"网站活性跟踪"的最佳工具;可以将网页/用户操作等信息发送到kafka中.并实时监控,或者离线统计分析等
    3、Log Aggregation
    kafka的特性决定它非常适合作为"日志收集中心";application可以将操作日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统.
21.JMS 消息传输模型
点对点模式,特点?一对一,消费者主动拉取数据
发布/订阅模式,特点?一对多,主动推取数据
22.#常见的类JMS消息服务器? MQ
2.4.1、JMS消息服务器 ActiveMQ
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的。 
 主要特点: 
 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP 
 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务) 
 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性 
 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上 
 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA 
 支持通过JDBC和journal提供高速的消息持久化 
 从设计上保证了高性能的集群,客户端-服务器,点对点 
 支持Ajax 
 支持与Axis的整合 
 可以很容易得调用内嵌JMS provider,进行测试
2.4.2、分布式消息中间件 Metamorphosis
Metamorphosis (MetaQ) 是一个高性能、高可用、可扩展的分布式消息中间件,类似于LinkedIn的Kafka,具有消息存储顺序写、吞吐量大和支持本地和XA事务等特性,适用于大吞吐量、顺序消息、广播和日志数据传输等场景,在淘宝和支付宝有着广泛的应用,现已开源。 
 主要特点: 
 生产者、服务器和消费者都可分布 
 消息存储顺序写 
 性能极高,吞吐量大 
 支持消息顺序 
 支持本地和XA事务 
 客户端pull,随机读,利用sendfile系统调用,zero-copy ,批量拉数据 
 支持消费端事务 
 支持消息广播模式 
 支持异步发送消息 
 支持http协议 
 支持消息重试和recover 
 数据迁移、扩容对用户透明 
 消费状态保存在客户端 
 支持同步和异步复制两种HA 
 支持group commit
2.4.3、分布式消息中间件 RocketMQ
RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点: 
 能够保证严格的消息顺序 
 提供丰富的消息拉取模式 
 高效的订阅者水平扩展能力 
 实时的消息订阅机制 
 亿级消息堆积能力 
 Metaq3.0 版本改名,产品名称改为RocketMQ
2.4.4、其他MQ
 .NET消息中间件 DotNetMQ 
 基于HBase的消息队列 HQueue 
 Go 的 MQ 框架 KiteQ 
 AMQP消息服务器 RabbitMQ 
 MemcacheQ 是一个基于 MemcacheDB 的消息队列服务器。
23.# 前台和后台启动的区别?
前台的是一个线程,关闭之后就直接直接关闭kafka
后台关闭不会关闭kafka
24.#kafka 消息保存最长时间?7天
25.#kafka 保存数据文件默认大小?1G
26.#kafka topic删除的模式?设置?
delete.topic.enable=true 物理删除,逻辑删除
27.# partitions 如何进行分区的?
hash值/partition的个数
28.# CG 有什么作用?
提高topic中消息的并发消费能力,而且还能提高"故障容错"性
CG 中的每一个消费者消费数据相互独立
29.#Segment file有级部分组成?
index,log
30.#kafka 有什么独特的特点?解耦 异步 并行
31.#消费者如何标记消费状态?偏移量


声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/605461
推荐阅读
相关标签
  

闽ICP备14008679号