当前位置:   article > 正文

【Spark】读取本地文件

【Spark】读取本地文件

最近有一个需求,spark任务读取本地csv文件,拼接成rk之后再去hbase取值进行后续处理。搞了好久都没能解决,记录一下解决思路如下:

1、写入临时文件

spark可以读取本地文件,但打成jar包之后不会自动读取jar包中的文件,需要写入临时文件再进行读取。于是操作如下:

  1. //定义文件路径,从jar包中读取csv文件到inputStream
  2. val inputStream = this.getClass.getResourceAsStream(s"/source/fileName.csv")
  3. //创建临时文件存储csv文件
  4. val tempFile = Files.createTempFile("temp-source",".csv")
  5. val tempFilePath = tempFile.toAbsolutePath.toString
  6. val outputStream = new FileOutputStream(tempFilePath)
  7. val bufferedOutputStream = new BufferedOutputStream(outputStream)
  8. //将inputStream中的数据写入到临时文件
  9. try {
  10. val buffer = enw Array[Byte](1024)
  11. var bytesRead = -1
  12. while ({
  13. bytesRead = inputStream.read(buffer)
  14. var bytesRead = -1
  15. }) {
  16. bufferedOutputStream.write(buffer,0,bytesRead)
  17. }
  18. } finally {
  19. bufferedOutputStream.close()
  20. outputStream.close()
  21. }
  22. println(s"Temp file created at: ${tempFilePath}")
  23. //读取临时csv文件为DataFrame
  24. val csvDF = spark.read.option("header","true")
  25. .csv("file:///${tempFilePath}")

2、临时文件上传至HDFS

按照上面的做法发包到集群上运行之后报错。猜测可能因为在集群上运行,driver端读取不到本地创建的临时文件数据。于是将临时文件上传至HDFS,再从hdfs中读取

  1. //将临时文件上传至HDFS
  2. val hdfsPath = new Path("hdfs-source-csv.csv")
  3. FileSystem.get(spark.sparkContext.hadoopConfiguration).copyFromLocalFile(new Path(tempFilePath), hdfsPath)
  4. println(s"File uploaded to HDFS at: ${hdfsPath.toString}")
  5. //读取hdfs文件
  6. val csvDF = spark.read.option("header","true")
  7. .csv(s"${hdfsPath}")

但是这么做还是失败了。推测是没有写入hdfs的权限。

那只好换个思路:

1、将csv文件转换成sql,写入PG临时表,再从PG读取

2、将csv文件转换成Map,再将Map转换成rdd,进行后续操作

3、摆烂,告诉领导这点工资我做不了,换人做吧

3、读取csv文件传换成Map

  1. package scala.test
  2. import org.apache.commons.lang3.StringUtils
  3. import scala.collection.mutable.ArrayBuffer
  4. import scala.util.parsing.combinator._
  5. object CSVParser extends RegexParsers {
  6. override protected val whiteSpace = """[ \t]""".r
  7. def field: Parser[String] = quoted | nonQuoted
  8. def quoted: Parser[String] = "\"" ~> """[^"\n]*""".r <~ "\""
  9. def nonQuoted: Parser[String] = """[^,\n]*""".r
  10. def record: Parser[List[String]] = repsep(field, ",")
  11. def records: Parser[List[List[String]]] = repsep(record, "\n")
  12. def apply(input: String): ParseResult[List[List[String]]] = parseAll(records, input)
  13. }
  14. // 读csv转成map
  15. object Main {
  16. def main(args: Array[String]): Unit = {
  17. val arrayBuffer: ArrayBuffer[List[String]] = new ArrayBuffer[List[String]]()
  18. val source = scala.io.Source.fromInputStream(this.getClass.getResourceAsStream("/真实停电汇总.csv"))
  19. source.getLines().foreach(line => {
  20. val result = CSVParser(line)
  21. result match {
  22. case CSVParser.Success(records, _) =>
  23. records.foreach { record =>
  24. arrayBuffer.append(record)
  25. }
  26. case CSVParser.Failure(msg, _) => println("Parsing failed: " + msg)
  27. case CSVParser.Error(msg, _) => println("Error: " + msg)
  28. }
  29. })
  30. val head = arrayBuffer.head
  31. arrayBuffer.tail.foreach(item => {
  32. if (StringUtils.isNoneEmpty(item.head)) println(head.zip(item).filter(x=>StringUtils.isNotEmpty(x._1)).toMap)
  33. })
  34. }
  35. }

4、将Map转换成rdd

方法一:

  1. var seq = Seq[Map[String, String]]()
  2. arrayBuffer.tail.foreach(item => {
  3. //将多个map合并成一个seq
  4. if(StringUtils.isNoneEmpty(item.head)) seq :+= (head.zip(item).filter(x=> StringUtils.isNotEmpty(x._1)).toMap)
  5. })
  6. val rdd = spark.sparkContext.parallelize(seq).repartition(12)
  7. rdd.map(item => {
  8. (item("id"), item("name"))
  9. }).toDF("id","name")

方法二:

  1. val head = arrayBuffer.head
  2. val body = arrayBuffer.tail
  3. val rdd = spark.sparkContext.parallelize(body).repartition(12).persist()
  4. rdd.toDF("COL").select(selectCol($"COL", head) :_*)
  5. def selectCol(col: Column, head: List[String]): ArrayBuffer[Column] = {
  6. val arrayBuffer = new ArrayBuffer[Column]()
  7. for (i <- head.indices) {
  8. arrayBuffer.append(col(i).as(head(i)))
  9. }
  10. arrayBuffer
  11. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/526917
推荐阅读
相关标签
  

闽ICP备14008679号