赞
踩
1、hbase-2.1.0介绍及分布式集群部署、HA集群部署、验证、硬件配置推荐
2、hbase-2.1.0 shell基本操作详解
3、HBase的java API基本操作(创建、删除表以及对数据的添加、删除、查询以及多条件查询)
4、HBase使用(namespace、数据分区、rowkey设计、原生api访问hbase)
5、Apache Phoenix(5.0.0-5.1.2) 介绍及部署、使用(基本使用、综合使用、二级索引示例)、数据分区示例
6、Base批量装载——Bulk load(示例一:基本使用示例)
7、Base批量装载-Bulk load(示例二:写千万级数据-mysql数据以ORCFile写入hdfs,然后导入hbase)
8、HBase批量装载-Bulk load(示例三:写千万级数据-mysql数据直接写成Hbase需要的数据,然后导入hbase)
本文主要介绍了通过java api操作hbase的基本示例。
本文依赖hbase环境可用。
本分主要分为2个部分,即maven依赖和源码示例。
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
将以下二个配置文件复制到resource目录中
hbase-site.xml
core-site.xml
注意:在哪个环境操作就使用哪个环境的配置文件,或者开发测试时直接在代码中设置zookeeper的地址
要操作Hbase也需要建立Hbase的连接。此处我们仍然使用TestNG来编写测试。使用@BeforeTest初始化HBase连接,创建admin对象、@AfterTest关闭连接。
import static org.junit.Assert.*; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; import org.junit.Test; /** * 创建和删除表操作 * * @author alanchan * */ public class AdminTest { private Configuration configuration; private Connection connection; private Admin admin; private String table_Name = "TEST"; @Before public void beforeTest() throws IOException { configuration = HBaseConfiguration.create(); connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); } @Test public void createTableTest() throws IOException { TableName tableName = TableName.valueOf(table_Name); // 1. 判断表是否存在 if (admin.tableExists(tableName)) { // a) 存在,则退出 return; } // 构建表 // 2. 使用TableDescriptorBuilder.newBuilder构建表描述构建器 // TableDescriptor: 表描述器,描述这个表有几个列簇、其他的属性都是在这里可以配置 TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName); // 3. 使用ColumnFamilyDescriptorBuilder.newBuilder构建列簇描述构建器 // 创建列簇也需要有列簇的描述器,需要用一个构建起来构建ColumnFamilyDescriptor // 经常会使用到一个工具类:Bytes(hbase包下的Bytes工具类) // 这个工具类可以将字符串、long、double类型转换成byte[]数组 // 也可以将byte[]数组转换为指定类型 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder .newBuilder(Bytes.toBytes("C1")); // 4. 构建列簇描述,构建表描述 ColumnFamilyDescriptor cfDes = columnFamilyDescriptorBuilder.build(); // 建立表和列簇的关联 tableDescriptorBuilder.setColumnFamily(cfDes); TableDescriptor tableDescriptor = tableDescriptorBuilder.build(); // 5. 创建表 admin.createTable(tableDescriptor); assertTrue("表创建成功", admin.tableExists(tableName)); } @Test public void deleteTableTest() throws IOException { TableName tableName = TableName.valueOf(table_Name); // 1. 判断表是否存在 if (admin.tableExists(tableName)) { // 2.如果存在,则禁用表 admin.disableTable(tableName); // 3.再删除表 admin.deleteTable(tableName); } assertFalse("表删除成功", admin.tableExists(tableName)); } @After public void afterTest() throws IOException { admin.close(); connection.close(); } }
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; import org.junit.Test; /** * 该示例是基于core-site.xml和hbase-site.xml文件没有的情况下,直接在代码中配置zookeeper信息 * * @author alanchan * */ public class AdminTestNoXmlConf { private Configuration configuration; private Connection connection; private Admin admin; private String table_Name = "TEST"; @Before public void beforeTest() throws IOException { configuration = HBaseConfiguration.create(); // 创建配置项,设置zookeeper的参数 configuration.set("hbase.zookeeper.quorum", "server1,server2,server3"); configuration.set("hbase.zookeeper.property.clientPort", "2181"); connection = ConnectionFactory.createConnection(configuration); admin = connection.getAdmin(); } @Test public void createTableTest() throws IOException { TableName tableName = TableName.valueOf(table_Name); // 1. 判断表是否存在 if (admin.tableExists(tableName)) { // a) 存在,则退出 return; } // 构建表 // 2. 使用TableDescriptorBuilder.newBuilder构建表描述构建器 // TableDescriptor: 表描述器,描述这个表有几个列簇、其他的属性都是在这里可以配置 TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(tableName); // 3. 使用ColumnFamilyDescriptorBuilder.newBuilder构建列簇描述构建器 // 创建列簇也需要有列簇的描述器,需要用一个构建起来构建ColumnFamilyDescriptor // 经常会使用到一个工具类:Bytes(hbase包下的Bytes工具类) // 这个工具类可以将字符串、long、double类型转换成byte[]数组 // 也可以将byte[]数组转换为指定类型 ColumnFamilyDescriptorBuilder columnFamilyDescriptorBuilder = ColumnFamilyDescriptorBuilder .newBuilder(Bytes.toBytes("C1")); // 4. 构建列簇描述,构建表描述 ColumnFamilyDescriptor cfDes = columnFamilyDescriptorBuilder.build(); // 建立表和列簇的关联 tableDescriptorBuilder.setColumnFamily(cfDes); TableDescriptor tableDescriptor = tableDescriptorBuilder.build(); // 5. 创建表 admin.createTable(tableDescriptor); assertTrue("表创建成功", admin.tableExists(tableName)); } @Test public void deleteTableTest() throws IOException { TableName tableName = TableName.valueOf(table_Name); // 1. 判断表是否存在 if (admin.tableExists(tableName)) { // 2.如果存在,则禁用表 admin.disableTable(tableName); // 3.再删除表 admin.deleteTable(tableName); } assertFalse("表删除成功", admin.tableExists(tableName)); } @After public void afterTest() throws IOException { admin.close(); connection.close(); } }
import java.io.IOException; import java.util.Iterator; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CompareOperator; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.util.Bytes; import org.junit.After; import org.junit.Before; import org.junit.Test; import lombok.extern.slf4j.Slf4j; /** * 添加、查询和删除数据。 * 修改数据可以看作是重新Put添加数据。 * * @author alanchan * */ @Slf4j public class OperatorTest { // Connection是线程安全的 private Connection connection; private TableName TABLE_NAME = TableName.valueOf("TEST"); @Before public void beforeTest() throws IOException { // 1. 使用HbaseConfiguration.create()创建Hbase配置 Configuration configuration = HBaseConfiguration.create(); // 2. 使用ConnectionFactory.createConnection()创建Hbase连接 connection = ConnectionFactory.createConnection(configuration); } @Test public void putTest() throws IOException { // 1. 使用Hbase连接获取Htable Table table = connection.getTable(TABLE_NAME); // 2. 构建ROWKEY、列簇名、列名 String rowkey = "4944191"; String columnFamily = "C1"; String columnName = "NAME"; String columnNameADDRESS = "ADDRESS"; String columnNameSEX = "SEX"; String columnNamePAY_DATE = "PAY_DATE"; String columnNameNUM_CURRENT = "NUM_CURRENT"; String columnNameNUM_PREVIOUS = "NUM_PREVIOUS"; String columnNameNUM_USAGE = "NUM_USAGE"; String columnNameTOTAL_MONEY = "TOTAL_MONEY"; String columnNameRECORD_DATE = "RECORD_DATE"; String columnNameLATEST_DATE = "LATEST_DATE"; // value: // 3. 构建Put对象(对应put命令) Put put = new Put(Bytes.toBytes(rowkey)); // 4. 添加姓名列 put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes("登卫红")); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnNameADDRESS), Bytes.toBytes("贵州省铜仁市德江县7单元267室")); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnNameSEX), Bytes.toBytes("男")); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnNamePAY_DATE), Bytes.toBytes("2020-05-10")); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnNameNUM_CURRENT), Bytes.toBytes("308.1")); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnNameNUM_PREVIOUS), Bytes.toBytes("283.1")); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnNameNUM_USAGE), Bytes.toBytes("25")); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnNameTOTAL_MONEY), Bytes.toBytes("150")); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnNameRECORD_DATE), Bytes.toBytes("2020-04-25")); put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnNameLATEST_DATE), Bytes.toBytes("2020-06-09")); // 5. 使用Htable表对象执行put操作 table.put(put); // 6. 关闭Htable表对象 // HTable是一个轻量级的对象,可以经常创建 // HTable它是一个非线程安全的API table.close(); } @Test public void getTest() throws IOException { // 1. 获取HTable Table table = connection.getTable(TABLE_NAME); // 2. 使用rowkey构建Get对象 Get get = new Get(Bytes.toBytes("4944191")); // 3. 执行get请求 Result result = table.get(get); // 4. 获取所有单元格 // 列出所有的单元格 List<Cell> cellList = result.listCells(); // 5. 打印rowkey byte[] rowkey = result.getRow(); log.info("rowkey={}", Bytes.toString(rowkey)); // 6. 迭代单元格列表 for (Cell cell : cellList) { // 将字节数组转换为字符串 // 获取列簇的名称 String cf = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); // 获取列的名称 String columnName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); // 获取值 String value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); log.info("列簇:列->值={}:{}->{}", cf, columnName, value); } // 7. 关闭表 table.close(); } @Test public void deleteTest() throws IOException { // 1. 获取HTable对象 Table table = connection.getTable(TABLE_NAME); // 2. 根据rowkey构建delete对象 Delete delete = new Delete(Bytes.toBytes("4944191")); // 3. 执行delete请求 table.delete(delete); // 4. 关闭表 table.close(); } @After public void afterTest() throws IOException { connection.close(); } // 查询2020年6月份所有用户的用水量 // // hbase(main):117:0> get 'WATER_BILL','9951726', {FORMATTER => 'toString'} // COLUMN CELL // C1:ADDRESS timestamp=1588911489455, value=安徽省宣城市市辖区13单元187室 // C1:LATEST_DATE timestamp=1588911489455, value=2019-07-03 // C1:NAME timestamp=1588911489455, value=检喜云 // C1:NUM_CURRENT timestamp=1588911489455, value=@}�fffff // C1:NUM_PREVIOUS timestamp=1588911489455, value=@z陙��� // C1:NUM_USAGE timestamp=1588911489455, value=@9 // C1:PAY_DATE timestamp=1588911489455, value=2020-09-26 // C1:RECORD_DATE timestamp=1588911489455, value=2019-07-18 // C1:SEX timestamp=1588911489455, value=男 // C1:TOTAL_MONEY timestamp=1588911489455, value=@`� @Test public void scanFilterTest() throws IOException { // 1. 获取表 Table table = connection.getTable(TABLE_NAME); // 2. 构建scan请求对象 Scan scan = new Scan(); // 3. 构建两个过滤器 // a) 构建两个日期范围过滤器(注意此处请使用RECORD_DATE——抄表日期比较 SingleColumnValueFilter startFilter = new SingleColumnValueFilter(Bytes.toBytes("C1"), Bytes.toBytes("RECORD_DATE"), CompareOperator.GREATER_OR_EQUAL, new BinaryComparator(Bytes.toBytes("2020-06-01"))); SingleColumnValueFilter endFilter = new SingleColumnValueFilter(Bytes.toBytes("C1"), Bytes.toBytes("RECORD_DATE"), CompareOperator.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("2020-06-30"))); // b) 构建过滤器列表 FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL, startFilter, endFilter); // 4. 执行scan扫描请求 scan.setFilter(filterList); ResultScanner resultScanner = table.getScanner(scan); Iterator<Result> iterator = resultScanner.iterator(); // 5. 迭代打印result while (iterator.hasNext()) { Result result = iterator.next(); // 列出所有的单元格 List<Cell> cellList = result.listCells(); // 5. 打印rowkey byte[] rowkey = result.getRow(); log.info("rowkey={}", Bytes.toString(rowkey)); // 6. 迭代单元格列表 for (Cell cell : cellList) { // 将字节数组转换为字符串 // 获取列簇的名称 String cf = Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()); // 获取列的名称 String columnName = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),cell.getQualifierLength()); String value = ""; if (columnName.equals("NUM_CURRENT") || columnName.equals("NUM_PREVIOUS") || columnName.equals("NUM_USAGE") || columnName.equals("TOTAL_MONEY")) { value = Bytes.toDouble(cell.getValueArray()) + ""; } else { // 获取值 value = Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()); } log.info("列簇:列->值={}:{}->{}", cf, columnName, value); } } // 7. 关闭ResultScanner resultScanner.close(); // 8. 关闭表 table.close(); } }
以上,完成了通过java api简单操作hbase的示例,如果需要更多更深入的使用,则需要参看官方文档。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。