当前位置:   article > 正文

Flink读取HDFS上的Parquet文件生成DataSet_avro schema must be a record.

avro schema must be a record.

首先打开Flink的官方网站,查看一下DataSet已支持的数据源:
1、File-Based:

  1. readTextFile(path) / TextInputFormat - Reads files line wise and returns them as Strings.
  2. readTextFileWithValue(path) / TextValueInputFormat - Reads files line wise and returns them as StringValues. StringValues are mutable strings.
  3. readCsvFile(path) / CsvInputFormat - Parses files of comma (or another char) delimited fields. Returns a DataSet of tuples or POJOs. Supports the basic java types and their Value counterparts as field types.
  4. readFileOfPrimitives(path, Class) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer.
  5. readFileOfPrimitives(path, delimiter, Class) / PrimitiveInputFormat - Parses files of new-line (or another char sequence) delimited primitive data types such as String or Integer using the given delimiter.

2、collection-Based:

  1. fromCollection(Collection) - Creates a data set from a Java.util.Collection. All elements in the collection must be of the same type.
  2. fromCollection(Iterator, Class) - Creates a data set from an iterator. The class specifies the data type of the elements returned by the iterator.
  3. fromElements(T ...) - Creates a data set from the given sequence of objects. All objects must be of the same type.
  4. fromParallelCollection(SplittableIterator, Class) - Creates a data set from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator.
  5. generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.

3、Generic:

  1. readFile(inputFormat, path) / FileInputFormat - Accepts a file input format.
  2. createInput(inputFormat) / InputFormat - Accepts a generic input format.

 显然,Parquet的读写是第3种--泛型模式,其关键是构建好InputFormat接口的Parquet实现类。Github上clone下flink的源码,IDEA打开并找到InputFormat接口,查看其继承关系。

 

RichInputFormat是InputFormat接口的关键实现类,它是一个抽象类,其主要在接口上增加了Flink的运行上下文。

接着打开RichInputFormat抽象类的继承关系:

很明显,FileInputFormat里的ParquetInputFormat就是我们想要的。可以看到,FileInputFormat除了支持Parquet,还支持了Avro、Orc等格式。接着往下跟:

ParquetInputFormat支持Map、Pojo、Row类型,分别对应嵌套、简单实体类、列表,这里选常用的ParquetPojoInputFormat进去看一下:

  1. /**
  2. * An implementation of {@link ParquetInputFormat} to read POJO records from Parquet files.
  3. */
  4. public class ParquetPojoInputFormat<E> extends ParquetInputFormat<E> {
  5. private static final Logger LOG = LoggerFactory.getLogger(ParquetPojoInputFormat.class);
  6. private final Class<E> pojoTypeClass;
  7. private final TypeSerializer<E> typeSerializer;
  8. private transient Field[] pojoFields;
  9. public ParquetPojoInputFormat(Path filePath, MessageType messageType, PojoTypeInfo<E> pojoTypeInfo) {
  10. super(filePath, messageType);
  11. this.pojoTypeClass = pojoTypeInfo.getTypeClass();
  12. this.typeSerializer = pojoTypeInfo.createSerializer(new ExecutionConfig());
  13. final Map<String, Field> fieldMap = new HashMap<>();
  14. findAllFields(pojoTypeClass, fieldMap);
  15. selectFields(fieldMap.keySet().toArray(new String[0]));
  16. }
  17. ...
  18. }

构造函数需要传入3个参数:

  • Path filePath

文件路径,传入路径字符串pathString,然后new Path(pathString)即可。

  • MessageType messageType

Parquet文件的Schema,这个有点难搞,后面我们主要讲这个参数如何构建。

  • PojoTypeInfo<E> pojoTypeInfo

简单实体类的TypeInfo,直接PojoTypeInfo.of(E.class)即可。

要获取Parquet文件的Schema,主要有3种方法:

  • 通过Schema字符串构建
  1. /**
  2. * @description 根据schema字符串创建MessageType
  3. *
  4. * @param schema
  5. * @return org.apache.parquet.schema.MessageType
  6. */
  7. public static MessageType build(String schema){
  8. return MessageTypeParser.parseMessageType(schema);
  9. }

该种方法需要我们为每个实体类均提前构建好schema字符串,不方便,不用。

  • 根据已有parquet文件构建

讲解这种方式前,大家需要了解下Parquet的文件结构:

一个Parquet文件是由一个header以及一个或多个block块组成,以一个footer结尾。header中只包含一个4个字节的数字PAR1用来识别整个Parquet文件格式。文件中所有的metadata都存在于footer中。footer中的metadata包含了格式的版本信息,schema信息、key-value paris以及所有block中的metadata信息。footer中最后两个字段为一个以4个字节长度的footer的metadata,以及同header中包含的一样的PAR1。

读取一个Parquet文件时,需要完全读取Footer的meatadata,Parquet格式文件不需要读取sync markers这样的标记分割查找,因为所有block的边界都存储于footer的metadata中。

所以,只需要读取目标目录中的任何一个Parquet文件即可拿到footer信息,解析footer即可得到Schema。

  1. /**
  2. * @description 根据Parquet文件创建schema
  3. *
  4. * @param parquetFilePath
  5. * @return org.apache.parquet.schema.MessageType
  6. */
  7. public static MessageType buildFromFile(Path parquetFilePath) throws Exception {
  8. Configuration configuration = new Configuration();
  9. ParquetMetadata readFooter = null;
  10. readFooter = ParquetFileReader.readFooter(configuration,
  11. parquetFilePath, ParquetMetadataConverter.NO_FILTER);
  12. MessageType schema =readFooter.getFileMetaData().getSchema();
  13. return schema;
  14. }
  15. /**
  16. * @description 根据Parquet文件创建schema
  17. *
  18. * @param pathUrl
  19. * @return org.apache.parquet.schema.MessageType
  20. */
  21. public static MessageType buildFromFile(String pathUrl) throws Exception {
  22. Path parquetFilePath = new Path(pathUrl);
  23. return buildFromFile(parquetFilePath);
  24. }

