当前位置:   article > 正文

Spark RDD操作之Action操作_sparkrdd的action操作’

sparkrdd的action操作’

first

def first(): T
  • 1

first返回RDD中的第一个元素,不排序。

scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[33] at makeRDD at :21
 
scala> rdd1.first
res14: (String, String) = (A,1)
 
scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21
 
scala> rdd1.first
res8: Int = 10
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

count

def count(): Long
  • 1

count返回RDD中的元素数量。

scala> var rdd1 = sc.makeRDD(Array(("A","1"),("B","2"),("C","3")),2)
rdd1: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[34] at makeRDD at :21
 
scala> rdd1.count
res15: Long = 3
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

reduce

def reduce(f: (T, T) ⇒ T): T
  • 1

根据映射函数f,对RDD中的元素进行二元计算,返回计算结果。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
 
scala> rdd1.reduce(_ + _)
res18: Int = 55
 
scala> var rdd2 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[38] at makeRDD at :21
 
scala> rdd2.reduce((x,y) => {
     |       (x._1 + y._1,x._2 + y._2)
     |     })
res21: (String, Int) = (CBBAA,6)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

collect

def collect(): Array[T]
  • 1

collect用于将一个RDD转换成数组。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[36] at makeRDD at :21
 
scala> rdd1.collect
res23: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
  • 1
  • 2
  • 3
  • 4
  • 5

take

def take(num: Int): Array[T]
  • 1

take用于获取RDD中从0到num-1下标的元素,不排序。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
 
scala> rdd1.take(1)
res0: Array[Int] = Array(10)                                                    
 
scala> rdd1.take(2)
res1: Array[Int] = Array(10, 4)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

top

def top(num: Int)(implicit ord: Ordering[T]): Array[T]
  • 1

top函数用于从RDD中,按照默认(降序)或者指定的排序规则,返回前num个元素。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
 
scala> rdd1.top(1)
res2: Array[Int] = Array(12)
 
scala> rdd1.top(2)
res3: Array[Int] = Array(12, 10)
 
//指定排序规则
scala> implicit val myOrd = implicitly[Ordering[Int]].reverse
myOrd: scala.math.Ordering[Int] = scala.math.Ordering$$anon$4@767499ef
 
scala> rdd1.top(1)
res4: Array[Int] = Array(2)
 
scala> rdd1.top(2)
res5: Array[Int] = Array(2, 3)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

takeOrdered

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
  • 1

takeOrdered和top类似,只不过以和top相反的顺序返回元素。

scala> var rdd1 = sc.makeRDD(Seq(10, 4, 2, 12, 3))
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[40] at makeRDD at :21
 
scala> rdd1.top(1)
res4: Array[Int] = Array(12)
 
scala> rdd1.top(2)
res5: Array[Int] = Array(12, 10)
 
scala> rdd1.takeOrdered(1)
res6: Array[Int] = Array(2)
 
scala> rdd1.takeOrdered(2)
res7: Array[Int] = Array(2, 3)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

aggregate

def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
  • 1

aggregate用户聚合RDD中的元素,先使用seqOp将RDD中每个分区中的T类型元素聚合成U类型,再使用combOp将之前每个分区聚合后的U类型聚合成U类型,特别注意seqOp和combOp都会使用zeroValue的值,zeroValue的类型为U。

var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.mapPartitionsWithIndex{
        (partIdx,iter) => {
          var part_map = scala.collection.mutable.Map[String,List[Int]]()
            while(iter.hasNext){
              var part_name = "part_" + partIdx;
              var elem = iter.next()
              if(part_map.contains(part_name)) {
                var elems = part_map(part_name)
                elems ::= elem
                part_map(part_name) = elems
              } else {
                part_map(part_name) = List[Int]{elem}
              }
            }
            part_map.iterator
           
        }
      }.collect
res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 第一个分区中包含5,4,3,2,1

  • 第二个分区中包含10,9,8,7,6

scala> rdd1.aggregate(1)(
     |           {(x : Int,y : Int) => x + y}, 
     |           {(a : Int,b : Int) => a + b}
     |     )
