当前位置:   article > 正文

Flink(3):DataSet的Source、Transform和Sink算子,以及计数器_flink 计数器的实现方式有哪些

flink 计数器的实现方式有哪些

一、概述

Flink批处理DataSet的处理流程Source、Transform和Sink算子。

二、source

【参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#data-sources】

1.基于文件创建

(1)readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.

(2)readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.

(3)readCsvFile(path) / CsvInputFormat - Parses files of comma (or another char) delimited fields. Returns a DataSet of tuples, case class objects, or POJOs. Supports the basic java types and their Value counterparts as field types.

(4)readFileOfPrimitives(path, delimiter) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer using the given delimiter.

(5)readSequenceFile(Key, Value, path) / SequenceFileInputFormat - Creates a JobConf and reads file from the specified path with type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.

2.基于集合创建

(1)fromCollection(Seq) - Creates a data set from a Seq. All elements in the collection must be of the same type.

(2)fromCollection(Iterator) - Creates a data set from an Iterator. The class specifies the data type of the elements returned by the iterator.

(3)fromElements(elements: _*) - Creates a data set from the given sequence of objects. All objects must be of the same type.

(4)fromParallelCollection(SplittableIterator) - Creates a data set from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.

(5)generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel

3.基于压缩文件

默认都是可以直接读取

Compression methodFile extensionsParallelizable
DEFLATE.deflateno
GZip.gz.gzipno
Bzip2.bz2no
XZ.xzno


4.实例代码

  1. package com.bd.flink._1203DataSet
  2. import org.apache.flink.api.scala.ExecutionEnvironment
  3. import org.apache.flink.configuration.Configuration
  4. /**
  5.   * Created by Administrator on 2019/12/3.
  6.   */
  7. object DataSetDataSourceApp {
  8.   def main(args: Array[String]): Unit = {
  9.     val env = ExecutionEnvironment.getExecutionEnvironment
  10.     //从集合创建
  11. //    fromCollection(env)
  12.     //从文件创建
  13. //    fromTextFile(env)
  14.     //从文件夹创建
  15. //    fromTextFileFolder(env)
  16.     //从csv文件创建
  17. //    fromCsvFile(env)
  18.     //读取递归文件
  19. //    fromRecursiveFiles(env)
  20.     //读取压缩文件
  21.     fromCompressFile(env)
  22.   }
  23.   /**
  24.     * 1.从集合创建
  25.     * @param env
  26.     */
  27.   def fromCollection(env:ExecutionEnvironment): Unit ={
  28.     import org.apache.flink.api.scala._
  29.     val data=1 to 10
  30.     env.fromCollection(data).print()
  31.   }
  32.   /**
  33.     * 2.从文件读取数据
  34.     * @param env
  35.     */
  36.   def fromTextFile(env:ExecutionEnvironment): Unit ={
  37.     import org.apache.flink.api.scala._
  38.     val filepath="data//hello.txt"
  39.     env.readTextFile(filepath).print()
  40.   }
  41.   /**
  42.     * 3.从文件夹读取数据
  43.     * @param env
  44.     */
  45.   def fromTextFileFolder(env:ExecutionEnvironment): Unit ={
  46.     import org.apache.flink.api.scala._
  47.     val filepath="data//"
  48.     env.readTextFile(filepath).print()
  49.   }
  50.   /**
  51.     * 4.从csv文件读取
  52.     * @param env
  53.     */
  54.   def fromCsvFile(env:ExecutionEnvironment): Unit ={
  55.     import org.apache.flink.api.scala._
  56.     val filepath="data//people.csv"
  57.     //依据元组方式解析csv:全列
  58.     env.readCsvFile[(String,Int,String)](filepath,ignoreFirstLine=true).print()
  59.     //依据元组方式解析csv:只要第1和3列:includedFields参数指定第几列
  60.     env.readCsvFile[(String,String)](filepath,ignoreFirstLine=true,includedFields=Array(0,2)).print()
  61.     //依据case class方式实现
  62.     case class MyClass(name:String,age:Int)
  63.     env.readCsvFile[MyClass](filepath,ignoreFirstLine=true,includedFields=Array(0,1)).print()
  64.     //依据java pojo方式实现
  65.     //结果:
  66.     //People{name='Bob', age=32, work='Developer'}
  67.     //People{name='Jorge', age=30, work='Developer'}
  68.     env.readCsvFile[People](filepath,ignoreFirstLine=true,pojoFields=Array("name","age","work")).print()
  69.   }
  70.   /**
  71.     * 5.读取递归文件夹的内容
  72.     * @param env
  73.     */
  74.   //参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#data-sources
  75.   def fromRecursiveFiles(env:ExecutionEnvironment): Unit ={
  76.     val filepath="data//"
  77. //    env.readTextFile(filepath).print()
  78. //    println("----------------分割线------------------")
  79.     val parameters=new Configuration
  80.     parameters.setBoolean("recursive.file.enumeration",true)
  81.     env.readTextFile(filepath).withParameters(parameters).print()
  82.   }
  83.   /**
  84.     * 6.从压缩文件读取
  85.     * @param env
  86.     */
  87.   //.deflate,.gz, .gzip,.bz2,.xz
  88.   def fromCompressFile(env:ExecutionEnvironment): Unit ={
  89.     val filePath="data//compress"
  90.     env.readTextFile(filePath).print()  //readTextFile可以实现直接读取,
  91.   }
  92. }

