当前位置:   article > 正文

Apache NiFi指南

apache nifi
一、什么是NiFi

NiFi是美国国家安全局开发并使用了8年的可视化数据集成产品,2014年NAS将其贡献给了Apache社区,2015年成为Apache顶级项目。

是一个基于Web图形界面,通过拖拽、连接、配置完成基于流程的编程,实现数据采集等功能的数据处理与分发系统。

说明:

  • Apache NiFi 是为数据流设计

  • 支持高度可配置指示图的数据路由、转换和系统中介逻辑

  • 支持从多种数据源动态拉取数据

  • NiFi是基于Java的,使用Maven支持包的构建管理

  • NiFi基于Web方式工作,后台在服务器上进行调度

  • 用户可以为数据处理定义为一个流程,然后进行处理,后台具有数据处理引擎、任务调度等组件。

二、NiFi架构

NiFi在主机操作系统上的JVM中执行。JVM上NiFi的主要组件如下:

1.FlowFile(信息流) FlowFile代表每个被系统处理的数据对象。每个FlowFile由两部分组成:属性和内容。内容是数据本身,属性是与数据相关的key-value键值对,用于描述数据 ​ 

2.Processor(黑盒) FlowFile Processor数据流处理器是nifi中真正处理工作的,例可以用来创建、发送、接受、转换、路由、分割、合并、处理 FlowFiles,数据流处理器可以接收上游的flow的attribute,以及content。数据流处理器可以处理0至多个流,并给出相应的反馈,比如提交或者回滚。Processor可以访问给定的FlowFile ​ 

3.Connection(有界缓冲区) 提供Processors之间的连接,作为Processors之间的缓冲队列。用来定义Processors之间的执行关系,并允许不同Processors之间以不同的速度进行交互 ​ 

4.Process Group(子网) 一个特定集合的Processors与他们之间的连接关系形成一个ProcessGroup ​ 

5.Flow Controller(调度) 流量控制器负责管理有多少处理器的连接和管理线程以及分配资源,他作为不同处理器之间的数据流交换代理 ​ 

6.Reporting Task Reporting Task是一种后台运行的组件,可将Metrics指标、监控信息、内部NiFi状态发送到外部 ​ 7.Funnel 漏斗是一个NiFi组件,用于将来自多个连接的数据组合成单个连接
三、NiFi的参数配置及目录结构
1.解压安装以后的Nifi目录如下:

  • bin目录下放置了 整个系统的控制脚本

  • lib目录下放置的Nifi自带的一个个nar程序包(其实就是Nifi内置的一个个组件)和它本身的程序所需要的加载编译等等的底层包

  • state是运行期间的一些数据

  • docs和work 是Nifi的一些官方文档和学习样例

2.conf目录下放置的是Nifi的配置文件如下:

​基本的使用,只需要注意两个文件就好:

  • nifi-properties文件,这个文件基本就是整个Nifi的配置中心,里面包含很多的基本配置,例如启动端口啊、内存分配啊等等

  • flow.xml.gz,这个文件主要是你整个nifi使用的全记录,解释的通俗点,如果你遇到了这么一个问题 “ 我在一台机器上部署了一个Nifi,并且进行了一段时间的使用,建立了很多流程和功能,这时候,需要换到别的机器的Nifi上进行开发”,你建立那些肯定不能挨个再在新环境上来一遍啊,这时候只需要把这个flow.xml.gz替换到新机器的Nifi环境里,重启新环境的Nifi就可以了。

3.logs目录里放的是Nifi运行后的主要的日志:
  • nifi-app.log 整个应用的运行日志

  • nifi-bootstrap.log 底层类加载一系列的日志

  • nifi-user.log 用户的访问操作日志

四、Apache NiFi处理器的类别介绍

NiFi包含许多不同的处理器。这些处理器提供了可从众多不同系统中提取数据、路由、转换、处理、拆分和聚合数据以及将数据分发到多个系统的功能。

介绍一些最常用的处理器,按功能对它们进行分类。

