赞
踩
在大数据中,使用了大量的数据。关于数据,我们有两个主要挑战。第一个挑战是如何收集大量数据,第二个挑战是分析收集的数据。为了克服这些挑战,您必须使用消息传递系统。
卡夫卡专为分布式高吞吐量系统而设计。卡夫卡倾向于非常好地取代传统的信息经纪人。与其他消息系统相比,Kafka具有更好的吞吐量,内置分区,复制和固有容错功能,因此非常适合大型消息处理应用程序。
什么是消息系统?
消息系统负责将数据从一个应用程序传输到另一个应用程序,因此应用程序可以专注于数据,但不必担心如何共享数据。分布式消息传递基于可靠消息队列的概念。消息在客户端应用程序和消息传递系统之间异步排队。有两种类型的消息传递模式可用 - 一种是点对点的,另一种是发布 - 订阅(pub-sub)消息传递系统。大多数消息传递模式遵循pub-sub。
点对点消息系统
在点对点系统中,消息被保存在一个队列中。一个或多个消费者可以消费队列中的消息,但是特定的消息只能由最多一个消费者消费。一旦消费者在队列中读取消息,消息就从该队列中消失。这个系统的典型例子是一个订单处理系统,其中每个订单将由一个订单处理器处理,但多个订单处理器也可以同时工作。下图描述了结构。
发布 - 订阅消息系统
在发布 - 订阅系统中,消息被保存在一个主题中。与点对点系统不同,消费者可以订阅一个或多个主题并使用该主题中的所有消息。在发布 - 订阅系统中,消息生产者称为发布者,消息消费者称为订阅者。一个真实的例子是Dish TV,它发布体育,电影,音乐等不同的频道,任何人都可以订阅他们自己的一套频道,并在他们的订阅频道可用时获得。
Kafka是什么
Apache Kafka是一个分布式的发布 - 订阅消息传递系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。卡夫卡适合离线和在线消息消费。卡夫卡消息被保存在磁盘上并在集群内复制以防止数据丢失。Kafka建立在ZooKeeper同步服务之上。它与Apache Storm和Spark完美集成,用于实时流数据分析。
优点
以下是卡夫卡的一些好处 -
可靠性 - 卡夫卡是分布式,分区,复制和容错。
可扩展性 - Kafka消息系统无需停机即可轻松扩展。
耐久性 - Kafka使用分布式提交日志,这意味着消息尽可能快地保留在磁盘上,因此它是持久的。
性能 - Kafka对于发布和订阅消息都有很高的吞吐量。它保持稳定的性能,即使存储了许多TB的消息。
卡夫卡速度非常快,可确保零停机时间和零数据丢失。
用例
卡夫卡可用于许多用例。其中一些列在下面 -
指标 - Kafka通常用于运营监控数据。这涉及从分布式应用程序汇总统计数据以生成操作数据的集中式提要。
日志聚合解决方案 - Kafka可以在整个组织中使用,从多个服务中收集日志,并以标准格式向多个消费者提供。
流处理 - 流行的框架(如Storm和Spark Streaming)可以从主题读取数据,对其进行处理,并将处理后的数据写入新主题,以供用户和应用程序使用。卡夫卡的强耐久性在流加工方面也非常有用。
需要卡夫卡
Kafka是处理所有实时数据馈送的统一平台。Kafka支持低延迟消息传送并在出现机器故障时保证容错。它有能力处理大量不同的消费者。卡夫卡速度非常快,每秒执行200万次写入。卡夫卡会将所有数据保留在磁盘上,这意味着所有写入都会进入操作系统(RAM)的页面缓存。这使得从页面缓存向网络套接字传输数据非常高效。
深入卡夫卡之前,您必须了解主题,经纪人,制作人和消费者等主要术语。下图说明了主要术语,表格详细描述了图表组件。
在上图中,主题被配置为三个分区。分区1具有两个偏移因子0和1.分区2具有四个偏移因子0,1,2和3.分区3具有一个偏移因子0.复制品的id与托管它的服务器的id相同。
假设,如果该主题的复制因子设置为3,则Kafka将为每个分区创建3个相同的副本,并将它们放入群集中以使其可用于其所有操作。为了平衡集群中的负载,每个代理存储一个或多个这些分区。多个生产者和消费者可以同时发布和检索消息。
S.No | Components and Description |
1 | Topics 属于特定类别的消息流被称为主题。数据存储在主题中。 主题分为多个分区。对于每个主题,卡夫卡都保留一个分区的最小范围。每个这样的分区都以不可变的有序顺序包含消息。分区被实现为一组相同大小的段文件。 |
2 | Partition 主题可能有很多分区,所以它可以处理任意数量的数据。 |
3 | Partition offset 每个分区消息都有一个称为偏移量的唯一序列标识。 |
4 | Replicas of partition 副本只是分区的备份。副本从不读取或写入数据。它们用于防止数据丢失。 |
5 | Brokers 经纪人是简单的系统,负责维护公布的数据。每个代理可能每个主题有零个或多个分区。假设,如果一个主题和N个代理中有N个分区,则每个代理将有一个分区。 假设某个主题中有N个分区并且N个代理(n + m)多于N个,则第一个N代理将拥有一个分区,下一个M代理将不会拥有该特定主题的任何分区。 假设某个主题中有N个分区,且N个代理的数量少于N个(nm),则每个代理将拥有一个或多个分区共享。由于代理人之间的负载分配不均衡,不推荐这种情况。 |
6 | Kafka Cluster 卡夫卡拥有多个经纪人称为卡夫卡集群。Kafka集群可以在无需停机的情况下进行扩展。这些集群用于管理消息数据的持久性和复制。 |
7 | Producers 制片人是一个或多个卡夫卡主题的发布者。制片人向卡夫卡经纪人发送数据。每当生产者向经纪人发布消息时,经纪人只需将消息附加到最后一个段文件。实际上,该消息将被附加到分区。生产者也可以将消息发送到他们选择的分区。 |
8 | Consumers 消费者从经纪商那里读取数据。消费者通过从代理商处获取数据来订阅一个或多个主题并消费发布的消息。 |
9 | Leader Leader是负责所有分区读写的节点。每个分区都有一台服务器充当领导者。 |
10 | Follower 遵循领导指示的节点称为追随者。如果领导失败,其中一个追随者将自动成为新领导。追随者扮演正常的消费者角色,拉动消息并更新自己的数据存储。 |
看看下面的插图。它显示了Kafka的集群图。
下表描述了上图中显示的每个组件。
S.No | Components and Description |
1 | Broker Kafka集群通常由多个代理组成,以保持负载平衡。卡夫卡经纪人是无国籍的,所以他们使用ZooKeeper维护他们的集群状态。一个Kafka代理实例可以处理每秒数十万次的读写操作,每个Broker都可以处理TB消息,而不会影响性能。卡夫卡经纪人领导人选举可以由ZooKeeper完成。 |
2 | ZooKeeper ZooKeeper用于管理和协调Kafka经纪人。ZooKeeper服务主要用于通知生产者和消费者有关Kafka系统中任何新经纪人的存在或Kafka系统中经纪人的失败。根据Zookeeper收到的有关经纪人存在或失败的通知,生产者和消费者就会做出决定,并开始与其他经纪人协调他们的任务。 |
3 | Producers 生产者将数据推送给经纪商。新代理启动后,所有生产者都会搜索它并自动向该新代理发送消息。卡夫卡制片人不会等待经纪人的确认,并且可以像经纪人能够处理的那样快地发送消息。 |
4 | Consumers 由于Kafka代理是无状态的,这意味着消费者必须通过使用分区偏移量来维护消耗了多少消息。如果消费者确认特定的消息偏移量,则意味着消费者已经消费了所有先前的消息。消费者向代理发出一个异步拉取请求,以准备消耗字节缓冲区。消费者可以简单地通过提供偏移值来倒回或跳到分区中的任何点。消费者补偿值由ZooKeeper通知。 |
截至目前,我们已经讨论了卡夫卡的核心概念。现在让我们来看看Kafka的工作流程。
卡夫卡只是分成一个或多个分区的主题集合。Kafka分区是消息的线性排序序列,每个消息由其索引标识(称为偏移量)。Kafka集群中的所有数据都是不相关的分区联合。传入消息写在分区的末尾,消费者依次读取消息。通过将消息复制到不同的经纪商来提供耐久性。
Kafka以快速,可靠,持久的容错和零停机方式提供基于pub-sub和queue的消息传递系统。在这两种情况下,制作者只需将消息发送到一个主题,消费者就可以根据他们的需要选择任何一种消息系统。让我们按照下一节中的步骤来了解消费者如何选择他们选择的消息系统。
Pub-Sub消息传递的工作流程
以下是Pub-Sub Messaging的步骤明智的工作流程 -
生产者定期发送消息给一个主题。
卡夫卡经纪人将所有消息存储在为该特定主题配置的分区中。它确保消息在分区之间平均分配。如果制作者发送两条消息并且有两个分区,则Kafka将在第一个分区中存储一条消息,并在第二个分区中存储第二条消息。
消费者订阅特定主题。
一旦消费者订阅了一个主题,Kafka将向消费者提供该主题的当前偏移量,并且还将该偏移量保存在Zookeeper集合中。
消费者会定期请求卡夫卡(如100小时)收取新消息。
卡夫卡收到生产者的消息后,会将这些消息转发给消费者。
消费者将收到消息并进行处理。
一旦消息被处理,消费者将向卡夫卡经纪人发送确认。
卡夫卡收到确认后,会将偏移量更改为新值,并在Zookeeper中更新它。由于在Zookeeper中维护了偏移量,因此即使在服务器暴力期间,使用者也可以正确读取下一条消息。
上述流程将重复,直到消费者停止请求。
消费者可以随时选择倒带/跳到期望的主题偏移,并阅读所有后续消息。
队列消息/消费者组的工作流程
在队列消息系统而不是单个消费者中,具有相同组ID的一组消费者将订阅主题。简而言之,订阅具有相同组ID的主题的消费者被视为单个组,并且消息在他们之间共享。让我们检查一下这个系统的实际工作流程。
生产者定期向主题发送消息。
Kafka将所有消息存储在为特定主题配置的分区中,类似于之前的场景。
单个消费者预订特定主题,假定主题-01与组ID作为第1组。
Kafka与Pub-Sub Messaging一样与消费者进行交互,直到新消费者订阅同一主题Topic-01,其Group ID与Group-1相同。
一旦新消费者到达,卡夫卡将其操作切换到共享模式并在两个消费者之间共享数据。这种共享将持续到用户数量达到为该特定主题配置的分区数量。
一旦消费者数量超过分区数量,新消费者将不会收到任何进一步的消息,直到任何一个现有的消费者退订。这种情况的出现是因为卡夫卡的每个消费者都将被分配至少一个分区,并且一旦所有分区被分配给现有消费者,新消费者将不得不等待。
这个功能也被称为消费群。以同样的方式,卡夫卡将以非常简单和有效的方式提供这两个系统中最好的。
ZooKeeper的角色
Apache Kafka的关键依赖是Apache Zookeeper,它是一个分布式配置和同步服务。Zookeeper作为卡夫卡经纪人和消费者之间的协调接口。卡夫卡服务器通过Zookeeper集群共享信息。Kafka在Zookeeper中存储基本元数据,例如有关主题,经纪人,消费者偏移量(队列读取器)等的信息。
由于所有关键信息都存储在Zookeeper中,并且它通常在整个集群中复制这些数据,所以Kafka broker / Zookeeper的故障不会影响Kafka集群的状态。一旦Zookeeper重新启动,Kafka将恢复状态。这给Kafka带来零停机时间。卡夫卡经纪人之间的领导者选举也是通过在领导失败的情况下使用Zookeeper完成的。
要了解Zookeeper的更多信息,请参阅zookeeper
让我们继续进一步讨论如何在下一章安装Java,ZooKeeper和Kafka。
第1步 安装Java
第2步 安装ZooKeeper
第3步 - Apache Kafka安装
让我们继续以下步骤在您的机器上安装Kafka。
步骤3.1 - 下载Kafka
要在您的机器上安装Kafka,请点击下面的链接 -
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
现在最新版本,即 - kafka_2.11_0.9.0.0.tgz将被下载到您的机器上。
第3.2步 - 提取tar文件
使用以下命令提取tar文件 -
$ cd opt/ $ tar -zxf kafka_2.11.0.9.0.0 tar.gz $ cd kafka_2.11.0.9.0.0 |
现在您已经在您的机器上下载了最新版本的Kafka。
步骤3.3 - 启动服务器
您可以通过提供以下命令启动服务器 -
$ bin/kafka-server-start.sh config/server.properties |
服务器启动后,您会在屏幕上看到以下响应 -
$ bin/kafka-server-start.sh config/server.properties [2016-01-02 15:37:30,410] INFO KafkaConfig values: request.timeout.ms = 30000 log.roll.hours = 168 inter.broker.protocol.version = 0.9.0.X log.preallocate = false security.inter.broker.protocol = PLAINTEXT ……………………………………………. ……………………………………………. |
第4步 - 停止服务器
执行所有操作后,可以使用以下命令停止服务器 -
$ bin/kafka-server-stop.sh config/server.properties |
现在我们已经讨论了Kafka安装,我们可以在下一章学习如何在Kafka上执行基本操作。
首先,让我们开始实施单节点单代理配置,然后我们将设置迁移到单节点多代理配置。
希望你现在已经在你的机器上安装了Java,ZooKeeper和Kafka。在转移到Kafka集群设置之前,首先您需要启动ZooKeeper,因为Kafka集群使用ZooKeeper。
启动ZooKeeper
打开一个新终端并输入以下命令 -
bin/zookeeper-server-start.sh config/zookeeper.properties |
要启动Kafka Broker,请键入以下命令 -
bin/kafka-server-start.sh config/server.properties |
启动Kafka Broker后,在ZooKeeper终端上输入命令jps,您将看到以下响应 -
821 QuorumPeerMain 928 Kafka 931 Jps |
现在你可以看到在终端上运行的两个守护进程,其中QuorumPeerMain是ZooKeeper守护进程,另一个守护进程是Kafka守护进程。
单节点单代理配置
在这个配置中你有一个ZooKeeper和broker id实例。以下是配置它的步骤 -
创建一个Kafka主题 - Kafka提供了一个名为kafka-topics.sh的命令行实用程序来在服务器上创建主题。打开新的终端并键入下面的示例。
语法
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-name |
例
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka |
我们只用一个分区和一个复制因子创建了一个名为Hello-Kafka的主题。以上创建的输出将与以下输出类似 -
输出 -Created topic “Hello-Kafka”
创建主题后,您可以在“卡夫卡经纪人终端”窗口中获取通知,并在config / server.properties文件的“/ tmp / kafka-logs /”中指定所创建主题的日志。
主题列表
要获得Kafka服务器中的主题列表,可以使用以下命令 -
语法
bin/kafka-topics.sh --list --zookeeper localhost:2181 |
输出
Hello-Kafka |
由于我们已经创建了一个主题,因此它只会列出Hello-Kafka。假设,如果您创建多个主题,您将在输出中获得主题名称。
启动制作人发送消息
语法
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name |
从上面的语法来看,生产者命令行客户端需要两个主要参数 -
经纪商名单 - 我们想要发送消息的经纪商名单。在这种情况下,我们只有一个经纪人。Config / server.properties文件包含代理端口ID,因为我们知道我们的代理正在监听端口9092,因此您可以直接指定它。
主题名称 - 以下是主题名称的示例。
例
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka |
制片人将等待stdin的输入并发布到Kafka集群。默认情况下,每个新行都作为新消息发布,然后在config / producer.properties文件中指定默认生产者属性。现在,您可以在终端中键入几行消息,如下所示。
输出
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka[2016-01-16 13:50:45,931] WARN property topic is not valid (kafka.utils.Verifia-bleProperties) Hello My first message |
My second message |
启动消费者接收消息
与生产者类似,默认的消费者属性在config / consumer.properties文件中指定。打开一个新终端并输入以下消息消息语法。
语法
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name --from-beginning |
例
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka --from-beginning |
输出
Hello My first message My second message |
最后,您可以从生产者的终端输入消息,并看到它们出现在消费者的终端中。到目前为止,您对使用单个代理的单节点群集有非常好的理解。现在让我们转到多个代理配置。
单节点多代理配置
在转到多代理群集设置之前,首先启动您的ZooKeeper服务器。
创建多个Kafka代理 - 我们在con-fig / server.properties中已经有一个Kafka代理实例。现在我们需要多个代理实例,因此将现有的server.prop- erties文件复制到两个新的配置文件中,并将其重命名为server-one.properties和server-two.properties。然后编辑两个新文件并分配以下更改 -
config/server-one.properties
# The id of the broker. This must be set to a unique integer for each broker. broker.id=1 # The port the socket server listens on port=9093 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-1 |
config/server-two.properties
# The id of the broker. This must be set to a unique integer for each broker. broker.id=2 # The port the socket server listens on port=9094 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-2 |
启动多个代理 - 在三台服务器上完成所有更改后,打开三个新终端逐个启动每个代理。
Broker1 bin/kafka-server-start.sh config/server.properties Broker2 bin/kafka-server-start.sh config/server-one.properties Broker3 bin/kafka-server-start.sh config/server-two.properties |
现在我们有三台不同的经纪人在机器上运行。通过在ZooKeeper终端上输入jps来检查所有的守护进程,然后你会看到响应。
创建一个主题
让我们为此主题指定复制因子值为三,因为我们有三个不同的代理正在运行。如果您有两个代理商,则分配的副本值将为两个。
语法
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic topic-name |
例
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Multibrokerapplication |
输出
created topic “Multibrokerapplication” |
将描述命令用于检查哪个代理对当前创建的主题听如下所示-
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation |
输出
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation
Topic:Multibrokerapplication PartitionCount:1 ReplicationFactor:3 Configs:
Topic:Multibrokerapplication Partition:0 Leader:0 Replicas:0,2,1 Isr:0,2,1 |
从上面的输出中,我们可以得出结论,第一行给出了所有分区的摘要,显示了我们已经选择的主题名称,分区计数和复制因子。在第二行中,每个节点将成为分区随机选择部分的领导者。
在我们的案例中,我们看到我们的第一个经纪人(带有broker.id 0)是领导者。然后副本:0,2,1意味着所有的券商复制的话题终于ISR是一组在同步副本。那么,这是复制品的一部分,目前活着并被领导者追上。
启动制作人发送消息
此过程与单个代理程序设置中的相同。
例
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication |
输出
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication [2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties) This is single node-multi broker demo This is the second message |
启动消费者接收消息
此过程与单个代理设置中显示的相同。
例
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Multibrokerapplica-tion --from-beginning |
输出
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Multibrokerapplica-tion —from-beginning This is single node-multi broker demo This is the second message |
主题基本操作
在本章中,我们将讨论各种基本的主题操作。
修改主题
正如您已经了解如何在Kafka集群中创建主题。现在让我们使用以下命令修改创建的主题
语法
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name --parti-tions count |
例
We have already created a topic “Hello-Kafka” with single partition count and one replica factor. Now using “alter” command we have changed the partition count. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic Hello-kafka --parti-tions 2 |
删除主题
要删除主题,您可以使用以下语法。
语法
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name |
例
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka |
输出
> Topic Hello-kafka marked for deletion |
注 -如果delete.topic.enable未设置为true,则这不会产生影响
让我们创建一个使用Java客户端发布和使用消息的应用程序。卡夫卡制作人客户端由以下API组成。
KafkaProducer API
让我们了解本节中最重要的一组Kafka生产者API。KafkaProducer API的核心部分是KafkaProducer类。KafkaProducer类提供了一个选项,用于将Kafka代理的构造函数与以下方法连接起来。
KafkaProducer类提供send方法来异步发送消息到主题。send()的签名如下
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback); |
ProducerRecord - 生产者管理等待发送的记录缓冲区。
Callback - 服务器确认记录时执行的用户提供的回调函数(null表示无回叫)。
KafkaProducer类提供了一种flush方法来确保所有先前发送的消息已经实际完成。冲洗方法的语法如下 -
public void flush() |
KafkaProducer类提供了partitionFor方法,该方法有助于获取给定主题的分区元数据。这可以用于自定义分区。这种方法的签名如下 -
public Map metrics() |
它返回生产者维护的内部度量图。
public void close() - KafkaProducer类提供close方法块,直到完成所有先前发送的请求。
Producer API
Producer API的核心部分是Producer类。Producer类提供了一个选项,通过以下方法在其构造函数中连接Kafka代理。
Producer类
生产者类提供send方法,使用以下签名将消息发送到单个或多个主题。
public void send(KeyedMessaget<k,v> message) - sends the data to a single topic,par-titioned by key using either sync or async producer. public void send(List<KeyedMessage<k,v>>messages) - sends data to multiple topics. Properties prop = new Properties(); prop.put(producer.type,”async”) ProducerConfig config = new ProducerConfig(prop); |
有两种类型的生产者 - 同步和异步。
同样的API配置也适用于Sync生产者。它们之间的区别是同步生产者直接发送消息,但在后台发送消息。当您想要更高的吞吐量时,首选异步生产者。在0.8之前的版本中,异步生产者没有回调send()来注册错误处理程序。这仅在当前版本0.9中可用。
public void close()
生产者类提供关闭方法来关闭所有卡夫卡经纪人的生产者池连接。
配置设置
生产者API的主要配置设置在下表中列出以便更好地理解 -
S.No | Configuration Settings and Description |
1 | client.id 确定生产者申请 |
2 | producer.type 无论是同步还是异步 |
3 | acks acks配置控制生产者请求下的标准被认为是完整的。 |
4 | retries 如果生产者请求失败,然后自动重试具有特定的值。 |
5 | bootstrap.servers 引导经纪人名单。 |
6 | linger.ms 如果你想减少请求的数量,你可以将linger.ms设置为大于某个值的值。 |
7 | key.serializer 串行器接口的关键。 |
8 | value.serializer 串行器接口的值。 |
9 | batch.size 缓冲区大小。 |
10 | buffer.memory 控制制造商可用于缓冲的总内存量。 |
ProducerRecord API
ProducerRecord是发送给Kafka cluster.ProducerRecord类的键/值对,用于使用以下签名创建包含分区,键和值对的记录。
public ProducerRecord (string topic, int partition, k key, v value) |
主题 - 将追加记录的用户定义的主题名称。
分区 - 分区计数
密钥 - 将包含在记录中的密钥。
价值 - 记录内容
public ProducerRecord (string topic, k key, v value) |
ProducerRecord类构造函数用于创建一个记录,其中包含键值对并且没有分区。
主题 - 创建一个分配记录的主题。
钥匙 - 记录的钥匙。
价值 - 记录内容。
public ProducerRecord (string topic, v value) |
ProducerRecord类创建没有分区和密钥的记录。
主题 - 创建主题。
价值 - 记录内容。
下表中列出了ProducerRecord类的方法 -
S.No | 类方法和描述 |
1 | public string topic() 主题将附加到记录。 |
2 | public K key() 将包含在记录中的密钥。如果没有这样的密钥,null将在这里重新转向。 |
3 | public V value() 记录内容。 |
4 | partition() 记录的分区计数 |
SimpleProducer应用程序
在创建应用程序之前,首先启动ZooKeeper和Kafka代理,然后使用create topic命令在Kafka代理中创建自己的主题。之后,创建一个名为Sim-pleProducer.java的Java类并输入以下代码。
//import util.properties packages import java.util.Properties;
//import simple producer packages import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer” public class SimpleProducer {
public static void main(String[] args) throws Exception{
// Check arguments length value if(args.length == 0){ System.out.println("Enter topic name”); return; }
//Assign topicName to string variable String topicName = args[0].toString();
// create instance for properties to access producer configs Properties props = new Properties();
//Assign localhost id props.put("bootstrap.servers", “localhost:9092");
//Set acknowledgements for producer requests. props.put("acks", “all");
//If the request fails, the producer can automatically retry, props.put("retries", 0);
//Specify buffer size in config props.put("batch.size", 16384);
//Reduce the no of requests less than 0 props.put("linger.ms", 1);
//The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serializa-tion.StringSerializer");
Producer<String, String> producer = new KafkaProducer <String, String>(props);
for(int i = 0; i < 10; i++) producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); System.out.println(“Message sent successfully”); producer.close(); } } |
编译 - 可以使用以下命令编译应用程序。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java |
执行 - 可以使用以下命令执行应用程序。
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name> |
输出
Message sent successfully To check the above output open new terminal and type Consumer CLI command to receive messages. >> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning 1 2 3 4 5 6 7 8 9 10 |
简单的用户示例
截至目前,我们已经创建了一个制作人向Kafka集群发送消息。现在让我们创建一个使用者来使用来自Kafka集群的消息。KafkaConsumer API用于消费来自Kafka集群的消息。下面定义了KafkaConsumer类的构造函数。
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs) |
configs - 返回消费者配置的地图。
KafkaConsumer类具有下表中列出的以下重要方法。
S.No | Method and Description |
1 | public java.util.Set<TopicPar-tition> assignment() 获取消费者当前分配的一组分区。 |
2 | public string subscription() 订阅给定的主题列表以获取动态签署的分区。 |
3 | public void sub-scribe(java.util.List<java.lang.String> topics, ConsumerRe-balanceListener listener) 订阅给定的主题列表以获取动态签署的分区。 |
4 | public void unsubscribe() 取消订阅给定分区列表中的主题。 |
5 | public void sub-scribe(java.util.List<java.lang.String> topics) 订阅给定的主题列表以获取动态签署的分区。如果给定的主题列表为空,则将其视为与取消订阅()相同。 |
6 | public void sub-scribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener) 参数模式以正则表达式的格式引用订阅模式,而侦听器参数从订阅模式中获取通知。 |
7 | public void as-sign(java.util.List<TopicParti-tion> partitions) 手动将分区列表分配给客户。 |
8 | poll() 获取使用其中一个订阅/分配API指定的主题或分区的数据。如果在轮询数据之前没有订阅主题,这将返回错误。 |
9 | public void commitSync() 提交最后一次poll()返回的主题和分区的所有sub-scribed列表的偏移量。同样的操作应用于commitAsyn()。 |
10 | public void seek(TopicPartition partition, long offset) 获取消费者将在下一个poll()方法中使用的当前偏移值。 |
11 | public void resume() 恢复暂停的分区。 |
12 | public void wakeup() 唤醒消费者。 |
ConsumerRecord API
ConsumerRecord API用于接收来自Kafka集群的记录。该API由主题名称,分区编号以及指向Kafka分区中记录的偏移量组成。ConsumerRecord类用于创建具有特定主题名称,分区计数和<key,value>对的消费者记录。它有以下签名。
public ConsumerRecord(string topic,int partition, long offset,K key, V value) |
主题 - 从Kafka集群收到的消费者记录的主题名称。
分区 - 主题的分区。
Key - 记录的关键字,如果不存在关键字null将被返回。
价值 - 记录内容。
ConsumerRecords API
ConsumerRecords API充当ConsumerRecord的容器。此API用于保留特定主题的每个分区的ConsumerRecord列表。它的构造函数定义如下。
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List <Consumer-Record>K,V>>> records) |
TopicPartition - 返回特定主题的分区映射。
记录 - ConsumerRecord的返回列表。
ConsumerRecords类定义了以下方法。
S.No | 方法和描述 |
1 | public int count() 所有主题的记录数。 |
2 | public Set partitions() 该记录集中包含数据的分区集(如果没有数据返回,则该集为空)。 |
3 | public Iterator iterator() 迭代器使您能够遍历集合,获取或重新移动元素。 |
4 | public List records() 获取给定分区的记录列表。 |
配置设置
消费者客户端API主配置设置的配置设置如下所示 -
S.No | Settings and Description |
1 | bootstrap.servers 引导经纪人名单。 |
2 | group.id 将个人消费者分配给组。 |
3 | enable.auto.commit 如果值为true,则为偏移启用自动提交,否则不提交。 |
4 | auto.commit.interval.ms 返回更新的消耗偏移量被写入ZooKeeper的频率。 |
5 | session.timeout.ms 指示在放弃并继续使用消息之前,Kafka将等待多少毫秒以等待ZooKeeper响应请求(读取或写入)。 |
SimpleConsumer应用程序
生产者申请步骤在这里保持不变。首先,启动您的ZooKeeper和Kafka经纪人。然后使用名为SimpleCon-sumer.java的Java类创建一个SimpleConsumer应用程序并键入以下代码。
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord;
public class SimpleConsumer { public static void main(String[] args) throws Exception { if(args.length == 0){ System.out.println("Enter topic name"); return; } //Kafka consumer configuration settings String topicName = args[0].toString(); Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props);
//Kafka Consumer subscribes list of topics here. consumer.subscribe(Arrays.asList(topicName))
//print the topic name System.out.println("Subscribed to topic " + topicName); int i = 0;
while (true) { ConsumerRecords<String, String> records = con-sumer.poll(100); for (ConsumerRecord<String, String> record : records)
// print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } } |
编译 - 可以使用以下命令编译应用程序。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java |
执行 -可以使用以下命令执行应用程序
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name> |
输入 - 打开生产者CLI并发送一些消息给主题。您可以将smple输入作为'Hello Consumer'。
输出 - 以下将是输出。
Subscribed to topic Hello-Kafka offset = 3, key = null, value = Hello Consumer |
消费者组是来自Kafka主题的多线程或多机器消费。
消费者组
消费者可以通过使用相同的group.id加入一个组。
一个组的最大并行度是该组中的消费者的数量←分区的数量。
Kafka将一个主题的分区分配给组中的使用者,以便每个分区仅由组中的一位消费者使用。
卡夫卡保证只有群组中的单个消费者阅读消息。
消费者可以按照存储在日志中的顺序查看消息。
重新平衡消费者
添加更多流程/线程将导致Kafka重新平衡。如果任何消费者或代理未能向ZooKeeper发送心跳信号,则可以通过Kafka集群重新配置它。在这种重新平衡期间,Kafka会将可用分区分配给可用线程,可能会将分区移至另一个进程。
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup { public static void main(String[] args) throws Exception { if(args.length < 2){ System.out.println("Usage: consumer <topic> <groupname>"); return; }
String topic = args[0].toString(); String group = args[1].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serializa-tion.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
consumer.subscribe(Arrays.asList(topic)); System.out.println("Subscribed to topic " + topic); int i = 0;
while (true) { ConsumerRecords<String, String> records = con-sumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } } |
编译
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java |
执行
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup <topic-name> my-group >>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup <topic-name> my-group |
在这里,我们创建了一个样本组名称作为my-group与两个消费者。同样,您可以在组中创建您的组和消费者数量。
输入
打开生产者CLI并发送一些消息,如 -
Test consumer group 01 Test consumer group 02 |
第一个过程的输出
Subscribed to topic Hello-kafka offset = 3, key = null, value = Test consumer group 01 |
第二个过程的输出
Subscribed to topic Hello-kafka offset = 3, key = null, value = Test consumer group 02 |
现在希望你能通过使用Java客户端演示了解SimpleConsumer和ConsumeGroup。现在您已经了解了如何使用Java客户端发送和接收消息。让我们在下一章继续卡夫卡与大数据技术的整合。
卡夫卡工具包装在“org.apache.kafka.tools.*”下。工具分为系统工具和复制工具。
系统工具
系统工具可以使用run class脚本从命令行运行。语法如下 -
bin/kafka-run-class.sh package.class - - options |
下面提到了一些系统工具 -
Kafka Migration Tool - 此工具用于将代理从一个版本迁移到另一个版本。
Mirror Maker - 此工具用于将一个Kafka集群镜像到另一个。
Consumer Offset Checker - 此工具显示指定的一组主题和使用者组的消费者组,主题,分区,偏移量,日志大小,所有者。
复制工具
卡夫卡复制是一个高层次的设计工具。添加复制工具的目的是提供更强的耐用性和更高的可用性。下面提到了一些复制工具 -
Create Topic Tool - 这会创建一个包含默认分区数量,复制因子的主题,并使用Kafka的默认方案执行副本分配。
List Topic Tool - 此工具列出给定主题列表的信息。如果在命令行中没有提供主题,该工具将查询Zookeeper以获取所有主题并列出它们的信息。该工具显示的字段是主题名称,分区,领导,副本,isr。
Add Partition Tool - 创建主题时,必须指定主题的分区数量。稍后,当话题量增加时,话题可能需要更多的分区。此工具有助于为特定主题添加更多分区,还可以手动添加分区的副本分配。
在本章中,我们将学习如何将Kafka与Apache Storm集成。
关于Storm
Storm最初是由Nathan Marz和BackType团队创建的。在很短的时间内,Apache Storm成为分布式实时处理系统的标准,允许您处理大量数据。Storm速度非常快,每个节点每秒处理超过一百万个元组的基准时钟。Apache Storm持续运行,从配置的源(Spouts)中消耗数据并将数据传递到处理管道(Bolts)。组合,喷嘴和螺栓构成拓扑。
与Storm整合
Kafka和Storm自然而然地相互补充,他们强大的合作能够实现快速移动大数据的实时流式分析。Kafka和Storm的整合使得开发者更容易从Storm拓扑中获取和发布数据流。
概念流程
喷口是溪流的来源。例如,喷口可以读取Kafka主题中的元组并将它们作为流发送。螺栓消耗输入流,处理并可能发射新的流。Bolts可以做任何事情,从运行功能,过滤元组,流聚合,流式连接,与数据库交谈等等。Storm拓扑中的每个节点并行执行。一个拓扑无限期地运行,直到你终止它。Storm会自动重新分配任何失败的任务。此外,即使机器停机并且信息丢失,Storm也可以保证不会丢失数据。
让我们详细介绍Kafka-Storm集成API。有三个主要课程将Kafka和Storm整合在一起。他们如下 -
BrokerHosts - ZkHosts&StaticHosts
BrokerHosts是一个接口,ZkHosts和StaticHosts是它的两个主要实现。ZkHosts用于通过在ZooKeeper中维护详细信息来动态跟踪卡夫卡经纪人,而StaticHosts用于手动/静态设置卡夫卡经纪人及其详细信息。ZkHosts是访问卡夫卡经纪人的简单而快捷的方式。
ZkHosts的签名如下 -
public ZkHosts(String brokerZkStr, String brokerZkPath) public ZkHosts(String brokerZkStr) |
其中brokerZkStr是ZooKeeper主机,brokerZkPath是维护Kafka代理细节的ZooKeeper路径。
KafkaConfig API
该API用于定义Kafka集群的配置设置。Kafka Con-fig的签名定义如下
public KafkaConfig(BrokerHosts hosts, string topic) |
主持人 - BrokerHosts可以是ZkHosts / StaticHosts。
主题 - 主题名称。
SpoutConfig API
Spoutconfig是KafkaConfig的扩展,支持额外的ZooKeeper信息。
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id) |
主机 - BrokerHosts可以是BrokerHosts接口的任何实现
主题 - 主题名称。
zkRoot - ZooKeeper根路径。
id -喷口存储在Zookeeper中消耗的偏移量的状态。该ID应该唯一标识你的喷口。
SchemeAsMultiScheme
SchemeAsMultiScheme是一个接口,它规定了从Kafka消耗的ByteBuffer如何转换为风暴元组。它来自MultiScheme并接受Scheme类的实现。Scheme类有很多实现,一个这样的实现是StringScheme,它将字节解析为一个简单的字符串。它还控制输出字段的命名。签名定义如下。
public SchemeAsMultiScheme(Scheme scheme) |
Scheme - 从kafka消耗的字节缓冲区。
KafkaSpout API
KafkaSpout是我们的喷口实现,它将与Storm整合。它从kafka主题获取消息并将其作为元组发送到Storm生态系统中。KafkaSpout从SpoutConfig获取配置细节。
以下是创建一个简单的卡夫卡喷嘴的示例代码。
// ZooKeeper connection string BrokerHosts hosts = new ZkHosts(zkConnString);
//Creating SpoutConfig Object SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName UUID.randomUUID().toString());
//convert the ByteBuffer to String. spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
//Assign SpoutConfig to KafkaSpout. KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); |
螺栓创建
Bolt是一个将元组作为输入,处理元组并生成新的元组作为输出的组件。Bolts将实施IRichBolt界面。在这个程序中,使用两个螺栓类WordSplitter-Bolt和WordCounterBolt来执行操作。
IRichBolt接口有以下方法 -
准备 - 为螺栓提供执行的环境。执行者将运行此方法来初始化喷口。
执行 - 处理输入的单个元组。
清理 - 当螺栓即将关闭时调用。
declareOutputFields - 声明元组的输出模式。
让我们创建SplitBolt.java,它实现了将句子分成单词和CountBolt.java的逻辑,它实现了逻辑来分离唯一的单词并计算它的出现次数。
SplitBolt.java
import java.util.Map;
import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.IRichBolt; import backtype.storm.task.TopologyContext;
public class SplitBolt implements IRichBolt { private OutputCollector collector;
@Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; }
@Override public void execute(Tuple input) { String sentence = input.getString(0); String[] words = sentence.split(" ");
for(String word: words) { word = word.trim();
if(!word.isEmpty()) { word = word.toLowerCase(); collector.emit(new Values(word)); }
}
collector.ack(input); }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); }
@Override public void cleanup() {}
@Override public Map<String, Object> getComponentConfiguration() { return null; }
} |
CountBolt.java
import java.util.Map; import java.util.HashMap;
import backtype.storm.tuple.Tuple; import backtype.storm.task.OutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.IRichBolt; import backtype.storm.task.TopologyContext;
public class CountBolt implements IRichBolt{ Map<String, Integer> counters; private OutputCollector collector;
@Override public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.counters = new HashMap<String, Integer>(); this.collector = collector; }
@Override public void execute(Tuple input) { String str = input.getString(0);
if(!counters.containsKey(str)){ counters.put(str, 1); }else { Integer c = counters.get(str) +1; counters.put(str, c); }
collector.ack(input); }
@Override public void cleanup() { for(Map.Entry<String, Integer> entry:counters.entrySet()){ System.out.println(entry.getKey()+" : " + entry.getValue()); } }
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
@Override public Map<String, Object> getComponentConfiguration() { return null; } } |
提交到拓扑
Storm拓扑基本上是一个Thrift结构。TopologyBuilder类提供了简单而简单的方法来创建复杂的拓扑。TopologyBuilder类具有设置喷口(setSpout)和设置螺栓(setBolt)的方法。最后,TopologyBuilder创建了拓扑学来创建拓朴学。shuffleGrouping和fieldsGrouping方法有助于设置喷嘴和螺栓的流分组。
本地群集 - 为了开发目的,我们可以使用LocalCluster对象创建本地群集,然后使用LocalCluster类的submitTopology方法提交拓扑。
KafkaStormSample.java
import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.topology.TopologyBuilder;
import java.util.ArrayList; import java.util.List; import java.util.UUID;
import backtype.storm.spout.SchemeAsMultiScheme; import storm.kafka.trident.GlobalPartitionInformation; import storm.kafka.ZkHosts; import storm.kafka.Broker; import storm.kafka.StaticHosts; import storm.kafka.BrokerHosts; import storm.kafka.SpoutConfig; import storm.kafka.KafkaConfig; import storm.kafka.KafkaSpout; import storm.kafka.StringScheme;
public class KafkaStormSample { public static void main(String[] args) throws Exception{ Config config = new Config(); config.setDebug(true); config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 1); String zkConnString = "localhost:2181"; String topic = "my-first-topic"; BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig kafkaSpoutConfig = new SpoutConfig (hosts, topic, "/" + topic, UUID.randomUUID().toString()); kafkaSpoutConfig.bufferSizeBytes = 1024 * 1024 * 4; kafkaSpoutConfig.fetchSizeBytes = 1024 * 1024 * 4; kafkaSpoutConfig.forceFromStart = true; kafkaSpoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("kafka-spout", new KafkaSpout(kafkaSpoutCon-fig)); builder.setBolt("word-spitter", new SplitBolt()).shuffleGroup-ing("kafka-spout"); builder.setBolt("word-counter", new CountBolt()).shuffleGroup-ing("word-spitter");
LocalCluster cluster = new LocalCluster(); cluster.submitTopology("KafkaStormSample", config, builder.create-Topology());
Thread.sleep(10000);
cluster.shutdown(); } } |
在移动编译之前,Kakfa-Storm集成需要馆长ZooKeeper客户端java库。策展人版本2.9.1支持Apache Storm 0.9.5版本(我们在本教程中使用)。下载下面指定的jar文件并将它放在java类路径中。
策展人客户2.9.1.jar
馆长框架-2.9.1.jar
在包含依赖文件后,使用以下命令编译程序,
javac -cp "/path/to/Kafka/apache-storm-0.9.5/lib/*" *.java |
执行
启动Kafka Producer CLI(在上一章中解释),创建一个名为my-first-topic的新主题并提供一些示例消息,如下所示 -
hello kafka storm spark test message another test message |
现在使用以下命令执行应用程序 -
java -cp“/path/to/Kafka/apache-storm-0.9.5/lib/*”:. KafkaStormSample |
此应用程序的输出示例如下所示 -
storm : 1 test : 2 spark : 1 another : 1 kafka : 1 hello : 1 message : 2 |
在本章中,我们将讨论如何将Apache Kafka与Spark Streaming API集成。
关于Spark
Spark Streaming API支持实时数据流的可扩展,高吞吐量,容错流处理。数据可以从Kafka,Flume,Twitter等许多来源获取,并且可以使用复杂算法进行处理,例如地图,缩小,连接和窗口等高级功能。最后,处理后的数据可以推送到文件系统,数据库和现场仪表板上。弹性分布式数据集(RDD)是Spark的基础数据结构。它是一个不可变的分布式对象集合。RDD中的每个数据集都被划分为逻辑分区,这些分区可以在集群的不同节点上进行计算。
与Spark整合
Kafka是Spark流媒体的潜在消息传递和集成平台。Kafka充当实时数据流的中心枢纽,并使用Spark Streaming中的复杂算法进行处理。数据处理完成后,Spark Streaming可以将结果发布到HDFS,数据库或仪表板中的另一个Kafka主题或商店中。下图描述了概念流程。
现在,让我们详细介绍Kafka-Spark API。
SparkConf API
它代表Spark应用程序的配置。用于将各种Spark参数设置为键值对。
SparkConf类具有以下方法 -
设置(字符串键,字符串值) - 设置配置变量。
删除(字符串键) - 从配置中删除键。
setAppName(字符串名称) - 为应用程序设置应用程序名称。
获取(字符串键) - 获取密钥
StreamingContext API
这是Spark功能的主要入口点。SparkContext表示与Spark群集的连接,并且可用于在群集上创建RDD,累加器和广播变量。签名的定义如下所示。
public StreamingContext(String master, String appName, Duration batchDuration, String sparkHome, scala.collection.Seq<String> jars, scala.collection.Map<String,String> environment) |
要连接的主群集URL(例如,mesos:// host:port,spark:// host:port,local [4])。
appName - 您的作业的名称,以显示在集群Web UI上
batchDuration - 流数据将被分成批次的时间间隔
public StreamingContext(SparkConf conf, Duration batchDuration) |
通过提供新的SparkContext所需的配置来创建StreamingContext。
conf - Spark参数
batchDuration - 流数据将被分成批次的时间间隔
KafkaUtils API
KafkaUtils API用于将Kafka集群连接到Spark流。该API具有如下定义的重要方法createStream签名。
public static ReceiverInputDStream<scala.Tuple2<String,String>> createStream( StreamingContext ssc, String zkQuorum, String groupId, scala.collection.immutable.Map<String,Object> topics, StorageLevel storageLevel) |
上面显示的方法用于创建从Kafka Brokers中提取消息的输入流。
ssc - StreamingContext对象。
zkQuorum - Zookeeper仲裁。
groupId - 此消费者的组ID。
主题 - 返回要消费的主题地图。
storageLevel - 用于存储接收对象的存储级别。
KafkaUtils API还有另一种方法createDirectStream,它用于创建一个输入流,直接从Kafka Brokers中提取消息而不使用任何接收器。此流可以保证来自Kafka的每条消息都只包含在一次转换中。
示例应用程序在Scala中完成。要编译应用程序,请下载并安装sbt,scala构建工具(与maven类似)。主应用程序代码如下所示。
import java.util.HashMap
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, Produc-erRecord} import org.apache.spark.SparkConf import org.apache.spark.streaming._ import org.apache.spark.streaming.kafka._
object KafkaWordCount { def main(args: Array[String]) { if (args.length < 4) { System.err.println("Usage: KafkaWordCount <zkQuorum><group> <topics> <numThreads>") System.exit(1) }
val Array(zkQuorum, group, topics, numThreads) = args val sparkConf = new SparkConf().setAppName("KafkaWordCount") val ssc = new StreamingContext(sparkConf, Seconds(2)) ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1L)) .reduceByKeyAndWindow(_ + _, _ - _, Minutes(10), Seconds(2), 2) wordCounts.print()
ssc.start() ssc.awaitTermination() } } |
构建脚本
spark-kafka集成取决于火花,火花流和火花Kafka集成罐。创建一个新的文件build.sbt并指定应用程序的详细信息及其依赖关系。该SBT在编译和打包应用程序会下载必要的罐子。
name := "Spark Kafka Project" version := "1.0" scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0" libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0" libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0" |
编译/包装
运行以下命令来编译和打包应用程序的jar文件。我们需要将jar文件提交到spark控制台来运行应用程序。
sbt package |
提交给Spark
启动Kafka Producer CLI(在前一章中介绍),创建一个名为my-first-topic的新主题,并提供一些示例消息,如下所示。
Another spark test message |
运行以下命令将应用程序提交到火花控制台。
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming -kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark -kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads> |
这个应用程序的输出示例如下所示。
spark console messages .. (Test,1) (spark,1) (another,1) (message,1) spark console message .. |
卡夫卡支持当今许多最好的工业应用。在本章中,我们将简要介绍一些卡夫卡最显着的应用。
Twitter是一种在线社交网络服务,提供发送和接收用户推文的平台。注册用户可以阅读和发布推文,但未注册的用户只能阅读推文。Twitter使用Storm-Kafka作为其流处理基础设施的一部分。
在LinkedIn上使用Apache Kafka来获取活动流数据和运营指标。Kafka消息传递系统可以为LinkedIn提供LinkedIn Newsfeed,LinkedIn Today等在线消息消费以及除Hadoop等离线分析系统之外的各种产品。卡夫卡的强大耐用性也是与LinkedIn相关的关键因素之一。
Netflix
Netflix是美国的按需互联网流媒体跨国提供商。Netflix使用Kafka进行实时监控和事件处理。
Mozilla
Mozilla是一个自由软件社区,由Netscape成员于1998年创建。Kafka即将取代Mozilla当前的生产系统的一部分,以收集来自最终用户浏览器的性能和使用数据,用于诸如遥测,测试引导等项目。
Oracle
Oracle通过名为OSB(Oracle Service Bus)的企业服务总线产品为Kafka提供本地连接,该产品允许开发人员利用OSB内置中介功能来实现分阶段数据管道。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。