当前位置:   article > 正文

flink PojoCsvInputFormat 处理 csv 文件数据问题小结

csvinputformat

背景

使用flink DataStream api 来处理一批数据, 数据本身就是保存在csv文件里面, 需要将csv文件里面的每一行转换为相应的POJO实例, 查询flink提供的FileInputFormat相关的类和接口(PojoCsvInputFormat)可以完成从CSV到DataStream的转换.但是在使用这个类的时候遇到一些问题, 而且网上也没有相关的答案,本文把这些问题以及解决的办法记录下来,希望能给遇到相似问题的同学一些参考.

准备工作

首先是csv测试文件,这里简单的使用两个字段age和name

age,name
20,hello
30,world
40,world
  • 1
  • 2
  • 3
  • 4

POJO类型

class MyItem(val name:String, val age: Int)
  • 1

注意这个地方只是定义了一个简单的类用于演示, 其实这个类严格来讲不是POJO类型.

flink处理逻辑代码

val parent = this.getClass.getClassLoader.getResource("").getPath

    val filePath = parent + "myitems.csv"
    val path = new Path(filePath)
  
    val nameFiled = new PojoField(classOf[MyItem].getDeclaredField("name"), BasicTypeInfo.STRING_TYPE_INFO)
    val ageField = new PojoField(classOf[MyItem].getDeclaredField("age"), BasicTypeInfo.INT_TYPE_INFO)
    val pojoTypeInfo = new PojoTypeInfo[MyItem](classOf[MyItem], List(nameFiled, ageField).asJava)
 
    val myFileFormat = new PojoCsvInputFormat[MyItem](path, pojoTypeInfo)
    myFileFormat.setSkipFirstLineAsHeader(true)
    val dataStream: DataStream[MyItem] = env.readFile(myFileFormat, filePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 10000)

    dataStream.map(item => {
        println(item.name + " " + item.age)
    })
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

首先是PojoCsvInputFormat的构造器,重载的构造器很多,核心的地方就是制定文件的Path以及POJO类型的TypeInfomation(PojoTypeInfo)
比如其中最简单的构造器

public PojoCsvInputFormat(Path filePath, PojoTypeInfo<OUT> pojoTypeInfo) {
        this(filePath, "\n", ",", pojoTypeInfo);
    }
  • 1
  • 2
  • 3

行划分符号和列划分符号都是使用默认的回车和逗号.
使用此构造器需要去构建PojoTypeInfo 这里的OUT就是我们要指定的转换的POJO类型,这里我们使用最原始的调用构造器的方式构建实例.

public PojoTypeInfo(Class<T> typeClass, List<PojoField> fields)
  • 1

PojoTypeInfo 接收一个类型参数 和一个字段描述的List. 由于编写的是Scala程序因此 类型参数使用 classOf[MyItem]传递, 字段的描述参数List 是一个JAVA的List类型, 当前程序是scala 所需需要完成scala List类型到 JAVA List类型的转换, 引入JavaConverters即可

import scala.collection.JavaConverters._
  • 1

基本上涉及到Scala类型和JAVA类型转换需要引入这个object,不过可能不同的Scala版本完成转换的包有一点差别,这里不再赘述.
在看这个PojoField类型

public PojoField(Field field, TypeInformation<?> type)
  • 1

就是对这个POJO类型的字段描述,第一个Field参数就是常规反射操作中的Field可以通过反射来获取,第二个参数就是对这个Field值类型的描述,POJO类型MyItem里面定义了String类型的name 和 Int类型的age因此这个地方只需要BasicTypeInfo.STRING_TYPE_INFO 和
BasicTypeInfo.INT_TYPE_INFO

因此构建这个PojoCsvInputFormat的完整代码如下:

 val nameFiled = new PojoField(classOf[MyItem].getDeclaredField("name"), BasicTypeInfo.STRING_TYPE_INFO)
    val ageField = new PojoField(classOf[MyItem].getDeclaredField("age"), BasicTypeInfo.INT_TYPE_INFO)
    val pojoTypeInfo = new PojoTypeInfo[MyItem](classOf[MyItem], List(nameFiled, ageField).asJava)
 
    val myFileFormat = new PojoCsvInputFormat[MyItem](path, pojoTypeInfo)
  • 1
  • 2
  • 3
  • 4
  • 5

反射获取Field

