赞
踩
Flink支持非常完善的数据类型,数据类型的描述信息都是由TypeInformation定义,比较常用的TypeInformation有BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo等。
1、BasicTypeInfo
//创建Int类型的数据集
val env = StreamExecutionEnvironment.getExecutionEnvironment
val IntStream:DataStream[Int] = env.fromElements(4,51,2,7)
//创建String类型的数据集
val StringStream:DataStream[String] = env.fromElements("hello","flink")
2、BasicArrayTypeInfo
//通过从数组中创建数据集
val ArrayStream:DataStream[Int] = env.fromCollection(Array(8,5,2,31))
//通过List集合创建数据集
val ListStream:DataStream[Int] = env.fromCollection(List(6,23,63,9))
//通过new Tuple2创建元组数据集
val TupleStream:DataStream[Tuple2[String,Int]] = env.fromElements(new Tuple2("唐太宗",1),new Tuple2("汉武帝",2))
Flink通过实现CaseClassTypeInfo支持任意的Scala Case Class,包括Scala tuples类型,支持通过字段名称和位置索引获取指标,不支持存储空值
//scala样例类
case class bing(id:Int,name:String)
object TableTeat {
def main(args: Array[String]): Unit = {
//val senv = EnvironmentSettings.newInstance().inStreamingMode().build()
val senv = StreamExecutionEnvironment.getExecutionEnvironment
//val tenv = TableEnvironment.create(senv)
val input = senv.fromElements(bing(1,"成吉思汗"),bing(2,"松赞干布"))
input.print()
senv.execute()
}
}
POJOs类可以完成复杂数据结构的定义,Flink通过实现PojoTypeInfo来描述任意的POJOs,包括Java类和Scala类
public class Person{
//字段具有public修饰符
public String name;
public int age;
//具有默认空构造器
public Person(){
}
public Person(String name,int age){
this.name = name;
this.age = age;
}
}
class Person(var name:String,var age:Int){
def this(){
this(null,-1)
}
}
定义好后,就可以在Flink环境中使用
val personStream = env.fromElements(new Person("刘病己",14),new Person("刘秀",25))
personStream.keyBy("name")
val mapStream = env.fromElements(Map("name" -> "朱元璋","age" -> "18"),Map("name"-> "朱棣","age" -> "24"))
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。