1.数据转换
- CompressContent:压缩或解压
- ConvertCharacterSet:将用于编码内容的字符集从一个字符集转换为另一个字符集
- EncryptContent:加密或解密
- ReplaceText:使用正则表达式修改文本内容
- TransformXml:应用XSLT转换XML内容
- JoltTransformJSON:应用JOLT规范来转换JSON内容
2.路由和调解
- ControlRate:限制流程中数据流经某部分的速率
- DetectDuplicate:根据一些用户定义的标准去监视发现重复的FlowFiles。通常与HashContent一起使用
- DistributeLoad:通过只将一部分数据分发给每个用户定义的关系来实现负载平衡或数据抽样
- MonitorActivity:当用户定义的时间段过去而没有任何数据流经此节点时发送通知。(可选)在数据流恢复时发送通知。
- RouteOnAttribute:根据FlowFile包含的属性路由FlowFile。
- ScanAttribute:扫描FlowFile上用户定义的属性集,检查是否有任何属性与用户定义的字典匹配。
- RouteOnContent:根据FlowFile的内容是否与用户自定义的正则表达式匹配。如果匹配,则FlowFile将路由到已配置的关系。
- ScanContent:在流文件的内容中搜索用户定义字典中存在的术语,并根据这些术语的存在或不存在来路由。字典可以由文本条目或二进制条目组成。
- ValidateXml:以XML模式验证XML内容; 根据用户定义的XML Schema,判断FlowFile的内容是否有效,进而来路由FlowFile。
3.数据库访问
- ConvertJSONToSQL:将JSON文档转换为SQL INSERT或UPDATE命令,然后将其传递给PutSQL处理器
- ExecuteSQL:执行用户定义的SQL SELECT命令,将结果写入Avro格式的FlowFile
- PutSQL:通过执行FlowFile内容定义的SQL DDM语句来更新数据库
- SelectHiveQL:针对Apache Hive数据库执行用户定义的HiveQL SELECT命令,将结果以Avro或CSV格式写入FlowFile
- PutHiveQL:通过执行由FlowFile的内容定义的HiveQL DDM语句来更新Hive数据库
4.属性提取
- EvaluateJsonPath:用户提供JSONPath表达式(与用于XML解析/提取的XPath类似),然后根据JSON内容评估这些表达式,以替换FlowFile内容或将该值提取到用户命名的属性中。
- EvaluateXPath:用户提供XPath表达式,然后根据XML内容评估这些表达式,以替换FlowFile内容,或将该值提取到用户命名的属性中。
- EvaluateXQuery:用户提供XQuery查询,然后根据XML内容评估此查询,以替换FlowFile内容或将该值提取到用户命名的属性中。
- ExtractText:用户提供一个或多个正则表达式,然后根据FlowFile的文本内容进行评估,然后将提取的值作为用户命名的属性添加。
- HashAttribute:对用户定义的现有属性列表的并置执行散列函数。
- HashContent:对FlowFile的内容执行散列函数,并将哈希值作为属性添加。
- IdentifyMimeType:评估FlowFile的内容,以便确定FlowFile封装的文件类型。该处理器能够检测许多不同的MIME类型,例如图像,文字处理器文档,文本和压缩格式等
- UpdateAttribute:更新Attribute
5.系统交互
- ExecuteProcess:运行用户定义的Operating System命令。进程的StdOut被重定向,使得写入StdOut的内容成为出站FlowFile的内容。该处理器是源处理器 - 其输出预计将生成一个新的FlowFile,并且系统调用预期不会接收输入。为了向进程提供输入,请使用ExecuteStreamCommand处理器。
- ExecuteStreamCommand:运行用户定义的Operating System命令。FlowFile的内容可选地流式传输到进程的StdIn。写入StdOut的内容成为hte出站FlowFile的内容。该处理器不能使用源处理器 - 它必须被馈送进入FlowFiles才能执行其工作。要使用源处理器执行相同类型的功能,请参阅ExecuteProcess Processor。
6.数据接入
- GetFile:将文件的内容从本地磁盘(或网络连接的磁盘)流入NiFi。
- GetFTP:通过FTP将远程文件的内容下载到NiFi中。
- GetSFTP:通过SFTP将远程文件的内容下载到NiFi中。
- GetJMSQueue:从JMS队列中下载消息,并根据JMS消息的内容创建一个FlowFile。也可以将JMS属性复制为属性。
- GetJMSTopic:从JMS主题下载消息,并根据JMS消息的内容创建一个FlowFile。也可以将JMS属性复制为属性。此处理器支持持久和非持久订阅。
- GetHTTP:将基于HTTP或HTTPS的远程URL的内容下载到NiFi中。处理器将记住ETag和Last-Modified Date,以确保数据不会持续摄取。
- ListenHTTP:启动HTTP(或HTTPS)服务器并监听传入连接。对于任何传入的POST请求,请求的内容将作为FlowFile写出,并返回200响应码。
- ListenUDP:侦听传入的UDP数据包,并为每个数据包或每包数据包创建一个FlowFile(取决于配置),并将FlowFile发送到成功关系。
- GetHDFS:在HDFS中监视用户指定的目录。每当一个新的文件进入HDFS,它被复制到NiFi中。该处理器仅在主节点上运行,如果在群集中运行。为了从HDFS中复制数据并保持原样,或者从集群中的多个节点流出数据,请参阅ListHDFS处理器。
- ListHDFS / FetchHDFS:ListHDFS监视HDFS中用户指定的目录,并发出一个FlowFile,其中包含遇到的每个文件的文件名。然后它通过分布式缓存通过整个NiFi集群来保持此状态。然后,这些FlowFiles可以跨群集扇出,并发送到FetchHDFS处理器,该处理器负责获取这些文件的实际内容,并发出包含从HDFS获取的内容的FlowFiles。
- FetchS3Object:从Amazon Web Services(AWS)简单存储服务(S3)中获取对象的内容。出站FlowFile包含从S3接收的内容。
- GetKafka:从Apache Kafka获取消息,专门用于0.8.x版本。消息可以作为每个消息的FlowFile发出,或者可以使用用户指定的分隔符进行批处理。
- GetMongo:对MongoDB执行用户指定的查询,并将内容写入新的FlowFile。
- GetTwitter:允许用户注册一个过滤器来收听Twitter”garden hose” 或Enterprise endpoint,为收到的每个推文创建一个FlowFile。
7.数据出口/发送数据
- PutEmail:向配置的收件人发送电子邮件。FlowFile的内容可选择作为附件发送。
- PutFile:将 FlowFile的内容写入本地(或网络连接)文件系统上的目录。
- PutFTP:将 FlowFile的内容复制到远程FTP服务器。
- PutSFTP:将 FlowFile的内容复制到远程SFTP服务器。
- PutJMS:将 FlowFile的内容作为JMS消息发送到JMS代理,可选择基于属性添加JMS属性。
- PutSQL:作为SQL DDL语句(INSERT,UPDATE或DELETE)执行 FlowFile的内容。FlowFile的内容必须是有效的SQL语句。可以使用属性作为参数,以便FlowFile的内容可以参数化SQL语句,以避免SQL注入攻击。
- PutKafka:将一个FlowFile的内容作为消息传递给Apache Kafka,专门用于0.8.x版本。FlowFile可以作为单个消息发送,或者可以指定分隔符,例如新行,以便为单个FlowFile发送许多消息。
- PutMongo:将 FlowFile的内容作为INSERT或UPDATE发送到Mongo。
8.分割和聚合
- SplitText:SplitText采用单个FlowFile,其内容为文本,并根据配置的行数将其拆分为1个或更多个FlowFiles。例如,处理器可以配置为将FlowFile拆分成许多FlowFiles,每个FlowFiles只有1行。
- SplitJson:允许用户将由数组或许多子对象组成的JSON对象拆分为每个JSON元素的FlowFile。
- SplitXml:允许用户将XML消息拆分成许多FlowFiles,每个FlowFiles都包含原始的段。当通过“包装”元素连接几个XML元素时,通常使用这种方法。然后,该处理器允许将这些元素分割成单独的XML元素。
- UnpackContent:解压缩不同类型的归档格式,如ZIP和TAR。存档中的每个文件随后作为单个FlowFile传输。
- MergeContent:该处理器负责将许多FlowFiles合并到一个FlowFile中。FlowFiles可以通过将其内容与可选的页眉,页脚和分隔符连接起来,或者通过指定ZIP或TAR等存档格式来合并。FlowFiles可以根据一个共同的属性进行合并,如果被其他Splitting进程拆分,则可以进行“碎片整理”。每个bin的最小和最大大小都是基于元素数量或FlowFiles内容的总大小进行用户指定的,并且可以分配可选的超时,以便FlowFiles只会等待其指定的时间。
- SegmentContent:根据一些配置的数据大小,将FlowFile分段到潜在的许多较小的FlowFiles中。拆分不是针对任何分隔符而是基于字节偏移来执行的。这是在传送FlowFiles之前使用的,以便通过并行发送许多不同的片段来提供更低的延迟。另一方面,这些FlowFiles可以由MergeContent处理器使用碎片整理模式进行重新组合。
- SplitContent:将单个FlowFile拆分为潜在的许多FlowFiles,类似于SegmentContent。但是,对于SplitContent,分割不是在任意字节边界上执行,而是指定要分割内容的字符串。
9.HTTP
- GetHTTP:将基于HTTP或HTTPS的远程URL的内容下载到NiFi中。处理器将记住ETag和Last-Modified Date,以确保数据不会持续摄取。
- ListenHTTP:启动HTTP(或HTTPS)服务器并监听传入连接。对于任何传入的POST请求,请求的内容将作为FlowFile写出,并返回200个响应。
- InvokeHTTP:执行由用户配置的HTTP请求。该处理器比GetHTTP和PostHTTP更加通用,但需要更多的配置。该处理器不能用作源处理器,并且必须具有传入的FlowFiles才能被触发以执行其任务。
- PostHTTP:执行HTTP POST请求,发送FlowFile的内容作为消息的正文。这通常与ListenHTTP一起使用,以便在不能使用Site to Site的情况下(例如,当节点不能直接访问,但能够通过HTTP进行通信时)在两个不同的NiFi实例之间传输数据)。注意:HTTP可用作站点到站点运输协议除了现有的RAW Socket传输。它还支持HTTP代理。推荐使用HTTP Site to Site,因为它具有更高的可扩展性,并且可以使用输入/输出端口提供双向数据传输,具有更好的用户认证和授权。
- HandleHttpRequest / HandleHttpResponse:HandleHttpRequest处理器是一个源处理器,它与ListenHTTP类似地启动嵌入式HTTP(S)服务器。但是,它不会向客户端发送响应。相反,FlowFile与HTTP请求的主体一起发送,作为其作为属性的所有典型Servlet参数,标头等的内容和属性。HandleHttpResponse可以在FlowFile处理完成后将响应发送回客户端。这些处理器总是被期望彼此结合使用,并允许用户在NiFi内直观地创建Web服务。
五、Apache NiFi FlowFile属性

