赞
踩
1、nifi-1.9.2介绍、单机部署及简单验证
2、NIFI应用示例-GetFile和PutFile应用
3、NIFI处理器介绍、FlowFlie常见属性、模板介绍和运行情况信息查看
4、集群部署及验证、监控及节点管理
5、NiFi FileFlow示例和NIFI模板示例
6、NIFI应用场景-离线同步Mysql数据到HDFS中
7、NIFI综合应用场景-将mysql查询出的json数据转换成txt后存储至HDFS中
8、NIFI综合应用场景-NiFi监控MySQL binlog进行实时同步到hive
9、NIFI综合应用场景-通过NIFI配置kafka的数据同步
本文旨在介绍通过实时监控mysql的binlog实现数据的实时同步至hive中。但由于hive执行insert语句的效率较低,一般不会这么使用。
本文的前提是hive、mysql、nifi环境可正常使用。
本文分为四部分,即实现流程、处理器说明、操作与验证。
NiFi监控MySQL binlog进行实时同步至hive。
含有insert、update、delete
<?xml version="1.0" encoding="UTF-8" standalone="yes"?> <template encoding-version="1.2"> <description></description> <groupId>4a34a13e-0186-1000-0000-00001568fc6d</groupId> <name>mysql_to_hive_realtime</name> <snippet> <connections> <id>32407c91-6021-3f17-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>41f509e4-9f75-3c4d-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression> <loadBalancePartitionAttribute></loadBalancePartitionAttribute> <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus> <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy> <name></name> <selectedRelationships>matched</selectedRelationships> <source> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>227f5c84-8d6b-320e-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <connections> <id>41722019-baff-39eb-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>f3510126-b3a9-3ef8-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression> <loadBalancePartitionAttribute></loadBalancePartitionAttribute> <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus> <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy> <name></name> <selectedRelationships>success</selectedRelationships> <source> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>2e7030a3-7059-3df9-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <connections> <id>4757dbca-d994-361c-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>c9ec9f4d-7a7c-3181-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression> <loadBalancePartitionAttribute></loadBalancePartitionAttribute> <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus> <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy> <name></name> <selectedRelationships>unmatched</selectedRelationships> <source> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>ab7d7202-e5c1-3329-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <connections> <id>50567837-8a1d-323a-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>2e7030a3-7059-3df9-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression> <loadBalancePartitionAttribute></loadBalancePartitionAttribute> <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus> <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy> <name></name> <selectedRelationships>matched</selectedRelationships> <source> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>877cd3d6-0995-358a-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <connections> <id>7a9089aa-7109-39f0-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>407717a9-fefe-3fe3-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression> <loadBalancePartitionAttribute></loadBalancePartitionAttribute> <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus> <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy> <name></name> <selectedRelationships>matched</selectedRelationships> <source> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>d069d5bb-0e44-3aed-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <connections> <id>89d7ab4a-6b9a-3b57-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>c9ec9f4d-7a7c-3181-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression> <loadBalancePartitionAttribute></loadBalancePartitionAttribute> <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus> <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy> <name></name> <selectedRelationships>failure</selectedRelationships> <source> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>2e7030a3-7059-3df9-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <connections> <id>9ced34ab-5a13-3440-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>877cd3d6-0995-358a-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression> <loadBalancePartitionAttribute></loadBalancePartitionAttribute> <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus> <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy> <name>R_E</name> <selectedRelationships>insert</selectedRelationships> <source> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>ab7d7202-e5c1-3329-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <connections> <id>c3c8ef8f-aff4-3b1a-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>227f5c84-8d6b-320e-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression> <loadBalancePartitionAttribute></loadBalancePartitionAttribute> <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus> <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy> <name></name> <selectedRelationships>delete</selectedRelationships> <source> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>ab7d7202-e5c1-3329-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <connections> <id>c4d6b504-fd48-398d-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>ab7d7202-e5c1-3329-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression> <loadBalancePartitionAttribute></loadBalancePartitionAttribute> <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus> <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy> <name>C_R</name> <selectedRelationships>success</selectedRelationships> <source> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>9217073a-1e76-3aeb-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>1</zIndex> </connections> <connections> <id>d6f466e6-40a6-311d-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>c9ec9f4d-7a7c-3181-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression> <loadBalancePartitionAttribute></loadBalancePartitionAttribute> <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus> <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy> <name></name> <selectedRelationships>failure</selectedRelationships> <selectedRelationships>retry</selectedRelationships> <source> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>f3510126-b3a9-3ef8-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <connections> <id>e238c257-c5ca-32eb-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>d069d5bb-0e44-3aed-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression> <loadBalancePartitionAttribute></loadBalancePartitionAttribute> <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus> <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy> <name></name> <selectedRelationships>update</selectedRelationships> <source> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>ab7d7202-e5c1-3329-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <connections> <id>eb46aaa0-f073-3cd1-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>f3510126-b3a9-3ef8-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression> <loadBalancePartitionAttribute></loadBalancePartitionAttribute> <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus> <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy> <name></name> <selectedRelationships>success</selectedRelationships> <source> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>41f509e4-9f75-3c4d-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <connections> <id>f755b13f-2d28-37c2-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>f3510126-b3a9-3ef8-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression> <loadBalancePartitionAttribute></loadBalancePartitionAttribute> <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus> <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy> <name></name> <selectedRelationships>success</selectedRelationships> <source> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>407717a9-fefe-3fe3-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <connections> <id>fb7797c7-aa5e-3b1a-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <backPressureDataSizeThreshold>1 GB</backPressureDataSizeThreshold> <backPressureObjectThreshold>10000</backPressureObjectThreshold> <destination> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>c9ec9f4d-7a7c-3181-0000-000000000000</id> <type>PROCESSOR</type> </destination> <flowFileExpiration>0 sec</flowFileExpiration> <labelIndex>1</labelIndex> <loadBalanceCompression>DO_NOT_COMPRESS</loadBalanceCompression> <loadBalancePartitionAttribute></loadBalancePartitionAttribute> <loadBalanceStatus>LOAD_BALANCE_NOT_CONFIGURED</loadBalanceStatus> <loadBalanceStrategy>DO_NOT_LOAD_BALANCE</loadBalanceStrategy> <name></name> <selectedRelationships>failure</selectedRelationships> <selectedRelationships>unmatched</selectedRelationships> <source> <groupId>cc71fffc-d0fd-3a74-0000-000000000000</groupId> <id>877cd3d6-0995-358a-0000-000000000000</id> <type>PROCESSOR</type> </source> <zIndex>0</zIndex> </connections> <controllerServices> <id>83f7fb9e-5a36-305c-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <bundle> <artifact>nifi-hive-nar</artifact> <group>org.apache.nifi</group> <version>1.9.2</version> </bundle> <comments></comments> <descriptors> <entry> <key>hive-db-connect-url</key> <value> <name>hive-db-connect-url</name> </value> </entry> <entry> <key>hive-config-resources</key> <value> <name>hive-config-resources</name> </value> </entry> <entry> <key>hive-db-user</key> <value> <name>hive-db-user</name> </value> </entry> <entry> <key>hive-db-password</key> <value> <name>hive-db-password</name> </value> </entry> <entry> <key>hive-max-wait-time</key> <value> <name>hive-max-wait-time</name> </value> </entry> <entry> <key>hive-max-total-connections</key> <value> <name>hive-max-total-connections</name> </value> </entry> <entry> <key>Validation-query</key> <value> <name>Validation-query</name> </value> </entry> <entry> <key>kerberos-credentials-service</key> <value> <identifiesControllerService>org.apache.nifi.kerberos.KerberosCredentialsService</identifiesControllerService> <name>kerberos-credentials-service</name> </value> </entry> <entry> <key>Kerberos Principal</key> <value> <name>Kerberos Principal</name> </value> </entry> <entry> <key>Kerberos Keytab</key> <value> <name>Kerberos Keytab</name> </value> </entry> </descriptors> <name>HiveConnectionPool</name> <persistsState>false</persistsState> <properties> <entry> <key>hive-db-connect-url</key> <value>jdbc:hive2://server4:10000</value> </entry> <entry> <key>hive-config-resources</key> <value>/usr/local/bigdata/testdata/hdfs-site.xml,/usr/local/bigdata/testdata/core-site.xml,/usr/local/bigdata/testdata/hive-site.xml</value> </entry> <entry> <key>hive-db-user</key> <value>alanchan</value> </entry> <entry> <key>hive-db-password</key> </entry> <entry> <key>hive-max-wait-time</key> <value>500 millis</value> </entry> <entry> <key>hive-max-total-connections</key> <value>8</value> </entry> <entry> <key>Validation-query</key> </entry> <entry> <key>kerberos-credentials-service</key> </entry> <entry> <key>Kerberos Principal</key> </entry> <entry> <key>Kerberos Keytab</key> </entry> </properties> <state>ENABLED</state> <type>org.apache.nifi.dbcp.hive.HiveConnectionPool</type> </controllerServices> <controllerServices> <id>f8602325-c430-3690-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <bundle> <artifact>nifi-distributed-cache-services-nar</artifact> <group>org.apache.nifi</group> <version>1.9.2</version> </bundle> <comments></comments> <descriptors> <entry> <key>Server Hostname</key> <value> <name>Server Hostname</name> </value> </entry> <entry> <key>Server Port</key> <value> <name>Server Port</name> </value> </entry> <entry> <key>SSL Context Service</key> <value> <identifiesControllerService>org.apache.nifi.ssl.SSLContextService</identifiesControllerService> <name>SSL Context Service</name> </value> </entry> <entry> <key>Communications Timeout</key> <value> <name>Communications Timeout</name> </value> </entry> </descriptors> <name>DistributedMapCacheClientService</name> <persistsState>false</persistsState> <properties> <entry> <key>Server Hostname</key> <value>server1</value> </entry> <entry> <key>Server Port</key> <value>4557</value> </entry> <entry> <key>SSL Context Service</key> </entry> <entry> <key>Communications Timeout</key> <value>30 secs</value> </entry> </properties> <state>ENABLED</state> <type>org.apache.nifi.distributed.cache.client.DistributedMapCacheClientService</type> </controllerServices> <processors> <id>227f5c84-8d6b-320e-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <position> <x>2.570618773470528</x> <y>551.1887534671108</y> </position> <bundle> <artifact>nifi-standard-nar</artifact> <group>org.apache.nifi</group> <version>1.9.2</version> </bundle> <config> <bulletinLevel>WARN</bulletinLevel> <comments></comments> <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> <descriptors> <entry> <key>Destination</key> <value> <name>Destination</name> </value> </entry> <entry> <key>Return Type</key> <value> <name>Return Type</name> </value> </entry> <entry> <key>Path Not Found Behavior</key> <value> <name>Path Not Found Behavior</name> </value> </entry> <entry> <key>Null Value Representation</key> <value> <name>Null Value Representation</name> </value> </entry> <entry> <key>content_id</key> <value> <name>content_id</name> </value> </entry> </descriptors> <executionNode>ALL</executionNode> <lossTolerant>false</lossTolerant> <penaltyDuration>30 sec</penaltyDuration> <properties> <entry> <key>Destination</key> <value>flowfile-attribute</value> </entry> <entry> <key>Return Type</key> <value>auto-detect</value> </entry> <entry> <key>Path Not Found Behavior</key> <value>ignore</value> </entry> <entry> <key>Null Value Representation</key> <value>empty string</value> </entry> <entry> <key>content_id</key> <value>$.columns[0].value</value> </entry> </properties> <runDurationMillis>0</runDurationMillis> <schedulingPeriod>0 sec</schedulingPeriod> <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <yieldDuration>1 sec</yieldDuration> </config> <executionNodeRestricted>false</executionNodeRestricted> <name>EvaluateJsonPath_delete</name> <relationships> <autoTerminate>true</autoTerminate> <name>failure</name> </relationships> <relationships> <autoTerminate>false</autoTerminate> <name>matched</name> </relationships> <relationships> <autoTerminate>true</autoTerminate> <name>unmatched</name> </relationships> <state>STOPPED</state> <style/> <type>org.apache.nifi.processors.standard.EvaluateJsonPath</type> </processors> <processors> <id>2e7030a3-7059-3df9-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <position> <x>936.336601121247</x> <y>827.0900163650513</y> </position> <bundle> <artifact>nifi-standard-nar</artifact> <group>org.apache.nifi</group> <version>1.9.2</version> </bundle> <config> <bulletinLevel>WARN</bulletinLevel> <comments></comments> <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> <descriptors> <entry> <key>Regular Expression</key> <value> <name>Regular Expression</name> </value> </entry> <entry> <key>Replacement Value</key> <value> <name>Replacement Value</name> </value> </entry> <entry> <key>Character Set</key> <value> <name>Character Set</name> </value> </entry> <entry> <key>Maximum Buffer Size</key> <value> <name>Maximum Buffer Size</name> </value> </entry> <entry> <key>Replacement Strategy</key> <value> <name>Replacement Strategy</name> </value> </entry> <entry> <key>Evaluation Mode</key> <value> <name>Evaluation Mode</name> </value> </entry> </descriptors> <executionNode>ALL</executionNode> <lossTolerant>false</lossTolerant> <penaltyDuration>30 sec</penaltyDuration> <properties> <entry> <key>Regular Expression</key> <value>(?s)(^.*$)</value> </entry> <entry> <key>Replacement Value</key> <value>insert into test.dms_content_t (content_id ,title , publisher ,click_num , picture ,rcmd ,stick ,page_title , page_keyword ,page_descr ,publish_time ,update_time ,create_time ,create_user_id ,modify_time ,modify_user_id ,status ,type ) values( ${content_id}, '${title}', '${publisher}', ${click_num}, '${picture}', ${rcmd}, ${stick}, '${page_title}', '${page_keyword}', '${page_descr}', '${publish_time}', '${update_time}', '${create_time}', ${create_user_id}, '${modify_time}', ${modify_user_id}, ${status}, ${type} )</value> </entry> <entry> <key>Character Set</key> <value>UTF-8</value> </entry> <entry> <key>Maximum Buffer Size</key> <value>1 MB</value> </entry> <entry> <key>Replacement Strategy</key> <value>Regex Replace</value> </entry> <entry> <key>Evaluation Mode</key> <value>Entire text</value> </entry> </properties> <runDurationMillis>0</runDurationMillis> <schedulingPeriod>0 sec</schedulingPeriod> <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <yieldDuration>1 sec</yieldDuration> </config> <executionNodeRestricted>false</executionNodeRestricted> <name>ReplaceText_insert</name> <relationships> <autoTerminate>false</autoTerminate> <name>failure</name> </relationships> <relationships> <autoTerminate>false</autoTerminate> <name>success</name> </relationships> <state>STOPPED</state> <style/> <type>org.apache.nifi.processors.standard.ReplaceText</type> </processors> <processors> <id>407717a9-fefe-3fe3-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <position> <x>503.83906293000393</x> <y>826.243760585218</y> </position> <bundle> <artifact>nifi-standard-nar</artifact> <group>org.apache.nifi</group> <version>1.9.2</version> </bundle> <config> <bulletinLevel>WARN</bulletinLevel> <comments></comments> <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> <descriptors> <entry> <key>Regular Expression</key> <value> <name>Regular Expression</name> </value> </entry> <entry> <key>Replacement Value</key> <value> <name>Replacement Value</name> </value> </entry> <entry> <key>Character Set</key> <value> <name>Character Set</name> </value> </entry> <entry> <key>Maximum Buffer Size</key> <value> <name>Maximum Buffer Size</name> </value> </entry> <entry> <key>Replacement Strategy</key> <value> <name>Replacement Strategy</name> </value> </entry> <entry> <key>Evaluation Mode</key> <value> <name>Evaluation Mode</name> </value> </entry> </descriptors> <executionNode>ALL</executionNode> <lossTolerant>false</lossTolerant> <penaltyDuration>30 sec</penaltyDuration> <properties> <entry> <key>Regular Expression</key> <value>(?s)(^.*$)</value> </entry> <entry> <key>Replacement Value</key> <value>UPDATE test.dms_content_t SET title = '${title}', publisher = '${publisher}' , click_num = ${click_num}, picture = '${picture}', rcmd = ${rcmd}, stick =${stick}, page_title = '${page_title}', page_keyword = '${page_keyword}', page_descr = '${page_descr}', publish_time = '${publish_time}', update_time = '${update_time}', create_time = '${create_time}', create_user_id = ${create_user_id}, modify_time = '${modify_time}', modify_user_id = ${modify_user_id}, status = ${status}, type = ${type} WHERE content_id = ${content_id}</value> </entry> <entry> <key>Character Set</key> <value>UTF-8</value> </entry> <entry> <key>Maximum Buffer Size</key> <value>1 MB</value> </entry> <entry> <key>Replacement Strategy</key> <value>Regex Replace</value> </entry> <entry> <key>Evaluation Mode</key> <value>Entire text</value> </entry> </properties> <runDurationMillis>0</runDurationMillis> <schedulingPeriod>0 sec</schedulingPeriod> <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <yieldDuration>1 sec</yieldDuration> </config> <executionNodeRestricted>false</executionNodeRestricted> <name>ReplaceText</name> <relationships> <autoTerminate>true</autoTerminate> <name>failure</name> </relationships> <relationships> <autoTerminate>false</autoTerminate> <name>success</name> </relationships> <state>STOPPED</state> <style/> <type>org.apache.nifi.processors.standard.ReplaceText</type> </processors> <processors> <id>41f509e4-9f75-3c4d-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <position> <x>0.0</x> <y>823.6731217176821</y> </position> <bundle> <artifact>nifi-standard-nar</artifact> <group>org.apache.nifi</group> <version>1.9.2</version> </bundle> <config> <bulletinLevel>WARN</bulletinLevel> <comments></comments> <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> <descriptors> <entry> <key>Regular Expression</key> <value> <name>Regular Expression</name> </value> </entry> <entry> <key>Replacement Value</key> <value> <name>Replacement Value</name> </value> </entry> <entry> <key>Character Set</key> <value> <name>Character Set</name> </value> </entry> <entry> <key>Maximum Buffer Size</key> <value> <name>Maximum Buffer Size</name> </value> </entry> <entry> <key>Replacement Strategy</key> <value> <name>Replacement Strategy</name> </value> </entry> <entry> <key>Evaluation Mode</key> <value> <name>Evaluation Mode</name> </value> </entry> </descriptors> <executionNode>ALL</executionNode> <lossTolerant>false</lossTolerant> <penaltyDuration>30 sec</penaltyDuration> <properties> <entry> <key>Regular Expression</key> <value>(?s)(^.*$)</value> </entry> <entry> <key>Replacement Value</key> <value>delete from test.dms_content_t where content_id = ${content_id}</value> </entry> <entry> <key>Character Set</key> <value>UTF-8</value> </entry> <entry> <key>Maximum Buffer Size</key> <value>1 MB</value> </entry> <entry> <key>Replacement Strategy</key> <value>Regex Replace</value> </entry> <entry> <key>Evaluation Mode</key> <value>Entire text</value> </entry> </properties> <runDurationMillis>0</runDurationMillis> <schedulingPeriod>0 sec</schedulingPeriod> <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <yieldDuration>1 sec</yieldDuration> </config> <executionNodeRestricted>false</executionNodeRestricted> <name>ReplaceText_delete</name> <relationships> <autoTerminate>true</autoTerminate> <name>failure</name> </relationships> <relationships> <autoTerminate>false</autoTerminate> <name>success</name> </relationships> <state>STOPPED</state> <style/> <type>org.apache.nifi.processors.standard.ReplaceText</type> </processors> <processors> <id>877cd3d6-0995-358a-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <position> <x>937.5049055646064</x> <y>550.9260149002075</y> </position> <bundle> <artifact>nifi-standard-nar</artifact> <group>org.apache.nifi</group> <version>1.9.2</version> </bundle> <config> <bulletinLevel>WARN</bulletinLevel> <comments></comments> <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> <descriptors> <entry> <key>Destination</key> <value> <name>Destination</name> </value> </entry> <entry> <key>Return Type</key> <value> <name>Return Type</name> </value> </entry> <entry> <key>Path Not Found Behavior</key> <value> <name>Path Not Found Behavior</name> </value> </entry> <entry> <key>Null Value Representation</key> <value> <name>Null Value Representation</name> </value> </entry> <entry> <key>click_num</key> <value> <name>click_num</name> </value> </entry> <entry> <key>content</key> <value> <name>content</name> </value> </entry> <entry> <key>content_id</key> <value> <name>content_id</name> </value> </entry> <entry> <key>create_time</key> <value> <name>create_time</name> </value> </entry> <entry> <key>create_user_id</key> <value> <name>create_user_id</name> </value> </entry> <entry> <key>database</key> <value> <name>database</name> </value> </entry> <entry> <key>modify_time</key> <value> <name>modify_time</name> </value> </entry> <entry> <key>modify_user_id</key> <value> <name>modify_user_id</name> </value> </entry> <entry> <key>page_descr</key> <value> <name>page_descr</name> </value> </entry> <entry> <key>page_keyword</key> <value> <name>page_keyword</name> </value> </entry> <entry> <key>page_title</key> <value> <name>page_title</name> </value> </entry> <entry> <key>picture</key> <value> <name>picture</name> </value> </entry> <entry> <key>publish_time</key> <value> <name>publish_time</name> </value> </entry> <entry> <key>publisher</key> <value> <name>publisher</name> </value> </entry> <entry> <key>rcmd</key> <value> <name>rcmd</name> </value> </entry> <entry> <key>status</key> <value> <name>status</name> </value> </entry> <entry> <key>stick</key> <value> <name>stick</name> </value> </entry> <entry> <key>table_name</key> <value> <name>table_name</name> </value> </entry> <entry> <key>title</key> <value> <name>title</name> </value> </entry> <entry> <key>type</key> <value> <name>type</name> </value> </entry> <entry> <key>update_time</key> <value> <name>update_time</name> </value> </entry> </descriptors> <executionNode>ALL</executionNode> <lossTolerant>false</lossTolerant> <penaltyDuration>30 sec</penaltyDuration> <properties> <entry> <key>Destination</key> <value>flowfile-attribute</value> </entry> <entry> <key>Return Type</key> <value>auto-detect</value> </entry> <entry> <key>Path Not Found Behavior</key> <value>ignore</value> </entry> <entry> <key>Null Value Representation</key> <value>empty string</value> </entry> <entry> <key>click_num</key> <value>$.columns[3].value</value> </entry> <entry> <key>content</key> <value>$.columns[7].value</value> </entry> <entry> <key>content_id</key> <value>$.columns[0].value</value> </entry> <entry> <key>create_time</key> <value>$.columns[13].value</value> </entry> <entry> <key>create_user_id</key> <value>$.columns[14].value</value> </entry> <entry> <key>database</key> <value>$.database</value> </entry> <entry> <key>modify_time</key> <value>$.columns[15].value</value> </entry> <entry> <key>modify_user_id</key> <value>$.columns[16].value</value> </entry> <entry> <key>page_descr</key> <value>$.columns[10].value</value> </entry> <entry> <key>page_keyword</key> <value>$.columns[9].value</value> </entry> <entry> <key>page_title</key> <value>$.columns[8].value</value> </entry> <entry> <key>picture</key> <value>$.columns[4].value</value> </entry> <entry> <key>publish_time</key> <value>$.columns[11].value</value> </entry> <entry> <key>publisher</key> <value>$.columns[2].value</value> </entry> <entry> <key>rcmd</key> <value>$.columns[5].value</value> </entry> <entry> <key>status</key> <value>$.columns[17].value</value> </entry> <entry> <key>stick</key> <value>$.columns[6].value</value> </entry> <entry> <key>table_name</key> <value>$.table_name</value> </entry> <entry> <key>title</key> <value>$.columns[1].value</value> </entry> <entry> <key>type</key> <value>$.columns[18].value</value> </entry> <entry> <key>update_time</key> <value>$.columns[12].value</value> </entry> </properties> <runDurationMillis>0</runDurationMillis> <schedulingPeriod>0 sec</schedulingPeriod> <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <yieldDuration>1 sec</yieldDuration> </config> <executionNodeRestricted>false</executionNodeRestricted> <name>EvaluateJsonPath_insert</name> <relationships> <autoTerminate>false</autoTerminate> <name>failure</name> </relationships> <relationships> <autoTerminate>false</autoTerminate> <name>matched</name> </relationships> <relationships> <autoTerminate>false</autoTerminate> <name>unmatched</name> </relationships> <state>STOPPED</state> <style/> <type>org.apache.nifi.processors.standard.EvaluateJsonPath</type> </processors> <processors> <id>9217073a-1e76-3aeb-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <position> <x>498.3604353497626</x> <y>0.0</y> </position> <bundle> <artifact>nifi-cdc-mysql-nar</artifact> <group>org.apache.nifi</group> <version>1.9.2</version> </bundle> <config> <bulletinLevel>WARN</bulletinLevel> <comments></comments> <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> <descriptors> <entry> <key>capture-change-mysql-hosts</key> <value> <name>capture-change-mysql-hosts</name> </value> </entry> <entry> <key>capture-change-mysql-driver-class</key> <value> <name>capture-change-mysql-driver-class</name> </value> </entry> <entry> <key>capture-change-mysql-driver-locations</key> <value> <name>capture-change-mysql-driver-locations</name> </value> </entry> <entry> <key>capture-change-mysql-username</key> <value> <name>capture-change-mysql-username</name> </value> </entry> <entry> <key>capture-change-mysql-password</key> <value> <name>capture-change-mysql-password</name> </value> </entry> <entry> <key>capture-change-mysql-server-id</key> <value> <name>capture-change-mysql-server-id</name> </value> </entry> <entry> <key>capture-change-mysql-db-name-pattern</key> <value> <name>capture-change-mysql-db-name-pattern</name> </value> </entry> <entry> <key>capture-change-mysql-name-pattern</key> <value> <name>capture-change-mysql-name-pattern</name> </value> </entry> <entry> <key>capture-change-mysql-max-wait-time</key> <value> <name>capture-change-mysql-max-wait-time</name> </value> </entry> <entry> <key>capture-change-mysql-dist-map-cache-client</key> <value> <identifiesControllerService>org.apache.nifi.distributed.cache.client.DistributedMapCacheClient</identifiesControllerService> <name>capture-change-mysql-dist-map-cache-client</name> </value> </entry> <entry> <key>capture-change-mysql-retrieve-all-records</key> <value> <name>capture-change-mysql-retrieve-all-records</name> </value> </entry> <entry> <key>capture-change-mysql-include-begin-commit</key> <value> <name>capture-change-mysql-include-begin-commit</name> </value> </entry> <entry> <key>capture-change-mysql-include-ddl-events</key> <value> <name>capture-change-mysql-include-ddl-events</name> </value> </entry> <entry> <key>capture-change-mysql-state-update-interval</key> <value> <name>capture-change-mysql-state-update-interval</name> </value> </entry> <entry> <key>capture-change-mysql-init-seq-id</key> <value> <name>capture-change-mysql-init-seq-id</name> </value> </entry> <entry> <key>capture-change-mysql-init-binlog-filename</key> <value> <name>capture-change-mysql-init-binlog-filename</name> </value> </entry> <entry> <key>capture-change-mysql-init-binlog-position</key> <value> <name>capture-change-mysql-init-binlog-position</name> </value> </entry> </descriptors> <executionNode>PRIMARY</executionNode> <lossTolerant>false</lossTolerant> <penaltyDuration>30 sec</penaltyDuration> <properties> <entry> <key>capture-change-mysql-hosts</key> <value>192.168.10.37:3306</value> </entry> <entry> <key>capture-change-mysql-driver-class</key> <value>com.mysql.jdbc.Driver</value> </entry> <entry> <key>capture-change-mysql-driver-locations</key> <value>/usr/local/bigdata/testdata/mysql-connector-java-5.1.44.jar</value> </entry> <entry> <key>capture-change-mysql-username</key> <value>root</value> </entry> <entry> <key>capture-change-mysql-password</key> </entry> <entry> <key>capture-change-mysql-server-id</key> </entry> <entry> <key>capture-change-mysql-db-name-pattern</key> <value>smp</value> </entry> <entry> <key>capture-change-mysql-name-pattern</key> <value>dms_content_t</value> </entry> <entry> <key>capture-change-mysql-max-wait-time</key> <value>30 seconds</value> </entry> <entry> <key>capture-change-mysql-dist-map-cache-client</key> <value>f8602325-c430-3690-0000-000000000000</value> </entry> <entry> <key>capture-change-mysql-retrieve-all-records</key> <value>true</value> </entry> <entry> <key>capture-change-mysql-include-begin-commit</key> <value>false</value> </entry> <entry> <key>capture-change-mysql-include-ddl-events</key> <value>false</value> </entry> <entry> <key>capture-change-mysql-state-update-interval</key> <value>0 seconds</value> </entry> <entry> <key>capture-change-mysql-init-seq-id</key> </entry> <entry> <key>capture-change-mysql-init-binlog-filename</key> </entry> <entry> <key>capture-change-mysql-init-binlog-position</key> </entry> </properties> <runDurationMillis>0</runDurationMillis> <schedulingPeriod>86400 sec</schedulingPeriod> <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <yieldDuration>1 sec</yieldDuration> </config> <executionNodeRestricted>false</executionNodeRestricted> <name>CaptureChangeMySQL</name> <relationships> <autoTerminate>false</autoTerminate> <name>success</name> </relationships> <state>STOPPED</state> <style/> <type>org.apache.nifi.cdc.mysql.processors.CaptureChangeMySQL</type> </processors> <processors> <id>ab7d7202-e5c1-3329-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <position> <x>503.7869490216376</x> <y>221.10748767852783</y> </position> <bundle> <artifact>nifi-standard-nar</artifact> <group>org.apache.nifi</group> <version>1.9.2</version> </bundle> <config> <bulletinLevel>WARN</bulletinLevel> <comments></comments> <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> <descriptors> <entry> <key>Routing Strategy</key> <value> <name>Routing Strategy</name> </value> </entry> <entry> <key>delete</key> <value> <name>delete</name> </value> </entry> <entry> <key>insert</key> <value> <name>insert</name> </value> </entry> <entry> <key>update</key> <value> <name>update</name> </value> </entry> </descriptors> <executionNode>ALL</executionNode> <lossTolerant>false</lossTolerant> <penaltyDuration>30 sec</penaltyDuration> <properties> <entry> <key>Routing Strategy</key> <value>Route to Property name</value> </entry> <entry> <key>delete</key> <value>${cdc.event.type:equals("delete")}</value> </entry> <entry> <key>insert</key> <value>${cdc.event.type:equals("insert")}</value> </entry> <entry> <key>update</key> <value>${cdc.event.type:equals("update")}</value> </entry> </properties> <runDurationMillis>0</runDurationMillis> <schedulingPeriod>0 sec</schedulingPeriod> <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <yieldDuration>1 sec</yieldDuration> </config> <executionNodeRestricted>false</executionNodeRestricted> <name>RouteOnAttribute</name> <relationships> <autoTerminate>false</autoTerminate> <name>delete</name> </relationships> <relationships> <autoTerminate>false</autoTerminate> <name>insert</name> </relationships> <relationships> <autoTerminate>false</autoTerminate> <name>unmatched</name> </relationships> <relationships> <autoTerminate>false</autoTerminate> <name>update</name> </relationships> <state>STOPPED</state> <style/> <type>org.apache.nifi.processors.standard.RouteOnAttribute</type> </processors> <processors> <id>c9ec9f4d-7a7c-3181-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <position> <x>1644.0293196271064</x> <y>667.1878252029419</y> </position> <bundle> <artifact>nifi-standard-nar</artifact> <group>org.apache.nifi</group> <version>1.9.2</version> </bundle> <config> <bulletinLevel>WARN</bulletinLevel> <comments></comments> <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> <descriptors> <entry> <key>Log Level</key> <value> <name>Log Level</name> </value> </entry> <entry> <key>Log Payload</key> <value> <name>Log Payload</name> </value> </entry> <entry> <key>Attributes to Log</key> <value> <name>Attributes to Log</name> </value> </entry> <entry> <key>attributes-to-log-regex</key> <value> <name>attributes-to-log-regex</name> </value> </entry> <entry> <key>Attributes to Ignore</key> <value> <name>Attributes to Ignore</name> </value> </entry> <entry> <key>attributes-to-ignore-regex</key> <value> <name>attributes-to-ignore-regex</name> </value> </entry> <entry> <key>Log prefix</key> <value> <name>Log prefix</name> </value> </entry> <entry> <key>character-set</key> <value> <name>character-set</name> </value> </entry> </descriptors> <executionNode>ALL</executionNode> <lossTolerant>false</lossTolerant> <penaltyDuration>30 sec</penaltyDuration> <properties> <entry> <key>Log Level</key> <value>info</value> </entry> <entry> <key>Log Payload</key> <value>false</value> </entry> <entry> <key>Attributes to Log</key> </entry> <entry> <key>attributes-to-log-regex</key> <value>.*</value> </entry> <entry> <key>Attributes to Ignore</key> </entry> <entry> <key>attributes-to-ignore-regex</key> </entry> <entry> <key>Log prefix</key> </entry> <entry> <key>character-set</key> <value>UTF-8</value> </entry> </properties> <runDurationMillis>0</runDurationMillis> <schedulingPeriod>0 sec</schedulingPeriod> <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <yieldDuration>1 sec</yieldDuration> </config> <executionNodeRestricted>false</executionNodeRestricted> <name>LogAttribute</name> <relationships> <autoTerminate>true</autoTerminate> <name>success</name> </relationships> <state>RUNNING</state> <style/> <type>org.apache.nifi.processors.standard.LogAttribute</type> </processors> <processors> <id>d069d5bb-0e44-3aed-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <position> <x>505.1243669870492</x> <y>548.6181273364</y> </position> <bundle> <artifact>nifi-standard-nar</artifact> <group>org.apache.nifi</group> <version>1.9.2</version> </bundle> <config> <bulletinLevel>WARN</bulletinLevel> <comments></comments> <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> <descriptors> <entry> <key>Destination</key> <value> <name>Destination</name> </value> </entry> <entry> <key>Return Type</key> <value> <name>Return Type</name> </value> </entry> <entry> <key>Path Not Found Behavior</key> <value> <name>Path Not Found Behavior</name> </value> </entry> <entry> <key>Null Value Representation</key> <value> <name>Null Value Representation</name> </value> </entry> <entry> <key>click_num</key> <value> <name>click_num</name> </value> </entry> <entry> <key>content</key> <value> <name>content</name> </value> </entry> <entry> <key>content_id</key> <value> <name>content_id</name> </value> </entry> <entry> <key>create_time</key> <value> <name>create_time</name> </value> </entry> <entry> <key>create_user_id</key> <value> <name>create_user_id</name> </value> </entry> <entry> <key>modify_time</key> <value> <name>modify_time</name> </value> </entry> <entry> <key>modify_user_id</key> <value> <name>modify_user_id</name> </value> </entry> <entry> <key>page_descr</key> <value> <name>page_descr</name> </value> </entry> <entry> <key>page_keyword</key> <value> <name>page_keyword</name> </value> </entry> <entry> <key>page_title</key> <value> <name>page_title</name> </value> </entry> <entry> <key>picture</key> <value> <name>picture</name> </value> </entry> <entry> <key>publish_time</key> <value> <name>publish_time</name> </value> </entry> <entry> <key>publisher</key> <value> <name>publisher</name> </value> </entry> <entry> <key>rcmd</key> <value> <name>rcmd</name> </value> </entry> <entry> <key>status</key> <value> <name>status</name> </value> </entry> <entry> <key>stick</key> <value> <name>stick</name> </value> </entry> <entry> <key>title</key> <value> <name>title</name> </value> </entry> <entry> <key>type</key> <value> <name>type</name> </value> </entry> <entry> <key>update_time</key> <value> <name>update_time</name> </value> </entry> </descriptors> <executionNode>ALL</executionNode> <lossTolerant>false</lossTolerant> <penaltyDuration>30 sec</penaltyDuration> <properties> <entry> <key>Destination</key> <value>flowfile-attribute</value> </entry> <entry> <key>Return Type</key> <value>auto-detect</value> </entry> <entry> <key>Path Not Found Behavior</key> <value>ignore</value> </entry> <entry> <key>Null Value Representation</key> <value>empty string</value> </entry> <entry> <key>click_num</key> <value>$.columns[3].value</value> </entry> <entry> <key>content</key> <value>$.columns[7].value</value> </entry> <entry> <key>content_id</key> <value>$.columns[0].value</value> </entry> <entry> <key>create_time</key> <value>$.columns[13].value</value> </entry> <entry> <key>create_user_id</key> <value>$.columns[14].value</value> </entry> <entry> <key>modify_time</key> <value>$.columns[15].value</value> </entry> <entry> <key>modify_user_id</key> <value>$.columns[16].value</value> </entry> <entry> <key>page_descr</key> <value>$.columns[10].value</value> </entry> <entry> <key>page_keyword</key> <value>$.columns[9].value</value> </entry> <entry> <key>page_title</key> <value>$.columns[8].value</value> </entry> <entry> <key>picture</key> <value>$.columns[4].value</value> </entry> <entry> <key>publish_time</key> <value>$.columns[11].value</value> </entry> <entry> <key>publisher</key> <value>$.columns[2].value</value> </entry> <entry> <key>rcmd</key> <value>$.columns[5].value</value> </entry> <entry> <key>status</key> <value>$.columns[17].value</value> </entry> <entry> <key>stick</key> <value>$.columns[6].value</value> </entry> <entry> <key>title</key> <value>$.columns[1].value</value> </entry> <entry> <key>type</key> <value>$.columns[18].value</value> </entry> <entry> <key>update_time</key> <value>$.columns[12].value</value> </entry> </properties> <runDurationMillis>0</runDurationMillis> <schedulingPeriod>0 sec</schedulingPeriod> <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <yieldDuration>1 sec</yieldDuration> </config> <executionNodeRestricted>false</executionNodeRestricted> <name>EvaluateJsonPath_update</name> <relationships> <autoTerminate>true</autoTerminate> <name>failure</name> </relationships> <relationships> <autoTerminate>false</autoTerminate> <name>matched</name> </relationships> <relationships> <autoTerminate>true</autoTerminate> <name>unmatched</name> </relationships> <state>STOPPED</state> <style/> <type>org.apache.nifi.processors.standard.EvaluateJsonPath</type> </processors> <processors> <id>f3510126-b3a9-3ef8-0000-000000000000</id> <parentGroupId>cc71fffc-d0fd-3a74-0000-000000000000</parentGroupId> <position> <x>508.31115147638445</x> <y>1174.3765625631154</y> </position> <bundle> <artifact>nifi-hive-nar</artifact> <group>org.apache.nifi</group> <version>1.9.2</version> </bundle> <config> <bulletinLevel>WARN</bulletinLevel> <comments></comments> <concurrentlySchedulableTaskCount>1</concurrentlySchedulableTaskCount> <descriptors> <entry> <key>Hive Database Connection Pooling Service</key> <value> <identifiesControllerService>org.apache.nifi.dbcp.hive.HiveDBCPService</identifiesControllerService> <name>Hive Database Connection Pooling Service</name> </value> </entry> <entry> <key>hive-batch-size</key> <value> <name>hive-batch-size</name> </value> </entry> <entry> <key>hive-charset</key> <value> <name>hive-charset</name> </value> </entry> <entry> <key>statement-delimiter</key> <value> <name>statement-delimiter</name> </value> </entry> <entry> <key>rollback-on-failure</key> <value> <name>rollback-on-failure</name> </value> </entry> </descriptors> <executionNode>ALL</executionNode> <lossTolerant>false</lossTolerant> <penaltyDuration>30 sec</penaltyDuration> <properties> <entry> <key>Hive Database Connection Pooling Service</key> <value>83f7fb9e-5a36-305c-0000-000000000000</value> </entry> <entry> <key>hive-batch-size</key> <value>100</value> </entry> <entry> <key>hive-charset</key> <value>UTF-8</value> </entry> <entry> <key>statement-delimiter</key> <value>;</value> </entry> <entry> <key>rollback-on-failure</key> <value>false</value> </entry> </properties> <runDurationMillis>0</runDurationMillis> <schedulingPeriod>0 sec</schedulingPeriod> <schedulingStrategy>TIMER_DRIVEN</schedulingStrategy> <yieldDuration>1 sec</yieldDuration> </config> <executionNodeRestricted>false</executionNodeRestricted> <name>PutHiveQL</name> <relationships> <autoTerminate>false</autoTerminate> <name>failure</name> </relationships> <relationships> <autoTerminate>false</autoTerminate> <name>retry</name> </relationships> <relationships> <autoTerminate>true</autoTerminate> <name>success</name> </relationships> <state>RUNNING</state> <style/> <type>org.apache.nifi.processors.hive.PutHiveQL</type> </processors> </snippet> <timestamp>02/16/2023 09:04:16 GMT</timestamp> </template>
CaptureChangeMySQL ——> RouteOnAttribute ——> EvaluateJsonPath ——> ReplaceText ——> PutHiveQL
使用puthive处理器需要使用的是事务表,存储格式为orc。需要设置hive-site.xml文件中对事务表的支持,具体内容如下:
<!-- 添加事务表控制配置--> <property> <name>hive.support.concurrency</name> <value>true</value> </property> <property> <name>hive.exec.dynamic.partition.mode</name> <value>nonstrict</value> </property> <property> <name>hive.txn.manager</name> <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value> </property> <property> <name>hive.compactor.initiator.on</name> <value>true</value> </property> <property> <name>hive.compactor.worker.threads</name> <value>1</value> </property> <property> <name>hive.in.test</name> <value>true</value> </property>
如果不设置,则会出现异常:This command is not allowed on an ACID table test.dms_content_t with a non-ACID transaction manager
CREATE TABLE myhive.nifi_hive(id int,name string,day_time string)
clustered by (id) into 2 buckets
STORED AS ORC
TBLPROPERTIES('transactional'='true');
从MySQL数据库检索更改数据捕获(CDC)事件。CDC事件包括INSERT,UPDATE,DELETE操作。事件将作为单独的流文件输出,并按操作发生的时间排序。
在下面的列表中,列出属性及其默认值,属性是否支持NiFi表达式语言,以及属性是否被视为“敏感”,这意味着将加密其值。在敏感属性中输入值之前,请确保nifi.properties文件具有属性nifi.sensitive.props.key的条目。
提供可通过套接字访问的映射(键/值)缓存。与该服务的交互通常是通过DistributedMapCacheClient服务完成的。
在下面的列表中,列出属性及其默认值
提供与DistributedMapCacheServer通信的功能。可以使用它来在NiFi群集中的节点之间共享地图
在下面的列表中,列出属性及其默认值。
该处理器使用属性表达式语言,根据流文件的属性去计算然后进行路由。该处理器往往用于判断逻辑。
在下面的列表中,列出属性默认值(如果有默认值),以及属性是否支持表达式语言。
该处理器允许用户指定属性的名称和值。
可以根据用户配置处理器的方式创建动态连接关系
执行HiveQL DDL / DML命令(例如,UPDATE,INSERT)。预期传入File的内容是要执行的HiveQL命令。HiveQL命令可以使用?转义参数。在这种情况下,要使用的参数必须作为FlowFile属性存在,命名约定为hiveql.args.N.type和hiveql.args.N.value,其中N是一个正整数。hiveql.args.N.type应该是指示JDBC类型的数字。FlowFile的内容应采用UTF-8格式。
在下面的列表中,列出属性及其默认值
为Apache Hive提供数据库连接池服务。可以从池中请求连接,使用后返回连接。
在下面的列表中,列出属性及其默认值,属性是否支持NiFi表达式语言
Mysql的版本号要求5.7+。
mysql> show variables like '%log_bin%';
show variables like '%binlog%' --查看binlog的一些配置信息 show variables like '%log_bin%' --查看到binlog日志为状态 show binary logs;--获取binlog文件列表 show master status;--查看当前正在写入的binlog文件 show binlog events in 'mysqlbin.000102' from 1 limit 0,10; vi /etc/my.cnf # 文件末尾加上如下内容: # server-id :表示单个结点的id,单个节点可以随意写,多个节点不能重复 server_id = 1 # log-bin指定binlog日志文件的名字为mysql-bin,以及其存储路径 log_bin = mysql-bin binlog_format = row ##只记录哪个db的信息,根据需要进行调整,为空则表示该ip下所有表 binlog_do_db=test # 重启mysql服务 systemctl restart mysqld.service # 或 service mysqld restart
如下图表面binlog开启,windows操作系统开启同理。
为了验证方便,创建了一个组,即mysql_hive_realtime
CaptureChangeMySQL的配置中需要DistributedMapCacheClientService、DistributedMapCacheServer处理器。
MySQL Hosts = 192.168.10.44:3306
MySQL Driver Class Name = com.mysql.jdbc.Driver
MySQL Driver Location(s) = /usr/local/bigdata/testdata/mysql-connector-java-5.1.44.jar
Username = root
Password = 888888
Include Begin/Commit Events = true
Include DDL Events = true
RouteOnAttribute多线程消费,根据自己的服务器硬件配置,以及数据的更新速率,进行评估后填写。
NiFi表达式
NiFi表达式官网:https://nifi.apache.org/docs/nifi-docs/html/expression-language-guide.html
NiFi表达式的equals函数
equals
{ "type": "insert", "timestamp": 1582484253000, "binlog_filename": "mysql-bin.000005", "binlog_position": 375, "database": "nifi_test", "table_name": "nifi_hive_streaming", "table_id": 108, "columns": [ { "id": 1, "name": "id", "column_type": 4, "value": 7 }, { "id": 2, "name": "name", "column_type": 12, "value": "testName5" }, { "id": 3, "name": "day_time", "column_type": 91, "value": "2020-02-24" } ] }
EvaluateJsonPath等处理器在提取数据时,可以使用JsonPath表达式,来灵活的获取信息。
JsonPath表达式
类似于XPath在xml文档中的定位,JsonPath表达式通常是用来路径检索或设置Json的。
JsonPath中的“根成员对象”始终称为$,无论是对象还是数组。
其表达式可以接受“dot–notation”和“bracket–notation”格式,例如
$.store.book[0].title
$[‘store’][‘book’][0][‘title’]
# 两种方式等价
可以在JsonPath表达式执行后进行调用,其输入值为表达式的结果
过滤器是用于过滤数组的逻辑表达式。
一个通常的表达式形如: [?(@.age > 18)]
可以通过逻辑表达式&&或||组合多个过滤器表达式
例如
[?(@.price < 10 > && @.category == ‘fiction’)]
字符串必须用单引号或双引号包围
[?(@.color == ‘blue’)] > or [?(@.color == “blue”)]
示例
{ "store": { "book": [ { "category": "reference", "author": "Nigel Rees", "title": "Sayings of the Century", "price": 8.95 }, { "category": "fiction", "author": "Evelyn Waugh", "title": "Sword of Honour", "price": 12.99 }, { "category": "fiction", "author": "Herman Melville", "title": "Moby Dick", "isbn": "0-553-21311-3", "price": 8.99 }, { "category": "fiction", "author": "J. R. R. Tolkien", "title": "The Lord of the Rings", "isbn": "0-395-19395-8", "price": 22.99 } ], "bicycle": { "color": "red", "price": 19.95 } }, "expensive": 10 }
运行并查看输出
配置ReplaceText
Replacement Value = insert into myhive.nifi_hive (id,name,day_time) values (${id},'${name}','${day_time}')
启动查看结果
略
Database Connection URL = jdbc:hive2://192.168.10.44:10000
Hive Configuration Resources = /export/download/config/core-site.xml,/export/download/config/hdfs-site.xml,/export/download/config/hive-site.xml
hive-site.xml文件必须开启事务表配置
配置完成后,记得启用HiveConnectionPool。
验证Hive表中是否成功写入数据正常即可。
特别需要注意的是hive的配置,创建事务表的语句。
实际用途不广,速度太慢。
以上完成了mysql实时的通过binlog日志和nifi读取其数据写入hive的过程。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。