赞
踩
- import org.apache.spark.sql.SparkSession
-
- val inputPath = "file:///Users/zz/Desktop/aa.sh"
- val rdd = spark.sparkContext.textFile(inputPath)
上面代码执行后,因为Spark的惰性机制,并不会真正执行,所以即使路径错误,此时也不会报错。
- val outputPath = "/Users/zz/Desktop/output"
- rdd.saveAsTextFile(outputPath)
- // 再次加载
- val rdd = spark.sparkContext.textFile(outputPath)
- val inputPath = "hdfs://localhost:9000/user/zz/aa.sh"
- val rdd = spark.sparkContext.textFile(inputPath)
- val outputPath = "hdfs://localhost:9000/user/zz/output"
- rdd.saveAsTextFile(outputPath)
JSON (JavaScript Object Notation) 是一种轻量级的数据交换格式。
aa.json
{"name": "aa"} {"name": "bb", "age":30}{"name": "cc", "age":18}
- val inputPath = "/Users/zenmen/Desktop/aa.json"
- val rdd = spark.sparkContext.textFile(inputPath)
- rdd.foreach(println)
JSON数据解析
- import scala.util.parsing.json.JSON
-
- val inputPath = "/Users/zenmen/Desktop/aa.json"
- val rdd = spark.sparkContext.textFile(inputPath)
- val result = rdd.map(JSON.parseFull(_))
-
- result.foreach({
- r => r match {
- case Some(map: Map[String, Any]) => println(map)
- case None => println("Parsing Failed!!!")
- case other => println("Unknown data structure: " + other)
- }
- })
输出
- Map(name -> aa)
- Map(name -> bb, age -> 30.0)
- Map(name -> cc, age -> 18.0)
HBase是google 对BigTable的开源实现,解决Google内部大规模网页搜索问题。
1. HBase是一个稀疏、多维度、排序的映射表,这张表的索引是行键、列族、列限定符和时间戳;
2. 每个值是一个未经解释的字符串,没有数据类型;
3. 用户在表中存储数据,每一行都有一个可排序的行键和任意多的列。
表:HBase采用表来组织数据,表由行和列组成,列划分为若干个列族。
行: 每个HBase表都有若干个行组成,每个行由 row key 来标志。
列族:1个HBase表被分成许多列族(Column Family)的集合,它是基本的访问控制单元。
列限定符:列族里的数据通过列限定符或列来定位。
时间戳:每个单元格都保存着同一份数据的多个版本,这些版本采用时间戳进行索引。
单元格:在HBase表中,通过行、列族和列限定符确定一个单元格cell,单元格中存储的数据没有数据类型,总被视为字节数据byte[]。
把HBase的lib目录下的一些jar文件拷贝到Spark中。
表名: student, 列族:info
- import org.apache.spark.sql.SparkSession
- import org.apache.hadoop.conf.Configuration
- //import org.apache.hadoop.hbase.HBaseConfiguration
- //import org.apache.hadoop.hbase._
- import org.apache.hadoop.hbase.client._
- import org.apache.hadoop.hbase.client.Result
- import org.apache.hadoop.hbase.mapreduce.TableInputFormat
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable
-
- object Test {
- def main(args: Array[String]): Unit = {
- val spark = SparkSession
- .builder()
- .appName("wc")
- .master("local")
- .getOrCreate()
-
- val conf = HBaseConfiguration.create()
- // 设置查询的表名
- conf.set(TableInputFormat.INPUT_TABLE, "student")
- val stuRDD = spark.sparkContext.newAPIHadoopRDD(conf,
- classOf[TableInputFormat],
- classOf[ImmutableBytesWritable],
- classOf[Result]
- )
- val count = stuRDD.count()
- print("Students RDD Count: " + count)
- // 缓存一下,以免后面重复生成
- stuRDD.cache()
- stuRDD.foreach({
- case(_,result) =>
- val key = Bytes.toString(result.getRow)
- val name = Bytes.toString(result.getValue("info".getBytes, "name".getBytes))
- val gender = Bytes.toString(result.getValue("info".getBytes, "gender".getBytes))
- val age = Bytes.toString(result.getValue("info".getBytes, "age".getBytes))
- println("Row key: " + key + "Name: " + name
- + "Gender: "+ gender + "Age: " + age)
- })
- }
- }
打包编译
在simple.sbt中写入下面内容,注意对应版本号!
采用sbt打包,submit提交运行
- import org.apache.spark.sql.SparkSession
- import org.apache.hadoop.hbase._
- import org.apache.hadoop.hbase.client.Result
- import org.apache.hadoop.hbase.client.Put
- import org.apache.hadoop.hbase.HBaseConfiguration
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable
- import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
- import org.apache.hadoop.hbase.mapreduce.Job
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.hadoop.mapreduce.Job
-
- object Test {
- def main(args: Array[String]): Unit = {
- val spark = SparkSession
- .builder()
- .appName("wc")
- .master("local")
- .getOrCreate()
-
- spark.sparkContext.hadoopConfiguration
- .set(TableOutputFormat.OUTPUT_TABLE, "student")
- val job = new Job(spark.sparkContext.hadoopConfiguration)
- job.setOutputKeyClass(classOf[ImmutableBytesWritable])
- job.setOutputValueClass(classOf[Result])
- job.setOutputFormatClass(classOf[
- TableOutputFormat[ImmutableBytesWritable]])
- // 构造两条数据
- val arr = Array("3,cc,M,26", "4,dd,W,30")
- val initRDD = spark.sparkContext.makeRDD(arr)
- .map(_.split(","))
- val rdd = initRDD
- .map{ arr => {
- val put = new Put(Bytes.toBytes(arr(0))) //行键
- put.add(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(arr(1)))
- put.add(Bytes.toBytes("info"), Bytes.toBytes("gender"), Bytes.toBytes(arr(2)))
- put.add(Bytes.toBytes("info"), Bytes.toBytes("age"), Bytes.toBytes(arr(3).toInt))
- (new ImmutableBytesWritable, put)
- }}
- rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())
- }
- }
运行
查看结果
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。