res17: Int = 58
  • 1
  • 2
  • 3
  • 4
  • 5

结果为什么是58,看下面的计算过程:

  • 先在每个分区中迭代执行 (x : Int,y : Int) => x + y 并且使用zeroValue的值1

  • 即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16

  • part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41

  • 再将两个分区的结果合并(a : Int,b : Int) => a + b ,并且使用zeroValue的值1

  • 即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58

再比如:

scala> rdd1.aggregate(2)(
     |           {(x : Int,y : Int) => x + y}, 
     |           {(a : Int,b : Int) => a * b}
     |     )
res18: Int = 1428
  • 1
  • 2
  • 3
  • 4
  • 5

这次zeroValue=2

part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17

part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42

最后:zeroValuepart_0part_1 = 2 * 17 * 42 = 1428

因此,zeroValue即确定了U的类型,也会对结果产生至关重要的影响,使用时候要特别注意。

fold

def fold(zeroValue: T)(op: (T, T) ⇒ T): T
  • 1

fold是aggregate的简化,将aggregate中的seqOp和combOp使用同一个函数op。

var rdd1 = sc.makeRDD(1 to 10, 2)
scala> rdd1.fold(1)(
     |       (x,y) => x + y    
     |     )
res19: Int = 58
 
##结果同上面使用aggregate的第一个例子一样,即:
scala> rdd1.aggregate(1)(
     |           {(x,y) => x + y}, 
     |           {(a,b) => a + b}
     |     )
res20: Int = 58
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

lookup

def lookup(key: K): Seq[V]
  • 1

lookup用于(K,V)类型的RDD,指定K值,返回RDD中该K对应的所有V值。

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21
 
scala> rdd1.lookup("A")
res0: Seq[Int] = WrappedArray(0, 2)
 
scala> rdd1.lookup("B")
res1: Seq[Int] = WrappedArray(1, 2)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

countByKey

def countByKey(): Map[K, Long]
  • 1

countByKey用于统计RDD[K,V]中每个K的数量。

scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("B",3)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[7] at makeRDD at :21
 
scala> rdd1.countByKey
res5: scala.collection.Map[String,Long] = Map(A -> 2, B -> 3)
  • 1
  • 2
  • 3
  • 4
  • 5

foreach

def foreach(f: (T) ⇒ Unit): Unit
  • 1

foreach用于遍历RDD,将函数f应用于每一个元素。
但要注意,如果对RDD执行foreach,只会在Executor端有效,而并不是Driver端。
比如:rdd.foreach(println),只会在Executor的stdout中打印出来,Driver端是看不到的。
在Spark1.4中是这样,不知道是否真如此。
这时候,使用accumulator共享变量与foreach结合,倒是个不错的选择。

scala> var cnt = sc.accumulator(0)
cnt: org.apache.spark.Accumulator[Int] = 0
 
scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21
 
scala> rdd1.foreach(x => cnt += x)
 
scala> cnt.value
res51: Int = 55
 
scala> rdd1.collect.foreach(println) 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

foreachPartition

def foreachPartition(f: (Iterator[T]) ⇒ Unit): Unit
  • 1

foreachPartition和foreach类似,只不过是对每一个分区使用f。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[5] at makeRDD at :21
 
scala> var allsize = sc.accumulator(0)
size: org.apache.spark.Accumulator[Int] = 0
 

scala>     rdd1.foreachPartition { x => {
     |       allsize += x.size
     |     }}
 
scala> println(allsize.value)
10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

sortBy

def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
  • 1

sortBy根据给定的排序k函数将RDD中的元素进行排序。

scala> var rdd1 = sc.makeRDD(Seq(3,6,7,1,2,0),2)
 
scala> rdd1.sortBy(x => x).collect
res1: Array[Int] = Array(0, 1, 2, 3, 6, 7) //默认升序
 
scala> rdd1.sortBy(x => x,false).collect
res2: Array[Int] = Array(7, 6, 3, 2, 1, 0)  //降序
 
//RDD[K,V]类型
scala>var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
 
scala> rdd1.sortBy(x => x).collect
res3: Array[(String, Int)] = Array((A,1), (A,2), (B,3), (B,6), (B,7))
 
