赞
踩
某某自来水公司,需要存储大量的缴费明细数据。以下截取了缴费明细的一部分内容。
用户id | 姓名 | 用户地址 | 性别 | 缴费时间 | 表示数(本次) | 表示数(上次) | 用量(立方) | 合计金额 | 查表日期 | 最迟缴费日期 |
---|---|---|---|---|---|---|---|---|---|---|
4944191 | 卫红红 | 郑州市7单元267室 | 男 | 2023-05-10 | 308.1 | 283.1 | 25 | 150 | 2023-04-25 | 2023-06-09 |
因为缴费明细的数据记录非常庞大,该公司的信息部门决定使用HBase来存储这些数据。并且,他们希望能够通过Java程序来访问这些数据。
groupId | cn.sias |
---|---|
artifactId | hbase_op |
<dependencies> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> <dependency> <groupId>org.testng</groupId> <artifactId>testng</artifactId> <version>6.14.3</version> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <target>1.8</target> <source>1.8</source> </configuration> </plugin> </plugins> </build>
将hbase-site.xml、core-site.xml、log4j.properties三个配置文件复制到项目的resource目录中
#导出配置文件 [root@hadoop102 ~]# sz /opt/module/hbase-2.5.5/conf/hbase-site.xml [root@hadoop102 ~]# sz /opt/module/hadoop-3.3.1/etc/hadoop/core-site.xml #从其它项目中复制log4j.properties
注意:请确认配置文件中的服务器节点hostname/ip地址配置正确
在test目录创建 com.sias.hbase 包结构 创建TableAmdinTest类
要操作Hbase也需要建立Hbase的连接。此处我们仍然使用TestNG来编写测试。使用@BeforeTest初始化HBase连接,创建admin对象、@AfterTest关闭连接。
实现步骤:
使用HbaseConfiguration.create()创建Hbase配置
使用ConnectionFactory.createConnection()创建Hbase连接
要创建表,需要基于Hbase连接获取admin管理对象
使用admin.close、connection.close关闭连接
参考代码:
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import java.io.IOException; /** * @BeforeTest初始化HBase连接,创建admin对象、@AfterTest关闭连接。 */ public class TableAmdinTest { private Configuration configuration; private Connection connection; private Admin admin; @BeforeTest public void beforeTest() throws IOException { configuration = HBaseConfiguration.create(); connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); } @AfterTest public void afterTest() throws IOException { admin.close(); connection.close(); } }
实现步骤:
判断表是否存在
a)存在,则退出
使用TableDescriptorBuilder.newBuilder构建表描述构建器
使用ColumnFamilyDescriptorBuilder.newBuilder构建列蔟描述构建器
构建列蔟描述,构建表描述
创建表
参考代码:
@Test public void createTableTest() throws IOException { // 表名 String TABLE_NAME = "WATER_BILL"; // 列蔟名 String COLUMN_FAMILY = "C1"; // 1. 判断表是否存在 if(admin.tableExists(TableName.valueOf(TABLE_NAME))) { return; } // 2. 构建表描述构建器 TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME)); // 3. 构建列蔟描述构建器 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(COLUMN_FAMILY)); // 4. 构建列蔟描述 ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptorBuilder.build(); // 5. 构建表描述 // 添加列蔟 tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor); TableDescriptor tableDescriptor = tableDescriptorBuilder.build(); // 6. 创建表 admin.createTable(tableDescriptor); }
实现步骤:
判断表是否存在
如果存在,则禁用表
再删除表
代码实现:
// 删除表 @Test public void dropTable() throws IOException { // 表名 TableName tableName = TableName.valueOf("WATER_BILL"); // 1. 判断表是否存在 if(admin.tableExists(tableName)) { // 2. 禁用表 admin.disableTable(tableName); // 3. 删除表 admin.deleteTable(tableName); } }
在com.sias.hbase.test包中创建DataOpTest类,初始化Hbase连接,在@BeforeTest中初始化HBase连接,在@AfterTest中关闭Hbase连接。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.testng.annotations.AfterTest; import org.testng.annotations.BeforeTest; import java.io.IOException; public class DataOpTest { private Configuration configuration; private Connection connection; @BeforeTest public void beforeTest() throws IOException { configuration = HBaseConfiguration.create(); connection = ConnectionFactory.createConnection(configuration); } @AfterTest public void afterTest() throws IOException { connection.close(); } }
在表中插入一个行,该行只包含一个列。
ROWKEY | 姓名(列名:NAME) |
---|---|
4944191 | 卫红红 |
实现步骤:
使用Hbase连接获取Htable
构建ROWKEY、列蔟名、列名
构建Put对象(对应put命令)
添加姓名列
使用Htable表对象执行put操作
关闭Htable表对象
代码实现:
@Test public void addTest() throws IOException { // 1.使用Hbase连接获取Htable TableName waterBillTableName = TableName.valueOf("WATER_BILL"); Table waterBillTable = connection.getTable(waterBillTableName); // 2.构建ROWKEY、列蔟名、列名 String rowkey = "4944191"; String cfName = "C1"; String colName = "NAME"; // 3.构建Put对象(对应put命令) Put put = new Put(Bytes.toBytes(rowkey)); // 4.添加姓名列 put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colName) , Bytes.toBytes("卫红红")); // 5.使用Htable表对象执行put操作 waterBillTable.put(put); // 6. 关闭表 waterBillTable.close(); }
hbase:009:0> get 'WATER_BILL','4944191',{FORMATTER => 'toString'}
列名**** | 说明**** | 值**** |
---|---|---|
ADDRESS | 用户地址 | 河南省郑州市新郑市7单元267室 |
SEX | 性别 | 男 |
PAY_DATE | 缴费时间 | 2023-05-10 |
NUM_CURRENT | 表示数(本次) | 308.1 |
NUM_PREVIOUS | 表示数(上次) | 283.1 |
NUM_USAGE | 用量(立方) | 25 |
TOTAL_MONEY | 合计金额 | 150 |
RECORD_DATE | 查表日期 | 2020-04-25 |
LATEST_DATE | 最迟缴费日期 | 2020-06-09 |
代码实现:
@Test public void addTest() throws IOException { // 1.使用Hbase连接获取Htable TableName waterBillTableName = TableName.valueOf("WATER_BILL"); Table waterBillTable = connection.getTable(waterBillTableName); // 2.构建ROWKEY、列蔟名、列名 String rowkey = "4944191"; String cfName = "C1"; String colName = "NAME"; String colADDRESS = "ADDRESS"; String colSEX = "SEX"; String colPAY_DATE = "PAY_DATE"; String colNUM_CURRENT = "NUM_CURRENT"; String colNUM_PREVIOUS = "NUM_PREVIOUS"; String colNUM_USAGE = "NUM_USAGE"; String colTOTAL_MONEY = "TOTAL_MONEY"; String colRECORD_DATE = "RECORD_DATE"; String colLATEST_DATE = "LATEST_DATE"; // 3.构建Put对象(对应put命令) Put put = new Put(Bytes.toBytes(rowkey)); // 4.添加姓名列 put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colName) , Bytes.toBytes("卫红红")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colADDRESS) , Bytes.toBytes("河南省郑州市新郑市7单元267室")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colSEX) , Bytes.toBytes("男")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colPAY_DATE) , Bytes.toBytes("2020-05-10")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colNUM_CURRENT) , Bytes.toBytes("308.1")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colNUM_PREVIOUS) , Bytes.toBytes("283.1")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colNUM_USAGE) , Bytes.toBytes("25")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colTOTAL_MONEY) , Bytes.toBytes("150")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colRECORD_DATE) , Bytes.toBytes("2020-04-25")); put.addColumn(Bytes.toBytes(cfName) , Bytes.toBytes(colLATEST_DATE) , Bytes.toBytes("2020-06-09")); // 5.使用Htable表对象执行put操作 waterBillTable.put(put); // 6. 关闭表 waterBillTable.close(); }
查询rowkey为4944191的所有列的数据,并打印出来。
实现步骤:
获取HTable
使用rowkey构建Get对象
执行get请求
获取所有单元格
打印rowkey
迭代单元格列表
关闭表
代码实现:
@Test public void getOneTest() throws IOException { // 1. 获取HTable TableName waterBillTableName = TableName.valueOf("WATER_BILL"); Table waterBilltable = connection.getTable(waterBillTableName); // 2. 使用rowkey构建Get对象 Get get = new Get(Bytes.toBytes("4944191")); // 3. 执行get请求 Result result = waterBilltable.get(get); // 4. 获取所有单元格 List<Cell> cellList = result.listCells(); // 打印rowkey System.out.println("rowkey => " + Bytes.toString(result.getRow())); // 5. 迭代单元格列表 for (Cell cell : cellList) { // 打印列蔟名 System.out.print(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength())); System.out.println(" => " + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } // 6. 关闭表 waterBilltable.close(); }
删除rowkey为4944191的整条数据。
实现步骤:
获取HTable对象
根据rowkey构建delete对象
执行delete请求
关闭表
代码实现:
@Test public void deleteOneTest() throws IOException { // 1. 获取HTable对象 Table waterBillTable = connection.getTable(TableName.valueOf("WATER_BILL")); // 2. 根据rowkey构建delete对象 Delete delete = new Delete(Bytes.toBytes("4944191")); // 3. 执行delete请求 waterBillTable.delete(delete); // 4. 关闭表 waterBillTable.close(); }
在资料中,有一份10W的抄表数据文件,我们需要将这里面的数据导入到HBase中。
在HBase中,有一个Import的MapReduce作业,可以专门用来将数据文件导入到HBase中。
用法:
hbase org.apache.hadoop.hbase.mapreduce.Import 表名 HDFS数据文件路径
将资料中数据文件上传到Linux中
再将文件上传到hdfs中
[root@hadoop102 ~]# cd /opt/data [root@hadoop102 data]# rz [root@hadoop102 data]# rz [root@hadoop102 data]# ls NEWS_VISIT_CNT.txt ORDER_INFO.txt part-m-00000_10w [root@hadoop102 data]# hadoop fs -mkdir -p /water_bill/output_ept_10W [root@hadoop102 data]# hadoop fs -put part-m-00000_10w /water_bill/output_ept_10W
3.使用以下方式来进行数据导入
[root@hadoop102 data]# hbase org.apache.hadoop.hbase.mapreduce.Import WATER_BILL /water_bill/output_ept_10W
[root@hadoop102 data]# hbase org.apache.hadoop.hbase.mapreduce.Export WATER_BILL /water_bill/output_ept_10W_export
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。