每个FlowFile都拥有多个属性,这些属性将在FlowFile的生命周期中发生变化。FlowFile三个主要优点:

- 它允许用户在流中做出路由决策,以便满足某些条件的FlowFiles可以与其他FlowFiles进行不同地处理。这可以由RouteOnAttribute和其他类似的处理器完成的。
- 利用属性配置处理器:处理器的配置依赖于数据本身。例如,PutFile能够使用Attributes来知道每个FlowFile的存储位置,而每个FlowFile的目录和文件名属性可能不同(结合表达式语言,比如每个流都有filename属性,组件中就可以这样配置文件名:${filename},就可以获取到当前FlowFIle中filename的属性值)。
- 属性提供了有关数据的极有价值的上下文。在查看FlowFile的Provenance数据时非常有用,它允许用户搜索符合特定条件的Provenance数据,并且还允许用户在检查Provenance事件的详细信息时查看此上下文。通过简单地浏览该上下文,用户能够知道为什么以这样或那样的方式处理数据。
1、共同属性
- 每个FlowFile都有一组属性:
-- filename:可用于将数据存储到本地或远程文件系统的文件名。
-- path:可用于将数据存储到本地或远程文件系统的目录的名称。
-- uuid:一个通用唯一标识符,用于区分FlowFile与系统中的其他FlowFiles。
-- entryDate:FlowFile进入系统的日期和时间(即已创建)。此属性的值是一个数字,表示自1970年1月1日午夜(UTC)以来的毫秒数。
  lineageStartDate:任何时候克隆,合并或拆分FlowFile,都会导致创建子FlowFile。该值表示当前FlowFile最早的祖先进入系统的日期和时间。该值是一个数字,表示自1970年1月1日午夜(UTC)以来的毫秒数。
