当前位置:   article > 正文

Spark读写Hbase(用户画像)

spark读写hbase

背景

依旧是公司用户画像项目,目前方案是将hive聚合之后的标签表全部倒入mysql,然后在ES建立索引,虽然限定了最大查询范围为90天的数据,但是面对千万级的用户量,90天的数据依旧是非常庞大,每天查询的效率依旧是在30分钟以上,所以准备对这块进行优化。

在公司层面进行调研之后发现,公司遗留了一个小的Hbase集群,集群配置:

1 active master, 1 backup masters, 2 servers。

但是问题是集群的版本为:1.1.2,非常低,在上篇文章中:

浅谈Hbase在用户画像上的应用

为了做用户画像的流程打通,本地建立的Hbase版本为1.3.6,spark版本为2.4,所以整套体系都不支持公司原来的hbase集群体系,为了保障之后的用户画像能落地,有两套解决方案:

1、公司层面的Hbase集群升级,由于历史包袱太重,之前的版本虽然老,但是依旧有部分的数据在上面跑,如果版本升级,后续对应的下游系统中间件可能会出现不兼容的问题,而且在Hbase层做适配需要调研太多的下游业务的使用场景,成本太高,所以未选用。

2、将本地的伪分布式Hbase进行降级,同时spark版本也进行降级处理。目前测试环境选定的Hbase版本为1.1.2,spark版本为2.1.1。

下面的文章主要是基于这两个版本的中间件,进行spark对Hbase的读写操作。

介绍

1、Hbase版本降级,1.1.2版本为2015年的版本

wget archive.apache.org/dist

2、按照上篇文章对Hbase进行配置和按照测试

访问页面:localhost:60010/master-

启动hbase shell,并插入数据:

Scan的结果:Row_key、列族、列名、时间戳、value值

实际结果用图标展现为:

转换为常见的关系型数据库的视角来看的话:

从表述中能看出来在关系型数据库中的存储同一份数据需要3行,而在Hbase是一行的,而且同一个列族是在同一个Store里面的,更加方便查询。

3、修改pom.xml文件

  1. <properties>
  2. <scala.version>2.11</scala.version>
  3. <spark.version>2.1.1</spark.version>
  4. <hadoop.version>2.7.7</hadoop.version>
  5. </properties>
  6. <repositories>
  7. <repository>
  8. <id>scala-tools.org</id>
  9. <name>Scala-Tools Maven2 Repository</name>
  10. <url>http://scala-tools.org/repo-releases</url>
  11. </repository>
  12. </repositories>
  13. <pluginRepositories>
  14. <pluginRepository>
  15. <id>scala-tools.org</id>
  16. <name>Scala-Tools Maven2 Repository</name>
  17. <url>http://scala-tools.org/repo-releases</url>
  18. </pluginRepository>
  19. </pluginRepositories>
  20. <dependencies>
  21. <dependency>
  22. <groupId>org.scala-lang</groupId>
  23. <artifactId>scala-library</artifactId>
  24. <version>2.11.8</version>
  25. <!-- <scope>provided</scope>-->
  26. </dependency>
  27. <dependency>
  28. <groupId>org.apache.spark</groupId>
  29. <artifactId>spark-core_${scala.version}</artifactId>
  30. <version>${spark.version}</version>
  31. <!-- <scope>provided</scope>-->
  32. </dependency>
  33. <dependency>
  34. <groupId>org.apache.spark</groupId>
  35. <artifactId>spark-sql_${scala.version}</artifactId>
  36. <version>${spark.version}</version>
  37. <!-- <scope>provided</scope>-->
  38. </dependency>
  39. <dependency>
  40. <groupId>log4j</groupId>
  41. <artifactId>log4j</artifactId>
  42. <version>1.2.14</version>
  43. </dependency>
  44. <dependency>
  45. <groupId>junit</groupId>
  46. <artifactId>junit</artifactId>
  47. <version>4.4</version>
  48. <!--<scope>test</scope>-->
  49. </dependency>
  50. <dependency>
  51. <groupId>org.specs</groupId>
  52. <artifactId>specs</artifactId>
  53. <version>1.2.5</version>
  54. <scope>test</scope>
  55. </dependency>
  56. <dependency>
  57. <groupId>org.scala-lang</groupId>
  58. <artifactId>scala-reflect</artifactId>
  59. <version>2.11.8</version>
  60. </dependency>
  61. <dependency>
  62. <groupId>org.scala-lang</groupId>
  63. <artifactId>scala-compiler</artifactId>
  64. <version>2.11.8</version>
  65. </dependency>
  66. <dependency>
  67. <groupId>org.scala-lang</groupId>
  68. <artifactId>scala-library</artifactId>
  69. <version>2.11.8</version>
  70. </dependency>
  71. <dependency>
  72. <groupId>org.apache.hbase</groupId>
  73. <artifactId>hbase-client</artifactId>
  74. <version>1.1.2</version>
  75. </dependency>
  76. <dependency>
  77. <groupId>org.apache.hbase</groupId>
  78. <artifactId>hbase-common</artifactId>
  79. <version>1.1.2</version>
  80. </dependency>
  81. <dependency>
  82. <groupId>org.apache.hbase</groupId>
  83. <artifactId>hbase-server</artifactId>
  84. <version>1.1.2</version>
  85. </dependency>