//按照V进行降序排序
scala> rdd1.sortBy(x => x._2,false).collect
res4: Array[(String, Int)] = Array((B,7), (B,6), (B,3), (A,2), (A,1))
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

saveAsTextFile

def saveAsTextFile(path: String): Unit

def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit
  • 1
  • 2
  • 3

saveAsTextFile用于将RDD以文本文件的格式存储到文件系统中。

codec参数可以指定压缩的类名。

var rdd1 = sc.makeRDD(1 to 10,2)
scala> rdd1.saveAsTextFile("hdfs://cdh5/tmp/lxw1234.com/") //保存到HDFS
hadoop fs -ls /tmp/lxw1234.com
Found 2 items
-rw-r--r--   2 lxw1234 supergroup        0 2015-07-10 09:15 /tmp/lxw1234.com/_SUCCESS
-rw-r--r--   2 lxw1234 supergroup        21 2015-07-10 09:15 /tmp/lxw1234.com/part-00000
 
hadoop fs -cat /tmp/lxw1234.com/part-00000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

注意:如果使用rdd1.saveAsTextFile(“file:///tmp/lxw1234.com”)将文件保存到本地文件系统,那么只会保存在Executor所在机器的本地目录。

//指定压缩格式保存

rdd1.saveAsTextFile(“hdfs://cdh5/tmp/lxw1234.com/”,classOf[com.hadoop.compression.lzo.LzopCodec])

hadoop fs -ls /tmp/lxw1234.com
-rw-r--r--   2 lxw1234 supergroup    0 2015-07-10 09:20 /tmp/lxw1234.com/_SUCCESS
-rw-r--r--   2 lxw1234 supergroup    71 2015-07-10 09:20 /tmp/lxw1234.com/part-00000.lzo
 
hadoop fs -text /tmp/lxw1234.com/part-00000.lzo
  • 1
  • 2
  • 3
  • 4
  • 5

saveAsSequenceFile

saveAsSequenceFile用于将RDD以SequenceFile的文件格式保存到HDFS上。
用法同saveAsTextFile。

saveAsObjectFile

def saveAsObjectFile(path: String): Unit
  • 1

saveAsObjectFile用于将RDD中的元素序列化成对象,存储到文件中。

对于HDFS,默认采用SequenceFile保存。

var rdd1 = sc.makeRDD(1 to 10,2)
rdd1.saveAsObjectFile("hdfs://cdh5/tmp/lxw1234.com/")
 
hadoop fs -cat /tmp/lxw1234.com/part-00000
SEQ !org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritableT
  • 1
  • 2
  • 3
  • 4
  • 5

saveAsHadoopFile

def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], codec: Class[_ <: CompressionCodec]): Unit
def saveAsHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: JobConf = …, codec: Option[Class[_ <: CompressionCodec]] = None): Unit
  • 1
  • 2

saveAsHadoopFile是将RDD存储在HDFS上的文件中,支持老版本Hadoop API。

可以指定outputKeyClass、outputValueClass以及压缩格式。
每个分区输出一个文件。

var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
 
rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
 
rdd1.saveAsHadoopFile("/tmp/lxw1234.com/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]],
                      classOf[com.hadoop.compression.lzo.LzopCodec])
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

saveAsHadoopDataset

def saveAsHadoopDataset(conf: JobConf): Unit
  • 1

saveAsHadoopDataset用于将RDD保存到除了HDFS的其他存储中,比如HBase。
在JobConf中,通常需要关注或者设置五个参数:
文件的保存路径、key值的class类型、value值的class类型、RDD的输出格式(OutputFormat)、以及压缩相关的参数。

  • 使用saveAsHadoopDataset将RDD保存到HDFS中
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf
 
 
 
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
var jobConf = new JobConf()
jobConf.setOutputFormat(classOf[TextOutputFormat[Text,IntWritable]])
jobConf.setOutputKeyClass(classOf[Text])
jobConf.setOutputValueClass(classOf[IntWritable])
jobConf.set("mapred.output.dir","/tmp/lxw1234/")
rdd1.saveAsHadoopDataset(jobConf)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

