当前位置:   article > 正文

nifi将mysql数据导到hdfs_用Nifi 从web api 取数据到HDFS

nifi session.write

import org.apache.commons.io.IOUtils

import java.nio.charset.*

import java.text.SimpleDateFormat;

import java.lang.StringBuilder;

import java.util.Calendar;

def flowFile = session.create()

def days = 10000

flowFile = session.write(flowFile, {inputStream, outputStream ->

SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");

Calendar cal = Calendar.getInstance();

StringBuilder sb = new StringBuilder();

cal.add(Calendar.DATE,1)

for(int i = 0; i < days; i++) {

cal.add(Calendar.DATE, -1);

sb.append(sdf.format(cal.getTime()) + "\n" );

}

//println(sb);

outputStream.write(sb.toString().getBytes(StandardCharsets.UTF_8))

} as StreamCallback)

//flowFile = session.putAttribute(flowFile, 'filename', 'get_date')

session.transfer(flowFile, REL_SUCCESS)

3. 用SplitText生成每行一个的日期

Line Split Count    1

4. 用ExtractText 取到日期参数

fb9d9343185b9c36c04efc5af5602912.png

5. 用UpdateAttribute生成url及filename

a14895af26f9e64b2e3903d816b28bfe.png

这里一定要设置filename,不然,所有的文件名都一样,最后只能成功插入一个记录到HDFS

6.  用InvokeHttp获取数据

aedd46ffb8fc37525cf5dd4c9e7cb0f2.png

aaf6b0ede560661a7a173c5dc864f6e4.png

7. 添加一个 RouteOnContent来过滤空数据

5e324e8d84325442a88a95e8cd572f00.png

8. 用PutHDFS把数据插入到HDFS

8fb317d7056ae2564eecc110ba949ab5.png

注意这里的Directory 要加上/, 不然就插入到user/root/nifi下了,而不是files下在的nifi了。

9. 每天更新数据

5ea640a2d7f4bdcb585becf0f10e2b6e.png

每天20点更新数据

代码小改下:

def count = 1

NIFI 中国社区 QQ群:595034369

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/519546
推荐阅读
相关标签
  

闽ICP备14008679号