三、Transformation

【参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#dataset-transformations】

1.基本算子

Map, FlatMap, MapPartition, Filter, Reduce等
2.实例代码

  1. package com.bd.flink._1203DataSet
  2. import org.apache.flink.api.common.operators.Order
  3. import org.apache.flink.api.scala.ExecutionEnvironment
  4. import scala.collection.mutable.ListBuffer
  5. //隐式转换
  6. import org.apache.flink.api.scala._
  7. /**
  8.   * Created by Administrator on 2019/12/3.
  9.   */
  10. object DataSetTransformation {
  11.   def main(args: Array[String]): Unit = {
  12.     val env = ExecutionEnvironment.getExecutionEnvironment
  13. //    mapFunction(env)
  14. //    filterFunction(env)
  15. //    firstFunction(env)
  16. //    flatMapFunction(env)
  17. //    joinFunction(env)
  18. //    outerjoinFunction(env)
  19.     crossFunction(env)
  20.   }
  21.   //1.map
  22.   def mapFunction(env:ExecutionEnvironment): Unit ={
  23.     val data=env.fromCollection(List(1,2,3,4,5,6,7,8,10))
  24.     //实现方式1
  25. //    data.map(x=>x*2).print()
  26.     print("===============分割线=============")
  27.     //实现方式2
  28.     data.map(_*2).print()
  29.   }
  30.   //2.filter
  31.   def filterFunction(env:ExecutionEnvironment): Unit ={
  32.     val data=env.fromCollection(List(1,2,3,4,5,6,7,8,10))
  33.     //实现方式1
  34.     //    data.map(x=>x*2).print()
  35.     print("===============分割线=============")
  36.     //实现方式2
  37.     data.filter(_>2).print()
  38.   }
  39.   //3.mappartiton
  40.   //DataSource 100个元素,结果插入数据库中
  41.   def mapPartitonFunction(env:ExecutionEnvironment): Unit ={
  42.     val students=new ListBuffer[String]
  43.     for(i<-1 to 100){
  44.       students.append("student: "+ i)
  45.     }
  46.     val data=env.fromCollection(students)
  47.     //mapPartition:每个分区创建一个数据库连接,而map则是每一条数据则创建一个数据库连接
  48.     data.mapPartition( x =>{
  49.       //获取链接
  50.       val connect="这个是数据库链接" //connect=DBUtils.getConnection
  51.       //每一个元素都要存储到数据库
  52.       //-》执行x的操作
  53.       val result= connect+x
  54.       x
  55.     }).print()
  56.   }
  57.   //4.first-n
  58.   def firstFunction(env:ExecutionEnvironment): Unit ={
  59.     val info = ListBuffer[(Int ,String)]()
  60.     info.append((1,"hadoop"))
  61.     info.append((1,"yarn"))
  62.     info.append((3,"mapreduce"))
  63.     info.append((3,"hbase"))
  64.     info.append((5,"spark"))
  65.     info.append((5,"storm"))
  66.     info.append((5,"solr"))
  67.     info.append((5,"zookeeper"))
  68.     val data=env.fromCollection(info)
  69.     //打印出前三个
  70.     data.first(3).print()
  71.     //分组以后,每个组内取前2个
  72.     data.groupBy(0).first(2).print()
  73.     //分组以后,在组内排序
  74.     //排序按照名字升序/降序排序
  75.     data.groupBy(0).sortGroup(1,Order.ASCENDING).first(2).print()
  76.   }
  77.   def flatMapFunction(env:ExecutionEnvironment): Unit ={
  78.     val info=ListBuffer[String]()
  79.     info.append("hadoop,spark")
  80.     info.append("flink,spark")
  81.     info.append("flume,spark")
  82.     val data=env.fromCollection(info)
  83. //    data.print()
  84. //    data.map(x=>x.split(",")).print()
  85.     //flatmap效果
  86. //    data.flatMap(_.split(",")).print()
  87.     //WordCount实现
  88.     data.flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print()
  89.   }
  90.   //distinct
  91.   def distinctFunction(env:ExecutionEnvironment){
  92.     val info=ListBuffer[String]()
  93.     info.append("hadoop,spark")
  94.     info.append("flink,spark")
  95.     info.append("flume,spark")
  96.     val data=env.fromCollection(info)
  97.     data.flatMap(_.split(",")).distinct().print()
  98.   }
  99.   //join
  100.   def joinFunction(env:ExecutionEnvironment): Unit ={
  101.     val info1 = ListBuffer[(Int ,String)]()
  102.     info1.append((1,"hadoop"))
  103.     info1.append((1,"yarn"))
  104.     info1.append((3,"mapreduce"))
  105.     info1.append((3,"hbase"))
  106.     val info2=ListBuffer[(Int ,String)]()
  107.     info2.append((1,"Shanghai"))
  108.     info2.append((2,"Beijing"))
  109.     info2.append((4,"Shenzhen"))
  110.     info2.append((5,"WuHan"))
  111.     //实例与解释
  112.     // In this case tuple fields are used as keys. "0" is the join field on the first tuple
  113.     // "1" is the join field on the second tuple.
  114. //     val result = input1.join(input2).where(0).equalTo(1)
  115.     val data1=env.fromCollection(info1)
  116.     val data2=env.fromCollection(info2)
  117.     //第一部分:输出为
  118.     //((1,hadoop),(1,Shanghai))
  119.     //((1,yarn),(1,Shanghai))
  120.     data1.join(data2).where(0).equalTo(0).print()
  121.     //第二部分:输出为元组
  122.     //(1,hadoop,Shanghai)
  123.     //(1,yarn,Shanghai)
  124.     data1.join(data2).where(0).equalTo(0).apply((first,second)=>{
  125.       (first._1,first._2,second._2)
  126.     }).print()
  127.   }
  128.   //outjoin
  129.   def outerjoinFunction(env:ExecutionEnvironment): Unit ={
  130.     val info1 = ListBuffer[(Int ,String)]()
  131.     info1.append((1,"hadoop"))
  132.     info1.append((1,"yarn"))
  133.     info1.append((3,"mapreduce"))
  134.     info1.append((3,"hbase"))
  135.     val info2=ListBuffer[(Int ,String)]()
  136.     info2.append((1,"Shanghai"))
  137.     info2.append((2,"Beijing"))
  138.     info2.append((4,"Shenzhen"))
  139.     info2.append((5,"WuHan"))
  140.     //实例与解释
  141.     // In this case tuple fields are used as keys. "0" is the join field on the first tuple
  142.     // "1" is the join field on the second tuple.
  143.     //     val result = input1.join(input2).where(0).equalTo(1)
  144.     val data1=env.fromCollection(info1)
  145.     val data2=env.fromCollection(info2)
  146.     //第一部分:输出为
  147.     //((1,hadoop),(1,Shanghai))
  148.     //((1,yarn),(1,Shanghai))
  149. //    data1.join(data2).where(0).equalTo(0).print()
  150.     //第二部分:(1)左外连接输出为元组
  151.     data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{
  152.       if(second==null){ //这个注意,空值的情况
  153.         (first._1,first._2,"-")
  154.       }else{
  155.         (first._1,first._2,second._2)
  156.       }
  157.     }).print()
  158.     //第二部分:(2)右外连接输出为元组
  159.     data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{
  160.       if(first==null){ //这个注意,空值的情况
  161.         (second._1,"-",second._2)
  162.       }else{
  163.         (first._1,first._2,second._2)
  164.       }
  165.     }).print()
  166.     //第二部分:(3)全外连接输出为元组
  167.     data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{
  168.       if((second==null)){
  169.         (first._1,first._2,"-")
  170.       }else if(first==null){ //这个注意,空值的情况
  171.         (second._1,"-",second._2)
  172.       }else{
  173.         (first._1,first._2,second._2)
  174.       }
  175.     }).print()
  176.   }
  177.   //笛卡尔积
  178.   //结果:2*3=6个
  179. //  (皇马,1)
  180. //  (皇马,4)
  181. //  (皇马,2)
  182. //  (巴萨,1)
  183. //  (巴萨,4)
  184. //  (巴萨,2)
  185.   def crossFunction(env:ExecutionEnvironment): Unit ={
  186.     val info1=List("皇马","巴萨")
  187.     val info2=List(1,4,2)
  188.     val data1=env.fromCollection(info1)
  189.     val data2=env.fromCollection(info2)
  190.     data1.cross(data2).print()
  191.   }
  192. }


