当前位置:   article > 正文

spark 显示hdfs 路径_Spark读取数据的同时;获取数据所在的HDFS路径

spark获取hdfs路径

常规的Spark读取文件函数为textFile,该函数只会返回文件的内容;而使用hadoopFile会将partition的一些属性也存放在RDD中!

val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)

通过mapPartitionsWithInputSplit函数其中一个参数InputSplit能快速获取iterator所在文件路径、长度等信息

hadoopRDD.mapPartitionsWithInputSplit((inputSplit: InputSplit, iterator: Iterator[(LongWritable, Text)])

源码:

import org.apache.hadoop.hbase.HBaseConfiguration

import org.apache.hadoop.hbase.client.Put

import org.apache.hadoop.hbase.io.ImmutableBytesWritable

import org.apache.hadoop.hbase.mapred.TableOutputFormat

import org.apache.hadoop.hbase.util.Bytes

import org.apache.hadoop.io.{LongWritable, Text}

import org.apache.hadoop.mapred.{FileSplit, InputSplit, JobConf, TextInputFormat}

import org.apache.spark.rdd.HadoopRDD

import org.apache.spark.sql.SparkSession

import scala.util.parsing.json.JSONObject

object GeneratedIndex {

def main(args: Array[String]): Unit = {

val sc = SparkSession.builder().master("yarn").getOrCreate().sparkContext

val input = args(0)

val jvpDate = args(1)

val tableName = args(2)

val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)

val hadoopRDD = fileRDD.asInstanceOf[HadoopRDD[LongWritable, Text]]

// 获取数据的归属文件路径

val fileAdnLine = hadoopRDD.mapPartitionsWithInputSplit((inputSplit: InputSplit, iterator: Iterator[(LongWritable, Text)]) => {

val file = inputSplit.asInstanceOf[FileSplit]

val fp = file.getPath.toString() //数据所在的HDFS路径

val start = fp.lastIndexOf("-")

val end = fp.lastIndexOf(".")

iterator.map(x => {

val lines = x._2.toString.split("\\|") // 分割读取数据

val uid = lines(2)

val pid = lines(5).substring(0, 3)

val map = List((pid, fp.slice(start + 1, end)))

(uid, map)

})

})

}

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/凡人多烦事01/article/detail/663012
推荐阅读
相关标签
  

闽ICP备14008679号