当前位置:   article > 正文

Flink的数据类型和序列化(Scala版)_scala kryo序列化 unable to resolve type variable

scala kryo序列化 unable to resolve type variable

1. 数据类型

1.1 Tuple和case class

package devBase

import org.apache.flink.api.scala.{ExecutionEnvironment,createTypeInformation}

case class Student(name:String, age:Int)

object DataTypeTest {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val case_input= env.fromElements(Student("LiMing",16), Student("Zhangsan",18))
    val tuple_input = env.fromElements(("LiMing",16),("Zhangsan",18))

    case_input.print()
    tuple_input.print()
  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

执行结果:

Student(LiMing,16)
Student(Zhangsan,18)
(LiMing,16)
(Zhangsan,18)
  • 1
  • 2
  • 3
  • 4
  • Tuple用TupleTypeInfo进行表示,case class用CaseClassTypeInfo进行表示

1.2 POJOs类

Flink处理POJOs类比普通的类更高效和易用,满足以下条件的类即是POJOs类:

  1. 类是访问权限是public
  2. 类有一个无参的默认构造器
  3. 类的字段访问权限都是public,且字段类型被Flink注册的序列化所支持
  • POJOs类在Flink中用PojoTypeInfo所表示,并用PojoSerializer进行序列化(可以配置用Kryo进行序列化)
package devBase

import org.apache.flink.api.scala.{ExecutionEnvironment,createTypeInformation}

class Student(name:String, age:Int) {
  def this() {
    this("default_name", 0)
  }

  override def toString: String = {
    s"name:${name}, age:${age}"
  }
}

object DataTypeTest {

  def main(args: Array[String]): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment

    val class_iput= env.fromElements(new Student("LiMing",16),
      new Student("Zhangsan",18))

    class_iput.print()
  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

执行结果:

name:LiMing, age:16
name:Zhangsan, age:18
  • 1
  • 2

1.3 原生数据类型

Flink支持所有Scala的原生数据类型,比如Int、String、Double; 用BasicTypeInfo进行表示

1.4 普通class

  • Flink支持不是POJOs类型的普通class(除了字段不能被序列化的class,比如字段类型为file pointers、I/O streams、native resources)
  • 不能访问普通class的字段
  • 使用Kryo对普通class进行序列化

1.5 Values

todo

1.6 Hadoop Writables

todo

1.7 Special Types

todo

2. TypeInformation类

scala的所有数据类型在Flink中都有对应的TypeInformation类,TypeInformation类对Scala的数据类型进行描述并生成序列化器

2.1 创建TypeInformation和TypeSerializer

package devBase

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.api.scala.createTypeInformation
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment



object DataTypeTest {

  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
    // 或者在rich函数中通过getRuntimeContext.getExecutionConfig
    val config = senv.getConfig

    val stringInfo: TypeInformation[String] = createTypeInformation[String]
    val stringSerializer:TypeSerializer[String] = stringInfo.createSerializer(config)

    val tupleInfo: TypeInformation[(String, Double)] = createTypeInformation[(String, Double)]
    val tupleSerializer:TypeSerializer[(String, Double)] = tupleInfo.createSerializer(config)


  }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

2.2 泛型参数用TypeInformation表示

对于泛型参数,Flink并不知道具体的数据类型,可以参考下面:

package devBase

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.scala.DataStream

object DataTypeTest {

  def selectFirst[T : TypeInformation](input: DataStream[(T, _)]) : DataStream[T] = {
    input.map(_._1)
  }

  def main(args: Array[String]): Unit = {


  }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/繁依Fanyi0/article/detail/767583
推荐阅读
相关标签
  

闽ICP备14008679号