-- fileSize:此属性表示FlowFile内容占用的字节数。
- 需要注意的是uuid,entryDate,lineageStartDate,和fileSize属性是系统生成的,不能改变
2、提取属性
- NiFi提供了几种不同的处理器,用于从FlowFiles中提取属性。这是构建自定义处理器的一个非常常见的用例,其实编写处理器是为了理解特定的数据格式,并从FlowFile的内容中提取相关信息,创建属性来保存该信息,以便可以决定如何路由或处理数据。
3、添加用户自定义的属性
- NIFI除了提供能够将特定信息从FlowFile内容提取到属性中的处理器之外,NIFI还允许用户将自定义属性添加到每个FlowFile中的特定位置。UpdateAttribute就是专为此目的而设计。用户可以通过单击属性选项卡右上角的+按钮,在配置对话框中向处理器添加新属性。然后UI会提示用户输入属性的名称,然后输入值。对于此UpdateAttribute处理的每个FlowFile,都会添加用户自定义属性。Attribute的名称将与添加的属性的名称相同。
- 属性的值也可以包含表达式语言。这样就允许基于其他属性修改或添加属性。例如,如果我们想要将处理文件的主机名和日期添加到文件名之前,我们可以通过添加{now():format(‘yyyy-dd-MM’)}-${filename}来实现来实现。刚开始大家可能不太理解这是什么意思,在后续的课程中我们会进行讲解。
- 除了添加一组自定义的属性外,UpdateAttribute还具有一个高级UI,允许用户配置一组规则,以便在何时添加属性。要访问此功能,请在配置对话框的属性选项卡中,单击Advanced对话框底部的按钮。将弹出此处理器特定的UI界面。在此UI中,用户可以配置规则引擎,实质上是指定必须匹配的规则,以便将已配置的属性添加到FlowFile。
4、属性路由
- NiFi最强大的功能之一是能够根据属性路由FlowFiles。执行此操作的主要机制是RouteOnAttribute。此处理器与UpdateAttribute一样,通过添加用户自定义的属性进行配置。通过单击处理器的配置对话框中属性选项卡右上角的+按钮,可以添加任意数量的属性。
- 每个FlowFile的属性将与配置的属性进行比较,以确定FlowFile是否满足指定的条件。每个属性的值应该是一个表达式语言并返回一个布尔值。下面的【表达式语言/在Property值中使用attribute】会对表达式语言进行补充。
- 在评估针对FlowFile的属性提供的表达式语言之后,处理器根据所选的路由策略确定如何路由FlowFile。最常见的策略是"Route to Property name"策略。选择此策略后,处理器将为配置的每个属性公开关系(可拖拽出去指向下一个处理器)。如果FlowFile的属性满足给定的表达式,则FlowFile的副本将路由到相应的Relationship。例如,如果我们有一个名为"begin-with-r"的新属性和值"$ {filename:startsWith(‘r’)}"的表达式,那么任何文件名以字母’r’开头的FlowFile将是路由到那个关系。所有其他FlowFiles将被路由到"unmatched"关系。
5、表达式语言/在Property值中使用attribute
- 当我们从FlowFiles的内容中提取属性并添加用户定义的属性时,除非我们有一些可以使用它们的机制,否则它们不会作为运算符进行计算。NiFi表达式语言允许我们在配置流时访问和操作FlowFile属性值。并非所有处理器属性都允许使用表达式语言,但很多处理器都可以。为了确定属性是否支持表达式语言,用户可以将鼠标悬停在处理器配置对话框的属性选项卡中的图标上,然后会有一个提示,显示属性的描述,默认值(如果有)以及属性是否支持表达式语言。
- 对于支持表达式语言的属性,可以通过在 开始标记 ${ 和结束标记 } 中添加表达式来使用它。表达式可以像属性名一样简单。例如,要引用uuid Attribute,我们可以简单地使用 {My Attribute Name} 将无效,但${‘My Attribute Name’}将引用属性My Attribute Name。
- 除了引用属性值之外,我们还可以对这些属性执行许多功能和比较。例如,如果我们想检查filename属性是否不分大小写(大写或小写)地包含字母’r’,我们可以使用表达式来完成${filename:toLower():contains(‘r’)}。函数由冒号分隔。我们可以将任意数量的函数链接在一起,以构建更复杂的表达式。即使我们正在调用filename:toLower(),这也不会改变filename属性的值,而只是返回给我们一个新的值。
- 我们也可以在一个表达式中嵌入另一个表达式。如果想要将attr1 Attribute 的值与attr2 Attribute的值进行比较,我们可以使用以下表达式来执行此操作:${attr1:equals( ${attr2} )}。
- 表达式语言包含许多不同的函数,官方文档Expression Language Guide。
- 此外,此表达式语言指南内置于应用程序中,以便用户可以轻松查看哪些功能可用,并在输入时查看其文档。设置支持表达式语言的属性的值时,如果光标位于表达式语言的开始和结束标记内,则在关键字上按 Ctrl + Space 将弹出所有可用的函数(快捷键冲突被占用会无法使用此功能),并将提供自动填充的功能。单击或使用键盘上下键指向弹出窗口中列出的某个功能会有提示,提示解释了该功能的作用,它所期望的参数以及函数的返回类型。
6、表达式语言中的自定义属性
- 除了使用FlowFile属性外,还可以定义表达式语言使用的自定义属性。定义自定义属性为处理和配置数据流提供了额外的灵活性
六、NiFi操作
Nifi的页面使用