四、sink

【参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#data-sinks】

1.基本算子

writeAsText,writeAsCsv,print,write,output

2.实例代码

  1. package com.bd.flink._1203DataSet
  2. import org.apache.flink.api.scala.ExecutionEnvironment
  3. import org.apache.flink.core.fs.FileSystem
  4. import org.apache.flink.core.fs.FileSystem.WriteMode
  5. /**
  6.   * Created by Administrator on 2019/12/4.
  7.   */
  8. object DataSetSink {
  9.   def main(args: Array[String]): Unit = {
  10.     val env = ExecutionEnvironment.getExecutionEnvironment
  11.     val data=1.to(10)
  12.     import org.apache.flink.api.scala._
  13.     val text=env.fromCollection(data)
  14.     val filepath="data//output"
  15.     //writeAsText
  16.     //参数:FileSystem.WriteMode.OVERWRITE,指覆盖之前文件
  17.     //默认写到文件
  18.     text.writeAsText(filepath,FileSystem.WriteMode.OVERWRITE)
  19.     //设置并行度,则写到data//output目录下的两个文件
  20.     text.writeAsText(filepath,FileSystem.WriteMode.OVERWRITE).setParallelism(2)
  21.     env.execute("DataSet Sink")
  22.   }
  23. }

 

五、计数器