该方法勉强满足我们的需求,但每次读取parquet目录,需要先取一个文件拿来"解剖"拿到Schema才能读取全部文件,听起来就不够优雅,不用。

  • 反射

基本思想是通过反射得到实体类各个字段的类型,然后对应成Parquet Schema里的类型。

下面为简单示例:

  1. /**
  2. * @description 根据类信息动态解析成MessageType
  3. *
  4. * @param clazz
  5. * @param messageName
  6. * @return org.apache.parquet.schema.MessageType
  7. */
  8. public static <T> MessageType build(Class<T> clazz, String messageName){
  9. Field[] fields = clazz.getDeclaredFields();
  10. Types.MessageTypeBuilder builder = Types.buildMessage();
  11. for(Field field: fields){
  12. field.setAccessible(true);
  13. String[] fieldTypes = field.getType().toString().toLowerCase().split("\\.");
  14. String fieldType = fieldTypes[fieldTypes.length-1];
  15. String fieldName = field.getName();
  16. if("string".equals(fieldType)){
  17. builder.required(PrimitiveType.PrimitiveTypeName.BINARY).as(OriginalType.UTF8).named(fieldName);
  18. }else if("int".equals(fieldType) || "integer".equals(fieldType) || "short".equals(fieldType)){
  19. builder.required(PrimitiveType.PrimitiveTypeName.INT32).named(fieldName);
  20. }else if("long".equals(fieldType)){
  21. builder.required(PrimitiveType.PrimitiveTypeName.INT64).named(fieldName);
  22. }else if("float".equals(fieldType)){
  23. builder.required(PrimitiveType.PrimitiveTypeName.FLOAT).named(fieldName);
  24. }else if("double".equals(fieldType)){
  25. builder.required(PrimitiveType.PrimitiveTypeName.DOUBLE).named(fieldName);
  26. }else if("boolean".equals(fieldType)){
  27. builder.required(PrimitiveType.PrimitiveTypeName.BOOLEAN).named(fieldName);
  28. }else {
  29. builder.required(PrimitiveType.PrimitiveTypeName.BINARY).named(fieldName);
  30. }
  31. }
  32. return builder.named(messageName);
  33. }
  34. /**
  35. * @description 根据类信息动态解析成MessageType
  36. *
  37. * @param clazz
  38. * @return org.apache.parquet.schema.MessageType
  39. */
  40. public static <T> MessageType build(Class<T> clazz){
  41. String className = clazz.getName();
  42. return build(clazz, className);
  43. }

上面的代码勉强可以应付大部分的实体类,但当实体类的字段为Timestamp、Date、List、Map等非基础类型时,则需要继续丰富上述代码,同时还需要考虑字符编码等各类问题才能使上述方法变得健壮。

我,当然可以手动撸一个出来,不怂!!!

但是,太麻烦,肯定有现成的轮子了,何不拿来直接用一下。

回想之前DataStream持久化Parquet文件的时候,曾经用过反射来生成Parquet的Schema,去瞅一眼:

  1. // ParquetAvroWriters.java
  2. public static <T> ParquetWriterFactory<T> forReflectRecord(Class<T> type) {
  3. // 基于类反射拿到Avro的Schema
  4. final String schemaString = ReflectData.get().getSchema(type).toString();
  5. final ParquetBuilder<T> builder = (out) -> createAvroParquetWriter(schemaString, ReflectData.get(), out);
  6. return new ParquetWriterFactory<>(builder);
  7. }

怎么又扯出来Avro了,其实就是用了下Avro的相关转换类来完成Parquet文件Schema的生成。

基本逻辑就是,Avro工具类基于实体类的Class通过反射生成Avro文件Schema,然后Avro工具类再将Avro文件Schema转化成Parquet文件的Schema。

跟踪一下Avro的相关代码,一通乱点,盲猜到上述Avro工具类为AvroSchemaConverter。

点进去看一下其主要方法:

哈哈哈哈哈哈,果然是你,ok,大功告成,调用AvroSchemaConverter工具类的convert方法即可将Avro的Schema转化为Parquet的Schema,即MessageType。

  1. public MessageType convert(Schema avroSchema) {
  2. if (!avroSchema.getType().equals(Type.RECORD)) {
  3. throw new IllegalArgumentException("Avro schema must be a record.");
  4. } else {
  5. return new MessageType(avroSchema.getFullName(), this.convertFields(avroSchema.getFields()));
  6. }
  7. }

封装成一个方法:

  1. public class MessageTypeBuilderByAvro{
  2. public static <T> MessageType build(Class<T> tClass){
  3. Schema avroSchema = ReflectData.get().getSchema(tClass);
  4. MessageType messageType = new AvroSchemaConverter().convert(avroSchema);
  5. return messageType;
  6. }
  7. }

自此,ParquetPojoInputFormat的3个参数均完美构建,下面就可以在Flink中愉快的读取HDFS上的Parquet文件并生成DataSet了,美滋滋!!!

  1. DataSource<E> PojoDataSource = env.createInput(new ParquetPojoInputFormat<E>(
  2. new Path(pathString),
  3. MessageTypeBuilderByAvro.build(E.class),
  4. (PojoTypeInfo<E>) PojoTypeInfo.of(E.class)
  5. ));


 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/2023面试高手/article/detail/413132
推荐阅读
相关标签
  

闽ICP备14008679号