当前位置:   article > 正文

Hbase Java编程

Hbase Java编程
1、需求与数据集

某某自来水公司,需要存储大量的缴费明细数据。以下截取了缴费明细的一部分内容。

用户id姓名用户地址性别缴费时间表示数(本次)表示数(上次)用量(立方)合计金额查表日期最迟缴费日期
4944191卫红红郑州市7单元267室2023-05-10308.1283.1251502023-04-252023-06-09

因为缴费明细的数据记录非常庞大,该公司的信息部门决定使用HBase来存储这些数据。并且,他们希望能够通过Java程序来访问这些数据。

2 、准备工作
2.1、创建IDEA Maven项目
groupIdcn.sias
artifactIdhbase_op
2.2、 导入pom依赖
   <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.0</version>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>6.14.3</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <target>1.8</target>
                    <source>1.8</source>
                </configuration>
            </plugin>
        </plugins>
    </build>
2.3、 复制HBase和Hadoop配置文件

将hbase-site.xml、core-site.xml、log4j.properties三个配置文件复制到项目的resource目录中

#导出配置文件
[root@hadoop102 ~]# sz /opt/module/hbase-2.5.5/conf/hbase-site.xml   
[root@hadoop102 ~]# sz /opt/module/hadoop-3.3.1/etc/hadoop/core-site.xml 
#从其它项目中复制log4j.properties

注意:请确认配置文件中的服务器节点hostname/ip地址配置正确

2.4 、创建包结构和类

在test目录创建 com.sias.hbase 包结构 ​ 创建TableAmdinTest类

2.5 、创建Hbase连接以及admin管理对象

要操作Hbase也需要建立Hbase的连接。此处我们仍然使用TestNG来编写测试。使用@BeforeTest初始化HBase连接,创建admin对象、@AfterTest关闭连接。

实现步骤:

  1. 使用HbaseConfiguration.create()创建Hbase配置

  2. 使用ConnectionFactory.createConnection()创建Hbase连接

  3. 要创建表,需要基于Hbase连接获取admin管理对象

  4. 使用admin.close、connection.close关闭连接

参考代码:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
​
import java.io.IOException;
​
/**
 * @BeforeTest初始化HBase连接,创建admin对象、@AfterTest关闭连接。
 */
public class TableAmdinTest {
    private Configuration configuration;
    private Connection connection;
    private Admin admin;
    @BeforeTest
    public void beforeTest() throws IOException {
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
        admin = connection.getAdmin();
    }
    @AfterTest
    public void afterTest() throws IOException {
        admin.close();
        connection.close();
    }
}
3、 需求一:使用Java代码创建表

实现步骤:

  1. 判断表是否存在

    a)存在,则退出

  2. 使用TableDescriptorBuilder.newBuilder构建表描述构建器

  3. 使用ColumnFamilyDescriptorBuilder.newBuilder构建列蔟描述构建器

  4. 构建列蔟描述,构建表描述

  5. 创建表

参考代码:

 @Test
 public void createTableTest() throws IOException {
        // 表名
        String TABLE_NAME = "WATER_BILL";
        // 列蔟名
        String COLUMN_FAMILY = "C1";
        // 1. 判断表是否存在
        if(admin.tableExists(TableName.valueOf(TABLE_NAME))) {
            return;
        }
        // 2. 构建表描述构建器
        TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE_NAME));
        // 3. 构建列蔟描述构建器
        ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(COLUMN_FAMILY));
        // 4. 构建列蔟描述
        ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptorBuilder.build();
        // 5. 构建表描述
        // 添加列蔟
        tableDescriptorBuilder.setColumnFamily(columnFamilyDescriptor);
        TableDescriptor tableDescriptor = tableDescriptorBuilder.build();
        // 6. 创建表
        admin.createTable(tableDescriptor);
}
4、 需求三:使用Java代码删除表

实现步骤:

  1. 判断表是否存在

  2. 如果存在,则禁用表

  3. 再删除表

代码实现:

 // 删除表
 @Test
 public void dropTable() throws IOException {
     // 表名
     TableName tableName = TableName.valueOf("WATER_BILL");
     // 1. 判断表是否存在
     if(admin.tableExists(tableName)) {
         // 2. 禁用表
         admin.disableTable(tableName);
         // 3. 删除表
         admin.deleteTable(tableName);
     }
}
5、 需求二:往表中插入一条数据
5.1 、创建类