Nifi默认启动端口是8080,使用windows下就bin目录下双击 run-nifi.bat ,Linux下就在/bin目录下,执行 ./nifi.sh start

主页面介绍

img

进入主页面后,它整体是一个画布的形式,最上方是公共导航栏,左侧那个Navigate没啥用,就是一个全局视角,下面的Operate是组件控制面板,可以进行单个组件的控制,也可以选中一片组件进行统一的启动,停止等。

面板介绍

img

导航栏分为两部分,上半部分是提供给我们工作的,下半部分是对整个Nifi环境下的一个监控信息。

img

是处理器(Processor),用鼠标单击拖出到画布上,便会出现处理器(Processor)菜单

img

是组,什么叫组呢,当你拉了很多处理器(Processor),形成了一个完整的流程的时候,我们可以单独把这块划分成一个整体,这时候就要用组把它包裹起来。

img

有了组以后,组和组之间可能也需要联通、通信,这时候就可以用入口和出口,把它们放在组内

img

需要配合Operate中的 上传使用,主要是用来迁移模板的

img

是集群Nifi进行数据通信的时候用的

img

是个便签,用来写个备注

img

是漏斗,主要作用就是把四散的数据可以汇集在一起。

Nifi的基本工作方式

Nifi其实就是一个数据接入、处理、清洗、分发的系统,它的工作方式就是将数据看作水管中的水,它是顺着某个流程管道流动,在这中间,可以在任意节点处堵截这个“水流”,并对它进行改造,然后放回管道继续向下流去。

