赞
踩
主要分为:
第一种方式是HBase自身提供的比较原始的高效操作方式,而第二、第三则分别是Spark、Flink集成HBase的方式,最后一种是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark、Flink中调用。
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HBaseAdmin; public class CreateTableTest { public static void main(String[] args) throws IOException { //设置HBase数据库的连接配置参数 Configuration conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.8.71"); // Zookeeper的地址 conf.set("hbase.zookeeper.property.clientPort", "2181"); String tableName = "emp"; String[] family = { "basicinfo","deptinfo"}; HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); //创建表对象 HTableDescriptor hbaseTableDesc = new HTableDescriptor(TableName.valueOf(tableName)); for(int i = 0; i < family.length; i++) { //设置表字段 hbaseTableDesc.addFamily(new HColumnDescriptor(family[i])); } //判断表是否存在,不存在则创建,存在则打印提示信息 if(hbaseAdmin.tableExists(TableName.valueOf(tableName))) { System.out.println("TableExists!"); /** 这个方法是用来结束当前正在运行中的java虚拟机。如何status是非零参数,那么表示是非正常退出。 System.exit(0)是将你的整个虚拟机里的内容都停掉了 ,而dispose()只是关闭这个窗口,但是并没有停止整个application exit() 。无论如何,内存都释放了!也就是说连JVM都关闭了,内存里根本不可能还有什么东西 System.exit(0)是正常退出程序,而System.exit(1)或者说非0表示非正常退出程序 System.exit(status)不管status为何值都会退出程序。和return 相比有以下不同点: return是回到上一层,而System.exit(status)是回到最上层 */ System.exit(0); } else{ hbaseAdmin.createTable(hbaseTableDesc); System.out.println("Create table Success!"); } } }
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.HBaseAdmin; public class DeleteMyTable { public static void main(String[] args) throws IOException { String tableName = "mytb"; delete(tableName); } public static Configuration getConfiguration() { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase"); conf.set("hbase.zookeeper.quorum", "192.168.8.71"); return conf; } public static void delete(String tableName) throws IOException { HBaseAdmin hAdmin = new HBaseAdmin(getConfiguration()); if(hAdmin.tableExists(tableName)){ try { hAdmin.disableTable(tableName); hAdmin.deleteTable(tableName); System.err.println("Delete table Success"); } catch (IOException e) { System.err.println("Delete table Failed "); } }else{ System.err.println("table not exists"); } } }
某电商网站,后台有买家信息表buyer,每注册一名新用户网站后台会产生一条日志,并写入hbase中。
数据格式为:用户ID(buyer_id),注册日期(reg_date),注册IP(reg_ip),卖家状态(buyer_status,0表示冻结 ,1表示正常),以“\t”分割,数据内容如下:
用户ID 注册日期 注册IP 卖家状态
20385,2010-05-04,124.64.242.30,1
20386,2010-05-05,117.136.0.172,1
20387,2010-05-06 ,114.94.44.230,1
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.util.Bytes; public class PutData { public static void main(String[] args) throws MasterNotRunningException, ZooKeeperConnectionException, IOException { String tableName = "mytb"; String columnFamily = "mycf"; put(tableName, "20385", columnFamily, "2010-05-04:reg_ip", "124.64.242.30"); put(tableName, "20385", columnFamily, "2010-05-04:buyer_status", "1"); put(tableName, "20386", columnFamily, "2010-05-05:reg_ip", "117.136.0.172"); put(tableName, "20386", columnFamily, "2010-05-05:buyer_status", "1"); put(tableName, "20387", columnFamily, "2010-05-06:reg_ip", "114.94.44.230"); put(tableName, "20387", columnFamily, "2010-05-06:buyer_status", "1"); } public static Configuration getConfiguration() { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase"); conf.set("hbase.zookeeper.quorum", "192.168.8.71"); return conf; } public static void put(String tableName, String row, String columnFamily, String column, String data) throws IOException { HTable table = new HTable(getConfiguration(), tableName); Put put = new Put(Bytes.toBytes(row)); put.add(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data)); table.put(put); System.err.println("SUCCESS"); } }
注意:手动构建 HTable 已被弃用。请使用 连接 来实例化表 。通过连接,可以使用 Connection.getTable(TableName)
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.util.Bytes; public class GetData { public static void main(String[] args) throws IOException { String tableName = "mytb"; get(tableName, "20386"); } public static Configuration getConfiguration() { Configuration conf = HBaseConfiguration.create(); conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase"); conf.set("hbase.zookeeper.quorum", "192.168.8.71"); return conf; } public static void get(String tableName, String rowkey) throws IOException { HTable table = new HTable(getConfiguration(), tableName); Get get = new Get(Bytes.toBytes(rowkey)); Result result = table.get(get); byte[] value1 = result.getValue("mycf".getBytes(), "2010-05-05:reg_ip".getBytes()); byte[] value2 = result.getValue("mycf".getBytes(), "2010-05-05:buyer_status".getBytes()); System.err.println("line1:SUCCESS"); System.err.println("line2:" + new String(value1) + "\t" + new String(value2)); } }
前面的这些代码都这样执行:
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac GetData.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/java GetData
import java.io.IOException; import java.util.Iterator; import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.util.Bytes; public class HBaseTest2 { // 声明静态配置 static Configuration conf = null; static { conf = HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum", "192.168.205.153"); } /* * 创建表 * @tableName 表名 * @family 列族列表 */ public static void creatTable(String tableName, String[] family) throws Exception { HBaseAdmin admin = new HBaseAdmin(conf); HTableDescriptor desc = new HTableDescriptor(tableName); for (int i = 0; i < family.length; i++) { desc.addFamily(new HColumnDescriptor(family[i])); } if (admin.tableExists(tableName)) { System.out.println("table Exists!"); System.exit(0); } else { admin.createTable(desc); System.out.println("create table Success!"); } } /* * 为表添加数据(适合知道有多少列族的固定表) * @rowKey rowKey * @tableName 表名 * @column1 第一个列族列表 * @value1 第一个列的值的列表 * @column2 第二个列族列表 * @value2 第二个列的值的列表 */ public static void addData(String rowKey, String tableName, String[] column1, String[] value1, String[] column2, String[] value2) throws IOException { Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey HTable table = new HTable(conf, tableName);// 获取表 HColumnDescriptor[] columnFamilies = table.getTableDescriptor() // 获取所有的列族 .getColumnFamilies(); for (int i = 0; i < columnFamilies.length; i++) { String familyName = columnFamilies[i].getNameAsString(); // 获取列族名 if (familyName.equals("article")) { // article列族put数据 for (int j = 0; j < column1.length; j++) { put.add(Bytes.toBytes(familyName), Bytes.toBytes(column1[j]), Bytes.toBytes(value1[j])); } } if (familyName.equals("author")) { // author列族put数据 for (int j = 0; j < column2.length; j++) { put.add(Bytes.toBytes(familyName), Bytes.toBytes(column2[j]), Bytes.toBytes(value2[j])); } } } table.put(put); System.out.println("add data Success!"); } /* * 根据rwokey查询 * @rowKey rowKey * @tableName 表名 */ public static Result getResult(String tableName, String rowKey) throws IOException { Get get = new Get(Bytes.toBytes(rowKey)); HTable table = new HTable(conf, tableName);// 获取表 Result result = table.get(get); for (KeyValue kv : result.list()) { System.out.println("family:" + Bytes.toString(kv.getFamily())); System.out .println("qualifier:" + Bytes.toString(kv.getQualifier())); System.out.println("value:" + Bytes.toString(kv.getValue())); System.out.println("Timestamp:" + kv.getTimestamp()); System.out.println("-------------------------------------------"); } return result; } /* * 遍历查询hbase表 * @tableName 表名 */ public static void getResultScann(String tableName) throws IOException { Scan scan = new Scan(); ResultScanner rs = null; HTable table = new HTable(conf, tableName); try { rs = table.getScanner(scan); for (Result r : rs) { for (KeyValue kv : r.list()) { System.out.println("family:" + Bytes.toString(kv.getFamily())); System.out.println("qualifier:" + Bytes.toString(kv.getQualifier())); System.out .println("value:" + Bytes.toString(kv.getValue())); System.out.println("timestamp:" + kv.getTimestamp()); System.out .println("-------------------------------------------"); } } } finally { rs.close(); } } /* * 查询表中的某一列 * @tableName 表名 * @rowKey rowKey */ public static void getResultByColumn(String tableName, String rowKey, String familyName, String columnName) throws IOException { HTable table = new HTable(conf, tableName); Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); // 获取指定列族和列修饰符对应的列 Result result = table.get(get); for (KeyValue kv : result.list()) { System.out.println("family:" + Bytes.toString(kv.getFamily())); System.out .println("qualifier:" + Bytes.toString(kv.getQualifier())); System.out.println("value:" + Bytes.toString(kv.getValue())); System.out.println("Timestamp:" + kv.getTimestamp()); System.out.println("-------------------------------------------"); } } /* * 更新表中的某一列 * @tableName 表名 * @rowKey rowKey * @familyName 列族名 * @columnName 列名 * @value 更新后的值 */ public static void updateTable(String tableName, String rowKey, String familyName, String columnName, String value) throws IOException { HTable table = new HTable(conf, tableName); Put put = new Put(Bytes.toBytes(rowKey)); put.add(Bytes.toBytes(familyName), Bytes.toBytes(columnName), Bytes.toBytes(value)); table.put(put); System.out.println("update table Success!"); } /* * 查询某列数据的多个版本 * @tableName 表名 * @rowKey rowKey * @familyName 列族名 * @columnName 列名 */ public static void getResultByVersion(String tableName, String rowKey, String familyName, String columnName) throws IOException { HTable table = new HTable(conf, tableName); Get get = new Get(Bytes.toBytes(rowKey)); get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); get.setMaxVersions(5); Result result = table.get(get); for (KeyValue kv : result.list()) { System.out.println("family:" + Bytes.toString(kv.getFamily())); System.out .println("qualifier:" + Bytes.toString(kv.getQualifier())); System.out.println("value:" + Bytes.toString(kv.getValue())); System.out.println("Timestamp:" + kv.getTimestamp()); System.out.println("-------------------------------------------"); } List<?> results = table.get(get).list(); Iterator<?> it = results.iterator(); while (it.hasNext()) { System.out.println(it.next().toString()); } } /* * 删除指定的列 * @tableName 表名 * @rowKey rowKey * @familyName 列族名 * @columnName 列名 */ public static void deleteColumn(String tableName, String rowKey, String falilyName, String columnName) throws IOException { HTable table = new HTable(conf, tableName); Delete deleteColumn = new Delete(Bytes.toBytes(rowKey)); deleteColumn.deleteColumns(Bytes.toBytes(falilyName), Bytes.toBytes(columnName)); table.delete(deleteColumn); System.out.println(falilyName + ":" + columnName + "is deleted!"); } /* * 删除指定的列 * @tableName 表名 * @rowKey rowKey */ public static void deleteAllColumn(String tableName, String rowKey) throws IOException { HTable table = new HTable(conf, tableName); Delete deleteAll = new Delete(Bytes.toBytes(rowKey)); table.delete(deleteAll); System.out.println("all columns are deleted!"); } /* * 删除表 * @tableName 表名 */ public static void deleteTable(String tableName) throws IOException { HBaseAdmin admin = new HBaseAdmin(conf); admin.disableTable(tableName); admin.deleteTable(tableName); System.out.println(tableName + "is deleted!"); } public static void main(String[] args) throws Exception { // 创建表 // String tableName = "blog2"; String[] family = { "article","author" }; // creatTable(tableName,family); // 为表添加数据 // String[] column1 = { "title", "content", "tag" }; String[] value1 = {"Head First HBase", // "HBase is the Hadoop database. Use it when you need random, realtime read/write access to your Big Data." // , "Hadoop,HBase,NoSQL" }; String[] column2 = { "name", "nickname" }; // String[] value2 = { "nicholas", "lee" }; addData("rowkey1", "blog2", // column1, value1, column2, value2); // 删除一列 // deleteColumn("blog2", "rowkey1", "author", "nickname"); // 删除所有列 // deleteAllColumn("blog2", "rowkey1"); //删除表 // deleteTable("blog2"); // 查询 // getResult("blog2", "rowkey1"); // 查询某一列的值 // getResultByColumn("blog2", "rowkey1", "author", "name"); // updateTable("blog2", "rowkey1", "author", "name","bin"); // getResultByColumn("blog2", "rowkey1", "author", "name"); // 遍历查询 // getResultScann("blog2"); // 查询某列的多版本 getResultByVersion("blog2", "rowkey1", "author", "name"); } }
注意:手动构建 HTable 已被弃用。请使用 连接 来实例化表 。通过连接,可以使用 Connection.getTable(TableName)
我这里使用的是HBase 2.1.2版本。这里我们采用静态方式连接HBase,不同于2.1.2之前的版本,无需创建HBase线程池,HBase2.1.2提供的代码已经封装好,只需创建调用即可:
/**
* 声明静态配置
*/
static Configuration conf = null;
static Connection conn = null;
static {
conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03");
conf.set("hbase.zookeeper.property.client", "2181");
try{
conn = ConnectionFactory.createConnection(conf);
}catch (Exception e){
e.printStackTrace();
}
}
创建HBase表,是通过Admin来执行的,表和列簇则是分别通过TableDescriptorBuilder和ColumnFamilyDescriptorBuilder来构建:
/** * 创建只有一个列簇的表 * @throws Exception */ public static void createTable() throws Exception{ Admin admin = conn.getAdmin(); if (!admin.tableExists(TableName.valueOf("test"))){ TableName tableName = TableName.valueOf("test"); //表描述器构造器 TableDescriptorBuilder tdb = TableDescriptorBuilder.newBuilder(tableName); //列族描述器构造器 ColumnFamilyDescriptorBuilder cdb = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("user")); //获得列描述器 ColumnFamilyDescriptor cfd = cdb.build(); //添加列族 tdb.setColumnFamily(cfd); //获得表描述器 TableDescriptor td = tdb.build(); //创建表 admin.createTable(td); }else { System.out.println("表已存在"); } //关闭连接 conn.close(); }
通过put api来添加数据:
/** * 添加数据(多个rowKey,多个列族) * @throws Exception */ public static void insertMany() throws Exception{ Table table = conn.getTable(TableName.valueOf("test")); List<Put> puts = new ArrayList<Put>(); Put put1 = new Put(Bytes.toBytes("rowKey1")); put1.addColumn(Bytes.toBytes("user"), Bytes.toBytes("name"), Bytes.toBytes("wd")); Put put2 = new Put(Bytes.toBytes("rowKey2")); put2.addColumn(Bytes.toBytes("user"), Bytes.toBytes("age"), Bytes.toBytes("25")); Put put3 = new Put(Bytes.toBytes("rowKey3")); put3.addColumn(Bytes.toBytes("user"), Bytes.toBytes("weight"), Bytes.toBytes("60kg")); Put put4 = new Put(Bytes.toBytes("rowKey4")); put4.addColumn(Bytes.toBytes("user"), Bytes.toBytes("sex"), Bytes.toBytes("男")); puts.add(put1); puts.add(put2); puts.add(put3); puts.add(put4); table.put(puts); table.close(); }
/** * 根据rowKey删除一行数据、或者删除某一行的某个列簇,或者某一行某个列簇某列 * @param tableName * @param rowKey * @throws Exception */ public static void deleteData(TableName tableName, String rowKey, String rowKey, String columnFamily, String columnName) throws Exception{ Table table = conn.getTable(tableName); Delete delete = new Delete(Bytes.toBytes(rowKey)); //①根据rowKey删除一行数据 table.delete(delete); //②删除某一行的某一个列簇内容 delete.addFamily(Bytes.toBytes(columnFamily)); //③删除某一行某个列簇某列的值 delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName)); table.close(); }
使用Put api直接替换掉即可:
/** * 根据RowKey , 列簇, 列名修改值 * @param tableName * @param rowKey * @param columnFamily * @param columnName * @param columnValue * @throws Exception */ public static void updateData(TableName tableName, String rowKey, String columnFamily, String columnName, String columnValue) throws Exception{ Table table = conn.getTable(tableName); Put put1 = new Put(Bytes.toBytes(rowKey)); put1.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(columnValue)); table.put(put1); table.close(); }
HBase查询分为get、scan、scan和filter结合。filter过滤器又分为RowFilter(rowKey过滤器)、SingleColumnValueFilter(列值过滤器)、ColumnPrefixFilter(列名前缀过滤器)。
/** * 根据rowKey查询数据 * @param tableName * @param rowKey * @throws Exception */ public static void getResult(TableName tableName, String rowKey) throws Exception{ Table table = conn.getTable(tableName); //获得一行 Get get = new Get(Bytes.toBytes(rowKey)); Result set = table.get(get); Cell[] cells = set.rawCells(); for (Cell cell: cells){ System.out.println(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } table.close(); } //过滤器 LESS < LESS_OR_EQUAL <= EQUAL = NOT_EQUAL <> GREATER_OR_EQUAL >= GREATER > NO_OP 排除所有 /** * @param tableName * @throws Exception */ public static void scanTable(TableName tableName) throws Exception{ Table table = conn.getTable(tableName); //①全表扫描 Scan scan1 = new Scan(); ResultScanner rscan1 = table.getScanner(scan1); //②rowKey过滤器 Scan scan2 = new Scan(); //str$ 末尾匹配,相当于sql中的 %str ^str开头匹配,相当于sql中的str% RowFilter filter = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("Key1$")); scan2.setFilter(filter); ResultScanner rscan2 = table.getScanner(scan2); //③列值过滤器 Scan scan3 = new Scan(); //下列参数分别为列族,列名,比较符号,值 SingleColumnValueFilter filter3 = new SingleColumnValueFilter(Bytes.toBytes("author"), Bytes.toBytes("name"), CompareOperator.EQUAL, Bytes.toBytes("spark")); scan3.setFilter(filter3); ResultScanner rscan3 = table.getScanner(scan3); //列名前缀过滤器 Scan scan4 = new Scan(); ColumnPrefixFilter filter4 = new ColumnPrefixFilter(Bytes.toBytes("name")); scan4.setFilter(filter4); ResultScanner rscan4 = table.getScanner(scan4); //过滤器集合 Scan scan5 = new Scan(); FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL); SingleColumnValueFilter filter51 = new SingleColumnValueFilter(Bytes.toBytes("author"), Bytes.toBytes("name"), CompareOperator.EQUAL, Bytes.toBytes("spark")); ColumnPrefixFilter filter52 = new ColumnPrefixFilter(Bytes.toBytes("name")); list.addFilter(filter51); list.addFilter(filter52); scan5.setFilter(list); ResultScanner rscan5 = table.getScanner(scan5); for (Result rs : rscan){ String rowKey = Bytes.toString(rs.getRow()); System.out.println("row key :" + rowKey); Cell[] cells = rs.rawCells(); for (Cell cell: cells){ System.out.println(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + "::" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } System.out.println("-------------------------------------------"); } }
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; public class HBaseTableLister { public static void main(String[] args) throws IOException { // 创建配置对象 Configuration config = HBaseConfiguration.create(); // 设置HBase集群的连接信息 config.set("hbase.zookeeper.quorum", "zk1.example.com,zk2.example.com,zk3.example.com"); config.set("hbase.zookeeper.property.clientPort", "2181"); // 创建HBase连接 Connection connection = ConnectionFactory.createConnection(config); // 获取HBase管理员对象 Admin admin = connection.getAdmin(); // 查询HBase中的所有表 TableName[] tableNames = admin.listTableNames(); // 输出表名 for (TableName tableName : tableNames) { System.out.println(Bytes.toString(tableName.getName())); } // 关闭连接 admin.close(); connection.close(); } }
import com.alibaba.fastjson.JSONObject; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.*; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.util.StopWatch; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.VoidFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructField; import org.apache.spark.sql.types.StructType; import scala.Tuple2; import scala.Tuple3; import java.io.IOException; import java.util.*; import java.util.concurrent.TimeUnit; import static com.xiaoqiang.utils.DateUtil.dateToTimestamp; /** * @author: xiaoqiang * @version: 1.0 * @description: com.xiaoqiang.utils * @date:2023/8/30 */ public class HBaseUtil { public static List<Put> puts = new ArrayList<Put>(); public static List<Delete> deletes = new ArrayList<>(); /** * @Description: 获取 hbase 客户端连接 * @param properties */ public static Connection getHBaseConnect(Properties properties) { try { // 连接 Hbase Configuration hbConf = HBaseConfiguration.create(); hbConf.set("fs.defaultFS", properties.getProperty("fs.defaultFS")); hbConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); hbConf.set("hbase.zookeeper.quorum", properties.getProperty("hbase.zookeeper.quorum")); hbConf.set("hbase.zookeeper.property.clientPort", properties.getProperty("hbase.zookeeper.property.clientPort")); return ConnectionFactory.createConnection(hbConf); } catch (IOException e) { throw new RuntimeException(e); } } /** * @Description: 获取 hbase 的所有表名 * @param properties */ public static void getAllTablesName(Properties properties) throws IOException { // 创建HBase连接 Connection connection = getHBaseConnect(properties); // 获取HBase管理员对象 Admin admin = connection.getAdmin(); // 查询HBase中的所有表 TableName[] tableNames = admin.listTableNames(); // 输出表名 for (TableName tableName : tableNames) { System.out.println(Bytes.toString(tableName.getName())); } // 关闭连接 admin.close(); connection.close(); } /** * @Description: 获取指定表的 count 数 * @param properties * @param tablename */ public static void rowCountByScanFilter(Properties properties, String tablename) throws IOException { // 创建HBase连接 Connection connection = getHBaseConnect(properties); // 获取查询全部列 long rowCount = 0; try { // 计时 StopWatch stopWatch = new StopWatch(); stopWatch.start(); TableName name=TableName.valueOf(tablename); // connection为类静态变量 Table table = connection.getTable(name); Scan scan = new Scan(); // startRowKey 和 endRowKey scan.withStartRow("student-1_1694307600".getBytes()); scan.withStopRow("student-1_1694311200".getBytes()); FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL); Filter filter1 = new FirstKeyOnlyFilter(); Filter filter2 = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("^tudent-12")); list.addFilter(filter1); // list.addFilter(filter2); // FirstKeyOnlyFilter只会取得每行数据的第一个kv,提高count速度 scan.setFilter(list); ResultScanner rs = table.getScanner(scan); for (Result result : rs) { rowCount += result.size(); // break; if (rowCount == 1000) { System.out.println("rowCount-->"+rowCount); } else if (rowCount == 10000) { System.out.println("rowCount-->"+rowCount); } else if (rowCount == 50000) { System.out.println("rowCount-->"+rowCount); } else if (rowCount == 100000) { System.out.println("rowCount-->"+rowCount); } else if (rowCount == 1000000) { System.out.println("rowCount-->"+rowCount); } } stopWatch.stop(); System.out.println("RowCount: " + rowCount); System.out.println("统计耗时:" + stopWatch.now(TimeUnit.SECONDS)); } catch (Throwable e) { e.printStackTrace(); } } /** * @Description: 根据 rowKey 查询数据 * @param properties * @param tableName * @param rowKey */ public static void getResultByRowKey(Properties properties, String tableName, String rowKey) throws Exception{ // 创建HBase连接 Connection connection = getHBaseConnect(properties); TableName name=TableName.valueOf(tableName); Table table = connection.getTable(name); // 获得一行 Get get = new Get(Bytes.toBytes(rowKey)); Result set = table.get(get); Cell[] cells = set.rawCells(); for (Cell cell: cells){ System.out.println(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } table.close(); } /** * @Description: 根据列值查询数据(列值过滤器)或者 rowKey 前后缀查询数据 * @param properties * @param tableName */ public static void scanTableByFilter(Properties properties, String tableName) throws Exception{ // 创建HBase连接 Connection connection = getHBaseConnect(properties); TableName name=TableName.valueOf(tableName); Table table = connection.getTable(name); // rowKey过滤器 Scan scan1 = new Scan(); // str$ 末尾匹配,相当于sql中的 %str ^str开头匹配,相当于sql中的str% RowFilter filter1 = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("^tudent-58")); scan1.setFilter(filter1); ResultScanner rscan1 = table.getScanner(scan1); // 列值过滤器 Scan scan2 = new Scan(); // 下列参数分别为列族,列名,比较符号,值 SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("DATA"), Bytes.toBytes("work_status"), CompareOperator.EQUAL, Bytes.toBytes("1")); scan2.setFilter(filter2); ResultScanner rscan2 = table.getScanner(scan2); FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL); list.addFilter(filter1); list.addFilter(filter2); Scan scan3 = new Scan(); scan3.setFilter(list); ResultScanner rscan3 = table.getScanner(scan3); int i=0; for (Result rs : rscan3){ String rowKey = Bytes.toString(rs.getRow()); System.out.println("row key :" + rowKey); Cell[] cells = rs.rawCells(); // 控制取回的条数 i++; if (i == 100) { break; } for (Cell cell: cells){ System.out.println(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + "::" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); } System.out.println("-------------------------------------------"); } } /** * @Description: 根据表名查询数据(只限查数据量不大的数据,数据量大的表慎用!!!!!!!!!) * @param properties * @param tableName */ public static void getResultByTableName(Properties properties, String tableName) throws Exception{ // 创建HBase连接 Connection connection = getHBaseConnect(properties); TableName name=TableName.valueOf(tableName); Table table = connection.getTable(name); // 全表扫描 Scan scan = new Scan(); // str$ 末尾匹配,相当于sql中的 %str ^str开头匹配,相当于sql中的str% RowFilter filter1 = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("^tudent-1")); // scan.setFilter(filter1); // startRowKey 和 endRowKey scan.withStartRow("student-46_1694016000".getBytes()); scan.withStopRow("student-46_1694102400".getBytes()); ResultScanner rscan = table.getScanner(scan); int num = 0; for (Result rs : rscan){ String rowKey = Bytes.toString(rs.getRow()); System.out.println("row key :" + rowKey); Cell[] cells = rs.rawCells(); num++; for (Cell cell: cells){ // if (fieldName.equals("Date")) { System.out.println(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + "::" + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::" + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength())); // } } System.out.println("-------------------------------------------"); if (num == 10) { break; } } table.close(); } /** * @Description: 添加数据(多个rowKey,多个列族) * @param properties * @param tableName */ public static void insertMany(Properties properties, String tableName) { // 创建HBase连接 Connection connection = getHBaseConnect(properties); TableName name=TableName.valueOf(tableName); Table table = null; try { table = connection.getTable(name); } catch (IOException e) { e.printStackTrace(); } List<Put> puts = new ArrayList<Put>(); // Put put1 = new Put(Bytes.toBytes("student-1_1694072961_10")); // put1.addColumn(Bytes.toBytes("DATA"), Bytes.toBytes("work_status"), Bytes.toBytes("0")); Put put2 = new Put(Bytes.toBytes("student-58_1694074491")); put2.addColumn(Bytes.toBytes("DATA"), Bytes.toBytes("work_status"), Bytes.toBytes("0")); // puts.add(put1); puts.add(put2); try { table.put(puts); System.out.println("添加数据成功-->"); table.close(); } catch (IOException e) { e.printStackTrace(); } } /** * @Description: 添加数据(多个rowKey,多个列族) * @param dataDataset * @param properties * @param tableName */ public static void insertDataset(String workType, Dataset<Row> dataDataset, Properties properties, String tableName) { // 创建HBase连接 Connection connection = getHBaseConnect(properties); TableName name = TableName.valueOf(tableName); Table table = null; try { table = connection.getTable(name); } catch (IOException e) { e.printStackTrace(); } JavaRDD<Row> dataRDD = dataDataset.toJavaRDD(); dataRDD.foreachPartition((VoidFunction<Iterator<Row>>) rowIterator -> { while (rowIterator.hasNext()) { Row next = rowIterator.next(); String rowKey = next.getAs("work_id"); String workTime = next.getAs("work_start_time"); System.out.println("rowKey: " + rowKey); if (rowKey == null || rowKey == "") { // System.out.println("rowKey为空"); } else { Put put = new Put(Bytes.toBytes(rowKey)); put.addColumn(Bytes.toBytes("DATA"), Bytes.toBytes("WORK_TIME"), Bytes.toBytes(workTime)); put.addColumn(Bytes.toBytes("DATA"), Bytes.toBytes("DATA_SOURCE"), Bytes.toBytes("offline")); puts.add(put); } } }); try { table.put(puts); table.close(); } catch (IOException e) { e.printStackTrace(); } } /** * @Description: 根据 rowKey 删除一行数据、或者删除某一行的某个列簇,或者某一行某个列簇某列 * @param properties * @param tableName * @param rowKey * @param columnFamily * @param columnName */ public static void deleteDataOne(Properties properties, String tableName, String rowKey, String columnFamily, String columnName) throws Exception{ // 创建HBase连接 Connection connection = getHBaseConnect(properties); TableName name=TableName.valueOf(tableName); Table table = connection.getTable(name); Delete delete = new Delete(Bytes.toBytes(rowKey)); // 删除某一行的某一个列簇内容 // delete.addFamily(Bytes.toBytes(columnFamily)); // 删除某一行某个列簇某列的值 // delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName)); // 根据rowKey删除一行数据 table.delete(delete); table.close(); } /** * @Description: 获取 HBASE Dataset数据 * @param properties * @param spark * @param tableName * @param colsName * @return Dataset<Row> * @throws IOException */ public static Dataset<Row> getHbaseDatasetData(Properties properties, SparkSession spark , String tableName, List<String> colsName, String RegexString, String startRowKey, String endRowKey) { Scan scan = new Scan(); // 限制取回的条数 Filter filter = new PageFilter(100000); // rowKey 过滤器(str$ 末尾匹配,相当于sql中的 %str ^str开头匹配,相当于sql中的str%) RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator(RegexString)); FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL); // list.addFilter(filter); list.addFilter(rowFilter); scan.setFilter(list); // startRowKey 和 endRowKey scan.withStartRow(startRowKey.getBytes()); scan.withStopRow(endRowKey.getBytes()); Base64.Encoder base64Encoder = Base64.getEncoder(); String scanToString = null; try { scanToString = base64Encoder.encodeToString(ProtobufUtil.toScan(scan).toByteArray()); } catch (IOException e) { e.printStackTrace(); } Configuration hbConf = HBaseConfiguration.create(); hbConf.set("fs.defaultFS", properties.getProperty("fs.defaultFS")); hbConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); hbConf.set("hbase.zookeeper.quorum", properties.getProperty("hbase.zookeeper.quorum")); hbConf.set("hbase.zookeeper.property.clientPort", properties.getProperty("hbase.zookeeper.property.clientPort")); //指定输入表名 hbConf.set(TableInputFormat.INPUT_TABLE, tableName); //Base-64编码的Scanner hbConf.set(TableInputFormat.SCAN, scanToString); //获取JavaSparkContext JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext()); //newAPIHadoopRDD这个RDD用于读取存储在Hadoop中的数据,文件有新 API 输入格式和额外的配置选项,传递到输入格式 //参数conf会被广播,不可以修改,所以最好一个RDD一个conf JavaPairRDD<ImmutableBytesWritable, Result> hbaseRDD = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class); //使用map函数将JavaPairRDD=>JavaRDD,反之使用mapToPair函数 JavaRDD<Row> dataRDD = hbaseRDD.map(new Function<Tuple2<ImmutableBytesWritable, Result>, Row>() { //序列化表示UID,用于类的版本控制 private static final long serialVersionUID = 1L; //重写call()函数,返回Row类型,这部分需要根据自己需求将result中的数据封装为Object[] @Override public Row call(Tuple2<ImmutableBytesWritable, Result> tuple2) { Result result = tuple2._2; String[] values = new String[colsName.size() + 1]; values[0] = Bytes.toString(result.getRow()); for (int i = 0; i < colsName.size(); i++) { values[i + 1] = Bytes.toString(result.getValue("DATA".getBytes(), colsName.get(i).getBytes())); } //creat()方法参数(object... value) return RowFactory.create((Object[]) values); } }); List<StructField> structFields = new ArrayList<>(); //创建StructField,基本等同于list内有几个StructField就是几列,需要和上面的Row的Object[]对应 structFields.add(DataTypes.createStructField("rowKey", DataTypes.StringType, true)); for (String col : colsName) { structFields.add(DataTypes.createStructField(col, DataTypes.StringType, true)); } //构建schema,可以把它理解为架子,结构 StructType schema = DataTypes.createStructType(structFields); //生成DataFrame,把书放入架子,把数据放入结构里就变成了dataframe return spark.createDataFrame(dataRDD, schema); } /** * @Description: 使用Spark时经常需要把数据落入HBase中,如果使用普通的Java API,写入会速度很慢 * Bulk写入优势: * BulkLoad不会写WAL,也不会产生flush以及split。 * 如果我们大量调用PUT接口插入数据,可能会导致大量的GC操作。除了影响性能之外,严重时甚至可能会对HBase节点的稳定性造成影响。但是采用Bulk就不会有这个顾虑。 * 过程中没有大量的接口调用消耗性能 * 优点:1)速度快,2)几乎不会失败 3)对hbase服务几乎无影响 * <p> * * @param joinAllDs * @param properties * @param hbaseTableName * @param path */ public static void saveToHbaseByBulk(Dataset<Row> joinAllDs, Properties properties, String hbaseTableName, String path) { //Hbase集群配置信息 Configuration hconf = HBaseConfiguration.create(); String fsAddress = properties.getProperty("fs.defaultFS"); hconf.set("fs.defaultFS", fsAddress); hconf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); hconf.set("hbase.zookeeper.quorum", properties.getProperty("hbase.zookeeper.quorum")); hconf.set("hbase.zookeeper.property.clientPort", properties.getProperty("hbase.zookeeper.property.clientPort")); hconf.set("hbase.mapreduce.hfileoutputformat.table.name", hbaseTableName); //设置hfile最大个数 hconf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", "3200"); //设置hfile的大小 hconf.set("hbase.hregion.max.filesize", "10737418240"); Connection connection = null; Table table = null; Admin admin = null; try { //创建Hbase连接 获取要操作的Hbase表 connection = ConnectionFactory.createConnection(hconf); TableName tableName = TableName.valueOf(hbaseTableName); table = connection.getTable(tableName); admin = connection.getAdmin(); //创建表的Region分布 RegionLocator regionLocator = connection.getRegionLocator(tableName); //将HFile写入Hbase对象 LoadIncrementalHFiles bulkLoader = new LoadIncrementalHFiles(hconf); String[] columns = joinAllDs.columns(); JavaPairRDD<ImmutableBytesWritable, KeyValue> hfileSortRowKeyAndCmRDD = joinAllDs // .dropDuplicates("Time") .javaRDD() .mapToPair(row -> { //组合rowKey 后续通过sortByKey根据rowKey整体排序 String studentId = row.getAs("student_id"); String workId = row.getAs("work_id"); String rowKey = studentId + workId; //对Hbase列名按照字典排序 TreeMap<String, String> treeMap = new TreeMap<>(); //lambda表达式实现List接口sort方法排序 for (String cm : columns) { Object getVal = row.getAs(cm); if (getVal != null) { String replace = getVal.toString().replace("WrappedArray(", "[").replace(")", "]"); treeMap.put(cm, replace); } } return new Tuple2<String, TreeMap>(rowKey, treeMap); }).sortByKey(true) .coalesce(6) .flatMapToPair(tuple2 -> { List<Tuple2<ImmutableBytesWritable, KeyValue>> list = new ArrayList<>(); byte[] rowKey = tuple2._1.getBytes(); // 排序后的rowKey byte[] cmf = "DATA".getBytes(); //列族 TreeMap<String, String> data = tuple2._2; //列名,对应的value for (Map.Entry<String, String> map : data.entrySet()) { //从排序过后的有序map集合里取列名 -> 对应的value // Caused by: java.io.IOException: Added a key not lexically larger than previous. KeyValue keyValue = new KeyValue(rowKey, cmf, map.getKey().getBytes(), System.currentTimeMillis(), map.getValue().getBytes()); list.add(new Tuple2<>(new ImmutableBytesWritable(rowKey), keyValue)); } return list.iterator(); }); //HFile在HDFS文件夹地址,该地址不能存在如果存在则会报错 FileSystem fs = FileSystem.get(hconf); if (fs.exists(new Path(path))) { fs.delete(new Path(path), true); //将RDD以HFile文件格式保存HDFS hfileSortRowKeyAndCmRDD.coalesce(6).saveAsNewAPIHadoopFile(path, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, hconf); //将HFile 移动到 Hbase表的Region bulkLoader.doBulkLoad(new Path(path), admin, table, regionLocator); } else { hfileSortRowKeyAndCmRDD.coalesce(6).saveAsNewAPIHadoopFile(path, ImmutableBytesWritable.class, KeyValue.class, HFileOutputFormat2.class, hconf); bulkLoader.doBulkLoad(new Path(path), admin, table, regionLocator); } } catch (IOException e) { e.printStackTrace(); } finally { closeResource(connection, table, admin); } } /** * @Description: 关闭释放资源 * @param connection * @param table * @param admin */ private static void closeResource(Connection connection, Table table, Admin admin) { if (admin != null) { try { admin.close(); } catch (IOException e) { e.printStackTrace(); } } if (table != null) { try { table.close(); } catch (IOException e) { e.printStackTrace(); } } if (connection != null) { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws Exception { Logger.getLogger("org.apache.spark").setLevel(Level.WARN); // pro或dev String runMode = "test"; // 获取配置文件 String configFile = String.format("config-%s.properties", runMode); Properties properties = PropertiesUtil.getProperties(configFile); // 获取所有表名 // getAllTablesName(properties); // 根据 rowKey 查询数据(返回一条数据) // getResultByRowKey(properties, "heheda", "heheawe_1694073276_10"); // 获取指定表的 count 数 rowCountByScanFilter(properties, "heheda"); // 根据表名查询数据 // getResultByTableName(properties, "heheda"); // System.out.println(getResultByFmeterId(properties, "student-1")); // 添加数据 // insertMany(properties, "heheda"); // 根据 rowKey 删除一行数据、或者删除某一行的某个列簇,或者某一行某个列簇某列 // deleteDataOne(properties, "heheda", "fawef_1694066860", "DATA", "student_name"); // 根据 rowkey 或者列值查询数据 // scanTableByFilter(properties, "heheda"); } }
import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.{Put, Result} import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.mapreduce.Job import org.apache.spark.rdd.RDD import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.{SparkConf, SparkContext} /** * @Auther: huiq * @Date: 2021/7/23 * @Description: 连接hbase测试 */ object OperateHbaseTest { def main(args: Array[String]): Unit = { //初始化spark val sparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName) val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate() //初始化hbase,指定zookeeper的参数 val config: Configuration = HBaseConfiguration.create() config.set("hbase.zookeeper.quorum", "node01,node02,node03") // HBase集群服务器地址(任一台) config.set("hbase.zookeeper.property.clientPort", "2181") // zookeeper客户端访问端口 config.set("zookeeper.znode.parent", "/hbase-unsecure") val sc: SparkContext = spark.sparkContext // 设定读取的表名 config.set(TableInputFormat.INPUT_TABLE,"test_schema1:t2") // 从hbase获取一张表的所有数据,得到一个RDD val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(config,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result]) val count = hbaseRDD.count() println("Students RDD Count--->" + count) // 遍历输出 hbaseRDD.foreach({ case (_,result) => val key = Bytes.toString(result.getRow) val a = Bytes.toString(result.getValue("F".getBytes,"a".getBytes)) val b = Bytes.toString(result.getValue("F".getBytes,"b".getBytes)) println("Row key:"+key+" a:"+oldData+" b:"+newData) }) // 写hbase val tablename = "test_schema1:t2" config.set(TableOutputFormat.OUTPUT_TABLE, "test_schema1:t2") val job = new Job(config) job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Result]) job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) val indataRDD = sc.makeRDD(Array("3,26,M","4,27,M")) //构建两行记录 val rdd = indataRDD.map(_.split(',')).map{arr=>{ val put = new Put(Bytes.toBytes(arr(0))) //行健的值 put.addColumn(Bytes.toBytes("F"),Bytes.toBytes("a"),Bytes.toBytes(arr(1))) put.addColumn(Bytes.toBytes("F"),Bytes.toBytes("b"),Bytes.toBytes(arr(2))) // put.add(Bytes.toBytes("F"),Bytes.toBytes("a"),Bytes.toBytes(arr(1)) // 网上有这么写的,但是我这里报错,没有深入研究或许是版本的问题吧 (new ImmutableBytesWritable, put) }} rdd.saveAsNewAPIHadoopDataset(job.getConfiguration()) // 构建Row类型的RDD val rowRDD = hbaseRDD.map(p => { val name = Bytes.toString(p._2.getValue(Bytes.toBytes("F"),Bytes.toBytes("a"))) val age = Bytes.toString(p._2.getValue(Bytes.toBytes("F"),Bytes.toBytes("b"))) Row(name,age) }) // 构造DataFrame的元数据 val schema = StructType(List( StructField("a",StringType,true), StructField("b",StringType,true) )) // 构造DataFrame val dataFrame = spark.createDataFrame(rowRDD,schema) // 注册成为临时表供SQL查询操作 dataFrame.createTempView("t2") val result: DataFrame = spark.sql("select * from t2") result.show() } }
注意:我这里使用的是Ambari2.7.4+HDP3.1.4版本,正常整合之后这三行代码都不用写也可以连接成功hbase),但是一开始我启动程序报错:java.util.concurrent.ExecutionException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /hbase/hbaseid
原因:hbase-site.xml文件中的配置为:
<property>
<name>zookeeper.znode.parent</name>
<value>/hbase-unsecure</value>
</property>
方法一:改为<value>/hbase</value>
,重启hbase。
注意:zookeeper.znode.parent
的值为在zookeeper中创建的目录。
方法二:在代码中添加config.set("zookeeper.znode.parent", "/hbase-unsecure")
参考:
Spark入门:读写HBase数据
[HBase 基础]-- HBaseConfiguration类,参数说明
补充:在hive的ods层中创建完hbase的映射表后,想通过create table as sleect ...
语句在dwd层生成相应的表,但是却报错:
解决方法1:启动的时候添加添加相应的参数:beeline -hiveconf zookeeper.znode.parent=/hbase-unsecure
或者hive -hiveconf zookeeper.znode.parent=/hbase-unsecure
解决方法2:我使用的是ambari的hdp 3.1.4版本,添加如下配置后执行beeline命令即可
import java.util import com.rongrong.bigdata.utils.{KafkaZkUtils, UMSUtils} import kafka.utils.ZkUtils import org.apache.hadoop.hbase.client.{ConnectionFactory, Put} import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.serialization.StringDeserializer import org.apache.log4j.Logger import org.apache.spark.streaming.{Durations, StreamingContext} import scala.util.Try object StandardOnlie { private val logger: Logger = Logger.getLogger(this.getClass) def main(args: Array[String]): Unit = { val spark = InitializeSpark.createSparkSession("StandardOnlie", "local") val streamingContext = new StreamingContext(spark.sparkContext, Durations.seconds(30)) val kafkaParams = Map[String, Object]( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:6667,node02:6667,node03:6667", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer], ConsumerConfig.GROUP_ID_CONFIG -> "group-02", ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest", ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean) ) val topic: String = "djt_db.test_schema1.result" val zkUrl = "node01:2181,node02:2181,node03:2181" val sessionTimeout = 1000 val connectionTimeout = 1000 val zkClient = ZkUtils.createZkClient(zkUrl, sessionTimeout, connectionTimeout) val kafkaStream = KafkaZkUtils.createDirectStream(zkClient, streamingContext, kafkaParams, topic) // 开始处理批次消息 kafkaStream.foreachRDD(rdd => { // 处理从获取 kafka 中的数据 logger.info("=============== Total " + rdd.count() + " events in this batch ..") rdd.foreach(x => { val configuration = HBaseConfiguration.create() configuration.set("zookeeper.znode.parent", "/hbase-unsecure") val connection = ConnectionFactory.createConnection(configuration) // 获取kafka中真正的数据 var usmString = x.value() val flag: Boolean = UMSUtils.isHeartbeatUms(usmString) if (!flag) { // 过滤掉心跳数据 val usmActiontype = UMSUtils.getActionType(usmString) println(s"该条数据的类型为--->${usmActiontype}") println("读取kafka数据,解析正文数据:" + x.value()) val data: util.Map[String, String] = UMSUtils.getDataFromUms(usmString) //获取表连接 val table = connection.getTable(TableName.valueOf("test_schema1:t2")) val rowkey: String = 123456 + "_" + data.get("a") val put = new Put(Bytes.toBytes(rowkey)) put.addColumn(Bytes.toBytes("F"), Bytes.toBytes("ums_active_"), Bytes.toBytes(data.get("ums_active_"))) put.addColumn(Bytes.toBytes("F"), Bytes.toBytes("ums_id_"), Bytes.toBytes(data.get("ums_id_"))) put.addColumn(Bytes.toBytes("F"), Bytes.toBytes("ums_ts_"), Bytes.toBytes(data.get("ums_ts_"))) put.addColumn(Bytes.toBytes("F"), Bytes.toBytes("a"), Bytes.toBytes(data.get("a"))) put.addColumn(Bytes.toBytes("F"), Bytes.toBytes("b"), Bytes.toBytes(data.get("b"))) table.put(put) table.close() //将数据写入HBase,若出错关闭table Try(table.put(put)).getOrElse(table.close()) //分区数据写入HBase后关闭连接 table.close() println(s"解析到的数据为--->${data}") } }) // 更新offset到zookeeper中 KafkaZkUtils.saveOffsets(zkClient, topic, KafkaZkUtils.getZkPath(kafkaParams, topic), rdd) }) }) streamingContext.start() streamingContext.awaitTermination() streamingContext.stop() } }
import kafka.utils.{ZKGroupTopicDirs, ZkUtils} import org.I0Itec.zkclient.ZkClient import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} import org.apache.kafka.common.TopicPartition import org.apache.log4j.Logger import org.apache.spark.rdd.RDD import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils} object KafkaZkUtils { private val logger: Logger = Logger.getLogger(this.getClass) /** * 获取 consumer 在zk上的路径 * @param kafkaParams * @param topic * @return */ def getZkPath(kafkaParams: Map[String, Object], topic: String): String ={ val topicDirs = new ZKGroupTopicDirs(kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).toString, topic) s"${topicDirs.consumerOffsetDir}" } /** * 创建 DirectStream * @param zkClient * @param streamingContext * @param kafkaParams * @param topic * @return */ def createDirectStream(zkClient: ZkClient,streamingContext: StreamingContext, kafkaParams: Map[String, Object], topic: String): InputDStream[ConsumerRecord[String, String]] = { val zkPath = getZkPath(kafkaParams,topic) //读取 topic 的 offset val storedOffsets = readOffsets(zkClient, topic, zkPath) val kafkaStream: InputDStream[ConsumerRecord[String, String]] = storedOffsets match { //上次未保存offsets case None => KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParams) ) case Some(fromOffsets) => { KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, // 指定分区消费,无法动态感知分区变化 // ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets) ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams, fromOffsets) ) } } kafkaStream } /** * 保存 offset * @param zkClient * @param topic * @param zkPath * @param rdd */ def saveOffsets(zkClient: ZkClient,topic: String, zkPath: String, rdd: RDD[_]): Unit = { logger.info("Saving offsets to zookeeper") val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges offsetsRanges.foreach(offsetRange => logger.debug(s"Using ${offsetRange}")) val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.untilOffset}").mkString(",") logger.info(s"Writing offsets to Zookeeper: ${offsetsRangesStr}") ZkUtils(zkClient, false).updatePersistentPath(zkPath, offsetsRangesStr) } /** * 读取 offset * @param zkClient * @param topic * @param zkPath * @return */ def readOffsets(zkClient: ZkClient, topic: String, zkPath: String): Option[Map[TopicPartition, Long]] = { logger.info("Reading offsets from zookeeper") val (offsetsRangesStrOpt, _) = ZkUtils(zkClient, false).readDataMaybeNull(zkPath) offsetsRangesStrOpt match { case Some(offsetsRangesStr) => { logger.debug(s"Read offset ranges: ${ offsetsRangesStr }") val offsets: Map[TopicPartition, Long] = offsetsRangesStr.split(",").map(s => s.split(":")) .map({ case Array(partitionStr, offsetStr) => (new TopicPartition(topic, partitionStr.toInt) -> offsetStr.toLong) // 这里可以指定offset的位置读取,注意:还需要把上面createDirectStream方法的ConsumerStrategies.Assign代码打开 // (new TopicPartition(topic, partitionStr.toInt) -> "20229".toLong) }).toMap Some(offsets) } case None => logger.info("No offsets found in Zookeeper") None } } }
本来想用foreachPartition的,但是我没成功。可参考sparkstreaming写入hbase,他这个里面在foreachPartition里面创建连接可以,并且他提了一句“获取HBase连接,分区创建一个连接,分区不跨节点,不需要序列化”,但是在我这里这样写就报错了:
报错代码:
rdd.foreachPartition(partitionRecords => {
val configuration = HBaseConfiguration.create()
configuration.set("zookeeper.znode.parent", "/hbase-unsecure")
val connection = ConnectionFactory.createConnection(configuration) //获取HBase连接,分区创建一个连接,分区不跨节点,不需要序列化
partitionRecords.foreach(x => {
// 获取kafka中真正的数据
var usmString = x.value()
查了一些资料:关于scala:通过Spark写入HBase:任务不可序列化、zookeeper报错: org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.EOFException、Spark 序列化问题全解、HBase连接池,目前也还没有找到解决方法,有知道的人可以探讨一下。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。