结果:

hadoop fs -cat /tmp/lxw1234/part-00000
A       2
A       1
hadoop fs -cat /tmp/lxw1234/part-00001
B       6
B       3
B       7
 
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 保存数据到HBASE

HBase建表:

create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
  • 1
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapred.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.mapred.TableOutputFormat
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 
var conf = HBaseConfiguration.create()
    var jobConf = new JobConf(conf)
    jobConf.set("hbase.zookeeper.quorum","zkNode1,zkNode2,zkNode3")
    jobConf.set("zookeeper.znode.parent","/hbase")
    jobConf.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")
    jobConf.setOutputFormat(classOf[TableOutputFormat])
    
    var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
    rdd1.map(x => 
      {
        var put = new Put(Bytes.toBytes(x._1))
        put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
        (new ImmutableBytesWritable,put)
      }
    ).saveAsHadoopDataset(jobConf)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 结果
hbase(main):005:0> scan 'lxw1234'
ROW     COLUMN+CELL                                                                                                
 A       column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x02                                              
 B       column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x06                                              
 C       column=f1:c1, timestamp=1436504941187, value=\x00\x00\x00\x07                                              
3 row(s) in 0.0550 seconds
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。

saveAsNewAPIHadoopFile

def saveAsNewAPIHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]): Unit

def saveAsNewAPIHadoopFile(path: String, keyClass: Class[_], valueClass: Class[_], outputFormatClass: Class[_ <: OutputFormat[_, _]], conf: Configuration = self.context.hadoopConfiguration): Unit
  • 1
  • 2
  • 3

saveAsNewAPIHadoopFile用于将RDD数据保存到HDFS上,使用新版本Hadoop API。

用法基本同saveAsHadoopFile。

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import org.apache.hadoop.io.Text
import org.apache.hadoop.io.IntWritable
 
var rdd1 = sc.makeRDD(Array(("A",2),("A",1),("B",6),("B",3),("B",7)))
rdd1.saveAsNewAPIHadoopFile("/tmp/lxw1234/",classOf[Text],classOf[IntWritable],classOf[TextOutputFormat[Text,IntWritable]])
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

saveAsNewAPIHadoopDataset

def saveAsNewAPIHadoopDataset(conf: Configuration): Unit
  • 1

作用同saveAsHadoopDataset,只不过采用新版本Hadoop API。

以写入HBase为例:

HBase建表:

create ‘lxw1234′,{NAME => ‘f1′,VERSIONS => 1},{NAME => ‘f2′,VERSIONS => 1},{NAME => ‘f3′,VERSIONS => 1}
  • 1

完整的Spark应用程序:

package com.lxw1234.test
 
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import SparkContext._
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.client.Put
 
object Test {
  def main(args : Array[String]) {
   val sparkConf = new SparkConf().setMaster("spark://lxw1234.com:7077").setAppName("lxw1234.com")
   val sc = new SparkContext(sparkConf);
   var rdd1 = sc.makeRDD(Array(("A",2),("B",6),("C",7)))
   
    sc.hadoopConfiguration.set("hbase.zookeeper.quorum ","zkNode1,zkNode2,zkNode3")
    sc.hadoopConfiguration.set("zookeeper.znode.parent","/hbase")
    sc.hadoopConfiguration.set(TableOutputFormat.OUTPUT_TABLE,"lxw1234")
    var job = new Job(sc.hadoopConfiguration)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])
    
    rdd1.map(
      x => {
        var put = new Put(Bytes.toBytes(x._1))
        put.add(Bytes.toBytes("f1"), Bytes.toBytes("c1"), Bytes.toBytes(x._2))
        (new ImmutableBytesWritable,put)
      }    
    ).saveAsNewAPIHadoopDataset(job.getConfiguration)
    
    sc.stop()   
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

注意:保存到HBase,运行时候需要在SPARK_CLASSPATH中加入HBase相关的jar包。

本文内容由网友自发贡献,转载请注明出处:【wpsshop博客】
推荐阅读
相关标签
  

闽ICP备14008679号