这里的节点,其实就是Nifi的Processor,然后,节点和节点直接的通道,在Nifi里叫Relationship。在nifi中,都是一个个的流程(处理器+管道),形成一个数据的处理通路。

img

像这个例子,GetFile组件负责从一个文件里读取数据,然后把读到的数据通过管道传到ExecuteScript组件(这个组件支持用脚本代码处理数据),经过ExecuteScript之后,流向PutFile组件(将数据写入到指定文件中)。

选择处理器

img

通过“组件商城” 图标进行处理器的选择, 处理器是最常用的组件,因为它负责数据的流入,流出,路由和操作。有许多不同类型的处理器。实际上,这是NiFi中非常常见的扩展点,这意味着许多供应商可能会实现自己的处理器来执行其所需的任何功能。将处理器拖动到画布上时,会向用户显示一个对话框:

img

这里可以通过处理器的包、处理器的属性、处理器的名称等维度进行组件的筛选、选择。选中后,双击则可拖拉至画布中。

组件状态

img

  • 状态:显示处理器的当前状态。以下指标是可能的:

    • img

      正在运行:处理器当前正在运行。

    • img

      已停止:处理器有效并已启用但未运行。

    • img

      无效:处理器已启用但当前无效且无法启动。将鼠标悬停在此图标上将提供工具提示,指示处理器无效的原因。一般情况下是需要我们完成必须的配置

    • img

      已禁用:处理器未运行,在启用之前无法启动。此状态不表示处理器是否有效。

  • 名称:这是处理器的用户定义名称。默认情况下组件的名称与它的Type相同。在示例中,此值为"ExecuteGroovyScript",是一个专门用于执行Groovy脚本的组件。

  • 任务:此处理器当前正在执行的任务数。此数字受处理器配置对话框的计划选项卡中的并发任务设置的约束。在这里,我们可以看到处理器当前正在执行一项任务。如果NiFi实例是集群的,则此值表示当前正在集群中的所有节点上执行的任务数。

  • 实时日志:这里是用于监控当前处理器状态的,当处理器内部出现问题,一般会在此处显示错误日志

  • 数据流入流出看板:这里主要是展示处理数据过程中数据的流入流出情况,Nifi默认是5分钟更新一次页面上的看板情况,当然用户也可以在画布空白处,鼠标右键选择刷新,以达到实时查看的效果。

    • In:处理器从其传入处理器的队列中提取的数据量。此值表示为count size,其中count是从队列中提取的FlowFiles的数量,size是这些FlowFiles内容的总大小

    • Read/Write:处理器从磁盘读取并写入磁盘的FlowFile内容的总大小。这提供了有关此处理器所需的I/O性能的有用信息。某些处理器可能只读取数据而不写入任何内容,而某些处理器不会读取数据但只会写入数据。其他可能既不会读取也不会写入数据,而某些处理器会读取和写入数据。

    • Out:处理器已传输到其出站连接的数据量。这不包括处理器自行删除的FlowFiles,也不包括路由到自动终止的连接的FlowFiles。与上面的"In"指标一样,此值表示为count size,其中count是已转移到出站Connections的FlowFiles的数量,size是这些FlowFiles内容的总大小。

    • Tasks/Time:此处理器在过去5分钟内被触发运行的次数,以及执行这些任务所花费的时间。时间格式为hour:minute:second。请注意,所花费的时间可能超过五分钟,因为许多任务可以并行执行。例如,如果处理器计划运行60个并发任务,并且每个任务都需要一秒钟才能完成,则所有60个任务可能会在一秒钟内完成。但是,在这种情况下,我们会看到时间指标显示它需要60秒,而不是1秒。

