当前位置:   article > 正文

通达信 day文件 解析_Flink内置数据源:文件数据源

通达信 day文件 java

1915160313884a7315ce03f15440ec0c.png

数据源是Flink DataSet API希望从其中获取数据的地方。它可以以文件或Java集合的形式出现。DataSet API支持许多内置的数据源函数。它还支持编写自定义数据源函数,因此不支持的任何东西都可以轻松编程。

首先,让我们理解其内置的数据源函数。

基于文件的数据源

Flink支持从文件中读取数据。它逐行读取数据并将其作为字符串返回。以下是我们可以用来读取数据的内置函数:

  • readTextFile(String path)/TextInputFormat:
    • 从路径指定的文件中传输数据。默认情况下,它将读取TextInputFormat并逐行读取字符串。
  • readTextFileWithValue(String path)/TextValueInputFormat :
    • 从路径指定的文件中传输数据。它返回可变字符串。
  • readCsvFile(String path)/CsvInputFormat :
    • 从逗号分隔的文件中读取数据。它返回Java POJO或tuples或case class对象。
  • readFileofPremitives(path,class)/readFileofPremitives(path,delimiter)/PrimitiveInputFormat :
    • 这将把新行解析为基本数据类型,如字符串或整数。
  • readFileofPremitives(path,delimiter,class)/PrimitiveInputFormat
  • readHadoopFile(FileInputFormat, Key, Value, path):
    • 它使用给定的FileInputFormat、Key类和Value类从指定路径读取文件。它将解析后的值作为元组Tuple2<Key,Value>返回。
  • readSequenceFile(Key, Value, path)/SequenceFileInputFormat:
    • 创建一个JobConf并使用给定的SequenceFileInputFormat、Key类和Value类从指定路径读取文件。它将解析后的值作为元组Tuple2<Key,Value>返回。

【示例】读取文件数据源中的数据

Java代码:

  1. package com.xueai8.ch04;
  2. import org.apache.flink.api.common.functions.MapFunction;
  3. import org.apache.flink.api.java.DataSet;
  4. import org.apache.flink.api.java.ExecutionEnvironment;
  5. /**
  6. * Created by www.xueai8.com
  7. * 文件数据源
  8. */
  9. public class FileSourceDemo01 {
  10. public static void main(String[] args) throws Exception {
  11. // 设置批处理执行环境
  12. final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  13. // 首先从环境中获取一些数据,比如:
  14. String textPath = "src/wc.txt";
  15. DataSet<String> text = env.readTextFile(textPath).map(String::toLowerCase);
  16. text.print();
  17. }
  18. }

执行程序,输出结果如下所示:

  1. good good study
  2. day day up

Scala代码:

  1. package com.xueai8.ch04
  2. import org.apache.flink.api.scala._
  3. /**
  4. * Created by www.xueai8.com
  5. * 文件数据源
  6. */
  7. object FileSourceDemo01 {
  8. def main(args: Array[String]): Unit = {
  9. // 设置批处理执行环境
  10. val env = ExecutionEnvironment.getExecutionEnvironment
  11. // 得到输入数据
  12. val textPath = "src/wc.txt"
  13. val text = env.readTextFile(textPath)
  14. // 对数据进行转换
  15. // text.map( _.toLowerCase).print()
  16. text.flatMap { _.toLowerCase.split("W+") }
  17. .map { (_, 1) }
  18. .groupBy(0)
  19. .sum(1)
  20. .print()
  21. }
  22. }

执行程序,输出结果如下所示:

  1. (up,1)
  2. (day,2)
  3. (good,2)
  4. (study,1)
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小蓝xlanll/article/detail/171095
推荐阅读
相关标签
  

闽ICP备14008679号