在com.sias.hbase.test包中创建DataOpTest类,初始化Hbase连接,在@BeforeTest中初始化HBase连接,在@AfterTest中关闭Hbase连接。

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
​
import java.io.IOException;
​
public class DataOpTest {
    private Configuration configuration;
    private Connection connection;
​
    @BeforeTest
    public void beforeTest() throws IOException {
        configuration = HBaseConfiguration.create();
        connection = ConnectionFactory.createConnection(configuration);
    }
​
    @AfterTest
    public void afterTest() throws IOException {
        connection.close();
    }
}
5.2、插入姓名列数据

在表中插入一个行,该行只包含一个列。

ROWKEY姓名(列名:NAME)
4944191卫红红

实现步骤:

  1. 使用Hbase连接获取Htable

  2. 构建ROWKEY、列蔟名、列名

  3. 构建Put对象(对应put命令)

  4. 添加姓名列

  5. 使用Htable表对象执行put操作

  6. 关闭Htable表对象

代码实现:

 @Test
 public void addTest() throws IOException {
    // 1.使用Hbase连接获取Htable
    TableName waterBillTableName = TableName.valueOf("WATER_BILL");
    Table waterBillTable = connection.getTable(waterBillTableName);
​
    // 2.构建ROWKEY、列蔟名、列名
    String rowkey = "4944191";
    String cfName = "C1";
    String colName = "NAME";
​
    // 3.构建Put对象(对应put命令)
    Put put = new Put(Bytes.toBytes(rowkey));
​
    // 4.添加姓名列
    put.addColumn(Bytes.toBytes(cfName)
                , Bytes.toBytes(colName)
                , Bytes.toBytes("卫红红"));
​
    // 5.使用Htable表对象执行put操作
    waterBillTable.put(put);
    // 6. 关闭表
    waterBillTable.close();
}
5.3、 查看HBase中的数据
hbase:009:0> get 'WATER_BILL','4944191',{FORMATTER => 'toString'}
5.4、插入其他列
列名****说明****值****
ADDRESS用户地址河南省郑州市新郑市7单元267室
SEX性别
PAY_DATE缴费时间2023-05-10
NUM_CURRENT表示数(本次)308.1
NUM_PREVIOUS表示数(上次)283.1
NUM_USAGE用量(立方)25
TOTAL_MONEY合计金额150
RECORD_DATE查表日期2020-04-25
LATEST_DATE最迟缴费日期2020-06-09

代码实现:

  @Test
    public void addTest() throws IOException {
        // 1.使用Hbase连接获取Htable
        TableName waterBillTableName = TableName.valueOf("WATER_BILL");
        Table waterBillTable = connection.getTable(waterBillTableName);
​
        // 2.构建ROWKEY、列蔟名、列名
        String rowkey = "4944191";
        String cfName = "C1";
        String colName = "NAME";
        String colADDRESS = "ADDRESS";
        String colSEX = "SEX";
        String colPAY_DATE = "PAY_DATE";
        String colNUM_CURRENT = "NUM_CURRENT";
        String colNUM_PREVIOUS = "NUM_PREVIOUS";
        String colNUM_USAGE = "NUM_USAGE";
        String colTOTAL_MONEY = "TOTAL_MONEY";
        String colRECORD_DATE = "RECORD_DATE";
        String colLATEST_DATE = "LATEST_DATE";
​
        // 3.构建Put对象(对应put命令)
        Put put = new Put(Bytes.toBytes(rowkey));
​
        // 4.添加姓名列
        put.addColumn(Bytes.toBytes(cfName)
                , Bytes.toBytes(colName)
                , Bytes.toBytes("卫红红"));
        put.addColumn(Bytes.toBytes(cfName)
                , Bytes.toBytes(colADDRESS)
                , Bytes.toBytes("河南省郑州市新郑市7单元267室"));
        put.addColumn(Bytes.toBytes(cfName)
                , Bytes.toBytes(colSEX)
                , Bytes.toBytes("男"));
        put.addColumn(Bytes.toBytes(cfName)
                , Bytes.toBytes(colPAY_DATE)
                , Bytes.toBytes("2020-05-10"));
        put.addColumn(Bytes.toBytes(cfName)
                , Bytes.toBytes(colNUM_CURRENT)
                , Bytes.toBytes("308.1"));
        put.addColumn(Bytes.toBytes(cfName)
                , Bytes.toBytes(colNUM_PREVIOUS)
                , Bytes.toBytes("283.1"));
        put.addColumn(Bytes.toBytes(cfName)
                , Bytes.toBytes(colNUM_USAGE)
                , Bytes.toBytes("25"));
        put.addColumn(Bytes.toBytes(cfName)
                , Bytes.toBytes(colTOTAL_MONEY)
                , Bytes.toBytes("150"));
        put.addColumn(Bytes.toBytes(cfName)
                , Bytes.toBytes(colRECORD_DATE)
                , Bytes.toBytes("2020-04-25"));
        put.addColumn(Bytes.toBytes(cfName)
                , Bytes.toBytes(colLATEST_DATE)
                , Bytes.toBytes("2020-06-09"));
​
        // 5.使用Htable表对象执行put操作
        waterBillTable.put(put);
​
        // 6. 关闭表
        waterBillTable.close();
    }
