赞
踩
需求背景
现有需求,需要采用spark查询hbase数据库的数据同步到中间分析库,记录spark集成hbase的简单例子代码
- import org.apache.hadoop.hbase.HBaseConfiguration
- import org.apache.hadoop.hbase.client.{ConnectionFactory, Scan}
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.spark.sql.SparkSession
-
- object ReadHBaseData {
- def main(args: Array[String]): Unit = {
- // 创建SparkSession
- val spark = SparkSession.builder()
- .appName("ReadHBaseData")
- .master("local")
- .getOrCreate()
-
- // 创建HBase配置
- val conf = HBaseConfiguration.create()
-
- // 设置HBase连接参数
- conf.set("hbase.zookeeper.quorum", "localhost")
- conf.set("hbase.zookeeper.property.clientPort", "2181")
-
- // 创建HBase连接
- val connection = ConnectionFactory.createConnection(conf)
-
- // 创建HBase表
- val tableName = "my_table"
- val table = connection.getTable(TableName.valueOf(tableName))
-
- // 创建HBase扫描对象
- val scan = new Scan()
-
- // 设置要读取的列族和列
- scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("column1"))
- scan.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("column2"))
-
- // 执行HBase扫描
- val scanner = table.getScanner(scan)
-
- // 遍历扫描结果并将结果转换为RDD
- val rdd = spark.sparkContext.parallelize(scanner.iterator().asScala.map(result => {
- val rowKey = Bytes.toString(result.getRow)
- val value1 = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("column1")))
- val value2 = Bytes.toString(result.getValue(Bytes.toBytes("cf"), Bytes.toBytes("column2")))
- (rowKey, value1, value2)
- }).toList)
-
- // 将RDD转换为DataFrame
- val df = spark.createDataFrame(rdd).toDF("rowKey", "value1", "value2")
-
- // 显示DataFrame内容
- df.show()
-
- // 关闭HBase连接
- scanner.close()
- table.close()
- connection.close()
-
- // 关闭SparkSession
- spark.stop()
- }
- }
-
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。