赞
踩
- def load2hbase2(sc: SparkContext,tableName: String): Unit ={
- val sqlContext = new SQLContext(sc)
- val properties = new Properties()
-
- properties.put("user","xxx")
- properties.put("password","xxx")
- properties.put("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
- val url = "jdbc:sqlserver://xxxx:1433;databaseName=xxxx"
-
- //分区条件,sID是uuid字符串,取第一个字符就是a-f和0-9分16个区
- val arr = new ArrayBuffer[String]()
- arr += "substring(sID,1,1)='a'"
- arr += "substring(sID,1,1)='d'"
- arr += "substring(sID,1,1)='f'"
- arr += "substring(sID,1,1)='6'"
- arr += "substring(sID,1,1)='c'"
- arr += "substring(sID,1,1)='8'"
- arr += "substring(sID,1,1)='2'"
- arr += "substring(sID,1,1)='e'"
- arr += "substring(sID,1,1)='3'"
- arr += "substring(sID,1,1)='4'"
- arr += "substring(sID,1,1)='0'"
- arr += "substring(sID,1,1)='5'"
- arr += "substring(sID,1,1)='7'"
- arr += "substring(sID,1,1)='1'"
- arr += "substring(sID,1,1)='9'"
- arr += "substring(sID,1,1)='b'"
-
- val conf = HBaseConfiguration.create()
-
- val table = new HTable(conf, tableName)
-
- conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
- conf.set("hbase.zookeeper.quorum","xxx")
- conf.set("hbase.zookeeper.property.clientPort", "2181")
- val job = Job.getInstance(conf)
- job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
- job.setMapOutputValueClass (classOf[KeyValue])
- HFileOutputFormat.configureIncrementalLoad (job, table)
-
- val sdf = sqlContext.read.jdbc(url,"(select sID,sCID,sCName,sBID_BL,sBID_Sd from table) t",arr.toArray,properties)
- val rdd = sdf.flatMap(x=>{
- var arr = new ArrayBuffer[(ImmutableBytesWritable,KeyValue)](4)
- if(x.length >= 5 && Prepare.citys.contains(getValue(x.getString(2)))){
- val rkbytes = Bytes.toBytes(getValue(x.getString(0)))
- val rkim = new ImmutableBytesWritable(rkbytes)
- arr += ((rkim,new KeyValue(rkbytes, "all".getBytes(), "SbidBl".getBytes(), Bytes.toBytes(getValue(x.getString(3))))))
- arr += ((rkim,new KeyValue(rkbytes, "all".getBytes(), "SbidSd".getBytes(), Bytes.toBytes(getValue(x.getString(4))))))
- arr += ((rkim,new KeyValue(rkbytes, "all".getBytes(), "Scid".getBytes(), Bytes.toBytes(getValue(x.getString(1))))))
- arr += ((rkim,new KeyValue(rkbytes, "all".getBytes(), "Scname".getBytes(), Bytes.toBytes(getValue(x.getString(2))))))
- }
- arr
- }).filter(x=>x._1!=null).sortBy(_._1)
- val p = "/tmp/test_loadhbase/"+System.currentTimeMillis()
-
- job.getConfiguration.set("mapred.output.dir", p)
- rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
-
- val bulkLoader = new LoadIncrementalHFiles(job.getConfiguration)
- bulkLoader.doBulkLoad(new Path(p), table)
- }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。