当前位置:   article > 正文

spark3.0.1版本查询Hbase数据库例子

spark3.0.1版本查询Hbase数据库例子

需求背景

现有需求,需要采用spark查询hbase数据库的数据同步到中间分析库,记录spark集成hbase的简单例子代码

  1. import org.apache.hadoop.hbase.HBaseConfiguration
  2. import org.apache.hadoop.hbase.client.{ConnectionFactory, Scan}
  3. import org.apache.hadoop.hbase.util.Bytes
  4. import org.apache.spark.sql.SparkSession
  5. object ReadHBaseData {
  6. def main(args: Array[String]): Unit = {
  7. // 创建SparkSession
  8. val spark = SparkSession.builder()
  9. .appName("ReadHBaseData")
  10. .master("local")
  11. .getOrCreate()
  12. // 创建HBase配置
  13. val conf = HBaseConfiguration.create()
  14. // 设置HBase连接参数
  15. conf.set("hbase.zookeeper.quorum", "localhost")
  16. conf.set("hbase.zookeeper.property.clientPort", "2181")
  17. // 创建HBase连接
  18. val connection = ConnectionFactory.createConnection(conf)
  19. // 创建HBase表
  20. val tableName = "my_table"
  21. val table = connection.getTable(TableName.valueOf(tableName))
  22. // 创建HBase扫描对象
  23. val scan = new Scan()
  24. // 设置要读取的列族和列
  25. scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("column1"))
  26. scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("column2"))
  27. // 执行HBase扫描
  28. val scanner = table.getScanner(scan)
  29. // 遍历扫描结果并将结果转换为RDD
  30. val rdd = spark.sparkContext.parallelize(scanner.iterator().asScala.map(result => {
  31. val rowKey = Bytes.toString(result.getRow)
  32. val value1 = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("column1")))
  33. val value2 = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("column2")))
  34. (rowKey, value1, value2)
  35. }).toList)
  36. // 将RDD转换为DataFrame
  37. val df = spark.createDataFrame(rdd).toDF("rowKey", "value1", "value2")
  38. // 显示DataFrame内容
  39. df.show()
  40. // 关闭HBase连接
  41. scanner.close()
  42. table.close()
  43. connection.close()
  44. // 关闭SparkSession
  45. spark.stop()
  46. }
  47. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/663327
推荐阅读
相关标签
  

闽ICP备14008679号