赞
踩
依旧是公司用户画像项目,目前方案是将hive聚合之后的标签表全部倒入mysql,然后在ES建立索引,虽然限定了最大查询范围为90天的数据,但是面对千万级的用户量,90天的数据依旧是非常庞大,每天查询的效率依旧是在30分钟以上,所以准备对这块进行优化。
在公司层面进行调研之后发现,公司遗留了一个小的Hbase集群,集群配置:
1 active master, 1 backup masters, 2 servers。
但是问题是集群的版本为:1.1.2,非常低,在上篇文章中:
为了做用户画像的流程打通,本地建立的Hbase版本为1.3.6,spark版本为2.4,所以整套体系都不支持公司原来的hbase集群体系,为了保障之后的用户画像能落地,有两套解决方案:
1、公司层面的Hbase集群升级,由于历史包袱太重,之前的版本虽然老,但是依旧有部分的数据在上面跑,如果版本升级,后续对应的下游系统中间件可能会出现不兼容的问题,而且在Hbase层做适配需要调研太多的下游业务的使用场景,成本太高,所以未选用。
2、将本地的伪分布式Hbase进行降级,同时spark版本也进行降级处理。目前测试环境选定的Hbase版本为1.1.2,spark版本为2.1.1。
下面的文章主要是基于这两个版本的中间件,进行spark对Hbase的读写操作。
1、Hbase版本降级,1.1.2版本为2015年的版本
wget archive.apache.org/dist
2、按照上篇文章对Hbase进行配置和按照测试
访问页面:localhost:60010/master-
启动hbase shell,并插入数据:
Scan的结果:Row_key、列族、列名、时间戳、value值
实际结果用图标展现为:
转换为常见的关系型数据库的视角来看的话:
从表述中能看出来在关系型数据库中的存储同一份数据需要3行,而在Hbase是一行的,而且同一个列族是在同一个Store里面的,更加方便查询。
3、修改pom.xml文件
- <properties>
- <scala.version>2.11</scala.version>
- <spark.version>2.1.1</spark.version>
- <hadoop.version>2.7.7</hadoop.version>
- </properties>
-
- <repositories>
- <repository>
- <id>scala-tools.org</id>
- <name>Scala-Tools Maven2 Repository</name>
- <url>http://scala-tools.org/repo-releases</url>
- </repository>
- </repositories>
-
- <pluginRepositories>
- <pluginRepository>
- <id>scala-tools.org</id>
- <name>Scala-Tools Maven2 Repository</name>
- <url>http://scala-tools.org/repo-releases</url>
- </pluginRepository>
- </pluginRepositories>
-
- <dependencies>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>2.11.8</version>
- <!-- <scope>provided</scope>-->
- </dependency>
-
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.version}</artifactId>
- <version>${spark.version}</version>
- <!-- <scope>provided</scope>-->
- </dependency>
- <dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-sql_${scala.version}</artifactId>
- <version>${spark.version}</version>
- <!-- <scope>provided</scope>-->
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.14</version>
- </dependency>
-
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>4.4</version>
- <!--<scope>test</scope>-->
- </dependency>
- <dependency>
- <groupId>org.specs</groupId>
- <artifactId>specs</artifactId>
- <version>1.2.5</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-reflect</artifactId>
- <version>2.11.8</version>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-compiler</artifactId>
- <version>2.11.8</version>
- </dependency>
- <dependency>
- <groupId>org.scala-lang</groupId>
- <artifactId>scala-library</artifactId>
- <version>2.11.8</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>1.1.2</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-common</artifactId>
- <version>1.1.2</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>1.1.2</version>
- </dependency>
主要是添加hbase-client、hbase-server、hbase-common,并将spark版本修改为2.1.1。
3、spark读写Hbase测试
往Hbase里面写数据:
通过HTable中put方法:
- package Flink
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
- import org.apache.hadoop.hbase.client.{HBaseAdmin, HTable, Put, Result}
- import org.apache.hadoop.hbase.mapred.TableOutputFormat
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.{SparkConf, SparkContext}
-
- import org.apache.hadoop.hbase.client.Put
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable
- import org.apache.hadoop.mapred.JobConf
- /** *
- *
- * @autor gaowei
- * @Date 2020-07-28 17:55
- */
- object HbaseT1 {
- def getHBaseConfiguration(quorum:String, port:String, tableName:String) = {
- val conf = HBaseConfiguration.create()
- conf.set("hbase.zookeeper.quorum",quorum)
- conf.set("hbase.zookeeper.property.clientPort",port)
-
- conf
- }
- def getHBaseAdmin(conf:Configuration,tableName:String) = {
- val admin = new HBaseAdmin(conf)
- if (!admin.isTableAvailable(tableName)) {
- val tableDesc = new HTableDescriptor(TableName.valueOf(tableName))
- admin.createTable(tableDesc)
- }
-
- admin
- }
- def getTable(conf:Configuration,tableName:String) = {
- new HTable(conf,tableName)
- }
- def main(args: Array[String]) {
- // 屏蔽不必要的日志显示在终端上
- Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
-
- val sparkConf = new SparkConf().setAppName("HBaseWriteTest").setMaster("local[2]")
- val sc = new SparkContext(sparkConf)
-
- val tableName = "TEST.USER_INFO"
- val quorum = "localhost"
- val port = "2181"
-
- // 配置相关信息
-
- val conf = getHBaseConfiguration(quorum,port,tableName)
- conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
-
-
- val indataRDD = sc.makeRDD(Array("R6,jack,16,t1", "R7,Lucy,15,t2", "R8,mike,17,t3", "R9,Lily,14,t4"))
-
- indataRDD.foreachPartition(x=> {
- val conf = getHBaseConfiguration(quorum,port,tableName)
- conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
-
- val htable = getTable(conf,tableName)
-
- x.foreach(y => {
- val arr = y.split(",")
- val key = arr(0)
- val value = arr(1)
- val value1 = arr(2)
- val value2 = arr(3)
-
- val put = new Put(Bytes.toBytes(key))
- put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C1"),Bytes.toBytes(value))
- put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C2"),Bytes.toBytes(value1))
- put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C3"),Bytes.toBytes(value2))
- htable.put(put)
- })
- })
- sc.stop()
- }
-
- }
TableOutputFormat向HBase写数据
- package Flink
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
- import org.apache.hadoop.hbase.client.{HBaseAdmin, HTable, Put, Result}
- import org.apache.hadoop.hbase.mapred.TableOutputFormat
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.{SparkConf, SparkContext}
-
- import org.apache.hadoop.hbase.client.Put
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable
- import org.apache.hadoop.mapred.JobConf
- /** *
- *
- * @autor gaowei
- * @Date 2020-07-28 17:55
- */
- object HbaseT1 {
- def getHBaseConfiguration(quorum:String, port:String, tableName:String) = {
- val conf = HBaseConfiguration.create()
- conf.set("hbase.zookeeper.quorum",quorum)
- conf.set("hbase.zookeeper.property.clientPort",port)
-
- conf
- }
- def getHBaseAdmin(conf:Configuration,tableName:String) = {
- val admin = new HBaseAdmin(conf)
- if (!admin.isTableAvailable(tableName)) {
- val tableDesc = new HTableDescriptor(TableName.valueOf(tableName))
- admin.createTable(tableDesc)
- }
-
- admin
- }
- def getTable(conf:Configuration,tableName:String) = {
- new HTable(conf,tableName)
- }
- def main(args: Array[String]) {
- // 屏蔽不必要的日志显示在终端上
- Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
-
- val sparkConf = new SparkConf().setAppName("HBaseWriteTest").setMaster("local[2]")
- val sc = new SparkContext(sparkConf)
-
- val tableName = "TEST.USER_INFO"
- val quorum = "localhost"
- val port = "2181"
-
- // 配置相关信息
-
- val conf = getHBaseConfiguration(quorum,port,tableName)
- conf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
-
-
- val indataRDD = sc.makeRDD(Array("R6,jack,16,t1", "R7,Lucy,15,t2", "R8,mike,17,t3", "R9,Lily,14,t4"))
-
- val jobConf = new JobConf()
- jobConf.setOutputFormat(classOf[TableOutputFormat])
- jobConf.set(TableOutputFormat.OUTPUT_TABLE,tableName)
- indataRDD.map(_.split(",")).map{arr => {
- val put = new Put(Bytes.toBytes(arr(0)))
- put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C1"),Bytes.toBytes(arr(1)))
- put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C2"),Bytes.toBytes(arr(2)))
- put.add(Bytes.toBytes("INFO"),Bytes.toBytes("C3"),Bytes.toBytes(arr(3)))
- (new ImmutableBytesWritable,put)
- }}.saveAsHadoopDataset(jobConf)
- sc.stop()
- }
-
- }
4、读Hbase的数据
- package Flink
- import org.apache.hadoop.hbase.mapreduce.TableInputFormat
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.hadoop.conf.Configuration
- import org.apache.hadoop.hbase.{HBaseConfiguration, HTableDescriptor, TableName}
- import org.apache.hadoop.hbase.client.{HBaseAdmin, HTable}
- import org.apache.log4j.{Level, Logger}
- import org.apache.spark.{SparkConf, SparkContext}
- /** *
- *
- * @autor gaowei
- * @Date 2020-07-27 17:06
- */
- object HbseTest {
-
- def getHBaseConfiguration(quorum:String, port:String, tableName:String) = {
- val conf = HBaseConfiguration.create()
- conf.set("hbase.zookeeper.quorum",quorum)
- conf.set("hbase.zookeeper.property.clientPort",port)
-
- conf
- }
- def getHBaseAdmin(conf:Configuration,tableName:String) = {
- val admin = new HBaseAdmin(conf)
- if (!admin.isTableAvailable(tableName)) {
- val tableDesc = new HTableDescriptor(TableName.valueOf(tableName))
- admin.createTable(tableDesc)
- }
-
- admin
- }
- def getTable(conf:Configuration,tableName:String) = {
- new HTable(conf,tableName)
- }
-
- def main(args: Array[String]): Unit = {
- Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
- val sparkConf = new SparkConf().setAppName("HBaseReadTest").setMaster("local[2]")
- val sc = new SparkContext(sparkConf)
-
- val tableName = "TEST.USER_INFO"
- val quorum = "localhost"
- val port = "2181"
-
- // 配置相关信息
- val conf = getHBaseConfiguration(quorum,port,tableName)
- conf.set(TableInputFormat.INPUT_TABLE,tableName)
-
- // HBase数据转成RDD
- val hBaseRDD = sc.newAPIHadoopRDD(conf,classOf[TableInputFormat],
- classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
- classOf[org.apache.hadoop.hbase.client.Result]).cache()
-
- // RDD数据操作
- val data = hBaseRDD.map(x => {
- val result = x._2
- val key = Bytes.toString(result.getRow)
- val value = Bytes.toString(result.getValue("INFO".getBytes,"C1".getBytes))
- val value1 = Bytes.toString(result.getValue("INFO".getBytes,"C2".getBytes))
- val value2 = Bytes.toString(result.getValue("INFO".getBytes,"C3".getBytes))
- (key,value,value1,value2)
- })
-
- data.foreach(println)
-
- sc.stop()
- }
-
- }
结果:
去Hbase查询的结果:
Hadoop本质上是:分布式文件系统(HDFS) + 分布式计算框架(Mapreduce) + 调度系统Yarn搭建起来的分布式大数据处理框架。
Hive:是一个基于Hadoop的数据仓库,适用于一些高延迟性的应用(离线开发),可以将结构化的数据文件映射为一张数据库表,并提供简单的sql查询功能。
Hive可以认为是MapReduce的一个包装,把好写的HQL转换为的MapReduce程序,本身不存储和计算数据,它完全依赖于HDFS和MapReduce,Hive中的表是纯逻辑表。hive需要用到hdfs存储文件,需要用到MapReduce计算框架。
HBase:是一个Hadoop的数据库,一个分布式、可扩展、大数据的存储。hbase是物理表,不是逻辑表,提供一个超大的内存hash表,搜索引擎通过它来存储索引,方便查询操作。
HBase可以认为是HDFS的一个包装。他的本质是数据存储,是个NoSql数据库;HBase部署于HDFS之上,并且克服了hdfs在随机读写方面的缺点,提高查询效率。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。