组件配置

Nifi的处理器,一般都有四个标签页,分别是SETTINGS,SCHEDULING,PROPERITIES,COMMENTS(除了PROPERITIES之外,另外三个几乎是通用的)。

SETTINGS(通用配置)

img

  • Name是用户自定义的名称,Id、Type、Bundle这三个是这个处理器组件所属的代码包等基本信息,Enable是控制组件由启用到禁用

    img

    状态的切换。

  • 最右边包含自动终止关系(Automatically Terminate Relationships)部分。此处列出了处理器定义的每个关系及其描述。为了使处理器被视为有效且能够运行,处理器定义的每个关系必须连接到下游组件或自动终止。我们可以通过选中它,例如图中选中Failure一样,来表示我们弃用这个输出,也就是不需要它指向下一个组件,这样这个处理器就变成只有一个对外输出数据的Relationship了。

  • 接下来是两个用于配置Penalty Duration和Yield Duration的对话框。在处理一条数据(FlowFile)的正常过程中,可能发生事件,该事件指示处理器此时不能处理数据但是数据可以在稍后进行处理。在发生这种情况时,处理器可以选择Penalize FlowFile。这将阻止FlowFile在一段时间内被处理。例如,如果处理器要将数据推送到远程服务,但远程服务已经有一个与处理器指定的文件名同名的文件,则处理器可能会惩罚FlowFile。Penalty Duration允许DFM指定FlowFile应该受到多长时间的惩罚。默认值为30 seconds。(简单理解为推后一段时间再处理),类似的处理器可以确定存在某种情况,处理器没法进行处理数据。例如,如果处理器要将数据推送到远程服务并且该服务没有响应。这样的话处理器应该Yield,这将阻止处理器运行一段时间。通过设置Yield Duration来指定该时间段。默认值为1 second。

  • 最下方Bulletin Level可以简单的理解为组件的日志输出等级的选择,有选择地进行日志等级输出

SCHEDULING(处理器调度)

img

