当前位置:   article > 正文

hbase使用Java或者Scala的一些基础操作_scala读写hbase

scala读写hbase

一、HBase连接的方式概况

主要分为:

  • 纯Java API读写HBase的方式;
  • Spark读写HBase的方式;
  • Flink读写HBase的方式;
  • HBase通过Phoenix读写的方式;

  第一种方式是HBase自身提供的比较原始的高效操作方式,而第二、第三则分别是Spark、Flink集成HBase的方式,最后一种是第三方插件Phoenix集成的JDBC方式,Phoenix集成的JDBC操作方式也能在Spark、Flink中调用。

二、Java

1.HBase老版本:
(1)建表:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;

public class CreateTableTest {
    public static void main(String[] args) throws IOException  {
        //设置HBase数据库的连接配置参数
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum",  "192.168.8.71");  //  Zookeeper的地址
        conf.set("hbase.zookeeper.property.clientPort", "2181");
        String tableName = "emp";
        String[] family = { "basicinfo","deptinfo"};
        HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
        //创建表对象
        HTableDescriptor hbaseTableDesc = new HTableDescriptor(TableName.valueOf(tableName));
        for(int i = 0; i < family.length; i++) {
            //设置表字段
            hbaseTableDesc.addFamily(new HColumnDescriptor(family[i]));
        }
        //判断表是否存在,不存在则创建,存在则打印提示信息
        if(hbaseAdmin.tableExists(TableName.valueOf(tableName))) {
            System.out.println("TableExists!");
            /**
             这个方法是用来结束当前正在运行中的java虚拟机。如何status是非零参数,那么表示是非正常退出。
             System.exit(0)是将你的整个虚拟机里的内容都停掉了 ,而dispose()只是关闭这个窗口,但是并没有停止整个application exit() 。无论如何,内存都释放了!也就是说连JVM都关闭了,内存里根本不可能还有什么东西
             System.exit(0)是正常退出程序,而System.exit(1)或者说非0表示非正常退出程序
             System.exit(status)不管status为何值都会退出程序。和return 相比有以下不同点:   return是回到上一层,而System.exit(status)是回到最上层
             */
            System.exit(0);
        } else{
            hbaseAdmin.createTable(hbaseTableDesc);
            System.out.println("Create table Success!");
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
(2)删除表:
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HBaseAdmin;
public class DeleteMyTable {
 
	public static void main(String[] args) throws IOException {
		String tableName = "mytb";
		delete(tableName);
	}
	
	public static Configuration getConfiguration() {
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase");
		conf.set("hbase.zookeeper.quorum", "192.168.8.71");
		return conf;
	}
	
	public static void delete(String tableName) throws IOException {
		HBaseAdmin hAdmin = new HBaseAdmin(getConfiguration());
		if(hAdmin.tableExists(tableName)){
			try {
				hAdmin.disableTable(tableName);
				hAdmin.deleteTable(tableName);
				System.err.println("Delete table Success");
			} catch (IOException e) {
			System.err.println("Delete table Failed ");
		}
		}else{
		System.err.println("table not exists");
		}
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
(3)写入数据:

  某电商网站,后台有买家信息表buyer,每注册一名新用户网站后台会产生一条日志,并写入hbase中。

  数据格式为:用户ID(buyer_id),注册日期(reg_date),注册IP(reg_ip),卖家状态(buyer_status,0表示冻结 ,1表示正常),以“\t”分割,数据内容如下:

用户ID   注册日期  注册IP   卖家状态
20385,2010-05-04,124.64.242.30,1
20386,2010-05-05,117.136.0.172,1
20387,2010-05-06 ,114.94.44.230,1
  • 1
  • 2
  • 3
  • 4
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class PutData {
    public static void main(String[] args) throws MasterNotRunningException,
            ZooKeeperConnectionException, IOException {
        String tableName = "mytb";
        String columnFamily = "mycf";

        put(tableName, "20385", columnFamily, "2010-05-04:reg_ip", "124.64.242.30");
        put(tableName, "20385", columnFamily, "2010-05-04:buyer_status", "1");

        put(tableName, "20386", columnFamily, "2010-05-05:reg_ip", "117.136.0.172");
        put(tableName, "20386", columnFamily, "2010-05-05:buyer_status", "1");

        put(tableName, "20387", columnFamily, "2010-05-06:reg_ip", "114.94.44.230");
        put(tableName, "20387", columnFamily, "2010-05-06:buyer_status", "1");

    }

    public static Configuration getConfiguration() {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase");
        conf.set("hbase.zookeeper.quorum", "192.168.8.71");
        return conf;
    }

    public static void put(String tableName, String row, String columnFamily,
                           String column, String data) throws IOException {
        HTable table = new HTable(getConfiguration(), tableName);
        Put put = new Put(Bytes.toBytes(row));
        put.add(Bytes.toBytes(columnFamily),
                Bytes.toBytes(column),
                Bytes.toBytes(data));
        table.put(put);
        System.err.println("SUCCESS");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44

注意:手动构建 HTable 已被弃用。请使用 连接 来实例化表 。通过连接,可以使用 Connection.getTable(TableName)

(4)查询:
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;
 
public class GetData {
	public static void main(String[] args) throws IOException {
		String tableName = "mytb";
		get(tableName, "20386");
 
	}
 
	public static Configuration getConfiguration() {
		Configuration conf = HBaseConfiguration.create();
		conf.set("hbase.rootdir", "hdfs://192.168.8.71:9000/hbase");
		conf.set("hbase.zookeeper.quorum", "192.168.8.71");
		return conf;
	}
 
	public static void get(String tableName, String rowkey) throws IOException {
		HTable table = new HTable(getConfiguration(), tableName);
		Get get = new Get(Bytes.toBytes(rowkey));
		Result result = table.get(get);
		byte[] value1 = result.getValue("mycf".getBytes(), "2010-05-05:reg_ip".getBytes());
		byte[] value2 = result.getValue("mycf".getBytes(), "2010-05-05:buyer_status".getBytes());
		System.err.println("line1:SUCCESS");
		System.err.println("line2:" 
		        + new String(value1) + "\t"
				+ new String(value2));
	}
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35

  前面的这些代码都这样执行:

[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/javac GetData.java
[hadoop@h71 q1]$ /usr/jdk1.7.0_25/bin/java GetData 
  • 1
  • 2
(5)通过Java Api与HBase交互的一些常用的操作整合:
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
 
public class HBaseTest2 {
 
    // 声明静态配置
    static Configuration conf = null;
    static {
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "192.168.205.153");
    }
 
    /*
     * 创建表
     * @tableName 表名
     * @family 列族列表
     */
    public static void creatTable(String tableName, String[] family) throws Exception {
        HBaseAdmin admin = new HBaseAdmin(conf);
        HTableDescriptor desc = new HTableDescriptor(tableName);
        for (int i = 0; i < family.length; i++) {
            desc.addFamily(new HColumnDescriptor(family[i]));
        }
        if (admin.tableExists(tableName)) {
            System.out.println("table Exists!");
            System.exit(0);
        } else {
            admin.createTable(desc);
            System.out.println("create table Success!");
        }
    }
 
    /*
     * 为表添加数据(适合知道有多少列族的固定表)
     * @rowKey rowKey
     * @tableName 表名
     * @column1 第一个列族列表
     * @value1 第一个列的值的列表
     * @column2 第二个列族列表
     * @value2 第二个列的值的列表
     */
    public static void addData(String rowKey, String tableName,
            String[] column1, String[] value1, String[] column2, String[] value2)
            throws IOException {
        Put put = new Put(Bytes.toBytes(rowKey));// 设置rowkey
        HTable table = new HTable(conf, tableName);// 获取表
        HColumnDescriptor[] columnFamilies = table.getTableDescriptor() // 获取所有的列族
                .getColumnFamilies();
 
        for (int i = 0; i < columnFamilies.length; i++) {
            String familyName = columnFamilies[i].getNameAsString(); // 获取列族名
            if (familyName.equals("article")) { // article列族put数据
                for (int j = 0; j < column1.length; j++) {
                    put.add(Bytes.toBytes(familyName),
                            Bytes.toBytes(column1[j]), Bytes.toBytes(value1[j]));
                }
            }
            if (familyName.equals("author")) { // author列族put数据
                for (int j = 0; j < column2.length; j++) {
                    put.add(Bytes.toBytes(familyName),
                            Bytes.toBytes(column2[j]), Bytes.toBytes(value2[j]));
                }
            }
        }
        table.put(put);
        System.out.println("add data Success!");
    }
 
    /*
     * 根据rwokey查询
     * @rowKey rowKey
     * @tableName 表名
     */
    public static Result getResult(String tableName, String rowKey) throws IOException {
        Get get = new Get(Bytes.toBytes(rowKey));
        HTable table = new HTable(conf, tableName);// 获取表
        Result result = table.get(get);
        for (KeyValue kv : result.list()) {
            System.out.println("family:" + Bytes.toString(kv.getFamily()));
            System.out
                    .println("qualifier:" + Bytes.toString(kv.getQualifier()));
            System.out.println("value:" + Bytes.toString(kv.getValue()));
            System.out.println("Timestamp:" + kv.getTimestamp());
            System.out.println("-------------------------------------------");
        }
        return result;
    }
 
    /*
     * 遍历查询hbase表
     * @tableName 表名
     */
    public static void getResultScann(String tableName) throws IOException {
        Scan scan = new Scan();
        ResultScanner rs = null;
        HTable table = new HTable(conf, tableName);
        try {
            rs = table.getScanner(scan);
            for (Result r : rs) {
                for (KeyValue kv : r.list()) {
                    System.out.println("family:"
                            + Bytes.toString(kv.getFamily()));
                    System.out.println("qualifier:"
                            + Bytes.toString(kv.getQualifier()));
                    System.out
                            .println("value:" + Bytes.toString(kv.getValue()));
                    System.out.println("timestamp:" + kv.getTimestamp());
                    System.out
                            .println("-------------------------------------------");
                }
            }
        } finally {
            rs.close();
        }
    }
 
    /*
     * 查询表中的某一列
     * @tableName 表名
     * @rowKey rowKey
     */
    public static void getResultByColumn(String tableName, String rowKey,
            String familyName, String columnName) throws IOException {
    	HTable table = new HTable(conf, tableName);
        Get get = new Get(Bytes.toBytes(rowKey));
        get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName)); // 获取指定列族和列修饰符对应的列
        Result result = table.get(get);
        for (KeyValue kv : result.list()) {
            System.out.println("family:" + Bytes.toString(kv.getFamily()));
            System.out
                    .println("qualifier:" + Bytes.toString(kv.getQualifier()));
            System.out.println("value:" + Bytes.toString(kv.getValue()));
            System.out.println("Timestamp:" + kv.getTimestamp());
            System.out.println("-------------------------------------------");
        }
    }
 
    /*
     * 更新表中的某一列
     * @tableName 表名
     * @rowKey rowKey
     * @familyName 列族名
     * @columnName 列名
     * @value 更新后的值
     */
    public static void updateTable(String tableName, String rowKey,
            String familyName, String columnName, String value)
            throws IOException {
    	HTable table = new HTable(conf, tableName);
        Put put = new Put(Bytes.toBytes(rowKey));
        put.add(Bytes.toBytes(familyName), Bytes.toBytes(columnName),
                Bytes.toBytes(value));
        table.put(put);
        System.out.println("update table Success!");
    }
 
    /*
     * 查询某列数据的多个版本
     * @tableName 表名
     * @rowKey rowKey
     * @familyName 列族名
     * @columnName 列名
     */
    public static void getResultByVersion(String tableName, String rowKey,
            String familyName, String columnName) throws IOException {
    	HTable table = new HTable(conf, tableName);
        Get get = new Get(Bytes.toBytes(rowKey));
        get.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName));
        get.setMaxVersions(5);
        Result result = table.get(get);
        for (KeyValue kv : result.list()) {
            System.out.println("family:" + Bytes.toString(kv.getFamily()));
            System.out
                    .println("qualifier:" + Bytes.toString(kv.getQualifier()));
            System.out.println("value:" + Bytes.toString(kv.getValue()));
            System.out.println("Timestamp:" + kv.getTimestamp());
            System.out.println("-------------------------------------------");
        }
        List<?> results = table.get(get).list(); Iterator<?> it =
        results.iterator(); while (it.hasNext()) {
        System.out.println(it.next().toString()); }
    }
 
    /*
     * 删除指定的列
     * @tableName 表名
     * @rowKey rowKey
     * @familyName 列族名
     * @columnName 列名
     */
    public static void deleteColumn(String tableName, String rowKey,
            String falilyName, String columnName) throws IOException {
    	HTable table = new HTable(conf, tableName);
        Delete deleteColumn = new Delete(Bytes.toBytes(rowKey));
        deleteColumn.deleteColumns(Bytes.toBytes(falilyName),
                Bytes.toBytes(columnName));
        table.delete(deleteColumn);
        System.out.println(falilyName + ":" + columnName + "is deleted!");
    }
 
    /*
     * 删除指定的列
     * @tableName 表名
     * @rowKey rowKey
     */
    public static void deleteAllColumn(String tableName, String rowKey)
            throws IOException {
    	HTable table = new HTable(conf, tableName);
        Delete deleteAll = new Delete(Bytes.toBytes(rowKey));
        table.delete(deleteAll);
        System.out.println("all columns are deleted!");
    }
 
    /*
     * 删除表
     * @tableName 表名
     */
    public static void deleteTable(String tableName) throws IOException {
        HBaseAdmin admin = new HBaseAdmin(conf);
        admin.disableTable(tableName);
        admin.deleteTable(tableName);
        System.out.println(tableName + "is deleted!");
    }
 
    public static void main(String[] args) throws Exception {
 
        // 创建表
//        String tableName = "blog2"; String[] family = { "article","author" };
//        creatTable(tableName,family);
 
        // 为表添加数据
//        String[] column1 = { "title", "content", "tag" }; String[] value1 = {"Head First HBase",
//        "HBase is the Hadoop database. Use it when you need random, realtime read/write access to your Big Data."
//        , "Hadoop,HBase,NoSQL" }; String[] column2 = { "name", "nickname" };
//        String[] value2 = { "nicholas", "lee" }; addData("rowkey1", "blog2",
//        column1, value1, column2, value2);
 
        // 删除一列
//         deleteColumn("blog2", "rowkey1", "author", "nickname");
 
        // 删除所有列
//        deleteAllColumn("blog2", "rowkey1");
        
        //删除表
//        deleteTable("blog2");
 
        // 查询
//         getResult("blog2", "rowkey1");
 
        // 查询某一列的值
//         getResultByColumn("blog2", "rowkey1", "author", "name");
//         updateTable("blog2", "rowkey1", "author", "name","bin");
//         getResultByColumn("blog2", "rowkey1", "author", "name");
 
        // 遍历查询
//         getResultScann("blog2");
 
        // 查询某列的多版本
         getResultByVersion("blog2", "rowkey1", "author", "name");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277

注意:手动构建 HTable 已被弃用。请使用 连接 来实例化表 。通过连接,可以使用 Connection.getTable(TableName)
 

2.HBase 2版本:

参考:HBase读写的几种方式(一)java篇

(1)连接HBase:

  我这里使用的是HBase 2.1.2版本。这里我们采用静态方式连接HBase,不同于2.1.2之前的版本,无需创建HBase线程池,HBase2.1.2提供的代码已经封装好,只需创建调用即可:

/**
  * 声明静态配置
  */
static Configuration conf = null;
static Connection conn = null;
static {
       conf = HBaseConfiguration.create();
       conf.set("hbase.zookeeper.quorum", "hadoop01,hadoop02,hadoop03");
       conf.set("hbase.zookeeper.property.client", "2181");
       try{
           conn = ConnectionFactory.createConnection(conf);
       }catch (Exception e){
           e.printStackTrace();
       }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
(2)创建HBase的表:

  创建HBase表,是通过Admin来执行的,表和列簇则是分别通过TableDescriptorBuilder和ColumnFamilyDescriptorBuilder来构建:

/**
 * 创建只有一个列簇的表
 * @throws Exception
 */
public static void createTable() throws Exception{
    Admin admin = conn.getAdmin();
    if (!admin.tableExists(TableName.valueOf("test"))){
        TableName tableName = TableName.valueOf("test");
        //表描述器构造器
        TableDescriptorBuilder tdb = TableDescriptorBuilder.newBuilder(tableName);
        //列族描述器构造器
        ColumnFamilyDescriptorBuilder cdb = ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("user"));
        //获得列描述器
        ColumnFamilyDescriptor cfd = cdb.build();
        //添加列族
        tdb.setColumnFamily(cfd);
        //获得表描述器
        TableDescriptor td = tdb.build();
        //创建表
        admin.createTable(td);
    }else {
        System.out.println("表已存在");
    }
    //关闭连接
    conn.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
(3)HBase表添加数据:

  通过put api来添加数据:

/**
 * 添加数据(多个rowKey,多个列族)
 * @throws Exception
 */
public static void insertMany() throws Exception{
    Table table = conn.getTable(TableName.valueOf("test"));
    List<Put> puts = new ArrayList<Put>();
    Put put1 = new Put(Bytes.toBytes("rowKey1"));
    put1.addColumn(Bytes.toBytes("user"), Bytes.toBytes("name"), Bytes.toBytes("wd"));

    Put put2 = new Put(Bytes.toBytes("rowKey2"));
    put2.addColumn(Bytes.toBytes("user"), Bytes.toBytes("age"), Bytes.toBytes("25"));

    Put put3 = new Put(Bytes.toBytes("rowKey3"));
    put3.addColumn(Bytes.toBytes("user"), Bytes.toBytes("weight"), Bytes.toBytes("60kg"));

    Put put4 = new Put(Bytes.toBytes("rowKey4"));
    put4.addColumn(Bytes.toBytes("user"), Bytes.toBytes("sex"), Bytes.toBytes("男"));

    puts.add(put1);
    puts.add(put2);
    puts.add(put3);
    puts.add(put4);
    table.put(puts);
    table.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
(4)删除HBase的列簇或列:
/**
 * 根据rowKey删除一行数据、或者删除某一行的某个列簇,或者某一行某个列簇某列
 * @param tableName
 * @param rowKey
 * @throws Exception
 */
public static void deleteData(TableName tableName, String rowKey, String rowKey, String columnFamily, String columnName) throws Exception{
    Table table = conn.getTable(tableName);
    Delete delete = new Delete(Bytes.toBytes(rowKey));
    //①根据rowKey删除一行数据
    table.delete(delete);
    
    //②删除某一行的某一个列簇内容
    delete.addFamily(Bytes.toBytes(columnFamily));
    
    //③删除某一行某个列簇某列的值
    delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));
    table.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
(5)更新HBase表的列:

  使用Put api直接替换掉即可:

/**
 * 根据RowKey , 列簇, 列名修改值
 * @param tableName
 * @param rowKey
 * @param columnFamily
 * @param columnName
 * @param columnValue
 * @throws Exception
 */
public static void updateData(TableName tableName, String rowKey, String columnFamily, String columnName, String columnValue) throws Exception{
    Table table = conn.getTable(tableName);
    Put put1 = new Put(Bytes.toBytes(rowKey));
    put1.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName), Bytes.toBytes(columnValue));
    table.put(put1);
    table.close();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
(6)HBase查询:

  HBase查询分为get、scan、scan和filter结合。filter过滤器又分为RowFilter(rowKey过滤器)、SingleColumnValueFilter(列值过滤器)、ColumnPrefixFilter(列名前缀过滤器)。

/**
 * 根据rowKey查询数据
 * @param tableName
 * @param rowKey
 * @throws Exception
 */
public static void getResult(TableName tableName, String rowKey) throws Exception{
    Table table = conn.getTable(tableName);
    //获得一行
    Get get = new Get(Bytes.toBytes(rowKey));
    Result set = table.get(get);
    Cell[] cells = set.rawCells();
    for (Cell cell: cells){
        System.out.println(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::" +
        Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
    }
    table.close();
}

//过滤器 LESS <  LESS_OR_EQUAL <=   EQUAL =   NOT_EQUAL <>   GREATER_OR_EQUAL >=   GREATER >   NO_OP 排除所有

/**
 * @param tableName
 * @throws Exception
 */
public static void scanTable(TableName tableName) throws Exception{
    Table table = conn.getTable(tableName);
    
    //①全表扫描
    Scan scan1 = new Scan();
    ResultScanner rscan1 = table.getScanner(scan1);
    
    //②rowKey过滤器
    Scan scan2 = new Scan();
    //str$ 末尾匹配,相当于sql中的 %str  ^str开头匹配,相当于sql中的str%
    RowFilter filter = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("Key1$"));
    scan2.setFilter(filter);
    ResultScanner rscan2 = table.getScanner(scan2);
    
    //③列值过滤器
    Scan scan3 = new Scan();
    //下列参数分别为列族,列名,比较符号,值
    SingleColumnValueFilter filter3 = new SingleColumnValueFilter(Bytes.toBytes("author"), Bytes.toBytes("name"),
               CompareOperator.EQUAL, Bytes.toBytes("spark"));
    scan3.setFilter(filter3);
    ResultScanner rscan3 = table.getScanner(scan3);
    
    //列名前缀过滤器
    Scan scan4 = new Scan();
    ColumnPrefixFilter filter4 = new ColumnPrefixFilter(Bytes.toBytes("name"));
    scan4.setFilter(filter4);
    ResultScanner rscan4 = table.getScanner(scan4);
    
    //过滤器集合
    Scan scan5 = new Scan();
    FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
    SingleColumnValueFilter filter51 = new SingleColumnValueFilter(Bytes.toBytes("author"), Bytes.toBytes("name"),
              CompareOperator.EQUAL, Bytes.toBytes("spark"));
    ColumnPrefixFilter filter52 = new ColumnPrefixFilter(Bytes.toBytes("name"));
    list.addFilter(filter51);
    list.addFilter(filter52);
    scan5.setFilter(list);
    ResultScanner rscan5 = table.getScanner(scan5);
    
    for (Result rs : rscan){
        String rowKey = Bytes.toString(rs.getRow());
        System.out.println("row key :" + rowKey);
        Cell[] cells = rs.rawCells();
        for (Cell cell: cells){
            System.out.println(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + "::"
                    + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::"
                    + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
        }
        System.out.println("-------------------------------------------");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
(7)快速测试hbase连通性Demo(查询所有表名):
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.util.Bytes;

import java.io.IOException;

public class HBaseTableLister {

    public static void main(String[] args) throws IOException {
        // 创建配置对象
        Configuration config = HBaseConfiguration.create();

        // 设置HBase集群的连接信息
        config.set("hbase.zookeeper.quorum", "zk1.example.com,zk2.example.com,zk3.example.com");
        config.set("hbase.zookeeper.property.clientPort", "2181");

        // 创建HBase连接
        Connection connection = ConnectionFactory.createConnection(config);

        // 获取HBase管理员对象
        Admin admin = connection.getAdmin();

        // 查询HBase中的所有表
        TableName[] tableNames = admin.listTableNames();

        // 输出表名
        for (TableName tableName : tableNames) {
            System.out.println(Bytes.toString(tableName.getName()));
        }

        // 关闭连接
        admin.close();
        connection.close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
(8)工具类实践:
import com.alibaba.fastjson.JSONObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StopWatch;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import scala.Tuple2;
import scala.Tuple3;

import java.io.IOException;
import java.util.*;
import java.util.concurrent.TimeUnit;

import static com.xiaoqiang.utils.DateUtil.dateToTimestamp;

/**
 * @author: xiaoqiang
 * @version: 1.0
 * @description: com.xiaoqiang.utils
 * @date:2023/8/30
 */
public class HBaseUtil {

    public static List<Put> puts = new ArrayList<Put>();
    public static List<Delete> deletes = new ArrayList<>();

    /**
     * @Description: 获取 hbase 客户端连接
     * @param properties
     */
    public static Connection getHBaseConnect(Properties properties) {
        try {
            // 连接 Hbase
            Configuration hbConf = HBaseConfiguration.create();
            hbConf.set("fs.defaultFS", properties.getProperty("fs.defaultFS"));
            hbConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
            hbConf.set("hbase.zookeeper.quorum", properties.getProperty("hbase.zookeeper.quorum"));
            hbConf.set("hbase.zookeeper.property.clientPort", properties.getProperty("hbase.zookeeper.property.clientPort"));

            return ConnectionFactory.createConnection(hbConf);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /**
     * @Description: 获取 hbase 的所有表名
     * @param properties
     */
    public static void getAllTablesName(Properties properties) throws IOException {
        // 创建HBase连接
        Connection connection = getHBaseConnect(properties);
        // 获取HBase管理员对象
        Admin admin = connection.getAdmin();
        // 查询HBase中的所有表
        TableName[] tableNames = admin.listTableNames();

        // 输出表名
        for (TableName tableName : tableNames) {
            System.out.println(Bytes.toString(tableName.getName()));
        }

        // 关闭连接
        admin.close();
        connection.close();
    }

    /**
     * @Description: 获取指定表的 count 数
     * @param properties
     * @param tablename
     */
    public static void rowCountByScanFilter(Properties properties, String tablename) throws IOException {
        // 创建HBase连接
        Connection connection = getHBaseConnect(properties);
        // 获取查询全部列
        long rowCount = 0;
        try {
            // 计时
            StopWatch stopWatch = new StopWatch();
            stopWatch.start();

            TableName name=TableName.valueOf(tablename);
            // connection为类静态变量
            Table table = connection.getTable(name);
            Scan scan = new Scan();

            // startRowKey 和 endRowKey
            scan.withStartRow("student-1_1694307600".getBytes());
            scan.withStopRow("student-1_1694311200".getBytes());

            FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
            Filter filter1 = new FirstKeyOnlyFilter();
            Filter filter2 = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("^tudent-12"));
            list.addFilter(filter1);
//            list.addFilter(filter2);

            // FirstKeyOnlyFilter只会取得每行数据的第一个kv,提高count速度
            scan.setFilter(list);

            ResultScanner rs = table.getScanner(scan);
            for (Result result : rs) {
                rowCount += result.size();
//                break;
                if (rowCount == 1000) {
                    System.out.println("rowCount-->"+rowCount);
                } else if (rowCount == 10000) {
                    System.out.println("rowCount-->"+rowCount);
                } else if (rowCount == 50000) {
                    System.out.println("rowCount-->"+rowCount);
                } else if (rowCount == 100000) {
                    System.out.println("rowCount-->"+rowCount);
                } else if (rowCount == 1000000) {
                    System.out.println("rowCount-->"+rowCount);
                }
            }

            stopWatch.stop();
            System.out.println("RowCount: " + rowCount);
            System.out.println("统计耗时:" + stopWatch.now(TimeUnit.SECONDS));
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }

    /**
     * @Description: 根据 rowKey 查询数据
     * @param properties
     * @param tableName
     * @param rowKey
     */
    public static void getResultByRowKey(Properties properties, String tableName, String rowKey) throws Exception{
        // 创建HBase连接
        Connection connection = getHBaseConnect(properties);
        TableName name=TableName.valueOf(tableName);
        Table table = connection.getTable(name);

        // 获得一行
        Get get = new Get(Bytes.toBytes(rowKey));
        Result set = table.get(get);
        Cell[] cells = set.rawCells();
        for (Cell cell: cells){
            System.out.println(Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::" +
                    Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
        }
        table.close();
    }

    /**
     * @Description: 根据列值查询数据(列值过滤器)或者 rowKey 前后缀查询数据
     * @param properties
     * @param tableName
     */
    public static void scanTableByFilter(Properties properties, String tableName) throws Exception{
        // 创建HBase连接
        Connection connection = getHBaseConnect(properties);
        TableName name=TableName.valueOf(tableName);
        Table table = connection.getTable(name);

        // rowKey过滤器
        Scan scan1 = new Scan();
        // str$ 末尾匹配,相当于sql中的 %str  ^str开头匹配,相当于sql中的str%
        RowFilter filter1 = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("^tudent-58"));
        scan1.setFilter(filter1);
        ResultScanner rscan1 = table.getScanner(scan1);

        // 列值过滤器
        Scan scan2 = new Scan();
        // 下列参数分别为列族,列名,比较符号,值
        SingleColumnValueFilter filter2 = new SingleColumnValueFilter(Bytes.toBytes("DATA"), Bytes.toBytes("work_status"),
                CompareOperator.EQUAL, Bytes.toBytes("1"));
        scan2.setFilter(filter2);
        ResultScanner rscan2 = table.getScanner(scan2);

        FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        list.addFilter(filter1);
        list.addFilter(filter2);
        Scan scan3 = new Scan();
        scan3.setFilter(list);
        ResultScanner rscan3 = table.getScanner(scan3);

        int i=0;
        for (Result rs : rscan3){
            String rowKey = Bytes.toString(rs.getRow());
            System.out.println("row key :" + rowKey);
            Cell[] cells = rs.rawCells();

            // 控制取回的条数
            i++;
            if (i == 100) {
                break;
            }
            for (Cell cell: cells){
                System.out.println(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + "::"
                        + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::"
                        + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
            }
            System.out.println("-------------------------------------------");
        }
    }

    /**
     * @Description: 根据表名查询数据(只限查数据量不大的数据,数据量大的表慎用!!!!!!!!!)
     * @param properties
     * @param tableName
     */
    public static void getResultByTableName(Properties properties, String tableName) throws Exception{
        // 创建HBase连接
        Connection connection = getHBaseConnect(properties);
        TableName name=TableName.valueOf(tableName);
        Table table = connection.getTable(name);
        // 全表扫描
        Scan scan = new Scan();

        // str$ 末尾匹配,相当于sql中的 %str  ^str开头匹配,相当于sql中的str%
        RowFilter filter1 = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator("^tudent-1"));
//        scan.setFilter(filter1);

        // startRowKey 和 endRowKey
        scan.withStartRow("student-46_1694016000".getBytes());
        scan.withStopRow("student-46_1694102400".getBytes());

        ResultScanner rscan = table.getScanner(scan);
        int num = 0;
        for (Result rs : rscan){
            String rowKey = Bytes.toString(rs.getRow());
            System.out.println("row key :" + rowKey);
            Cell[] cells = rs.rawCells();
            num++;
            for (Cell cell: cells){
//                if (fieldName.equals("Date")) {
                    System.out.println(Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()) + "::"
                            + Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()) + "::"
                            + Bytes.toString(cell.getValueArray(), cell.getValueOffset(), cell.getValueLength()));
//                }
            }
            System.out.println("-------------------------------------------");
            if (num == 10) {
                break;
            }
        }
        table.close();
    }

    /**
     * @Description: 添加数据(多个rowKey,多个列族)
     * @param properties
     * @param tableName
     */
    public static void insertMany(Properties properties, String tableName) {
        // 创建HBase连接
        Connection connection = getHBaseConnect(properties);
        TableName name=TableName.valueOf(tableName);
        Table table = null;
        try {
            table = connection.getTable(name);
        } catch (IOException e) {
            e.printStackTrace();
        }

        List<Put> puts = new ArrayList<Put>();
//        Put put1 = new Put(Bytes.toBytes("student-1_1694072961_10"));
//        put1.addColumn(Bytes.toBytes("DATA"), Bytes.toBytes("work_status"), Bytes.toBytes("0"));

        Put put2 = new Put(Bytes.toBytes("student-58_1694074491"));
        put2.addColumn(Bytes.toBytes("DATA"), Bytes.toBytes("work_status"), Bytes.toBytes("0"));

//        puts.add(put1);
        puts.add(put2);
        try {
            table.put(puts);
            System.out.println("添加数据成功-->");
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * @Description: 添加数据(多个rowKey,多个列族)
     * @param dataDataset
     * @param properties
     * @param tableName
     */
    public static void insertDataset(String workType, Dataset<Row> dataDataset, Properties properties, String tableName) {
        // 创建HBase连接
        Connection connection = getHBaseConnect(properties);
        TableName name = TableName.valueOf(tableName);
        Table table = null;
        try {
            table = connection.getTable(name);
        } catch (IOException e) {
            e.printStackTrace();
        }

        JavaRDD<Row> dataRDD = dataDataset.toJavaRDD();
        dataRDD.foreachPartition((VoidFunction<Iterator<Row>>) rowIterator -> {
            while (rowIterator.hasNext()) {
                Row next = rowIterator.next();
                String rowKey = next.getAs("work_id");
                String workTime = next.getAs("work_start_time");
                System.out.println("rowKey: " + rowKey);
                if (rowKey == null || rowKey == "") {
//                    System.out.println("rowKey为空");
                } else {
                    Put put = new Put(Bytes.toBytes(rowKey));
                    put.addColumn(Bytes.toBytes("DATA"), Bytes.toBytes("WORK_TIME"), Bytes.toBytes(workTime));
                    put.addColumn(Bytes.toBytes("DATA"), Bytes.toBytes("DATA_SOURCE"), Bytes.toBytes("offline"));
                    puts.add(put);
                }
            }
        });

        try {
            table.put(puts);
            table.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * @Description: 根据 rowKey 删除一行数据、或者删除某一行的某个列簇,或者某一行某个列簇某列
     * @param properties
     * @param tableName
     * @param rowKey
     * @param columnFamily
     * @param columnName
     */
    public static void deleteDataOne(Properties properties, String tableName, String rowKey, String columnFamily, String columnName) throws Exception{
        // 创建HBase连接
        Connection connection = getHBaseConnect(properties);
        TableName name=TableName.valueOf(tableName);
        Table table = connection.getTable(name);

        Delete delete = new Delete(Bytes.toBytes(rowKey));

        // 删除某一行的某一个列簇内容
//        delete.addFamily(Bytes.toBytes(columnFamily));

        // 删除某一行某个列簇某列的值
//        delete.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnName));

        // 根据rowKey删除一行数据
        table.delete(delete);
        table.close();
    }

    /**
     * @Description: 获取 HBASE Dataset数据
     * @param properties
     * @param spark
     * @param tableName
     * @param colsName
     * @return Dataset<Row>
     * @throws IOException
     */
    public static Dataset<Row> getHbaseDatasetData(Properties properties, SparkSession spark
            , String tableName, List<String> colsName, String RegexString, String startRowKey, String endRowKey) {

        Scan scan = new Scan();

        // 限制取回的条数
        Filter filter = new PageFilter(100000);
        // rowKey 过滤器(str$ 末尾匹配,相当于sql中的 %str  ^str开头匹配,相当于sql中的str%)
        RowFilter rowFilter = new RowFilter(CompareOperator.EQUAL, new RegexStringComparator(RegexString));

        FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
//        list.addFilter(filter);
        list.addFilter(rowFilter);
        scan.setFilter(list);

        // startRowKey 和 endRowKey
        scan.withStartRow(startRowKey.getBytes());
        scan.withStopRow(endRowKey.getBytes());

        Base64.Encoder base64Encoder = Base64.getEncoder();
        String scanToString = null;
        try {
            scanToString = base64Encoder.encodeToString(ProtobufUtil.toScan(scan).toByteArray());
        } catch (IOException e) {
            e.printStackTrace();
        }

        Configuration hbConf = HBaseConfiguration.create();
        hbConf.set("fs.defaultFS", properties.getProperty("fs.defaultFS"));
        hbConf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        hbConf.set("hbase.zookeeper.quorum", properties.getProperty("hbase.zookeeper.quorum"));
        hbConf.set("hbase.zookeeper.property.clientPort", properties.getProperty("hbase.zookeeper.property.clientPort"));
        //指定输入表名
        hbConf.set(TableInputFormat.INPUT_TABLE, tableName);
        //Base-64编码的Scanner
        hbConf.set(TableInputFormat.SCAN, scanToString);

        //获取JavaSparkContext
        JavaSparkContext jsc = JavaSparkContext.fromSparkContext(spark.sparkContext());
        //newAPIHadoopRDD这个RDD用于读取存储在Hadoop中的数据,文件有新 API 输入格式和额外的配置选项,传递到输入格式
        //参数conf会被广播,不可以修改,所以最好一个RDD一个conf
        JavaPairRDD<ImmutableBytesWritable, Result> hbaseRDD = jsc.newAPIHadoopRDD(hbConf, TableInputFormat.class, ImmutableBytesWritable.class, Result.class);
        //使用map函数将JavaPairRDD=>JavaRDD,反之使用mapToPair函数
        JavaRDD<Row> dataRDD = hbaseRDD.map(new Function<Tuple2<ImmutableBytesWritable, Result>, Row>() {
            //序列化表示UID,用于类的版本控制
            private static final long serialVersionUID = 1L;

            //重写call()函数,返回Row类型,这部分需要根据自己需求将result中的数据封装为Object[]
            @Override
            public Row call(Tuple2<ImmutableBytesWritable, Result> tuple2) {
                Result result = tuple2._2;
                String[] values = new String[colsName.size() + 1];
                values[0] = Bytes.toString(result.getRow());
                for (int i = 0; i < colsName.size(); i++) {
                    values[i + 1] = Bytes.toString(result.getValue("DATA".getBytes(), colsName.get(i).getBytes()));
                }
                //creat()方法参数(object... value)
                return RowFactory.create((Object[]) values);
            }
        });
        List<StructField> structFields = new ArrayList<>();
        //创建StructField,基本等同于list内有几个StructField就是几列,需要和上面的Row的Object[]对应
        structFields.add(DataTypes.createStructField("rowKey", DataTypes.StringType, true));
        for (String col : colsName) {
            structFields.add(DataTypes.createStructField(col, DataTypes.StringType, true));
        }
        //构建schema,可以把它理解为架子,结构
        StructType schema = DataTypes.createStructType(structFields);
        //生成DataFrame,把书放入架子,把数据放入结构里就变成了dataframe
        return spark.createDataFrame(dataRDD, schema);
    }

    /**
     * @Description: 使用Spark时经常需要把数据落入HBase中,如果使用普通的Java API,写入会速度很慢
     * Bulk写入优势:
     * BulkLoad不会写WAL,也不会产生flush以及split。
     * 如果我们大量调用PUT接口插入数据,可能会导致大量的GC操作。除了影响性能之外,严重时甚至可能会对HBase节点的稳定性造成影响。但是采用Bulk就不会有这个顾虑。
     * 过程中没有大量的接口调用消耗性能
     * 优点:1)速度快,2)几乎不会失败  3)对hbase服务几乎无影响
     * <p>
     *
     * @param joinAllDs
     * @param properties
     * @param hbaseTableName
     * @param path
     */
    public static void saveToHbaseByBulk(Dataset<Row> joinAllDs, Properties properties, String hbaseTableName, String path) {
        //Hbase集群配置信息
        Configuration hconf = HBaseConfiguration.create();
        String fsAddress = properties.getProperty("fs.defaultFS");
        hconf.set("fs.defaultFS", fsAddress);
        hconf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
        hconf.set("hbase.zookeeper.quorum", properties.getProperty("hbase.zookeeper.quorum"));
        hconf.set("hbase.zookeeper.property.clientPort", properties.getProperty("hbase.zookeeper.property.clientPort"));
        hconf.set("hbase.mapreduce.hfileoutputformat.table.name", hbaseTableName);
        //设置hfile最大个数
        hconf.set("hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily", "3200");
        //设置hfile的大小
        hconf.set("hbase.hregion.max.filesize", "10737418240");
        Connection connection = null;
        Table table = null;
        Admin admin = null;
        try {
            //创建Hbase连接 获取要操作的Hbase表
            connection = ConnectionFactory.createConnection(hconf);
            TableName tableName = TableName.valueOf(hbaseTableName);
            table = connection.getTable(tableName);
            admin = connection.getAdmin();
            //创建表的Region分布
            RegionLocator regionLocator = connection.getRegionLocator(tableName);
            //将HFile写入Hbase对象
            LoadIncrementalHFiles bulkLoader = new LoadIncrementalHFiles(hconf);
            String[] columns = joinAllDs.columns();
            JavaPairRDD<ImmutableBytesWritable, KeyValue> hfileSortRowKeyAndCmRDD = joinAllDs
//                    .dropDuplicates("Time")
                    .javaRDD()
                    .mapToPair(row -> {
                        //组合rowKey 后续通过sortByKey根据rowKey整体排序
                        String studentId = row.getAs("student_id");
                        String workId = row.getAs("work_id");
                        String rowKey = studentId + workId;
                        //对Hbase列名按照字典排序
                        TreeMap<String, String> treeMap = new TreeMap<>();
                        //lambda表达式实现List接口sort方法排序
                        for (String cm : columns) {
                            Object getVal = row.getAs(cm);
                            if (getVal != null) {
                                String replace = getVal.toString().replace("WrappedArray(", "[").replace(")", "]");
                                treeMap.put(cm, replace);
                            }
                        }
                        return new Tuple2<String, TreeMap>(rowKey, treeMap);
                    }).sortByKey(true)
                    .coalesce(6)
                    .flatMapToPair(tuple2 -> {
                        List<Tuple2<ImmutableBytesWritable, KeyValue>> list = new ArrayList<>();
                        byte[] rowKey = tuple2._1.getBytes();  // 排序后的rowKey
                        byte[] cmf = "DATA".getBytes(); //列族
                        TreeMap<String, String> data = tuple2._2; //列名,对应的value
                        for (Map.Entry<String, String> map : data.entrySet()) {  //从排序过后的有序map集合里取列名 -> 对应的value
                            // Caused by: java.io.IOException: Added a key not lexically larger than previous.
                            KeyValue keyValue = new KeyValue(rowKey, cmf, map.getKey().getBytes(), System.currentTimeMillis(), map.getValue().getBytes());
                            list.add(new Tuple2<>(new ImmutableBytesWritable(rowKey), keyValue));
                        }
                        return list.iterator();
                    });

            //HFile在HDFS文件夹地址,该地址不能存在如果存在则会报错
            FileSystem fs = FileSystem.get(hconf);
            if (fs.exists(new Path(path))) {
                fs.delete(new Path(path), true);
                //将RDD以HFile文件格式保存HDFS
                hfileSortRowKeyAndCmRDD.coalesce(6).saveAsNewAPIHadoopFile(path,
                        ImmutableBytesWritable.class,
                        KeyValue.class,
                        HFileOutputFormat2.class, hconf);
                //将HFile 移动到 Hbase表的Region
                bulkLoader.doBulkLoad(new Path(path), admin, table, regionLocator);
            } else {
                hfileSortRowKeyAndCmRDD.coalesce(6).saveAsNewAPIHadoopFile(path,
                        ImmutableBytesWritable.class,
                        KeyValue.class,
                        HFileOutputFormat2.class, hconf);
                bulkLoader.doBulkLoad(new Path(path), admin, table, regionLocator);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            closeResource(connection, table, admin);
        }
    }

    /**
     * @Description: 关闭释放资源
     * @param connection
     * @param table
     * @param admin
     */
    private static void closeResource(Connection connection, Table table, Admin admin) {
        if (admin != null) {
            try {
                admin.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (table != null) {
            try {
                table.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (connection != null) {
            try {
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws Exception {
        Logger.getLogger("org.apache.spark").setLevel(Level.WARN);

        // pro或dev
        String runMode = "test";

        // 获取配置文件
        String configFile = String.format("config-%s.properties", runMode);
        Properties properties = PropertiesUtil.getProperties(configFile);

        // 获取所有表名
//        getAllTablesName(properties);
        // 根据 rowKey 查询数据(返回一条数据)
//        getResultByRowKey(properties, "heheda", "heheawe_1694073276_10");
        // 获取指定表的 count 数
        rowCountByScanFilter(properties, "heheda");
        // 根据表名查询数据
//        getResultByTableName(properties, "heheda");
//        System.out.println(getResultByFmeterId(properties, "student-1"));
        // 添加数据
//        insertMany(properties, "heheda");
        // 根据 rowKey 删除一行数据、或者删除某一行的某个列簇,或者某一行某个列簇某列
//        deleteDataOne(properties, "heheda", "fawef_1694066860", "DATA", "student_name");
        // 根据 rowkey 或者列值查询数据
//        scanTableByFilter(properties, "heheda");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145
  • 146
  • 147
  • 148
  • 149
  • 150
  • 151
  • 152
  • 153
  • 154
  • 155
  • 156
  • 157
  • 158
  • 159
  • 160
  • 161
  • 162
  • 163
  • 164
  • 165
  • 166
  • 167
  • 168
  • 169
  • 170
  • 171
  • 172
  • 173
  • 174
  • 175
  • 176
  • 177
  • 178
  • 179
  • 180
  • 181
  • 182
  • 183
  • 184
  • 185
  • 186
  • 187
  • 188
  • 189
  • 190
  • 191
  • 192
  • 193
  • 194
  • 195
  • 196
  • 197
  • 198
  • 199
  • 200
  • 201
  • 202
  • 203
  • 204
  • 205
  • 206
  • 207
  • 208
  • 209
  • 210
  • 211
  • 212
  • 213
  • 214
  • 215
  • 216
  • 217
  • 218
  • 219
  • 220
  • 221
  • 222
  • 223
  • 224
  • 225
  • 226
  • 227
  • 228
  • 229
  • 230
  • 231
  • 232
  • 233
  • 234
  • 235
  • 236
  • 237
  • 238
  • 239
  • 240
  • 241
  • 242
  • 243
  • 244
  • 245
  • 246
  • 247
  • 248
  • 249
  • 250
  • 251
  • 252
  • 253
  • 254
  • 255
  • 256
  • 257
  • 258
  • 259
  • 260
  • 261
  • 262
  • 263
  • 264
  • 265
  • 266
  • 267
  • 268
  • 269
  • 270
  • 271
  • 272
  • 273
  • 274
  • 275
  • 276
  • 277
  • 278
  • 279
  • 280
  • 281
  • 282
  • 283
  • 284
  • 285
  • 286
  • 287
  • 288
  • 289
  • 290
  • 291
  • 292
  • 293
  • 294
  • 295
  • 296
  • 297
  • 298
  • 299
  • 300
  • 301
  • 302
  • 303
  • 304
  • 305
  • 306
  • 307
  • 308
  • 309
  • 310
  • 311
  • 312
  • 313
  • 314
  • 315
  • 316
  • 317
  • 318
  • 319
  • 320
  • 321
  • 322
  • 323
  • 324
  • 325
  • 326
  • 327
  • 328
  • 329
  • 330
  • 331
  • 332
  • 333
  • 334
  • 335
  • 336
  • 337
  • 338
  • 339
  • 340
  • 341
  • 342
  • 343
  • 344
  • 345
  • 346
  • 347
  • 348
  • 349
  • 350
  • 351
  • 352
  • 353
  • 354
  • 355
  • 356
  • 357
  • 358
  • 359
  • 360
  • 361
  • 362
  • 363
  • 364
  • 365
  • 366
  • 367
  • 368
  • 369
  • 370
  • 371
  • 372
  • 373
  • 374
  • 375
  • 376
  • 377
  • 378
  • 379
  • 380
  • 381
  • 382
  • 383
  • 384
  • 385
  • 386
  • 387
  • 388
  • 389
  • 390
  • 391
  • 392
  • 393
  • 394
  • 395
  • 396
  • 397
  • 398
  • 399
  • 400
  • 401
  • 402
  • 403
  • 404
  • 405
  • 406
  • 407
  • 408
  • 409
  • 410
  • 411
  • 412
  • 413
  • 414
  • 415
  • 416
  • 417
  • 418
  • 419
  • 420
  • 421
  • 422
  • 423
  • 424
  • 425
  • 426
  • 427
  • 428
  • 429
  • 430
  • 431
  • 432
  • 433
  • 434
  • 435
  • 436
  • 437
  • 438
  • 439
  • 440
  • 441
  • 442
  • 443
  • 444
  • 445
  • 446
  • 447
  • 448
  • 449
  • 450
  • 451
  • 452
  • 453
  • 454
  • 455
  • 456
  • 457
  • 458
  • 459
  • 460
  • 461
  • 462
  • 463
  • 464
  • 465
  • 466
  • 467
  • 468
  • 469
  • 470
  • 471
  • 472
  • 473
  • 474
  • 475
  • 476
  • 477
  • 478
  • 479
  • 480
  • 481
  • 482
  • 483
  • 484
  • 485
  • 486
  • 487
  • 488
  • 489
  • 490
  • 491
  • 492
  • 493
  • 494
  • 495
  • 496
  • 497
  • 498
  • 499
  • 500
  • 501
  • 502
  • 503
  • 504
  • 505
  • 506
  • 507
  • 508
  • 509
  • 510
  • 511
  • 512
  • 513
  • 514
  • 515
  • 516
  • 517
  • 518
  • 519
  • 520
  • 521
  • 522
  • 523
  • 524
  • 525
  • 526
  • 527
  • 528
  • 529
  • 530
  • 531
  • 532
  • 533
  • 534
  • 535
  • 536
  • 537
  • 538
  • 539
  • 540
  • 541
  • 542
  • 543
  • 544
  • 545
  • 546
  • 547
  • 548
  • 549
  • 550
  • 551
  • 552
  • 553
  • 554
  • 555
  • 556
  • 557
  • 558
  • 559
  • 560
  • 561
  • 562
  • 563
  • 564
  • 565
  • 566
  • 567
  • 568
  • 569
  • 570
  • 571
  • 572
  • 573
  • 574
  • 575
  • 576
  • 577
  • 578
  • 579
  • 580
  • 581
  • 582
  • 583
  • 584
  • 585
  • 586
  • 587
  • 588
  • 589
  • 590
  • 591
  • 592
  • 593
  • 594
  • 595
  • 596
  • 597
  • 598
  • 599
  • 600
  • 601
  • 602
  • 603
  • 604
  • 605
  • 606
  • 607
  • 608

三、Scala

1.读写hbase+sparksql查询hbase的表数据:
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.{Put, Result}
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableOutputFormat}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * @Auther: huiq
 * @Date: 2021/7/23
 * @Description: 连接hbase测试
 */
object OperateHbaseTest {

  def main(args: Array[String]): Unit = {

    //初始化spark
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName(this.getClass.getSimpleName)
    val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()

    //初始化hbase,指定zookeeper的参数
    val config: Configuration = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum", "node01,node02,node03") // HBase集群服务器地址(任一台)
    config.set("hbase.zookeeper.property.clientPort", "2181") // zookeeper客户端访问端口
    config.set("zookeeper.znode.parent", "/hbase-unsecure")

    val sc: SparkContext = spark.sparkContext


    // 设定读取的表名
    config.set(TableInputFormat.INPUT_TABLE,"test_schema1:t2")

    // 从hbase获取一张表的所有数据,得到一个RDD
    val hbaseRDD: RDD[(ImmutableBytesWritable, Result)] = sc.newAPIHadoopRDD(config,classOf[TableInputFormat],classOf[ImmutableBytesWritable],classOf[Result])

    val count = hbaseRDD.count()
    println("Students RDD Count--->" + count)

    // 遍历输出
    hbaseRDD.foreach({ case (_,result) =>
      val key = Bytes.toString(result.getRow)
      val a = Bytes.toString(result.getValue("F".getBytes,"a".getBytes))
      val b = Bytes.toString(result.getValue("F".getBytes,"b".getBytes))
      println("Row key:"+key+" a:"+oldData+" b:"+newData)
    })


    // 写hbase
    val tablename = "test_schema1:t2"
    config.set(TableOutputFormat.OUTPUT_TABLE, "test_schema1:t2")

    val job = new Job(config)
    job.setOutputKeyClass(classOf[ImmutableBytesWritable])
    job.setOutputValueClass(classOf[Result])
    job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]])

    val indataRDD = sc.makeRDD(Array("3,26,M","4,27,M")) //构建两行记录
    val rdd = indataRDD.map(_.split(',')).map{arr=>{
      val put = new Put(Bytes.toBytes(arr(0))) //行健的值
      put.addColumn(Bytes.toBytes("F"),Bytes.toBytes("a"),Bytes.toBytes(arr(1)))
      put.addColumn(Bytes.toBytes("F"),Bytes.toBytes("b"),Bytes.toBytes(arr(2)))
//      put.add(Bytes.toBytes("F"),Bytes.toBytes("a"),Bytes.toBytes(arr(1)) // 网上有这么写的,但是我这里报错,没有深入研究或许是版本的问题吧
      (new ImmutableBytesWritable, put)
    }}
    rdd.saveAsNewAPIHadoopDataset(job.getConfiguration())


    // 构建Row类型的RDD
    val rowRDD = hbaseRDD.map(p => {
      val name = Bytes.toString(p._2.getValue(Bytes.toBytes("F"),Bytes.toBytes("a")))
      val age = Bytes.toString(p._2.getValue(Bytes.toBytes("F"),Bytes.toBytes("b")))
      Row(name,age)
    })
    // 构造DataFrame的元数据
    val schema = StructType(List(
      StructField("a",StringType,true),
      StructField("b",StringType,true)
    ))

    // 构造DataFrame
    val dataFrame = spark.createDataFrame(rowRDD,schema)

    // 注册成为临时表供SQL查询操作
    dataFrame.createTempView("t2")
    val result: DataFrame = spark.sql("select * from t2")
    result.show()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93

注意:我这里使用的是Ambari2.7.4+HDP3.1.4版本,正常整合之后这三行代码都不用写也可以连接成功hbase),但是一开始我启动程序报错:java.util.concurrent.ExecutionException: org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /hbase/hbaseid
在这里插入图片描述
原因:hbase-site.xml文件中的配置为:

    <property>
      <name>zookeeper.znode.parent</name>
      <value>/hbase-unsecure</value>
    </property>
  • 1
  • 2
  • 3
  • 4

方法一:改为<value>/hbase</value>,重启hbase。
注意:zookeeper.znode.parent的值为在zookeeper中创建的目录。
在这里插入图片描述
方法二:在代码中添加config.set("zookeeper.znode.parent", "/hbase-unsecure")

参考:
Spark入门:读写HBase数据
[HBase 基础]-- HBaseConfiguration类,参数说明

补充在hive的ods层中创建完hbase的映射表后,想通过create table as sleect ...语句在dwd层生成相应的表,但是却报错:
在这里插入图片描述
解决方法1:启动的时候添加添加相应的参数:beeline -hiveconf zookeeper.znode.parent=/hbase-unsecure或者hive -hiveconf zookeeper.znode.parent=/hbase-unsecure

解决方法2:我使用的是ambari的hdp 3.1.4版本,添加如下配置后执行beeline命令即可
在这里插入图片描述

2.sparkstreaming读取kafka的数据再写入hbase:
import java.util

import com.rongrong.bigdata.utils.{KafkaZkUtils, UMSUtils}
import kafka.utils.ZkUtils
import org.apache.hadoop.hbase.client.{ConnectionFactory, Put}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.Logger
import org.apache.spark.streaming.{Durations, StreamingContext}

import scala.util.Try

object StandardOnlie {

  private val logger: Logger = Logger.getLogger(this.getClass)

  def main(args: Array[String]): Unit = {
    val spark = InitializeSpark.createSparkSession("StandardOnlie", "local")
    val streamingContext = new StreamingContext(spark.sparkContext, Durations.seconds(30))
    val kafkaParams = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "node01:6667,node02:6667,node03:6667",
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
      ConsumerConfig.GROUP_ID_CONFIG -> "group-02",
      ConsumerConfig.AUTO_OFFSET_RESET_CONFIG -> "latest",
      ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG -> (false: java.lang.Boolean)
    )

    val topic: String = "djt_db.test_schema1.result"
    val zkUrl = "node01:2181,node02:2181,node03:2181"
    val sessionTimeout = 1000
    val connectionTimeout = 1000

    val zkClient = ZkUtils.createZkClient(zkUrl, sessionTimeout, connectionTimeout)

    val kafkaStream = KafkaZkUtils.createDirectStream(zkClient, streamingContext, kafkaParams, topic)

    // 开始处理批次消息
    kafkaStream.foreachRDD(rdd => {
      // 处理从获取 kafka 中的数据
      logger.info("=============== Total " + rdd.count() + " events in this    batch ..")

      rdd.foreach(x => {
        val configuration = HBaseConfiguration.create()
        configuration.set("zookeeper.znode.parent", "/hbase-unsecure")
        val connection = ConnectionFactory.createConnection(configuration)

        // 获取kafka中真正的数据
          var usmString = x.value()
          val flag: Boolean = UMSUtils.isHeartbeatUms(usmString)
          if (!flag) { // 过滤掉心跳数据
            val usmActiontype = UMSUtils.getActionType(usmString)
            println(s"该条数据的类型为--->${usmActiontype}")
            println("读取kafka数据,解析正文数据:" + x.value())
            val data: util.Map[String, String] = UMSUtils.getDataFromUms(usmString)
            //获取表连接
            val table = connection.getTable(TableName.valueOf("test_schema1:t2"))
            val rowkey: String = 123456 + "_" + data.get("a")
            val put = new Put(Bytes.toBytes(rowkey))
            put.addColumn(Bytes.toBytes("F"), Bytes.toBytes("ums_active_"), Bytes.toBytes(data.get("ums_active_")))
            put.addColumn(Bytes.toBytes("F"), Bytes.toBytes("ums_id_"), Bytes.toBytes(data.get("ums_id_")))
            put.addColumn(Bytes.toBytes("F"), Bytes.toBytes("ums_ts_"), Bytes.toBytes(data.get("ums_ts_")))
            put.addColumn(Bytes.toBytes("F"), Bytes.toBytes("a"), Bytes.toBytes(data.get("a")))
            put.addColumn(Bytes.toBytes("F"), Bytes.toBytes("b"), Bytes.toBytes(data.get("b")))
            table.put(put)
            table.close()
            //将数据写入HBase,若出错关闭table
            Try(table.put(put)).getOrElse(table.close())
            //分区数据写入HBase后关闭连接
            table.close()
            println(s"解析到的数据为--->${data}")
          }
        })

        // 更新offset到zookeeper中
        KafkaZkUtils.saveOffsets(zkClient, topic, KafkaZkUtils.getZkPath(kafkaParams, topic), rdd)
      })
    })
    streamingContext.start()
    streamingContext.awaitTermination()
    streamingContext.stop()
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
import kafka.utils.{ZKGroupTopicDirs, ZkUtils}
import org.I0Itec.zkclient.ZkClient
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.log4j.Logger
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils}

object KafkaZkUtils {
  private val logger: Logger = Logger.getLogger(this.getClass)

  /**
   * 获取 consumer 在zk上的路径
   * @param kafkaParams
   * @param topic
   * @return
   */
  def getZkPath(kafkaParams: Map[String, Object], topic: String): String ={
    val topicDirs = new ZKGroupTopicDirs(kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).toString, topic)
    s"${topicDirs.consumerOffsetDir}"
  }

  /**
   * 创建 DirectStream
   * @param zkClient
   * @param streamingContext
   * @param kafkaParams
   * @param topic
   * @return
   */
  def createDirectStream(zkClient: ZkClient,streamingContext: StreamingContext, kafkaParams: Map[String, Object], topic: String): InputDStream[ConsumerRecord[String, String]] = {


    val zkPath = getZkPath(kafkaParams,topic)

    //读取 topic 的 offset
    val storedOffsets = readOffsets(zkClient, topic, zkPath)

    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = storedOffsets match {
      //上次未保存offsets
      case None =>
        KafkaUtils.createDirectStream[String, String](
          streamingContext,
          PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](Array(topic), kafkaParams)
        )
      case Some(fromOffsets) => {
        KafkaUtils.createDirectStream[String, String](
          streamingContext,
          PreferConsistent,
          // 指定分区消费,无法动态感知分区变化
          //          ConsumerStrategies.Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
          ConsumerStrategies.Subscribe[String, String](List(topic), kafkaParams, fromOffsets)
        )
      }
    }
    kafkaStream
  }

  /**
   * 保存 offset
   * @param zkClient
   * @param topic
   * @param zkPath
   * @param rdd
   */
  def saveOffsets(zkClient: ZkClient,topic: String, zkPath: String, rdd: RDD[_]): Unit = {

    logger.info("Saving offsets to zookeeper")

    val offsetsRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    offsetsRanges.foreach(offsetRange => logger.debug(s"Using ${offsetRange}"))

    val offsetsRangesStr = offsetsRanges.map(offsetRange => s"${offsetRange.partition}:${offsetRange.untilOffset}").mkString(",")

    logger.info(s"Writing offsets to Zookeeper: ${offsetsRangesStr}")

    ZkUtils(zkClient, false).updatePersistentPath(zkPath, offsetsRangesStr)
  }

  /**
   * 读取 offset
   * @param zkClient
   * @param topic
   * @param zkPath
   * @return
   */
  def readOffsets(zkClient: ZkClient, topic: String, zkPath: String): Option[Map[TopicPartition, Long]] = {
    logger.info("Reading offsets from zookeeper")

    val (offsetsRangesStrOpt, _) = ZkUtils(zkClient, false).readDataMaybeNull(zkPath)
    offsetsRangesStrOpt match {
      case Some(offsetsRangesStr) => {
        logger.debug(s"Read offset ranges: ${
          offsetsRangesStr
        }")
        val offsets: Map[TopicPartition, Long] = offsetsRangesStr.split(",").map(s => s.split(":"))
          .map({
            case Array(partitionStr, offsetStr) =>
              (new TopicPartition(topic, partitionStr.toInt) -> offsetStr.toLong)
                // 这里可以指定offset的位置读取,注意:还需要把上面createDirectStream方法的ConsumerStrategies.Assign代码打开
//              (new TopicPartition(topic, partitionStr.toInt) -> "20229".toLong)
          }).toMap
        Some(offsets)
      }
      case None =>
        logger.info("No offsets found in Zookeeper")
        None
    }
  }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115

  本来想用foreachPartition的,但是我没成功。可参考sparkstreaming写入hbase,他这个里面在foreachPartition里面创建连接可以,并且他提了一句“获取HBase连接,分区创建一个连接,分区不跨节点,不需要序列化”,但是在我这里这样写就报错了:
在这里插入图片描述
报错代码:

      rdd.foreachPartition(partitionRecords => {
        val configuration = HBaseConfiguration.create()
        configuration.set("zookeeper.znode.parent", "/hbase-unsecure")
        val connection = ConnectionFactory.createConnection(configuration) //获取HBase连接,分区创建一个连接,分区不跨节点,不需要序列化

        partitionRecords.foreach(x => {

        // 获取kafka中真正的数据
          var usmString = x.value()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

查了一些资料:关于scala:通过Spark写入HBase:任务不可序列化zookeeper报错: org.I0Itec.zkclient.exception.ZkMarshallingError: java.io.EOFExceptionSpark 序列化问题全解HBase连接池,目前也还没有找到解决方法,有知道的人可以探讨一下。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/Gausst松鼠会/article/detail/699363
推荐阅读
相关标签
  

闽ICP备14008679号