当前位置:   article > 正文

Flink之数据类型详解_flink case type

flink case type

一、数据类型支持

Flink支持非常完善的数据类型,数据类型的描述信息都是由TypeInformation定义,比较常用的TypeInformation有BasicTypeInfo、TupleTypeInfo、CaseClassTypeInfo、PojoTypeInfo类等。TypeInformation主要作用是为了在Flink系统内有效地对数据结构类型进行管理,能够在分布式计算过程中对数据的类型进行管理和推断。同时基于对数据的类型信息管理,Flink内部对数据存储也进行了相应的性能优化。

Flink能够支持任意的Java或Scala的数据类型,不用像Hadoop中的org.apache.hadoop.io.Writable而实现特定的序列化和反序列化接口,从而让用户能够更加容易使用已有的数据结构类型。另外使用TypeInformation管理数据类型信息,能够在数据处理之前将数据类型推断出来,而不是真正在触发计算后才识别出,这样能够及时有效地避免用户在使用Flink编写应用的过程中的数据类型问题。

1.1、 原生数据类型

Flink通过实现BasicTypeInfo数据类型,能够支持任意Java 原生基本类型(装箱)或 String 类型,例如Integer、String、Double等,如以下代码所示,通过从给定的元素集中创建DataStream数据集。

image.png

Flink实现另外一种TypeInfomation是BasicArrayTypeInfo,对应的是Java基本类型数组(装箱)或 String对象的数组,如下代码通过使用Array数组和List集合创建DataStream数据集。

image.png

1.2、Java Tuples类型

通过定义TupleTypeInfo来描述Tuple类型数据,Flink在Java接口中定义了元祖类(Tuple)供用户使用。Flink Tuples是固定长度固定类型的Java Tuple实现,不支持空值存储。目前支持任意的Flink Java Tuple类型字段数量上限为25如果字段数量超过上限,可以通过继承Tuple类的方式进行拓展。如下代码所示,创建Tuple数据类型数据集。

image.png

1.3、Scala Case Class类型

Flink通过实现CaseClassTypeInfo支持任意的Scala Case Class,包括Scala tuples类型,支持的字段数量上限为22,支持通过字段名称和位置索引获取指标,不支持存储空值。如下代码实例所示,定义WordCount Case Class数据类型,然后通过fromElements方法创建input数据集,调用keyBy()方法对数据集根据word字段重新分区。

image.png

通过使用Scala Tuple创建DataStream数据集,其他的使用方式和Case Class相似。需要注意的是,如果根据名称获取字段,可以使用Tuple中的默认字段名称。

image.png

注:不同于Scala,Java中的元组是从“0”开始计数。 

  1. Tuple2<String, Integer> tuple = new Tuple2("Flink", 1);
  2. System.out.println(tuple.getField(0).toString());
  3. System.out.println(tuple.getField(1).toString());
  4. System.out.println("-----------------------------");
  5. tuple.setField("Scala",0);
  6. // 类型擦除,可以设置非Integer类型的数据
  7. tuple.setField("java",1);
  8. // 推荐获取元组值的方式
  9. System.out.println(tuple.f0);
  10. System.out.println(tuple.f1);

1.4、POJOs类型

POJOs类可以完成复杂数据结构的定义,Flink通过实现PojoTypeInfo来描述任意的POJOs,包括Java 和Scala类。在Flink中使用POJOs类可以通过字段名称获取字段,例如dataStream.join(otherStream).where("name").equalTo("personName"),对于用户做数据处理则非常透明和简单,如代码清单3-2所示。如果在Flink中使用POJOs数据类型,需要遵循以下要求:

  • POJOs 类必须是Public修饰且必须独立定义,不能是内部类;
  • POJOs类中必须含有默认空构造器;
  • POJOs类中所有的Fields必须是Public或者具有Public修饰的getter和setter方法;
  • POJOs类中的字段类型必须是Flink支持的。

