当前位置:   article > 正文

Spark数据读写--HDFS、HBase、Json_spark java读取hdfs文件写hbase中

spark java读取hdfs文件写hbase中

1. 本地文件的读写

1.1 读文件

  1. import org.apache.spark.sql.SparkSession
  2. val inputPath = "file:///Users/zz/Desktop/aa.sh"
  3. val rdd = spark.sparkContext.textFile(inputPath)

上面代码执行后,因为Spark的惰性机制,并不会真正执行,所以即使路径错误,此时也不会报错。

1.2 写文件

  1. val outputPath = "/Users/zz/Desktop/output"
  2. rdd.saveAsTextFile(outputPath)
  3. // 再次加载
  4. val rdd = spark.sparkContext.textFile(outputPath)

 2. HDFS文件的读写

  1. val inputPath = "hdfs://localhost:9000/user/zz/aa.sh"
  2. val rdd = spark.sparkContext.textFile(inputPath)
  3. val outputPath = "hdfs://localhost:9000/user/zz/output"
  4. rdd.saveAsTextFile(outputPath)

 3. JSON文件的读写

JSON (JavaScript Object Notation) 是一种轻量级的数据交换格式。

aa.json

{"name": "aa"} {"name": "bb", "age":30}{"name": "cc", "age":18}

  1. val inputPath = "/Users/zenmen/Desktop/aa.json"
  2. val rdd = spark.sparkContext.textFile(inputPath)
  3. rdd.foreach(println)

JSON数据解析

  1. import scala.util.parsing.json.JSON
  2. val inputPath = "/Users/zenmen/Desktop/aa.json"
  3. val rdd = spark.sparkContext.textFile(inputPath)
  4. val result = rdd.map(JSON.parseFull(_))
  5. result.foreach({
  6. r => r match {
  7. case Some(map: Map[String, Any]) => println(map)
  8. case None => println("Parsing Failed!!!")
  9. case other => println("Unknown data structure: " + other)
  10. }
  11. })

输出

  1. Map(name -> aa)
  2. Map(name -> bb, age -> 30.0)
  3. Map(name -> cc, age -> 18.0)

4. HBase 读写

4.1 HBase简介

HBase是google 对BigTable的开源实现,解决Google内部大规模网页搜索问题。

1. HBase是一个稀疏、多维度、排序的映射表,这张表的索引是行键、列族、列限定符和时间戳

2. 每个值是一个未经解释的字符串,没有数据类型;

3. 用户在表中存储数据,每一行都有一个可排序的行键和任意多的列。

表:HBase采用表来组织数据,表由行和列组成,列划分为若干个列族。

行: 每个HBase表都有若干个行组成,每个行由 row key 来标志。

列族:1个HBase表被分成许多列族(Column Family)的集合,它是基本的访问控制单元。

列限定符:列族里的数据通过列限定符或列来定位。

时间戳:每个单元格都保存着同一份数据的多个版本,这些版本采用时间戳进行索引。

单元格:在HBase表中,通过行、列族和列限定符确定一个单元格cell,单元格中存储的数据没有数据类型,总被视为字节数据byte[]。

 

4.2 配置Spark

把HBase的lib目录下的一些jar文件拷贝到Spark中。

 4.3 读HBase文件

表名: student, 列族:info

  1. import org.apache.spark.sql.SparkSession
  2. import org.apache.hadoop.conf.Configuration
  3. //import org.apache.hadoop.hbase.HBaseConfiguration
  4. //import org.apache.hadoop.hbase._
  5. import org.apache.hadoop.hbase.client._
  6. import org.apache.hadoop.hbase.client.Result
  7. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  8. import org.apache.hadoop.hbase.util.Bytes
  9. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  10. object Test {
  11. def main(args: Array[String]): Unit = {
  12. val spark = SparkSession
  13. .builder()
  14. .appName("wc")
  15. .master("local")
  16. .getOrCreate()
  17. val conf = HBaseConfiguration.create()
  18. // 设置查询的表名
  19. conf.set(TableInputFormat.INPUT_TABLE, "student")
  20. val stuRDD = spark.sparkContext.newAPIHadoopRDD(conf,
  21. classOf[TableInputFormat],
  22. classOf[ImmutableBytesWritable],
  23. classOf[Result]
  24. )
  25. val count = stuRDD.count()
  26. print("Students RDD Count: " + count)
  27. // 缓存一下,以免后面重复生成
  28. stuRDD.cache()
  29. stuRDD.foreach({
  30. case(_,result) =>
  31. val key = Bytes.toString(result.getRow)
  32. val name = Bytes.toString(result.getValue("info".getBytes, "name".getBytes))
  33. val gender = Bytes.toString(result.getValue("info".getBytes, "gender".getBytes))
  34. val age = Bytes.toString(result.getValue("info".getBytes, "age".getBytes))
  35. println("Row key: " + key + "Name: " + name
  36. + "Gender: "+ gender + "Age: " + age)
  37. })
  38. }
  39. }

 打包编译

在simple.sbt中写入下面内容,注意对应版本号!

 采用sbt打包,submit提交运行

4.4 写HBase文件

  1. import org.apache.spark.sql.SparkSession
  2. import org.apache.hadoop.hbase._
  3. import org.apache.hadoop.hbase.client.Result
  4. import org.apache.hadoop.hbase.client.Put
  5. import org.apache.hadoop.hbase.HBaseConfiguration
  6. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  7. import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
  8. import org.apache.hadoop.hbase.mapreduce.Job
  9. import org.apache.hadoop.hbase.util.Bytes
  10. import org.apache.hadoop.mapreduce.Job
  11. object Test {
  12. def main(args: Array[String]): Unit = {
  13. val spark = SparkSession
  14. .builder()
  15. .appName("wc")
  16. .master("local")
  17. .getOrCreate()
  18. spark.sparkContext.hadoopConfiguration
  19. .set(TableOutputFormat.OUTPUT_TABLE, "student")
  20. val job = new Job(spark.sparkContext.hadoopConfiguration)
  21. job.setOutputKeyClass(classOf[ImmutableBytesWritable])
  22. job.setOutputValueClass(classOf[Result])
  23. job.setOutputFormatClass(classOf[
  24. TableOutputFormat[ImmutableBytesWritable]])
  25. // 构造两条数据
  26. val arr = Array("3,cc,M,26", "4,dd,W,30")
  27. val initRDD = spark.sparkContext.makeRDD(arr)
  28. .map(_.split(","))
  29. val rdd = initRDD
  30. .map{ arr => {
  31. val put = new Put(Bytes.toBytes(arr(0))) //行键
  32. put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(arr(1)))
  33. put.add(Bytes.toBytes("info"), Bytes.toBytes("gender"), Bytes.toBytes(arr(2)))
  34. put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(arr(3).toInt))
  35. (new ImmutableBytesWritable, put)
  36. }}
  37. rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
  38. }
  39. }

运行

 查看结果

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/菜鸟追梦旅行/article/detail/662979
推荐阅读
相关标签
  

闽ICP备14008679号