1.实现功能

不同分区中,统一计数,而不是各个分区记录自己的。

2.实现代码

  1. package com.bd.flink._1203DataSet
  2. import org.apache.flink.api.common.accumulators.LongCounter
  3. import org.apache.flink.api.common.functions.RichMapFunction
  4. import org.apache.flink.api.scala.ExecutionEnvironment
  5. import org.apache.flink.configuration.Configuration
  6. import org.apache.flink.core.fs.FileSystem
  7. /**
  8.   * Created by Administrator on 2019/12/4.
  9.   * 计数器
  10.   */
  11. object DataSetCounter {
  12.   def main(args: Array[String]): Unit = {
  13.     val env = ExecutionEnvironment.getExecutionEnvironment
  14.     import org.apache.flink.api.scala._
  15.     val data=env.fromElements("hadoop","hdfs","yarn","spark","flume","flink")
  16.     //不符合预期的计数器
  17.     //因为如果并行度不为1,则计算的是每个分区中的个数
  18. //    data.map(new RichMapFunction[String,Long] (){
  19. //      var counter=0l
  20. //      override def map(in: String): Long = {
  21. //        counter=counter+1
  22. //        println("counter :"+counter)
  23. //        counter
  24. //      }
  25. //    }).setParallelism(3).print()
  26.     //改进:使用计数器
  27.     val result=data.map(new RichMapFunction[String,String] (){
  28.       //第1步:定义计数器
  29.       val counterAcc=new LongCounter()
  30.       override def open(parameters: Configuration): Unit = {
  31.         //第2步:注册计数器
  32.         getRuntimeContext.addAccumulator("ele-counts-scala",counterAcc)
  33.       }
  34.       override def map(in: String): String = {
  35.         counterAcc.add(1)
  36. //        println("counter :"+counterAcc)
  37.         in
  38.       }
  39.     }) //.setParallelism(3)
  40. //        result.print()
  41.     val filepath="data//output-scala-count"
  42.     //默认写到文件
  43.     result.writeAsText(filepath,FileSystem.WriteMode.OVERWRITE).setParallelism(3) //设置并行度也无所谓
  44.     //执行获得结果result
  45.     val jobResult= env.execute("CounterAcc")
  46.     //第3步:获取计数器
  47.     val num=jobResult.getAccumulatorResult[Long]("ele-counts-scala")
  48.     println("num:"+num)
  49.   }
  50. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Monodyee/article/detail/696171
推荐阅读
相关标签
  

闽ICP备14008679号