当前位置:   article > 正文

spark读取sqlserver数据BulkLoad到hbase_spark实时读取sqlserver

spark实时读取sqlserver
  1. def load2hbase2(sc: SparkContext,tableName: String): Unit ={
  2. val sqlContext = new SQLContext(sc)
  3. val properties = new Properties()
  4. properties.put("user","xxx")
  5. properties.put("password","xxx")
  6. properties.put("driver","com.microsoft.sqlserver.jdbc.SQLServerDriver")
  7. val url = "jdbc:sqlserver://xxxx:1433;databaseName=xxxx"
  8. //分区条件,sID是uuid字符串,取第一个字符就是a-f和0-916个区
  9. val arr = new ArrayBuffer[String]()
  10. arr += "substring(sID,1,1)='a'"
  11. arr += "substring(sID,1,1)='d'"
  12. arr += "substring(sID,1,1)='f'"
  13. arr += "substring(sID,1,1)='6'"
  14. arr += "substring(sID,1,1)='c'"
  15. arr += "substring(sID,1,1)='8'"
  16. arr += "substring(sID,1,1)='2'"
  17. arr += "substring(sID,1,1)='e'"
  18. arr += "substring(sID,1,1)='3'"
  19. arr += "substring(sID,1,1)='4'"
  20. arr += "substring(sID,1,1)='0'"
  21. arr += "substring(sID,1,1)='5'"
  22. arr += "substring(sID,1,1)='7'"
  23. arr += "substring(sID,1,1)='1'"
  24. arr += "substring(sID,1,1)='9'"
  25. arr += "substring(sID,1,1)='b'"
  26. val conf = HBaseConfiguration.create()
  27. val table = new HTable(conf, tableName)
  28. conf.set(TableOutputFormat.OUTPUT_TABLE, tableName)
  29. conf.set("hbase.zookeeper.quorum","xxx")
  30. conf.set("hbase.zookeeper.property.clientPort", "2181")
  31. val job = Job.getInstance(conf)
  32. job.setMapOutputKeyClass (classOf[ImmutableBytesWritable])
  33. job.setMapOutputValueClass (classOf[KeyValue])
  34. HFileOutputFormat.configureIncrementalLoad (job, table)
  35. val sdf = sqlContext.read.jdbc(url,"(select sID,sCID,sCName,sBID_BL,sBID_Sd from table) t",arr.toArray,properties)
  36. val rdd = sdf.flatMap(x=>{
  37. var arr = new ArrayBuffer[(ImmutableBytesWritable,KeyValue)](4)
  38. if(x.length >= 5 && Prepare.citys.contains(getValue(x.getString(2)))){
  39. val rkbytes = Bytes.toBytes(getValue(x.getString(0)))
  40. val rkim = new ImmutableBytesWritable(rkbytes)
  41. arr += ((rkim,new KeyValue(rkbytes, "all".getBytes(), "SbidBl".getBytes(), Bytes.toBytes(getValue(x.getString(3))))))
  42. arr += ((rkim,new KeyValue(rkbytes, "all".getBytes(), "SbidSd".getBytes(), Bytes.toBytes(getValue(x.getString(4))))))
  43. arr += ((rkim,new KeyValue(rkbytes, "all".getBytes(), "Scid".getBytes(), Bytes.toBytes(getValue(x.getString(1))))))
  44. arr += ((rkim,new KeyValue(rkbytes, "all".getBytes(), "Scname".getBytes(), Bytes.toBytes(getValue(x.getString(2))))))
  45. }
  46. arr
  47. }).filter(x=>x._1!=null).sortBy(_._1)
  48. val p = "/tmp/test_loadhbase/"+System.currentTimeMillis()
  49. job.getConfiguration.set("mapred.output.dir", p)
  50. rdd.saveAsNewAPIHadoopDataset(job.getConfiguration)
  51. val bulkLoader = new LoadIncrementalHFiles(job.getConfiguration)
  52. bulkLoader.doBulkLoad(new Path(p), table)
  53. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/785648
推荐阅读
相关标签
  

闽ICP备14008679号