image.png

定义好POJOs Class后,就可以在Flink环境中使用了,如下代码所示,使用fromElements接口构建Person类的数据集。POJOs类仅支持字段名称指定字段,如代码中通过Person name来指定Keyby字段

image.png

Scala POJOs数据结构定义如下,使用方式与Java POJOs相同。

image.png

1.5、Flink Value类型

Value数据类型实现了org.apache.flink.types.Value,其中包括read()和write()两个方法完成序列化和反序列化操作,相对于通用的序列化工具会有着比较高效的性能。目前Flink提供了內建的Value类型有IntValue、DoubleValue以及StringValue等,用户可以结合原生数据类型和Value类型使用。

1.6、 特殊数据类型

在Flink中也支持一些比较特殊的数据数据类型,例如Scala中的List、Map、Either、Option、Try数据类型,以及Java中Either数据类型,还有Hadoop的Writable数据类型。如下代码所示,创建Map和List类型数据集。这种数据类型使用场景不是特别广泛,主要原因是数据中的操作相对不像POJOs类那样方便和透明,用户无法根据字段位置或者名称获取字段信息,同时要借助Types Hint帮助Flink推断数据类型信息,关于Tyeps Hmt介绍可以参考下一小节。

image.png

二、TypeInformation信息获取

通常情况下Flink都能正常进行数据类型推断,并选择合适的serializers以及comparators。但在某些情况下却无法直接做到,例如定义函数时如果使用到了泛型,JVM就会出现类型擦除的问题,使得Flink并不能很容易地获取到数据集中的数据类型信息。同时在Scala API和Java API中,Flink分别使用了不同的方式重构了数据类型信息。

2.1、 Scala API类型信息

Scala API通过使用Manifest和类标签,在编译器运行时获取类型信息,即使是在函数定义中使用了泛型,也不会像Java API出现类型擦除的问题,这使得Scala API具有非常精密的类型管理机制。同时在Flink中使用到Scala Macros框架,在编译代码的过程中推断函数输入参数和返回值的类型信息,同时在Flink中注册成TypeInformation以支持上层计算算子使用。

当使用Scala API开发Flink应用,如果使用到Flink已经通过TypeInformation定义的数据类型,TypeInformation类不会自动创建,而是使用隐式参数的方式引入,代码不会直接抛出编码异常,但是当启动Flink应用程序时就会报”could not find implicit value for evidence parameter of type TypeInformation”的错误。这时需要将TypeInformation类隐式参数引入到当前程序环境中,

代码实例如下:

import org.apache.flink.api.scala._

2.2、Java API类型信息

由于Java的泛型会出现类型擦除问题,Flink通过Java反射机制尽可能重构类型信息,例如使用函数签名以及子类的信息等。同时类型推断在当输出类型依赖于输入参数类型时相对比较容易做到,但是如果函数的输出类型不依赖于输入参数的类型信息,这个时候就需要借助于类型提示(Ctype Himts)来告诉系统函数中传入的参数类型信息和输出参数信息。如代码清单3-3通过在returns方法中传入TypeHint实例指定输出参数类型,帮助Flink系统对输出类型进行数据类型参数的推断和收集。

image.png

在使用Java API定义POJOs类型数据时,PojoTypeInformation为POJOs类中的所有字段创建序列化器,对于标准的类型,例如Integer、String、Long等类型是通过Flink自带的序列化器进行数据序列化,对于其他类型数据都是直接调用Kryo序列化工具来进行序列化。

通常情况下,如果Kryo序列化工具无法对POJOs类序列化时,可以使用Avro对POJOs类进行序列化,如下代码通过在ExecutionConfig中调用enableForceAvro()来开启Avro序列化

image.png

如果用户想使用Kryo序列化工具来序列化POJOs所有字段,则在ExecutionConfig中调用enableForceKryo()来开启Kryo序列化。

image.png

