赞
踩
常规的Spark读取文件函数为textFile,该函数只会返回文件的内容;而使用hadoopFile会将partition的一些属性也存放在RDD中!
val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)
通过mapPartitionsWithInputSplit函数其中一个参数InputSplit能快速获取iterator所在文件路径、长度等信息
hadoopRDD.mapPartitionsWithInputSplit((inputSplit: InputSplit, iterator: Iterator[(LongWritable, Text)])
源码:
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapred.{FileSplit, InputSplit, JobConf, TextInputFormat}
import org.apache.spark.rdd.HadoopRDD
import org.apache.spark.sql.SparkSession
import scala.util.parsing.json.JSONObject
object GeneratedIndex {
def main(args: Array[String]): Unit = {
val sc = SparkSession.builder().master("yarn").getOrCreate().sparkContext
val input = args(0)
val jvpDate = args(1)
val tableName = args(2)
val fileRDD = sc.hadoopFile[LongWritable, Text, TextInputFormat](input)
val hadoopRDD = fileRDD.asInstanceOf[HadoopRDD[LongWritable, Text]]
// 获取数据的归属文件路径
val fileAdnLine = hadoopRDD.mapPartitionsWithInputSplit((inputSplit: InputSplit, iterator: Iterator[(LongWritable, Text)]) => {
val file = inputSplit.asInstanceOf[FileSplit]
val fp = file.getPath.toString() //数据所在的HDFS路径
val start = fp.lastIndexOf("-")
val end = fp.lastIndexOf(".")
iterator.map(x => {
val lines = x._2.toString.split("\\|") // 分割读取数据
val uid = lines(2)
val pid = lines(5).substring(0, 3)
val map = List((pid, fp.slice(start + 1, end)))
(uid, map)
})
})
}
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。