赞
踩
package devBase import org.apache.flink.api.scala.{ExecutionEnvironment,createTypeInformation} case class Student(name:String, age:Int) object DataTypeTest { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val case_input= env.fromElements(Student("LiMing",16), Student("Zhangsan",18)) val tuple_input = env.fromElements(("LiMing",16),("Zhangsan",18)) case_input.print() tuple_input.print() } }
执行结果:
Student(LiMing,16)
Student(Zhangsan,18)
(LiMing,16)
(Zhangsan,18)
Flink处理POJOs类比普通的类更高效和易用,满足以下条件的类即是POJOs类:
package devBase import org.apache.flink.api.scala.{ExecutionEnvironment,createTypeInformation} class Student(name:String, age:Int) { def this() { this("default_name", 0) } override def toString: String = { s"name:${name}, age:${age}" } } object DataTypeTest { def main(args: Array[String]): Unit = { val env = ExecutionEnvironment.getExecutionEnvironment val class_iput= env.fromElements(new Student("LiMing",16), new Student("Zhangsan",18)) class_iput.print() } }
执行结果:
name:LiMing, age:16
name:Zhangsan, age:18
Flink支持所有Scala的原生数据类型,比如Int、String、Double; 用BasicTypeInfo进行表示
todo
todo
todo
scala的所有数据类型在Flink中都有对应的TypeInformation类,TypeInformation类对Scala的数据类型进行描述并生成序列化器
package devBase import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment object DataTypeTest { def main(args: Array[String]): Unit = { val senv = StreamExecutionEnvironment.getExecutionEnvironment // 或者在rich函数中通过getRuntimeContext.getExecutionConfig val config = senv.getConfig val stringInfo: TypeInformation[String] = createTypeInformation[String] val stringSerializer:TypeSerializer[String] = stringInfo.createSerializer(config) val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)] val tupleSerializer:TypeSerializer[(String, Double)] = tupleInfo.createSerializer(config) } }
对于泛型参数,Flink并不知道具体的数据类型,可以参考下面:
package devBase import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.scala.DataStream object DataTypeTest { def selectFirst[T : TypeInformation](input: DataStream[(T, _)]) : DataStream[T] = { input.map(_._1) } def main(args: Array[String]): Unit = { } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。