赞
踩
Flink批处理DataSet的处理流程Source、Transform和Sink算子。
【参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#data-sources】
1.基于文件创建
(1)readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.
(2)readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.
(3)readCsvFile(path) / CsvInputFormat - Parses files of comma (or another char) delimited fields. Returns a DataSet of tuples, case class objects, or POJOs. Supports the basic java types and their Value counterparts as field types.
(4)readFileOfPrimitives(path, delimiter) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer using the given delimiter.
(5)readSequenceFile(Key, Value, path) / SequenceFileInputFormat - Creates a JobConf and reads file from the specified path with type SequenceFileInputFormat, Key class and Value class and returns them as Tuple2<Key, Value>.
2.基于集合创建
(1)fromCollection(Seq) - Creates a data set from a Seq. All elements in the collection must be of the same type.
(2)fromCollection(Iterator) - Creates a data set from an Iterator. The class specifies the data type of the elements returned by the iterator.
(3)fromElements(elements: _*) - Creates a data set from the given sequence of objects. All objects must be of the same type.
(4)fromParallelCollection(SplittableIterator) - Creates a data set from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.
(5)generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel
3.基于压缩文件
默认都是可以直接读取
Compression method | File extensions | Parallelizable |
---|---|---|
DEFLATE | .deflate | no |
GZip | .gz , .gzip | no |
Bzip2 | .bz2 | no |
XZ | .xz | no |
4.实例代码
- package com.bd.flink._1203DataSet
-
- import org.apache.flink.api.scala.ExecutionEnvironment
- import org.apache.flink.configuration.Configuration
-
- /**
- * Created by Administrator on 2019/12/3.
- */
- object DataSetDataSourceApp {
-
- def main(args: Array[String]): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- //从集合创建
- // fromCollection(env)
-
- //从文件创建
- // fromTextFile(env)
-
- //从文件夹创建
- // fromTextFileFolder(env)
-
- //从csv文件创建
- // fromCsvFile(env)
-
- //读取递归文件
- // fromRecursiveFiles(env)
-
- //读取压缩文件
- fromCompressFile(env)
- }
-
-
- /**
- * 1.从集合创建
- * @param env
- */
- def fromCollection(env:ExecutionEnvironment): Unit ={
- import org.apache.flink.api.scala._
- val data=1 to 10
- env.fromCollection(data).print()
-
- }
-
- /**
- * 2.从文件读取数据
- * @param env
- */
- def fromTextFile(env:ExecutionEnvironment): Unit ={
- import org.apache.flink.api.scala._
- val filepath="data//hello.txt"
- env.readTextFile(filepath).print()
- }
-
- /**
- * 3.从文件夹读取数据
- * @param env
- */
- def fromTextFileFolder(env:ExecutionEnvironment): Unit ={
- import org.apache.flink.api.scala._
- val filepath="data//"
- env.readTextFile(filepath).print()
- }
-
- /**
- * 4.从csv文件读取
- * @param env
- */
- def fromCsvFile(env:ExecutionEnvironment): Unit ={
- import org.apache.flink.api.scala._
- val filepath="data//people.csv"
- //依据元组方式解析csv:全列
- env.readCsvFile[(String,Int,String)](filepath,ignoreFirstLine=true).print()
- //依据元组方式解析csv:只要第1和3列:includedFields参数指定第几列
- env.readCsvFile[(String,String)](filepath,ignoreFirstLine=true,includedFields=Array(0,2)).print()
-
- //依据case class方式实现
- case class MyClass(name:String,age:Int)
- env.readCsvFile[MyClass](filepath,ignoreFirstLine=true,includedFields=Array(0,1)).print()
-
- //依据java pojo方式实现
- //结果:
- //People{name='Bob', age=32, work='Developer'}
- //People{name='Jorge', age=30, work='Developer'}
- env.readCsvFile[People](filepath,ignoreFirstLine=true,pojoFields=Array("name","age","work")).print()
-
- }
-
- /**
- * 5.读取递归文件夹的内容
- * @param env
- */
- //参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#data-sources
- def fromRecursiveFiles(env:ExecutionEnvironment): Unit ={
-
- val filepath="data//"
- // env.readTextFile(filepath).print()
- // println("----------------分割线------------------")
- val parameters=new Configuration
- parameters.setBoolean("recursive.file.enumeration",true)
- env.readTextFile(filepath).withParameters(parameters).print()
- }
-
- /**
- * 6.从压缩文件读取
- * @param env
- */
- //.deflate,.gz, .gzip,.bz2,.xz
- def fromCompressFile(env:ExecutionEnvironment): Unit ={
- val filePath="data//compress"
- env.readTextFile(filePath).print() //readTextFile可以实现直接读取,
- }
-
- }
【参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#dataset-transformations】
1.基本算子
Map, FlatMap, MapPartition, Filter, Reduce等
2.实例代码
- package com.bd.flink._1203DataSet
-
- import org.apache.flink.api.common.operators.Order
- import org.apache.flink.api.scala.ExecutionEnvironment
-
- import scala.collection.mutable.ListBuffer
- //隐式转换
- import org.apache.flink.api.scala._
- /**
- * Created by Administrator on 2019/12/3.
- */
- object DataSetTransformation {
-
- def main(args: Array[String]): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- // mapFunction(env)
-
- // filterFunction(env)
- // firstFunction(env)
- // flatMapFunction(env)
- // joinFunction(env)
- // outerjoinFunction(env)
- crossFunction(env)
- }
- //1.map
- def mapFunction(env:ExecutionEnvironment): Unit ={
- val data=env.fromCollection(List(1,2,3,4,5,6,7,8,10))
-
- //实现方式1
- // data.map(x=>x*2).print()
- print("===============分割线=============")
- //实现方式2
- data.map(_*2).print()
- }
-
- //2.filter
- def filterFunction(env:ExecutionEnvironment): Unit ={
- val data=env.fromCollection(List(1,2,3,4,5,6,7,8,10))
-
- //实现方式1
- // data.map(x=>x*2).print()
- print("===============分割线=============")
- //实现方式2
- data.filter(_>2).print()
- }
-
- //3.mappartiton
- //DataSource 100个元素,结果插入数据库中
- def mapPartitonFunction(env:ExecutionEnvironment): Unit ={
-
- val students=new ListBuffer[String]
- for(i<-1 to 100){
- students.append("student: "+ i)
- }
-
- val data=env.fromCollection(students)
-
- //mapPartition:每个分区创建一个数据库连接,而map则是每一条数据则创建一个数据库连接
- data.mapPartition( x =>{
- //获取链接
- val connect="这个是数据库链接" //connect=DBUtils.getConnection
- //每一个元素都要存储到数据库
- //-》执行x的操作
- val result= connect+x
- x
- }).print()
-
- }
-
-
- //4.first-n
- def firstFunction(env:ExecutionEnvironment): Unit ={
-
-
- val info = ListBuffer[(Int ,String)]()
- info.append((1,"hadoop"))
- info.append((1,"yarn"))
- info.append((3,"mapreduce"))
- info.append((3,"hbase"))
- info.append((5,"spark"))
- info.append((5,"storm"))
- info.append((5,"solr"))
- info.append((5,"zookeeper"))
- val data=env.fromCollection(info)
- //打印出前三个
- data.first(3).print()
- //分组以后,每个组内取前2个
- data.groupBy(0).first(2).print()
- //分组以后,在组内排序
- //排序按照名字升序/降序排序
- data.groupBy(0).sortGroup(1,Order.ASCENDING).first(2).print()
-
- }
-
- def flatMapFunction(env:ExecutionEnvironment): Unit ={
- val info=ListBuffer[String]()
- info.append("hadoop,spark")
- info.append("flink,spark")
- info.append("flume,spark")
- val data=env.fromCollection(info)
-
- // data.print()
- // data.map(x=>x.split(",")).print()
- //flatmap效果
- // data.flatMap(_.split(",")).print()
- //WordCount实现
- data.flatMap(_.split(",")).map((_,1)).groupBy(0).sum(1).print()
- }
-
-
- //distinct
- def distinctFunction(env:ExecutionEnvironment){
- val info=ListBuffer[String]()
- info.append("hadoop,spark")
- info.append("flink,spark")
- info.append("flume,spark")
- val data=env.fromCollection(info)
-
- data.flatMap(_.split(",")).distinct().print()
- }
-
- //join
- def joinFunction(env:ExecutionEnvironment): Unit ={
- val info1 = ListBuffer[(Int ,String)]()
- info1.append((1,"hadoop"))
- info1.append((1,"yarn"))
- info1.append((3,"mapreduce"))
- info1.append((3,"hbase"))
-
- val info2=ListBuffer[(Int ,String)]()
- info2.append((1,"Shanghai"))
- info2.append((2,"Beijing"))
- info2.append((4,"Shenzhen"))
- info2.append((5,"WuHan"))
-
- //实例与解释
- // In this case tuple fields are used as keys. "0" is the join field on the first tuple
- // "1" is the join field on the second tuple.
- // val result = input1.join(input2).where(0).equalTo(1)
-
- val data1=env.fromCollection(info1)
- val data2=env.fromCollection(info2)
- //第一部分:输出为
- //((1,hadoop),(1,Shanghai))
- //((1,yarn),(1,Shanghai))
- data1.join(data2).where(0).equalTo(0).print()
-
- //第二部分:输出为元组
- //(1,hadoop,Shanghai)
- //(1,yarn,Shanghai)
- data1.join(data2).where(0).equalTo(0).apply((first,second)=>{
- (first._1,first._2,second._2)
- }).print()
-
-
- }
-
- //outjoin
- def outerjoinFunction(env:ExecutionEnvironment): Unit ={
- val info1 = ListBuffer[(Int ,String)]()
- info1.append((1,"hadoop"))
- info1.append((1,"yarn"))
- info1.append((3,"mapreduce"))
- info1.append((3,"hbase"))
-
- val info2=ListBuffer[(Int ,String)]()
- info2.append((1,"Shanghai"))
- info2.append((2,"Beijing"))
- info2.append((4,"Shenzhen"))
- info2.append((5,"WuHan"))
-
- //实例与解释
- // In this case tuple fields are used as keys. "0" is the join field on the first tuple
- // "1" is the join field on the second tuple.
- // val result = input1.join(input2).where(0).equalTo(1)
-
- val data1=env.fromCollection(info1)
- val data2=env.fromCollection(info2)
- //第一部分:输出为
- //((1,hadoop),(1,Shanghai))
- //((1,yarn),(1,Shanghai))
- // data1.join(data2).where(0).equalTo(0).print()
-
- //第二部分:(1)左外连接输出为元组
- data1.leftOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{
- if(second==null){ //这个注意,空值的情况
- (first._1,first._2,"-")
- }else{
- (first._1,first._2,second._2)
- }
- }).print()
-
- //第二部分:(2)右外连接输出为元组
- data1.rightOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{
- if(first==null){ //这个注意,空值的情况
- (second._1,"-",second._2)
- }else{
- (first._1,first._2,second._2)
- }
- }).print()
-
-
- //第二部分:(3)全外连接输出为元组
- data1.fullOuterJoin(data2).where(0).equalTo(0).apply((first,second)=>{
- if((second==null)){
- (first._1,first._2,"-")
- }else if(first==null){ //这个注意,空值的情况
- (second._1,"-",second._2)
- }else{
- (first._1,first._2,second._2)
- }
- }).print()
-
- }
-
- //笛卡尔积
- //结果:2*3=6个
- // (皇马,1)
- // (皇马,4)
- // (皇马,2)
- // (巴萨,1)
- // (巴萨,4)
- // (巴萨,2)
- def crossFunction(env:ExecutionEnvironment): Unit ={
- val info1=List("皇马","巴萨")
- val info2=List(1,4,2)
-
- val data1=env.fromCollection(info1)
- val data2=env.fromCollection(info2)
-
- data1.cross(data2).print()
- }
-
- }
【参考:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/batch/#data-sinks】
1.基本算子
writeAsText,writeAsCsv,print,write,output
2.实例代码
- package com.bd.flink._1203DataSet
-
- import org.apache.flink.api.scala.ExecutionEnvironment
- import org.apache.flink.core.fs.FileSystem
- import org.apache.flink.core.fs.FileSystem.WriteMode
-
- /**
- * Created by Administrator on 2019/12/4.
- */
- object DataSetSink {
-
- def main(args: Array[String]): Unit = {
-
- val env = ExecutionEnvironment.getExecutionEnvironment
- val data=1.to(10)
-
- import org.apache.flink.api.scala._
- val text=env.fromCollection(data)
- val filepath="data//output"
-
- //writeAsText
- //参数:FileSystem.WriteMode.OVERWRITE,指覆盖之前文件
- //默认写到文件
- text.writeAsText(filepath,FileSystem.WriteMode.OVERWRITE)
-
- //设置并行度,则写到data//output目录下的两个文件
- text.writeAsText(filepath,FileSystem.WriteMode.OVERWRITE).setParallelism(2)
-
-
- env.execute("DataSet Sink")
- }
- }
1.实现功能
不同分区中,统一计数,而不是各个分区记录自己的。
2.实现代码
- package com.bd.flink._1203DataSet
-
- import org.apache.flink.api.common.accumulators.LongCounter
- import org.apache.flink.api.common.functions.RichMapFunction
- import org.apache.flink.api.scala.ExecutionEnvironment
- import org.apache.flink.configuration.Configuration
- import org.apache.flink.core.fs.FileSystem
-
-
- /**
- * Created by Administrator on 2019/12/4.
- * 计数器
- */
- object DataSetCounter {
-
- def main(args: Array[String]): Unit = {
- val env = ExecutionEnvironment.getExecutionEnvironment
- import org.apache.flink.api.scala._
- val data=env.fromElements("hadoop","hdfs","yarn","spark","flume","flink")
-
- //不符合预期的计数器
- //因为如果并行度不为1,则计算的是每个分区中的个数
- // data.map(new RichMapFunction[String,Long] (){
- // var counter=0l
- // override def map(in: String): Long = {
- // counter=counter+1
- // println("counter :"+counter)
- // counter
- // }
- // }).setParallelism(3).print()
-
-
- //改进:使用计数器
- val result=data.map(new RichMapFunction[String,String] (){
-
- //第1步:定义计数器
- val counterAcc=new LongCounter()
-
- override def open(parameters: Configuration): Unit = {
- //第2步:注册计数器
- getRuntimeContext.addAccumulator("ele-counts-scala",counterAcc)
- }
-
- override def map(in: String): String = {
- counterAcc.add(1)
- // println("counter :"+counterAcc)
- in
- }
- }) //.setParallelism(3)
- // result.print()
- val filepath="data//output-scala-count"
- //默认写到文件
- result.writeAsText(filepath,FileSystem.WriteMode.OVERWRITE).setParallelism(3) //设置并行度也无所谓
-
- //执行获得结果result
- val jobResult= env.execute("CounterAcc")
- //第3步:获取计数器
- val num=jobResult.getAccumulatorResult[Long]("ele-counts-scala")
-
- println("num:"+num)
-
- }
-
- }
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。