赞
踩
数据源是Flink DataSet API希望从其中获取数据的地方。它可以以文件或Java集合的形式出现。DataSet API支持许多内置的数据源函数。它还支持编写自定义数据源函数,因此不支持的任何东西都可以轻松编程。
首先,让我们理解其内置的数据源函数。
Flink支持从文件中读取数据。它逐行读取数据并将其作为字符串返回。以下是我们可以用来读取数据的内置函数:
Java代码:
- package com.xueai8.ch04;
-
- import org.apache.flink.api.common.functions.MapFunction;
- import org.apache.flink.api.java.DataSet;
- import org.apache.flink.api.java.ExecutionEnvironment;
-
-
- /**
- * Created by www.xueai8.com
- * 文件数据源
- */
- public class FileSourceDemo01 {
- public static void main(String[] args) throws Exception {
- // 设置批处理执行环境
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- // 首先从环境中获取一些数据,比如:
- String textPath = "src/wc.txt";
- DataSet<String> text = env.readTextFile(textPath).map(String::toLowerCase);
- text.print();
- }
- }
执行程序,输出结果如下所示:
- good good study
- day day up
Scala代码:
- package com.xueai8.ch04
-
- import org.apache.flink.api.scala._
-
- /**
- * Created by www.xueai8.com
- * 文件数据源
- */
- object FileSourceDemo01 {
- def main(args: Array[String]): Unit = {
- // 设置批处理执行环境
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- // 得到输入数据
- val textPath = "src/wc.txt"
- val text = env.readTextFile(textPath)
-
- // 对数据进行转换
- // text.map( _.toLowerCase).print()
- text.flatMap { _.toLowerCase.split("W+") }
- .map { (_, 1) }
- .groupBy(0)
- .sum(1)
- .print()
- }
- }
执行程序,输出结果如下所示:
- (up,1)
- (day,2)
- (good,2)
- (study,1)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。