6、需求三:查看一条数据

查询rowkey为4944191的所有列的数据,并打印出来。

实现步骤:

  1. 获取HTable

  2. 使用rowkey构建Get对象

  3. 执行get请求

  4. 获取所有单元格

  5. 打印rowkey

  6. 迭代单元格列表

  7. 关闭表

代码实现:

 @Test
    public void getOneTest() throws IOException {
        // 1. 获取HTable
        TableName waterBillTableName = TableName.valueOf("WATER_BILL");
        Table waterBilltable = connection.getTable(waterBillTableName);
        // 2. 使用rowkey构建Get对象
        Get get = new Get(Bytes.toBytes("4944191"));
        // 3. 执行get请求
        Result result = waterBilltable.get(get);
        // 4. 获取所有单元格
        List<Cell> cellList = result.listCells();
        // 打印rowkey
        System.out.println("rowkey => " + Bytes.toString(result.getRow()));
        // 5. 迭代单元格列表
        for (Cell cell : cellList) {
            // 打印列蔟名
            System.out.print(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()));
            System.out.println(" => " + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
​
        }
        // 6. 关闭表
        waterBilltable.close();
    }
7 、需求四:删除一条数据

删除rowkey为4944191的整条数据。

实现步骤:

  1. 获取HTable对象

  2. 根据rowkey构建delete对象

  3. 执行delete请求

  4. 关闭表

代码实现:

    @Test
    public void deleteOneTest() throws IOException {
        // 1. 获取HTable对象
        Table waterBillTable = connection.getTable(TableName.valueOf("WATER_BILL"));
        // 2. 根据rowkey构建delete对象
        Delete delete = new Delete(Bytes.toBytes("4944191"));
        // 3. 执行delete请求
        waterBillTable.delete(delete);
        // 4. 关闭表
        waterBillTable.close();
    }
8、 需求五:导入数据
8.1、需求

在资料中,有一份10W的抄表数据文件,我们需要将这里面的数据导入到HBase中。

8.2 、Import JOB

在HBase中,有一个Import的MapReduce作业,可以专门用来将数据文件导入到HBase中。

用法:

hbase org.apache.hadoop.hbase.mapreduce.Import 表名 HDFS数据文件路径
8.3、 导入数据
  1. 将资料中数据文件上传到Linux中

  2. 再将文件上传到hdfs中

[root@hadoop102 ~]# cd /opt/data
[root@hadoop102 data]# rz                                                       
[root@hadoop102 data]# rz
[root@hadoop102 data]# ls
NEWS_VISIT_CNT.txt  ORDER_INFO.txt  part-m-00000_10w
[root@hadoop102 data]# hadoop fs -mkdir -p /water_bill/output_ept_10W
[root@hadoop102 data]# hadoop fs -put part-m-00000_10w /water_bill/output_ept_10W

3.使用以下方式来进行数据导入

[root@hadoop102 data]# hbase org.apache.hadoop.hbase.mapreduce.Import WATER_BILL /water_bill/output_ept_10W
8.4、导出数据
[root@hadoop102 data]# hbase org.apache.hadoop.hbase.mapreduce.Export WATER_BILL /water_bill/output_ept_10W_export
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/在线问答5/article/detail/914402
推荐阅读
相关标签
  

闽ICP备14008679号