主要是添加hbase-client、hbase-server、hbase-common,并将spark版本修改为2.1.1。

3、spark读写Hbase测试

往Hbase里面写数据:

  • 通过HTable中put方法:

  1. package Flink
  2. import org.apache.hadoop.conf.Configuration
  3. import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
  4. import org.apache.hadoop.hbase.client.{HBaseAdmin, HTable, Put, Result}
  5. import org.apache.hadoop.hbase.mapred.TableOutputFormat
  6. import org.apache.hadoop.hbase.util.Bytes
  7. import org.apache.log4j.{Level, Logger}
  8. import org.apache.spark.{SparkConf, SparkContext}
  9. import org.apache.hadoop.hbase.client.Put
  10. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  11. import org.apache.hadoop.mapred.JobConf
  12. /** *
  13. *
  14. * @autor gaowei
  15. * @Date 2020-07-28 17:55
  16. */
  17. object HbaseT1 {
  18. def getHBaseConfiguration(quorum:String, port:String, tableName:String) = {
  19. val conf = HBaseConfiguration.create()
  20. conf.set("hbase.zookeeper.quorum",quorum)
  21. conf.set("hbase.zookeeper.property.clientPort",port)
  22. conf
  23. }
  24. def getHBaseAdmin(conf:Configuration,tableName:String) = {
  25. val admin = new HBaseAdmin(conf)
  26. if (!admin.isTableAvailable(tableName)) {
  27. val tableDesc = new HTableDescriptor(TableName.valueOf(tableName))
  28. admin.createTable(tableDesc)
  29. }
  30. admin
  31. }
  32. def getTable(conf:Configuration,tableName:String) = {
  33. new HTable(conf,tableName)
  34. }
  35. def main(args: Array[String]) {
  36. // 屏蔽不必要的日志显示在终端上
  37. Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  38. val sparkConf = new SparkConf().setAppName("HBaseWriteTest").setMaster("local[2]")
  39. val sc = new SparkContext(sparkConf)
  40. val tableName = "TEST.USER_INFO"
  41. val quorum = "localhost"
  42. val port = "2181"
  43. // 配置相关信息
  44. val conf = getHBaseConfiguration(quorum,port,tableName)
  45. conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
  46. val indataRDD = sc.makeRDD(Array("R6,jack,16,t1", "R7,Lucy,15,t2", "R8,mike,17,t3", "R9,Lily,14,t4"))
  47. indataRDD.foreachPartition(x=> {
  48. val conf = getHBaseConfiguration(quorum,port,tableName)
  49. conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
  50. val htable = getTable(conf,tableName)
  51. x.foreach(y => {
  52. val arr = y.split(",")
  53. val key = arr(0)
  54. val value = arr(1)
  55. val value1 = arr(2)
  56. val value2 = arr(3)
  57. val put = new Put(Bytes.toBytes(key))
  58. put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C1"),Bytes.toBytes(value))
  59. put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C2"),Bytes.toBytes(value1))
  60. put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C3"),Bytes.toBytes(value2))
  61. htable.put(put)
  62. })
  63. })
  64. sc.stop()
  65. }
  66. }
  • TableOutputFormat向HBase写数据

  1. package Flink
  2. import org.apache.hadoop.conf.Configuration
  3. import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
  4. import org.apache.hadoop.hbase.client.{HBaseAdmin, HTable, Put, Result}
  5. import org.apache.hadoop.hbase.mapred.TableOutputFormat
  6. import org.apache.hadoop.hbase.util.Bytes
  7. import org.apache.log4j.{Level, Logger}
  8. import org.apache.spark.{SparkConf, SparkContext}
  9. import org.apache.hadoop.hbase.client.Put
  10. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  11. import org.apache.hadoop.mapred.JobConf
  12. /** *
  13. *
  14. * @autor gaowei
  15. * @Date 2020-07-28 17:55
  16. */
  17. object HbaseT1 {
  18. def getHBaseConfiguration(quorum:String, port:String, tableName:String) = {
  19. val conf = HBaseConfiguration.create()
  20. conf.set("hbase.zookeeper.quorum",quorum)
  21. conf.set("hbase.zookeeper.property.clientPort",port)
  22. conf
  23. }
  24. def getHBaseAdmin(conf:Configuration,tableName:String) = {
  25. val admin = new HBaseAdmin(conf)
  26. if (!admin.isTableAvailable(tableName)) {
  27. val tableDesc = new HTableDescriptor(TableName.valueOf(tableName))
  28. admin.createTable(tableDesc)
  29. }
  30. admin
  31. }
  32. def getTable(conf:Configuration,tableName:String) = {
  33. new HTable(conf,tableName)
  34. }
  35. def main(args: Array[String]) {
  36. // 屏蔽不必要的日志显示在终端上
  37. Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  38. val sparkConf = new SparkConf().setAppName("HBaseWriteTest").setMaster("local[2]")
  39. val sc = new SparkContext(sparkConf)
  40. val tableName = "TEST.USER_INFO"
  41. val quorum = "localhost"
  42. val port = "2181"
  43. // 配置相关信息
  44. val conf = getHBaseConfiguration(quorum,port,tableName)
  45. conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
  46. val indataRDD = sc.makeRDD(Array("R6,jack,16,t1", "R7,Lucy,15,t2", "R8,mike,17,t3", "R9,Lily,14,t4"))
  47. val jobConf = new JobConf()
  48. jobConf.setOutputFormat(classOf[TableOutputFormat])
  49. jobConf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
  50. indataRDD.map(_.split(",")).map{arr => {
  51. val put = new Put(Bytes.toBytes(arr(0)))
  52. put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C1"),Bytes.toBytes(arr(1)))
  53. put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C2"),Bytes.toBytes(arr(2)))
  54. put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C3"),Bytes.toBytes(arr(3)))
  55. (new ImmutableBytesWritable,put)
  56. }}.saveAsHadoopDataset(jobConf)
  57. sc.stop()
  58. }
  59. }

