赞
踩
NiFi用来处理数据集成场景的数据分发,BS结构的图形化。Apache NiFi 是一个易于使用,功能强大且可靠的系统,用于处理和分发数据。可以自动化管理系统间的数据流。它使用高度可配置的指示图来管理数据路由、转换和系统中介逻辑,支持从多种数据源动态拉取数据。NiFi原来是NSA的一个项目,目前已经代码开源,是Apache基金会的顶级项目之一。
NiFi是基于Java的,使用Maven支持包的构建管理。 NiFi基于Web方式工作,后台在服务器上进行调度。用户可以将数据处理定义为一个流程,然后进行处理,NiFi后台具有数据处理引擎、任务调度等组件。
简单的说,NiFi就是为了解决不同系统间数据自动流通问题而建立的。
虽然dataflow这个术语在各种场景都有被使用,但我们在这里使用它来表示不同系统间的自动化的可管理的信息流。自企业拥有多个系统开始,一些系统会有数据生成,一些系统要消费数据,而不同系统之间数据的流通问题就出现了。这些问题出现的相应的解决方案已经被广泛的研究和讨论,其中企业集成eip就是一个全面且易于使用的方案。
数据转换、路由、数据库操作、属性提取、系统交互、数据提取、数据监听和发送、拆分聚合、HTTP。为了创建有效的数据流处理流程,用户必须了解可用的处理器类型。NiFi包含许多不同的处理器。这些处理器提供了可从众多不同系统中提取数据,路由,转换,处理,拆分和聚合数据以及将数据分发到多个系统的功能。
几乎每个NiFi版本中可用的处理器数量都在增加。因此,我们不会尝试在这里介绍每一个可用的处理器,但我们将重点介绍一些最常用的处理器,按功能对它们进行分类。
每个FlowFile都拥有多个属性,这些属性将在FlowFile的生命周期中发生变化。FlowFile的概念非常强大,并提供三个主要优点。
每个FlowFile都有一组属性:
需要注意的是uuid,entryDate,lineageStartDate,和fileSize属性是系统生成的,不能改变。
NiFi提供了几种不同的处理器,用于从FlowFiles中提取属性。我们在之前的处理器分类中已经提到过。这是构建自定义处理器的一个非常常见的用例,其实编写处理器是为了理解特定的数据格式,并从FlowFile的内容中提取相关信息,创建属性来保存该信息,以便可以决定如何路由或处理数据。
NIFI除了提供能够将特定信息从FlowFile内容提取到属性中的处理器之外,NIFI还允许用户将自定义属性添加到每个FlowFile中的特定位置。UpdateAttribute就是专为此目的而设计。用户可以通过单击属性选项卡右上角的+按钮,在配置对话框中向处理器添加新属性。然后UI会提示用户输入属性的名称,然后输入值。对于此UpdateAttribute处理的每个FlowFile,都会添加用户自定义属性。Attribute的名称将与添加的属性的名称相同。
属性的值也可以包含表达式语言。这样就允许基于其他属性修改或添加属性。例如,如果我们想要将处理文件的主机名和日期添加到文件名之前,我们可以通过添加 h o s t n a m e ( ) − {hostname()}- hostname()−{now():format(‘yyyy-dd-MM’)}-${filename}来实现来实现。刚开始大家可能不太理解这是什么意思,在后续的课程中我们会进行讲解。
除了添加一组自定义的属性外,UpdateAttribute还具有一个高级UI,允许用户配置一组规则,以便在何时添加属性。要访问此功能,请在配置对话框的属性选项卡中,单击Advanced对话框底部的按钮。将弹出此处理器特定的UI界面。在此UI中,用户可以配置规则引擎,实质上是指定必须匹配的规则,以便将已配置的属性添加到FlowFile。
NiFi最强大的功能之一是能够根据属性路由FlowFiles。执行此操作的主要机制是RouteOnAttribute。此处理器与UpdateAttribute一样,通过添加用户自定义的属性进行配置。通过单击处理器的配置对话框中属性选项卡右上角的+按钮,可以添加任意数量的属性。
每个FlowFile的属性将与配置的属性进行比较,以确定FlowFile是否满足指定的条件。每个属性的值应该是一个表达式语言并返回一个布尔值。下面的【表达式语言/在Property值中使用attribute】会对表达式语言进行补充。
在评估针对FlowFile的属性提供的表达式语言之后,处理器根据所选的路由策略确定如何路由FlowFile。最常见的策略是"Route to Property name"策略。选择此策略后,处理器将为配置的每个属性公开关系(可拖拽出去指向下一个处理器)。如果FlowFile的属性满足给定的表达式,则FlowFile的副本将路由到相应的Relationship。例如,如果我们有一个名为"begin-with-r"的新属性和值"$ {filename:startsWith(‘r’)}"的表达式,那么任何文件名以字母’r’开头的FlowFile将是路由到那个关系。所有其他FlowFiles将被路由到"unmatched"关系。
当我们从FlowFiles的内容中提取属性并添加用户定义的属性时,除非我们有一些可以使用它们的机制,否则它们不会作为运算符进行计算。NiFi表达式语言允许我们在配置流时访问和操作FlowFile属性值。并非所有处理器属性都允许使用表达式语言,但很多处理器都可以。为了确定属性是否支持表达式语言,用户可以将鼠标悬停在处理器配置对话框的属性选项卡中的[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Jcq68veG-1622119703458)(./images/问号.png)]图标上,然后会有一个提示,显示属性的描述,默认值(如果有)以及属性是否支持表达式语言。
对于支持表达式语言的属性,可以通过在 开始标记 ${ 和结束标记 } 中添加表达式来使用它。表达式可以像属性名一样简单。例如,要引用uuid Attribute,我们可以简单地使用 u u i d 。 如 果 属 性 名 称 以 字 母 以 外 的 任 何 字 符 开 头 , 或 者 包 含 除 数 字 , 字 母 , 句 号 ( . ) 或 下 划 线 ( ) 以 外 的 字 符 , 则 需 要 加 引 号 。 例 如 , {uuid}。如果属性名称以字母以外的任何字符开头,或者包含除数字,字母,句号(.)或下划线(_)以外的字符,则需要加引号。例如, uuid。如果属性名称以字母以外的任何字符开头,或者包含除数字,字母,句号(.)或下划线()以外的字符,则需要加引号。例如,{My Attribute Name} 将无效,但${‘My Attribute Name’}将引用属性My Attribute Name。
除了引用属性值之外,我们还可以对这些属性执行许多功能和比较。例如,如果我们想检查filename属性是否不分大小写(大写或小写)地包含字母’r’,我们可以使用表达式来完成${filename:toLower():contains(‘r’)}。请注意,函数由冒号分隔。我们可以将任意数量的函数链接在一起,以构建更复杂的表达式。重要的是要明白,即使我们正在调用filename:toLower(),这也不会改变filename属性的值,而只是返回给我们一个新的值。
我们也可以在一个表达式中嵌入另一个表达式。例如,如果我们想要将attr1 Attribute 的值与attr2 Attribute的值进行比较,我们可以使用以下表达式来执行此操作:${attr1:equals( ${attr2} )}。
表达式语言包含许多不同的函数,官方文档Expression Language Guide。
此外,此表达式语言指南内置于应用程序中,以便用户可以轻松查看哪些功能可用,并在输入时查看其文档。设置支持表达式语言的属性的值时,如果光标位于表达式语言的开始和结束标记内,则在关键字上按 Ctrl + Space 将弹出所有可用的函数(快捷键冲突被占用会无法使用此功能),并将提供自动填充的功能。单击或使用键盘上下键指向弹出窗口中列出的某个功能会有提示,提示解释了该功能的作用,它所期望的参数以及函数的返回类型。
除了使用FlowFile属性外,还可以定义表达式语言使用的自定义属性。定义自定义属性为处理和配置数据流提供了额外的灵活性。
ExecuteScript : 执行脚本处理器, 支持: clojure, ecmascript, groovy, lua, python, ruby
QueryDatabaseTable : 数据库查询处理器, 支持: mysql
ConvertAvroToJSON : avro 数据格式转换为 json
SplitJson : 将JSON文件拆分为多个单独的FlowFiles, 用于由JsonPath表达式指定的数组元素。
EvaluateJsonPath : 根据FlowFile的内容评估一个或多个JsonPath表达式。这些表达式的结果将分配给FlowFile属性,或者写入FlowFile本身的内容,具体取决于处理器的配置。
ReplaceText : 文本组装与替换, 支持正则表达式
PutHDFS : 将FlowFile数据写入Hadoop分布式文件系统(HDFS)
PutHiveQL : 执行hive ddl/dml命令, 如: insert, update
PublishKafka_2_0 : 根据配置将消息发送到kafka topic
SelectHiveQL : 执行hive select 语句并获取结果
PutSQL : 执行SQL的insert或update命令
GetFile : 从目录中的文件创建FlowFiles。
PutFile : 将FlowFile数据写入文件
GetHDFS : 从Hadoop分布式文件系统获取文件
CaptureChangeMySQL : 从MySQL数据库中检索更改数据捕获(CDC)事件。CDC事件包括INSERT,UPDATE,DELETE操作。事件作为单个流文件输出,这些文件按操作发生的时间排序。
ExecuteStreamCommand : 一般用于执行sh脚本
当数据在NiFi中流经您的数据流处理流程时,了解您的系统执行情况以评估您是否需要更多资源以及评估当前资源的运行状况非常重要。NiFi提供了一些监控系统的机制。
在组件工具栏下的NiFi屏幕顶部附近有一个条形,称为状态栏。它包含一些关于NiFi当前健康状况的重要统计数据。活动线程的数量可以指示NiFi当前的工作状态,排队统计数据表示当前在整个流程中排队的FlowFile数量以及这些FlowFiles的总大小。
如果NiFi实例位于群集中,我们还会在此处看到一个指示器,告诉我们群集中有多少节点以及当前连接的节点数。在这种情况下,活动线程的数量和队列大小指示当前连接的所有节点的所有总和。
画布上的每个处理器,进程组(Group)和远程进程组都提供了有关组件处理了多少数据的若干统计信息。这些统计信息提供有关在过去五分钟内处理了多少数据的信息。这是一个滚动窗口,允许我们查看处理器消耗的FlowFiles数量,以及处理器发出的FlowFiles数量。
处理器之间的连接还会显示当前排队的项目数。
查看这些指标的历史值以及(如果是群集的)不同节点相互比较也可能很有价值。我们可以右键单击组件并选择Stats菜单项查看此信息,nifi会向我们展示一个图表,该图表涵盖自NiFi启动以来的时间,或最多24小时,以较少者为准(通过更改属性文件中的配置,可以扩展或减少此处显示的时间量)
在此对话框的右上角有一个下拉列表,允许用户选择他们正在查看的指标。底部的图表允许用户选择图表的较小部分进行放大。
除了为每个组件提供的统计信息之外,用户还想知道流程是否出现问题。虽然我们可以监视日志中的任何内容,但在屏幕上弹出通知会更方便。如果处理器将日志级别设置为WARNING或ERROR,我们将在处理器的右上角看到"Bulletin Indicator"。此指示器看起来像一个粘滞便笺,将在事件发生后持续显示五分钟。将鼠标悬停在公告上可提供有关所发生情况的信息,以便用户无需筛选日志消息即可查找。如果是在集群中,公告还会指示是集群中的哪个节点发布了公告。我们还可以在处理器的"配置"对话框的"设置"选项卡中更改公告的日志级别。
如果框架发布了公告,我们还会在屏幕右上方突出显示公告指示符。在全局菜单中是公告板选项(Bulletin Board)。单击此选项我们将看到公告板,在那里我们可以看到NiFi实例中出现的所有公告,并可以根据组件,消息等进行过滤。
NiFi对其摄取的每个数据保持非常精细的细节。当数据通过系统处理并被转换,路由,拆分,聚合和分发到其他端点时,这些信息都存储在NiFi的Provenance Repository中。为了搜索和查看此信息,我们可以从全局菜单中选择数据源(Data Provenance)。会弹出一个表格,列出我们搜索过的Provenance事件:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-hhChdR96-1622119768301)(./images/provenance-table.png)]
此表列出了最近发生的1,000个Provenance事件(尽管事件发生后可能需要几秒钟才能处理信息)。在此对话框中,有一个Search按钮,允许用户搜索特定处理器发生的事件,按文件名或UUID或其他几个字段搜索特定的FlowFile。在nifi.properties文件中提供了配置这些属性中的哪些属性可编入索引或可作搜索条件的功能。此外,配置文件还允许您指定将被索引的FlowFile属性。因此,您可以指定哪些属性对您的特定数据流很重要,并使这些属性可搜索。
一旦我们执行了搜索,我们的表格将仅展示与搜索条件匹配的事件。在这里,我们可以选择细节图标[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-4QuQG0E6-1622119768303)(
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。