赞
踩
Spark
的数据读取及数据保存可以从两个维度来作区分:文件格式以及文件系统:
Text
文件、Json
文件、csv
文件、Sequence
文件以及 Object
文件HDFS、Hbase
以及数据库text/hdfs
类型的文件读都可以用 textFile(path)
,保存使用 saveAsTextFile(path)
// 读取本地文件,必须保证每个节点都有该文件
val rdd = sc.textFile("./xx.txt")
// 保存到 hdfs
rdd.saveAsTextFile(hdfs://hadoop1:9000/test/info.json)
读 json
文件主要是需要解析其 json
格式,一般采用:SparkSQL
,也可以使用 fastjson、scala.util.parsing.json.JSON
scala> val rdd = sc.textFile("hdfs://hadoop1:9000/test/info.json")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://hadoop1:9000/test/info.json MapPartitionsRDD[45] at textFile at <console>:24
scala> rdd.collect()
res39: Array[String] = Array({"age": 0, "name": "rose0"}, {"age": 1, "name": "rose1"}, {"age": 2, "name": "rose2"}, {"age": 3, "name": "rose3"}, {"age": 4, "name": "rose4"}, {"age": 5, "name": "rose5"}, {"age": 6, "name": "rose6"}, {"age": 7, "name": "rose7"}, {"age": 8, "name": "rose8"}, {"age": 9, "name": "rose9"}, {"age": 10, "name": "rose10"}, {"age": 11, "name": "rose11"}, {"age": 12, "name": "rose12"}, {"age": 13, "name": "rose13"}, {"age": 14, "name": "rose14"}, {"age": 15, "name": "rose15"}, {"age": 16, "name": "rose16"}, {"age": 17, "name": "rose17"}, {"age": 18, "name": "rose18"}, {"age": 19, "name": "rose19"}, {"age": 20, "name": "rose20"}, {"age": 21, "name": "rose21"}, {"age": 22, "name": "rose22"}, {"age": 23, "name": "rose23"}, {"age": 24, "name": "rose24"}, {"age": 25, "...
scala> import scala.util.parsing.json.JSON
import scala.util.parsing.json.JSON
// 解析到的结果其实就是 Option 组成的数组, Option 存储的就是 Map 对象
scala> val rdd2 = rdd.map(JSON.parseFull).collect()
rdd2: Array[Option[Any]] = Array(Some(Map(age -> 0.0, name -> rose0)), Some(Map(age -> 1.0, name -> rose1)), Some(Map(age -> 2.0, name -> rose2)), Some(Map(age -> 3.0, name -> rose3)), Some(Map(age -> 4.0, name -> rose4)), Some(Map(age -> 5.0, name -> rose5)), Some(Map(age -> 6.0, name -> rose6)), Some(Map(age -> 7.0, name -> rose7)), Some(Map(age -> 8.0, name -> rose8)), Some(Map(age -> 9.0, name -> rose9)), Some(Map(age -> 10.0, name -> rose10)), Some(Map(age -> 11.0, name -> rose11)), Some(Map(age -> 12.0, name -> rose12)), Some(Map(age -> 13.0, name -> rose13)), Some(Map(age -> 14.0, name -> rose14)), Some(Map(age -> 15.0, name -> rose15)), Some(Map(age -> 16.0, name -> rose16)), Some(Map(age -> 17.0, name -> rose17)), Some(Map(age -> 18.0, name -> rose18)), Some(Map(age -> 19.0, na...
SequenceFile
文件是 Hadoop
用来存储二进制形式的 key-value
对而设计的一种平面文件(Flat File)
val rdd1 = sc.parallelize(Array(("a", 1),("b", 2),("c", 3)))
// // 保存 SequenceFile
// rdd1.saveAsSequenceFile("test_sequence")
// 读取时需要指定读取数据的数据类型 [String, Int]
val rdd2 = sc.sequenceFile[String, Int]("test_sequence")
rdd2.collect().foreach(println)
运行结果:
(a,1)
(b,2)
(c,3)
对象文件是将对象序列化后保存的文件,采用 Java
的序列化机制,可以通过 objectFile[k,v](path)
函数接收一个路径,读取对象文件,返回对应的 RDD
,也可以通过调用saveAsObjectFile()
实现对对象文件的输出
// 保存
val rdd1 = sc.parallelize(Array(("a", 1),("b", 2),("c", 3)))
rdd1.saveAsObjectFile("test_object_file")
// 读取
val rdd2 = sc.objectFile[(String, Int)]("test_object_file")
rdd2.collect().foreach(println)
Spark
的整个生态系统与 Hadoop
完全兼容的,所以对于 Hadoop
所支持的文件类型或者数据库类型,Spark
也同样支持。
Hadoop
有新旧两套 API
接口,为了能够兼容 Spark
也有两套,分别为:HadoopRDD 、newHadoopRDD
,两个接口函数的参数分别为:
org.apache.hadoop.mapred.InputFormat、org.apache.hadoop.mapreduce.InputFormat(NewInputFormat)
[K, V]
中的 K
[K, V]
中的 V
RDD
的 partition
数据的最小值,若没有指定,系统使用默认值 defaultMinSplits
在 Hadoop
中以压缩形式存储的数据,不需要指定解压方式就能够进行读取,因为Hadoop
本身有一个解压器会根据压缩文件的后缀推断解压算法进行解压
如果用Spark
从Hadoop
中读取某种类型的数据不知道怎么读取的时候,上网查找一个使用map-reduce
的时候是怎么读取这种这种数据的,然后再将对应的读取方式改写成上面的hadoopRDD
和newAPIHadoopRDD
两个类就行了
Spark
读取 HBase
:
org.apache.hadoop.hbase.mapreduce.TableInputFormat
org.apache.hadoop.hbase.io.ImmutableBytesWritable
,值的类型为 org.apache.hadoop.hbase.client.Result
连接集群
spark
应用需要连接到zookeeper
集群,然后借助zookeeper
访问hbase
。一般可以通过两种方式连接到zookeeper
:
hbase-site.xml
文件加入 classpath
HBaseConfiguration
实例中设置<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> <version>1.3.1</version> <exclusions> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>servlet-api-2.5</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>1.3.1</version> <exclusions> <exclusion> <groupId>org.mortbay.jetty</groupId> <artifactId>servlet-api-2.5</artifactId> </exclusion> <exclusion> <groupId>javax.servlet</groupId> <artifactId>servlet-api</artifactId> </exclusion> </exclusions> </dependency>
从 hbase
读取数据转化成 RDD
package top.midworld.spark1016.hbase_access import java.util import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration} import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.apache.spark.sql.SparkSession import org.json4s.jackson.Serialization import scala.collection.mutable object HbaseRead { def main(args: Array[String]): Unit = { val session = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate() val sc = session.sparkContext // 连接 HBase 的配置 val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3") hbaseConf.set(TableInputFormat.INPUT_TABLE, "t2") // 读取数据 val rdd1 = sc.newAPIHadoopRDD( hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result] ) val resultRDD = rdd1.map { // it 封装了 rowkey、item 里面才是数据 case (it, results) => { // 定义一个可变 map val resultMap = mutable.Map[String, Any]() // rowkey 添加到 resultMap 中 resultMap += "rowKey" -> Bytes.toString(it.get()) // 将其他数据添加到 resultMap 中 val cells: util.List[Cell] = results.listCells() import scala.collection.JavaConversions._ for (cell <- cells) { // 列名 - 列值 val key = Bytes.toString(CellUtil.cloneQualifier(cell)) val value = Bytes.toString(CellUtil.cloneValue(cell)) resultMap += key -> value } // 把map转成json json4s(json4scala) implicit val df = org.json4s.DefaultFormats Serialization.write(resultMap) } } resultRDD.collect().foreach(println) sc.stop() } }
运行结果:
{"alias2":"jun2","rowKey":"10004"}
{"alias4":"jun4","rowKey":"10011"}
{"alias5":"jun5","rowKey":"10016"}
其他应用:
val count = rdd1.count()
println("rdd1 RDD Count:" + count)
rdd1.cache() // 缓存,避免 rdd 重新计算
rdd1.foreach({
case (_, results) => {
val rowKey = Bytes.toString(results.getRow)
// info 为列族、alias2 为列名
val name = Bytes.toString(results.getValue("info".getBytes, "alias2".getBytes))
println(rowKey, name)
}
})
上面使用 IDEA
在 Windows
上测试,在 Linux
运行,需要将 spark
程序打包为 jar
包,常用的方法有:maven、sbt
,这里采用 sbt
:
1、环境准备
Hadoop、zookeeper、spark、hbase
集群hbase/lib
中的一些 jar
包拷贝到 spark/jars/hbase
中// 在 spark 安装目录 jars 中新建 hbase/ 目录
cd /home/hadoop/apps/spark-2.2.0/jars/
mkdir hbase
cd hbase
// 拷贝以下 jar 包到 spark/jars/hbase 中
[hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/hbase*.jar .
[hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/guava-12.0.1.jar .
[hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/htrace-core-3.1.0-incubating.jar .
[hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/protobuf-java-2.5.0.jar .
[hadoop@hadoop1 hbase]$ cp /home/hadoop/apps/hbase-1.2.6/lib/metrics-core-2.2.0.jar .
注意:缺少
metrics-core-2.2.0.jar
会报Caused by: java.lang.ClassNotFoundException: com.yammer.metrics.core.Gauge
,可以参考:https://blog.csdn.net/u010842515/article/details/51451883
2、编写 spark
程序:
cd /home/hadoop/apps/spark-2.2.0/mycode/
mkdir hbase
cd hbase
mkdir -p src/main/scala/
vim SparkOperateHBase.scala
spark
程序内容:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import java.util import org.apache.hadoop.hbase.{Cell, CellUtil, HBaseConfiguration} import org.apache.hadoop.hbase.client.Result import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.util.Bytes import org.json4s.jackson.Serialization import scala.collection.mutable object SparkOperateHBase { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("Simple Application") val sc = new SparkContext(conf) // 连接 HBase 的配置 val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3") hbaseConf.set(TableInputFormat.INPUT_TABLE, "t2") // 读取数据 val rdd1 = sc.newAPIHadoopRDD( hbaseConf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result] ) val resultRDD = rdd1.map { // it 封装了 rowkey、item 里面才是数据 case (it, results) => { // 定义一个可变 map val resultMap = mutable.Map[String, Any]() // rowkey 添加到 resultMap 中 resultMap += "rowKey" -> Bytes.toString(it.get()) // 将其他数据添加到 resultMap 中 val cells: util.List[Cell] = results.listCells() import scala.collection.JavaConversions._ for (cell <- cells) { // 列名 - 列值 val key = Bytes.toString(CellUtil.cloneQualifier(cell)) val value = Bytes.toString(CellUtil.cloneValue(cell)) resultMap += key -> value } // 把map转成json json4s(json4scala) implicit val df = org.json4s.DefaultFormats Serialization.write(resultMap) } } resultRDD.collect().foreach(println) sc.stop() } }
3、编写 sbt
程序:
vim simple.sbt
// libraryDependencies 为 spark 程序中用到的依赖包
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.2.0"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.2.6"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.2.6"
4、编译打包:
// jar 包位置 /home/hadoop/apps/spark-2.2.0/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
cd /home/hadoop/apps/spark-2.2.0/mycode/hbase
// 编译打包
/home/hadoop/apps/sbt/run.sh package
5、提交 spark
任务:
cd /home/hadoop/apps/spark-2.2.0/mycode/hbase
/home/hadoop/apps/spark-2.2.0/bin/spark-submit --driver-class-path /home/hadoop/apps/spark-2.2.0/jars/hbase/*:/home/hadoop/apps/hbase-1.2.6/conf/ --class "SparkOperateHBase" /home/hadoop/apps/spark-2.2.0/mycode/hbase/target/scala-2.11/simple-project_2.11-1.0.jar
package top.midworld.spark1016.hbase_access import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapred.TableReduce import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.spark.sql.SparkSession object HBaseWrite { def main(args: Array[String]): Unit = { val session = SparkSession.builder.appName("create_rdd").master("local[2]").getOrCreate() val sc = session.sparkContext // 连接 HBase 的配置 val hbaseConf = HBaseConfiguration.create() hbaseConf.set("hbase.zookeeper.quorum", "hadoop1,hadoop2,hadoop3") hbaseConf.set(TableOutputFormat.OUTPUT_TABLE, "stu") // 往 hbase 写入数据 val job = Job.getInstance(hbaseConf) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Put]) val dataRDD = sc.parallelize( List( ("10017", "scala", "1"), ("10018", "spark", "2"), ("10019", "java", "3") ) ) // 将 rdd 封装为 TableReduce 格式 val hbaseRDD = dataRDD.map { case (rowKey, name, age) => // 设置 rowKey val rk = new ImmutableBytesWritable() rk.set(Bytes.toBytes(rowKey)) // 添加数据 val put = new Put(Bytes.toBytes(rowKey)) // 分别为列族:info、列名:name、值:name/age put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name)) put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(age)) // return 返回 (rk, put) } // 写入 hbaseRDD.saveAsNewAPIHadoopDataset(job.getConfiguration) sc.stop() } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。