如果默认的Kryo序列化类不能序列化POJOs对象,通过调用ExecutionConfig的addDefault-KryoSerializer()方法向Kryo中添加自定义的序列化器。

image.png

2.3、自定义TypeInformation

除了使用已有的TypeInformation所定义的数据格式类型之外,用户也可以自定义实现TypeInformation,来满足的不同的数据类型定义需求。Flink提供了可插拔的Type Information Factory让用户将自定义的TypeInformation注册到Flink类型系统中。如下代码所示只需要通过实现org.apache.flink.api.common.typeinfo.TypeInfoFactory接口,返回相应的类型信息。

通过@TypeInfo注解创建数据类型,定义CustomTuple数据类型。

image.png

然后定义CustomTypeInfoFactory类继承于TypeInfoFactory,参数类型指定CustomTuple。最后重写createTypeInfo方法,创建的CustomTupleTypeInfo就是CustomTuple数据类型TypeInformation。

image.png

三、FAQ

3.1、flink使用map算子返回Tuple3时,如果不指定returns则会报错

  1. StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  2. StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
  3. Properties kafkaProp = new Properties();
  4. FlinkKafkaConsumer010<String> myConsumer = new FlinkKafkaConsumer010<String>("test", new SimpleStringSchema(), kafkaProp);
  5. DataStream<Tuple3<Integer, String, Integer>> dataStream = env
  6. .addSource(myConsumer)
  7. .map(record -> {
  8. JSONObject jsonObject = JSON.parseObject(record);
  9. return new Tuple3<>(jsonObject.getInteger("id"), jsonObject.getString("name"), jsonObject.getInteger("age"));
  10. });
  11. env.execute();

运行上述代码,错误信息如下:

  1. Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(TestFlinkTable.java:43)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
  2. at org.apache.flink.streaming.api.transformations.StreamTransformation.getOutputType(StreamTransformation.java:420)
  3. at org.apache.flink.streaming.api.datastream.DataStream.getType(DataStream.java:175)
  4. at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:217)
  5. at com.miaoke.sync.test.TestFlinkTable.main(TestFlinkTable.java:50)
  6. Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple3' are missing. In many cases lambda methods don't provide enough information for automatic type extraction when Java generics are involved. An easy workaround is to use an (anonymous) class instead that implements the 'org.apache.flink.api.common.functions.MapFunction' interface. Otherwise the type has to be specified explicitly using type information.
  7. at org.apache.flink.api.java.typeutils.TypeExtractionUtils.validateLambdaType(TypeExtractionUtils.java:350)
  8. at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:579)
  9. at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:175)
  10. at org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:585)
  11. at com.miaoke.sync.test.TestFlinkTable.main(TestFlinkTable.java:43)

根据错误提示,加上returns(),则正常通过

  1. DataStream<Tuple3<Integer, String, Integer>> dataStream = env
  2. .addSource(myConsumer)
  3. .map(record -> {
  4. JSONObject jsonObject = JSON.parseObject(record);
  5. return new Tuple3<>(jsonObject.getInteger("id"), jsonObject.getString("name"), jsonObject.getInteger("age"));
  6. }).returns(Types.TUPLE(Types.INT, Types.STRING, Types.INT));

常见的 returns 的使用:

  1. .returns(Types.TUPLE(Types.INT, TypeInformation.of(Alarm.class)))
  2. .returns(Types.TUPLE(Types.INT,Types.INT))
  3. .returns(Types.STRING)
  4. .returns(TypeInformation.of(String.class))
  5. .returns(new TypeHint<Tuple2<String, String>>(){})
  6. .returns(TypeInformation.of(new TypeHint<Tuple2<ConsumerRecord, String>>() {}))
  7. .returns(SomeType.class)

推荐文章:https://blog.csdn.net/QcloudCommunity/article/details/82623022

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/空白诗007/article/detail/767541
推荐阅读
相关标签
  

闽ICP备14008679号