这一标签页,代表的就是如何驱动处理器,或者说处理器的运作方式:

  • 第一个配置选项是调度策略(Scheduling Strategy)。调度有三种可能的选项:

    • Timer driven:这是默认模式。处理器将定期运行。即多久运行一次,运行处理器的时间间隔由Run Schedule选项定义(当Run Schedule为0时,则代表瞬时执行)。

    • Event driven:选择此模式时,将由一个事件触发处理器运行,当FlowFiles进入连接此处理器的Connections时,将产生这个事件。此模式目前被认为是实验性的,并非所有处理器都支持。选择此模式时,Run Schedule选项不可配置。此外,只有此模式下Concurrent Tasks选项可以设置为0。这种情况,线程数仅受管理员配置的事件驱动线程池的大小限制。

    • CRON驱动:这是定时执行模式,即通过cron表达式,进行定时运行的控制。

  • 线程的分配(Concurrent Tasks):可以控制处理器将使用的线程数。换句话说,它控制此处理器应同时处理多少个FlowFiles。增加此值通常会使处理器在相同的时间内处理更多数据。但是,它是通过使用其他处理器无法使用的系统资源来实现此目的。这基本上提供了处理器的相对权重 - 应该将多少系统资源分配给此处理器而不是其他处理器。该字段适用于大多数处理器。但是,某些类型的处理器只能使用单个任务进行调度。

  • 关于Execution,执行设置用于确定处理器将被调度执行的节点。选择"All Nodes"将导致在集群中的每个节点上调度此处理器。选择"Primary Node"将导致此处理器仅在主节点上进行调度。一般单节点的情况下,我们都使用Primary Node

  • "Run Duration"选项卡的右侧包含一个用于选择运行持续时间的滑块。这可以控制处理器每次触发时应安排运行的时间。在滑块的左侧,标记为"Lower latency(较低延迟)",而右侧标记为"Higher throughput(较高吞吐量)"。处理器完成运行后,必须更新存储库才能将FlowFiles传输到下一个Connection。更新存储库的成本很高,因此在更新存储库之前可以立即完成的工作量越多,处理器可以处理的工作量就越多(吞吐量越高)。这意味着在上一批数据处理更新此存储库之前,Processor是无法开始处理接下来的FlowFiles。结果是,延迟时间会更长(从开始到结束处理FlowFile所需的时间会更长)。因此,滑块提供了一个频谱,DFM可以从中选择支持较低延迟或较高吞吐量。

COMMENTS(备注区)

这块把它称之为”备注区“,即用来为用户提供一个区域,以包含适用于此组件的任何注释。

PROPERITIES(属性区)

想了解对应组件的具体属性配置可以参考官网文档:Apache NiFi Documentation

队列管道操作

img

对于队列管道,它即是数据从一个处理器流向另一个处理器的中间队列,最多的用处就是用来监控数据是否正常流通,以及在开发使用过程中,可能调试定位问题等需要查看一下管道的数据,这里主要从管道的来源、手动清空、查看数据、设置超时清空、删除来描述一下对于管道队列

管道的来源:

管道的建立十分简单,两个组件进行一下拖拉连线即可,管道建立后,就需要选择前置处理器选用哪个Relationship输出的数据作为管道的源头,也就是上面配置项那里的Relationship。

手动清空管道:

管道内的数据承载是有限的,有些时候(阻塞或者需要删除组件)需要进行手动清空管道的数据,操作方式是:选中管道,右键会出现:

img

查看数据

查看管道中的数据可以选中管道,右键后的 List queue选项

img

设置超时清空

当有些组件处理速度过慢,导致阻塞(允许数据丢失的情况下),我们不能挨个进行手动的清空,这时候可以 在管道的 右键 configure 选项中进入管道的配置页面

img

在FlowFile Expiration进行超时自动清空的设置,默认为0是不做自动清空

删除

一般删除处理器之前,是需要断开所有与其关联的管道,即删除管道,删除时如果管道中有数据,需要手动制空后,选则 Delete。

Nifi的迁移

使用Nifi的过程中,当进行了一系列的开发,想要对绘制的各种流程图,以及其中的配置、代码进行备份或者迁移的时候,Nifi本身提供了很友好的迁移方式。

局部备份迁移

  如果仅对部分流程进行备份,可以对选中的区域,使用

img

进行创建模板,在另一Nifi中,使用

img

上传模板,选择

img

样例进行复原。

整体备份迁移

当整个Nifi的全景图需要进行备份或者迁移的时候,可以对nifi安装目录下的/conf/flow.xml.gz文件进行复制和替换,然后重启被替换的Nifi,即可以还原nifi之前的流程和模板

(注:未进行认证设置的Nifi的flow.xml.gz是无法直接在配置了认证权限的nifi上使用的!)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/889192
推荐阅读
相关标签
  

闽ICP备14008679号