4、读Hbase的数据

  1. package Flink
  2. import org.apache.hadoop.hbase.mapreduce.TableInputFormat
  3. import org.apache.hadoop.hbase.util.Bytes
  4. import org.apache.hadoop.conf.Configuration
  5. import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
  6. import org.apache.hadoop.hbase.client.{HBaseAdmin, HTable}
  7. import org.apache.log4j.{Level, Logger}
  8. import org.apache.spark.{SparkConf, SparkContext}
  9. /** *
  10. *
  11. * @autor gaowei
  12. * @Date 2020-07-27 17:06
  13. */
  14. object HbseTest {
  15. def getHBaseConfiguration(quorum:String, port:String, tableName:String) = {
  16. val conf = HBaseConfiguration.create()
  17. conf.set("hbase.zookeeper.quorum",quorum)
  18. conf.set("hbase.zookeeper.property.clientPort",port)
  19. conf
  20. }
  21. def getHBaseAdmin(conf:Configuration,tableName:String) = {
  22. val admin = new HBaseAdmin(conf)
  23. if (!admin.isTableAvailable(tableName)) {
  24. val tableDesc = new HTableDescriptor(TableName.valueOf(tableName))
  25. admin.createTable(tableDesc)
  26. }
  27. admin
  28. }
  29. def getTable(conf:Configuration,tableName:String) = {
  30. new HTable(conf,tableName)
  31. }
  32. def main(args: Array[String]): Unit = {
  33. Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  34. val sparkConf = new SparkConf().setAppName("HBaseReadTest").setMaster("local[2]")
  35. val sc = new SparkContext(sparkConf)
  36. val tableName = "TEST.USER_INFO"
  37. val quorum = "localhost"
  38. val port = "2181"
  39. // 配置相关信息
  40. val conf = getHBaseConfiguration(quorum,port,tableName)
  41. conf.set(TableInputFormat.INPUT_TABLE,tableName)
  42. // HBase数据转成RDD
  43. val hBaseRDD = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],
  44. classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
  45. classOf[org.apache.hadoop.hbase.client.Result]).cache()
  46. // RDD数据操作
  47. val data = hBaseRDD.map(x => {
  48. val result = x._2
  49. val key = Bytes.toString(result.getRow)
  50. val value = Bytes.toString(result.getValue("INFO".getBytes,"C1".getBytes))
  51. val value1 = Bytes.toString(result.getValue("INFO".getBytes,"C2".getBytes))
  52. val value2 = Bytes.toString(result.getValue("INFO".getBytes,"C3".getBytes))
  53. (key,value,value1,value2)
  54. })
  55. data.foreach(println)
  56. sc.stop()
  57. }
  58. }

结果:

去Hbase查询的结果:

结论:

Hadoop本质上是:分布式文件系统(HDFS) + 分布式计算框架(Mapreduce) + 调度系统Yarn搭建起来的分布式大数据处理框架。

Hive:是一个基于Hadoop的数据仓库,适用于一些高延迟性的应用(离线开发),可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能。

Hive可以认为是MapReduce的一个包装,把好写的HQL转换为的MapReduce程序,本身不存储和计算数据,它完全依赖于HDFS和MapReduce,Hive中的表是纯逻辑表。hive需要用到hdfs存储文件,需要用到MapReduce计算框架。

HBase:是一个Hadoop的数据库,一个分布式、可扩展、大数据的存储。hbase是物理表,不是逻辑表,提供一个超大的内存hash表,搜索引擎通过它来存储索引,方便查询操作。

HBase可以认为是HDFS的一个包装。他的本质是数据存储,是个NoSql数据库;HBase部署于HDFS之上,并且克服了hdfs在随机读写方面的缺点,提高查询效率。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小惠珠哦/article/detail/874999
推荐阅读
相关标签
  

闽ICP备14008679号