注意反射这个地方使用的是getDeclaredField而不是 getField. 我们都知道使用getField能获取所有的公共字段, 而getDeclaredField是可以获取私有字段. 这个地方如果使用功能getField是无法获取相应的字段. 因为在scala类中声明一个类的field其实是私有的,类外通过instance.field方式访问这个字段其实是调用类的getter方法. 这个可以通过编译scala文件 到字节码 在转换成等价的JAVA代码验证, 比如这个MyItem的POJO类型, 使用scalac ./MyItem.scala·编译代码, 再使用javap -private org.cl.c4.p5.MyItem转换成等价JAVA代码如下:

public class org.cl.c4.p5.MyItem {
  private final java.lang.String name;
  private final int age;
  public java.lang.String name();
  public int age();
  public org.cl.c4.p5.MyItem(java.lang.String, int);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

可见name和 age 两个字段都是private的 由于他们在MyItem定义的时候声明为val类型所以他们只有同名的两个getter方法(name()和 age()) 关于scala的getter setter部分此处不再赘述. 总之因为编译后字段其实是私有的,所以此处使用getDeclaredField而不是getField方法获取反射的Field.

CSV文件格式问题

如果你不爱折腾, 那么可能你遇不到我下面描述的问题. 首先将csv文件的第一行去掉

20,hello
30,world
40,world
  • 1
  • 2
  • 3

同时代码里面第一行跳过的设置也注释掉

//myFileFormat.setSkipFirstLineAsHeader(true)
  • 1

再运行代码,出现如下错误

Caused by: org.apache.flink.api.common.io.ParseException: Line could not be parsed: '20,hello'
ParserError NUMERIC_VALUE_ILLEGAL_CHARACTER 
Expect field types: class java.lang.Integer, class java.lang.String 
  • 1
  • 2
  • 3

这里需要提到如果使用excel保存csv有三种csv format可选

1. CSV UTF-8 (Comma delimited)
2. Macintosh Comma Seperated
3. MS-DOS Comma Seperated
  • 1
  • 2
  • 3

出现上述问题是因为保存format选则的是CSV UTF-8格式
如果保存成为 第二种Macintosh Comma Seperated, debug发现只能够打印一条数据 然后程序就终止了
value
打印的name也比较奇怪有一个\r的字符 后面跟了一个整数30, 应该是下一行的age的值.

如果保存成为MS-DOS格式程序能够正常的运行. 因此三种格式的CSV是有明显的差异的.
stack over flow 已经讨论过此问题参考What’s the difference between - CSV (MS-Dos), CSV (Macintosh), CSV (comma delimited)
以及
Comma-Delimited and MS-DOS CSV Variations

总结一下三种CSV格式的主要区别还是在于CSV文件中的文本的编码问题, Macintosh 是对应mac系统的 DOS是针对微软的DOS系统的.
而CSV UTF-8应该是通用的一种格式 UTF-8编码CSV文件中的文本数据. 所以理论上考虑系统兼容性问题,还是采用CSV UTF-8格式保存CSV.

这里需要提一下Macintosh CSV 格式问题. 开发用的mac电脑, csv也保存成了改系统下的csv(excel save as Macintosh csv) 但是程序不能正常运行. 原因在于Mac 系统下换行符号是\r. 而PojoCsvInputFormat默认使用的是\n作为换行符号. 所以更改一下构造函数

val myFileFormat = new PojoCsvInputFormat[MyItem](path, "\r", ",", pojoTypeInfo)
  • 1

将CSV文件保存为 Macintosh csv 运行结果正确

hello 20
world 30
world 40
  • 1
  • 2
  • 3

但是这样仅仅能在mac 系统下正常运行.

如果将CSV文件正规化加上第一行列名的描述, 且代码里面处理的时候跳过第一行 且文件保存成为 CSV UTF-8 格式 程序正常运行

myFileFormat.setSkipFirstLineAsHeader(true)
  • 1

运行结果

hello 20
world 30
world 40
  • 1
  • 2
  • 3

综上所述, 如果CSV经过excel操作 那么还是保存成为CSV UTF-8 格式以兼容任意系统, 且CSV 的文件第一行列名不可少,需要规范.

excel 保存任意分隔符的csv文件

题外话, csv默认的分隔符是, 但是某些情况下csv文件中的文本含有逗号因此要更改默认分隔符号. Python 生成csv可以指定分隔符号,因此不用考虑这个问题.但是如果csv是通过excel保存生成的,需要考虑保存的时候指定分割符号. 但是目前来看如果要更改分割符号 只能够更改系统区域(-_-||) 所以个人觉得处理这中含有特殊符号的文本的CSV还是使用Python来进行处理.

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/413122
推荐阅读
相关标签
  

闽ICP备14008679号