当前位置:   article > 正文

HBase 使用_hbasescan 'student',{startrow => '1001'}

hbasescan 'student',{startrow => '1001'}

1、 简单使用

1、 基本操作
1 进入 HBase 客户端命令行
bin/hbase shell

2 查看帮助命令
hbase(main)> help

3 查看当前数据库中有哪些表
hbase(main)> list

2、 表的操作
1 创建表
hbase(main)> create 'student','info'

2 插入数据到表

hbase(main) > put 'student','1001','info:name','Thomas'
hbase(main) > put 'student','1001','info:sex','male'
hbase(main) > put 'student','1001','info:age','18'
hbase(main) > put 'student','1002','info:name','Janna'
hbase(main) > put 'student','1002','info:sex','female'
hbase(main) > put 'student','1002','info:age','20'
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

3 扫描查看表数据
hbase(main) > scan 'student'
hbase(main) > scan 'student',{STARTROW => '1001', STOPROW => '1001'}
hbase(main) > scan 'student',{STARTROW => '1001'}

4 查看表结构
hbase(main) > describe ‘student’

5 更新指定字段的数据
hbase(main) > put 'student','1001','info:name','Nick'
hbase(main) > put 'student','1001','info:age','100'

6 查看“指定行”或“指定列族:列” 的数据
hbase(main) > get 'student','1001'
hbase(main) > get 'student','1001','info:name'

7 删除数据
删除某 rowkey 的全部数据:
hbase(main) > deleteall 'student','1001'
删除某 rowkey 的某一列数据:
hbase(main) > delete 'student','1002','info:sex'

8 清空表数据
hbase(main) > truncate 'student'
尖叫提示: 清空表的操作顺序为先 disable,然后再 truncating。

9 删除表
首先需要先让该表为 disable 状态:
hbase(main) > disable 'student'
然后才能 drop 这个表:
hbase(main) > drop 'student'
尖叫提示: 如果直接 drop 表,会报错: Drop the named table. Table must first be disabled
ERROR: Table student is enabled. Disable it first.

10 统计表数据行数
hbase(main) > count 'student'

11 变更表信息
将 info 列族中的数据存放 3 个版本:
hbase(main) > alter 'student',{NAME=>'info',VERSIONS=>3}

2、读写流程

1、 HBase 读数据流程
1 HRegionServer 保存着 meta 表以及表数据,要访问表数据,首先 Client 先去访问zookeeper,从 zookeeper 里面获取 meta 表所在的位置信息,即找到这个 meta 表在哪个HRegionServer 上保存着。

2 接着 Client 通过刚才获取到的 HRegionServer 的 IP 来访问 Meta 表所在的HRegionServer,从而读取到 Meta,进而获取到 Meta 表中存放的元数据。

3 Client 通过元数据中存储的信息,访问对应的 HRegionServer,然后扫描所在HRegionServer 的 Memstore 和 Storefile 来查询数据。

4 最后 HRegionServer 把查询到的数据响应给 Client。

2、 HBase 写数据流程
1 Client 也是先访问 zookeeper,找到 Meta 表,并获取 Meta 表信息。

2 确定当前将要写入的数据所对应的 RegionServer 服务器和 Region。

3 Client 向该 RegionServer 服务器发起写入数据请求,然后 RegionServer 收到请求并响应。

4 Client 先把数据写入到 HLog,以防止数据丢失。

5 然后将数据写入到 Memstore。

6 如果 Hlog 和 Memstore 均写入成功,则这条数据写入成功。在此过程中,如果 Memstore达到阈值,会把 Memstore 中的数据 flush 到 StoreFile 中。

7 当 Storefile 越来越多,会触发 Compact 合并操作,把过多的 Storefile 合并成一个大的Storefile。当 Storefile 越来越大, Region 也会越来越大,达到阈值后,会触发 Split 操作,将 Region 一分为二。

尖叫提示: 因为内存空间是有限的,所以说溢写过程必定伴随着大量的小文件产生。

3、 JavaAPI

1、 新建 Maven Project
新建项目后在 pom.xml 中添加依赖:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.da</groupId>
    <artifactId>hbase</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>1.3.1</version>
        </dependency>
    </dependencies>
</project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19

2、 编写 HBaseAPI
注意,这部分的学习内容,我们先学习使用老版本的 API,接着再写出新版本的 API 调用方式。 因为在企业中,有些时候我们需要一些过时的 API 来提供更好的兼容性。
1 首先需要获取 Configuration 对象:

    public static Configuration conf;
    static {
        // 使用 HBaseConfiguration 的单例方法实例化
        conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "192.168.25.102");
        conf.set("hbase.zookeeper.property.clientPort", "2181");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2 判断表是否存在:

    // 判断表是否存在
    public static boolean isTableExist(String tableName)
            throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
        // 在 HBase 中管理、访问表需要先创建 HBaseAdmin 对象
        Connection connection = ConnectionFactory.createConnection(conf);
        HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
        // HBaseAdmin admin = new HBaseAdmin(conf);
        return admin.tableExists(tableName);
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

3 创建表

    // 创建表
    public static void createTable(String tableName, String... columnFamily)
            throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
        // 判断表是否存在
        if (isTableExist(tableName)) {
            System.out.println("表" + tableName + "已存在");
            // System.exit(0);
        } else {
            // 创建表属性对象,表名需要转字节
            HTableDescriptor descriptor = new HTableDescriptor(TableName.valueOf(tableName));
            // 创建多个列族
            for (String cf : columnFamily) {
                descriptor.addFamily(new HColumnDescriptor(cf));
            }
            // 根据对表的配置,创建表
            admin.createTable(descriptor);
            System.out.println("表" + tableName + "创建成功! ");
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

4 删除表

    // 删除表
    public static void dropTable(String tableName)
            throws MasterNotRunningException, ZooKeeperConnectionException, IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        HBaseAdmin admin = (HBaseAdmin) connection.getAdmin();
        if (isTableExist(tableName)) {
            if (!admin.isTableDisabled(tableName))
                admin.disableTable(tableName);
            admin.deleteTable(tableName);
            System.out.println("表" + tableName + "删除成功! ");
        } else {
            System.out.println("表" + tableName + "不存在! ");
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

5 向表中插入数据

    // 向表中插入数据
    public static void addRowData(String tableName, String rowKey, String columnFamily, String column, String value)
            throws IOException {
        // 创建 HTable 对象
        Connection connection = ConnectionFactory.createConnection(conf);
        HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
        // 向表中插入数据
        Put put = new Put(Bytes.toBytes(rowKey));
        // 向 Put 对象中组装数据
        put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(value));
        hTable.put(put);
        hTable.close();
        System.out.println("插入数据成功");
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

6 删除一行数据

    // 删除一行数据
    public static void deleteOneRow(String tableName, String row) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
        Delete delete = new Delete(Bytes.toBytes(row));
        hTable.delete(delete);
        hTable.close();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

7 删除多行数据

    // 删除多行数据
    public static void deleteMultiRow(String tableName, String... rows) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
        List<Delete> deleteList = new ArrayList<Delete>();
        for (String row : rows) {
            Delete delete = new Delete(Bytes.toBytes(row));
            deleteList.add(delete);
        }
        hTable.delete(deleteList);
        hTable.close();
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

8 得到所有数据

    // 得到所有数据
    public static void getAllRows(String tableName) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
        // 得到用于扫描 region 的对象
        Scan scan = new Scan();
        // 使用 HTable 得到 resultcanner 实现类的对象
        ResultScanner resultScanner = hTable.getScanner(scan);
        for (Result result : resultScanner) {
            Cell[] cells = result.rawCells();
            for (Cell cell : cells) {
                // 得到 rowkey
                System.out.println("行键:" + Bytes.toString(CellUtil.cloneRow(cell)));
                // 得到列族
                System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
                System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
                System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
                System.out.println("-------------------------------------------------");
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

9 得到某一行所有数据

    // 得到某一行所有数据
    public static void getRow(String tableName, String rowKey) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        // get.setMaxVersions();显示所有版本
        // get.setTimeStamp();显示指定时间戳的版本
        Result result = hTable.get(get);
        for (Cell cell : result.rawCells()) {
            System.out.println("行键:" + Bytes.toString(result.getRow()));
            System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
            System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
            System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
            System.out.println("时间戳:" + cell.getTimestamp());
            System.out.println("-------------------------------------------------");
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

10 得到某多行所有数据

    // 得到某多行所有数据
    public static void getRows(String tableName, String... rowKeys) throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
        List<Get> gets = new ArrayList<>();
        for (String rowKey : rowKeys) {
            Get get = new Get(Bytes.toBytes(rowKey));
            gets.add(get);
        }
        // get.setMaxVersions();显示所有版本
        // get.setTimeStamp();显示指定时间戳的版本
        Result[] results = hTable.get(gets);
        for (Result result : results) {
            for (Cell cell : result.rawCells()) {
                System.out.println("行键:" + Bytes.toString(result.getRow()));
                System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
                System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
                System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
                System.out.println("时间戳:" + cell.getTimestamp());
                System.out.println("-------------------------------------------------");
            }
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

11 获取某一行指定“列族:列” 的数据

    // 获取某一行指定“列族:列” 的数据
    public static void getRowQualifier(String tableName, String rowKey, String family, String qualifier)
            throws IOException {
        Connection connection = ConnectionFactory.createConnection(conf);
        HTable hTable = (HTable) connection.getTable(TableName.valueOf(tableName));
        Get get = new Get(Bytes.toBytes(rowKey));
        get.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
        Result result = hTable.get(get);
        for (Cell cell : result.rawCells()) {
            System.out.println("行键:" + Bytes.toString(result.getRow()));
            System.out.println("列族" + Bytes.toString(CellUtil.cloneFamily(cell)));
            System.out.println("列:" + Bytes.toString(CellUtil.cloneQualifier(cell)));
            System.out.println("值:" + Bytes.toString(CellUtil.cloneValue(cell)));
        }
    }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

4、MapReduce

通过 HBase 的相关 JavaAPI,我们可以实现伴随 HBase 操作的 MapReduce 过程,比如使用MapReduce 将数据从本地文件系统导入到 HBase 的表中,比如我们从 HBase 中读取一些原始数据后使用 MapReduce 做数据分析。

1、 官方 HBase-MapReduce
1 查看 HBase 的 MapReduce 任务的执行
bin/hbase mapredcp

2 执行环境变量的导入

export HBASE_HOME=/opt/module/hbase-1.3.1
export HADOOP_HOME=/opt/module/hadoop-2.7.2
export HADOOP_CLASSPATH=`${HBASE_HOME}/bin/hbase mapredcp`
  • 1
  • 2
  • 3

3 运行官方的 MapReduce 任务
案例一:统计 Student 表中有多少行数据
/opt/module/hadoop-2.7.2/bin/yarn jar /opt/module/hbase-1.3.1/lib/hbase-server-1.3.1.jar rowcounter student

案例二: 使用 MapReduce 将本地数据导入到 HBase
1) 在本地创建一个 tsv 格式的文件: fruit.tsv

1001    Apple   Red
1002    Pear    Yellow
1003    Pineapple   Yellow
  • 1
  • 2
  • 3

2) 创建 HBase 表
bin/hbase shell
hbase(main):001:0> create 'fruit','info'

3) 在 HDFS 中创建 input_fruit 文件夹并上传 fruit.tsv 文件
hadoop fs -mkdir /input_fruit/
hadoop fs -put fruit.tsv /input_fruit/

4) 执行 MapReduce 到 HBase 的 fruit 表中

/opt/module/hadoop-2.7.2/bin/yarn jar \
/opt/module/hbase-1.3.1/lib/hbase-server-1.3.1.jar importtsv \
-Dimporttsv.columns=HBASE_ROW_KEY,info:name,info:color fruit \
hdfs://hadoop102:9000/input_fruit
  • 1
  • 2
  • 3
  • 4

5) 使用 scan 命令查看导入后的结果
hbase(main):001:0> scan ‘fruit’

2、 自定义 HBase-MapReduce1
目标: 将 fruit 表中的一部分数据,通过 MR 迁入到 fruit_mr 表中。

分步实现:
1 构建 ReadFruitMapper 类,用于读取 fruit 表中的数据

package com.da.mr1;

import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;

public class ReadFruitMapper extends TableMapper<ImmutableBytesWritable, Put> {
    @Override
    protected void map(ImmutableBytesWritable key, Result value, Context context)
            throws IOException, InterruptedException {
        // 将 fruit 的 name 和 color 提取出来,相当于将每一行数据读取出来放入到 Put 对象中。
        Put put = new Put(key.get());
        // 遍历添加 column 行
        for (Cell cell : value.rawCells()) {
            // 添加/克隆列族:info
            if ("info".equals(Bytes.toString(CellUtil.cloneFamily(cell)))) {
                // 添加/克隆列: name
                if ("name".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                    // 将该列 cell 加入到 put 对象中
                    put.add(cell);
                    // 添加/克隆列:color
                } else if ("color".equals(Bytes.toString(CellUtil.cloneQualifier(cell)))) {
                    // 向该列 cell 加入到 put 对象中
                    put.add(cell);
                }
            }
        }
        // 将从 fruit 读取到的每行数据写入到 context 中作为 map 的输出
        context.write(key, put);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

2 构建 WriteFruitMRReducer 类,用于将读取到的 fruit 表中的数据写入到 fruit_mr 表中

package com.da.mr1;

import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

public class WriteFruitMRReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
            throws IOException, InterruptedException {
        // 读出来的每一行数据写入到 fruit_mr 表中
        for (Put put : values) {
            context.write(NullWritable.get(), put);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

3 构建 Fruit2FruitMRRunner extends Configured implements Tool 用于组装运行 Job任务

package com.da.mr1;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class Fruit2FruitMRRunner implements Tool {
    private Configuration conf;

    @Override
    public Configuration getConf() {
        return this.conf;
    }

    @Override
    public void setConf(Configuration conf) {
        // 得到 Configuration
        this.conf = HBaseConfiguration.create(conf);
    }

    // 组装 Job
    public int run(String[] args) throws Exception {
        // 创建 Job 任务
        Job job = Job.getInstance(conf);
        job.setJarByClass(Fruit2FruitMRRunner.class);
        // 配置 Job
        Scan scan = new Scan();
        scan.setCacheBlocks(false);
        scan.setCaching(500);
        // 设置 Mapper,注意导入的是 mapreduce 包下的,不是 mapred 包下的,后者是老版本
        TableMapReduceUtil.initTableMapperJob("fruit", // 数据源的表名
                scan, // scan 扫描控制器
                ReadFruitMapper.class, // 设置 Mapper 类
                ImmutableBytesWritable.class, // 设置 Mapper 输出 key 类型
                Put.class, // 设置 Mapper 输出 value 值类型
                job// 设置给哪个 JOB
        );
        // 设置 Reducer
        TableMapReduceUtil.initTableReducerJob("fruit_mr", WriteFruitMRReducer.class, job);
        // 设置 Reduce 数量,最少 1 个
        job.setNumReduceTasks(1);
        boolean isSuccess = job.waitForCompletion(true);
        if (!isSuccess) {
            throw new IOException("Job running with error");
        }
        return isSuccess ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        int status = ToolRunner.run(conf, new Fruit2FruitMRRunner(), args);
        System.exit(status);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63

4 打包运行任务

/opt/module/hadoop-2.7.2/bin/yarn jar \
/opt/module/jars/hbase1.jar \
com.da.mr1.Fruit2FruitMRRunner
  • 1
  • 2
  • 3

尖叫提示: 运行任务前,如果待数据导入的表不存在,则需要提前创建之。
尖叫提示: maven 打包命令: -P local clean package 或-P dev clean package install(将第三方jar 包一同打包,需要插件: maven-shade-plugin)

3、自定义 HBase-MapReduce2
目标: 实现将 HDFS 中的数据写入到 HBase 表中。

分步实现:
1 构建 ReadFruitFromHDFSMapper 于读取 HDFS 中的文件数据

package com.da.mr2;

import java.io.IOException;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class ReadFruitFromHDFSMapper extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 从 HDFS 中读取的数据
        String lineValue = value.toString();
        // 读取出来的每行数据使用\t 进行分割,存于 String 数组
        String[] values = lineValue.split("\t");
        // 根据数据中值的含义取值
        String rowKey = values[0];
        String name = values[1];
        String color = values[2];
        // 初始化 rowKey
        ImmutableBytesWritable rowKeyWritable = new ImmutableBytesWritable(Bytes.toBytes(rowKey));
        // 初始化 put 对象
        Put put = new Put(Bytes.toBytes(rowKey));
        // 参数分别:列族、列、值
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("name"), Bytes.toBytes(name));
        put.addColumn(Bytes.toBytes("info"), Bytes.toBytes("color"), Bytes.toBytes(color));
        context.write(rowKeyWritable, put);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

2 构建 Write2HbaseReducer 类

package com.da.mr2;

import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.NullWritable;

public class Write2HbaseReducer extends TableReducer<ImmutableBytesWritable, Put, NullWritable> {
    @Override
    protected void reduce(ImmutableBytesWritable key, Iterable<Put> values, Context context)
            throws IOException, InterruptedException {
        // 读出来的每一行数据写入到 fruit_hdfs 表中
        for (Put put : values) {
            context.write(NullWritable.get(), put);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18

3 创建 HDFS2HbaseRunner 组装 Job

package com.da.mr2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class HDFS2HbaseRunner implements Tool {
    private Configuration conf;

    @Override
    public Configuration getConf() {
        return this.conf;
    }

    @Override
    public void setConf(Configuration conf) {
        // 得到 Configuration
        this.conf = HBaseConfiguration.create(conf);
    }

    public int run(String[] args) throws Exception {
        // 得到 Configuration
        Configuration conf = this.getConf();
        // 创建 Job 任务
        Job job = Job.getInstance(conf);
        job.setJarByClass(HDFS2HbaseRunner.class);
        Path inPath = new Path("hdfs://hadoop102:9000/input_fruit/fruit.tsv");
        FileInputFormat.addInputPath(job, inPath);
        // 设置 Mapper
        job.setMapperClass(ReadFruitFromHDFSMapper.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(Put.class);
        // 设置 Reducer
        TableMapReduceUtil.initTableReducerJob("fruit", Write2HbaseReducer.class, job);
        // 设置 Reduce 数量,最少 1 个
        job.setNumReduceTasks(1);
        boolean isSuccess = job.waitForCompletion(true);
        if (!isSuccess) {
            throw new IOException("Job running with error");
        }
        return isSuccess ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int status = ToolRunner.run(new HDFS2HbaseRunner(), args);
        System.exit(status);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59

4 打包运行

/opt/module/hadoop-2.7.2/bin/yarn jar \
/opt/module/jars/hbase2.jar \
com.da.mr2.HDFS2HbaseRunner
  • 1
  • 2
  • 3

尖叫提示: 运行任务前,如果待数据导入的表不存在,则需要提前创建之。
尖叫提示: maven 打包命令: -P local clean package 或-P dev clean package install(将第三方jar 包一同打包,需要插件: maven-shade-plugin)

5、与 Hive 的集成

1、 HBase 与 Hive 的对比
1 Hive
1) 数据仓库
Hive 的本质其实就相当于将 HDFS 中已经存储的文件在 Mysql 中做了一个双射关系,以方便使用 HQL 去管理查询。

2) 用于数据分析、清洗
Hive 适用于离线的数据分析和清洗,延迟较高。

3) 基于 HDFS、 MapReduce
Hive 存储的数据依旧在 DataNode 上,编写的 HQL 语句终将是转换为 MapReduce 代码执行。

2 HBase
1) 数据库
是一种面向列存储的非关系型数据库。

2) 用于存储结构化和非结构话的数据
适用于单表非关系型数据的存储,不适合做关联查询,类似 JOIN 等操作。

3) 基于 HDFS
数据持久化存储的体现形式是 Hfile,存放于 DataNode 中,被 ResionServer 以 region 的形式进行管理。

4) 延迟较低,接入在线业务使用
面对大量的企业数据, HBase 可以直线单表大量数据的存储,同时提供了高效的数据访问速度。

2、 HBase 与 Hive 集成使用
尖叫提示: HBase 与 Hive 的集成在最新的两个版本中无法兼容。 所以,我们只能含着泪勇敢的重新编译: hive-hbase-handler-1.2.2.jar!! 好气!!

环境准备
因为我们后续可能会在操作 Hive 的同时对 HBase 也会产生影响,所以 Hive 需要持有操作HBase 的 Jar,那么接下来拷贝 Hive 所依赖的 Jar 包(或者使用软连接的形式)。

$ export HBASE_HOME=/opt/module/hbase-1.3.1
$ export HIVE_HOME=/opt/module/hive

$ ln -s $HBASE_HOME/lib/hbase-common-1.3.1.jar
$HIVE_HOME/lib/hbase-common-1.3.1.jar
$ ln -s $HBASE_HOME/lib/hbase-server-1.3.1.jar $HIVE_HOME/lib/hbase-server-1.3.1.jar
$ ln -s $HBASE_HOME/lib/hbase-client-1.3.1.jar $HIVE_HOME/lib/hbase-client-1.3.1.jar
$ ln -s $HBASE_HOME/lib/hbase-protocol-1.3.1.jar $HIVE_HOME/lib/hbase-protocol-1.3.1.jar
$ ln -s $HBASE_HOME/lib/hbase-it-1.3.1.jar $HIVE_HOME/lib/hbase-it-1.3.1.jar
$ ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar
$HIVE_HOME/lib/htrace-core-3.1.0-incubating.jar
$ ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
$HIVE_HOME/lib/hbase-hadoop2-compat-1.3.1.jar
$ ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
$HIVE_HOME/lib/hbase-hadoop-compat-1.3.1.jar
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

同时在 hive-site.xml 中修改 zookeeper 的属性,如下:

<property>
    <name>hive.zookeeper.quorum</name>
    <value>hadoop102,hadoop103,hadoop104</value>
    <description>The list of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
<property>
    <name>hive.zookeeper.client.port</name>
    <value>2181</value>
    <description>The port of ZooKeeper servers to talk to. This is only needed for read/write locks.</description>
</property>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

1 案例一
目标: 建立 Hive 表,关联 HBase 表,插入数据到 Hive 表的同时能够影响 HBase表。
分步实现:
1) 在 Hive 中创建表同时关联 HBase

CREATE TABLE hive_hbase_emp_table(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

尖叫提示: 完成之后,可以分别进入 Hive 和 HBase 查看,都生成了对应的表

2) 在 Hive 中创建临时中间表,用于 load 文件中的数据
尖叫提示: 不能将数据直接 load 进 Hive 所关联 HBase 的那张表中

CREATE TABLE emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
row format delimited fields terminated by '\t';
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

3) 向 Hive 中间表中 load 数据
hive> load data local inpath '/opt/module/data/emp.txt' into table emp;

4) 通过 insert 命令将中间表中的数据导入到 Hive 关联 HBase 的那张表中
hive> insert into table hive_hbase_emp_table select * from emp;

5) 查看 Hive 以及关联的 HBase 表中是否已经成功的同步插入了数据
Hive:
hive> select * from hive_hbase_emp_table;
HBase:
hbase> scan 'hbase_emp_table'

2 案例二
目标: 在 HBase 中已经存储了某一张表 hbase_emp_table,然后在 Hive 中创建一个外部表来关联 HBase 中的 hbase_emp_table 这张表,使之可以借助 Hive 来分析 HBase 这张表中的数据。
注: 该案例 2 紧跟案例 1 的脚步,所以完成此案例前,请先完成案例 1。

分步实现:
1) 在 Hive 中创建外部表

CREATE EXTERNAL TABLE relevance_hbase_emp(
empno int,
ename string,
job string,
mgr int,
hiredate string,
sal double,
comm double,
deptno int)
STORED BY
'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" =
":key,info:ename,info:job,info:mgr,info:hiredate,info:sal,info:comm,info:deptno")
TBLPROPERTIES ("hbase.table.name" = "hbase_emp_table");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

2) 关联后就可以使用 Hive 函数进行一些分析操作了
hive (default)> select * from relevance_hbase_emp;

6、与 Sqoop 的集成

之前我们已经学习过如何使用 Sqoop在 Hadoop集群和关系型数据库中进行数据的导入导出工作,接下来我们学习一下利用 Sqoop 在 HBase 和 RDBMS 中进行数据的转储。

相关参数:

参数描述
–column-family familySets the target column family for the import 设置导入的目标列族。
–hbase-create-tableIf specified, create missing HBase tables是否自动创建不存在的 HBase 表(这就意味着,不需要手动提前在 HBase 中先建立表)
–hbase-row-key colSpecifies which input column to use as the rowkey.Incase, if input table contains composite key, then col must be in the form of a comma-separated list of composite key attributes.mysql 中哪一列的值作为 HBase 的 rowkey,如果 rowkey是个组合键,则以逗号分隔。(注:避免 rowkey 的重复)
–hbase-table table-nameSpecifies an HBase table to use as the target instead of HDFS.指定数据将要导入到 HBase 中的哪张表中。
–hbase-bulkload Enablesbulk loading.是否允许 bulk 形式的导入。

1 案例
目标: 将 RDBMS 中的数据抽取到 HBase 中
分步实现:
1) 配置 sqoop-env.sh,添加如下内容:
export HBASE_HOME=/opt/module/hbase-1.3.1

2) 在 Mysql 中新建一个数据库 db_library,一张表 book

CREATE DATABASE db_library;
CREATE TABLE db_library.book(
id int(4) PRIMARY KEY NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
price VARCHAR(255) NOT NULL);
  • 1
  • 2
  • 3
  • 4
  • 5

3) 向表中插入一些数据

INSERT INTO db_library.book (name, price) VALUES('Lie Sporting', '30');
INSERT INTO db_library.book (name, price) VALUES('Pride & Prejudice', '70');
INSERT INTO db_library.book (name, price) VALUES('Fall of Giants', '50');
  • 1
  • 2
  • 3

4) 执行 Sqoop 导入数据的操作

$ bin/sqoop import \
--connect jdbc:mysql://linux01:3306/db_library \
--username root \
--password 123456 \
--table book \
--columns "id,name,price" \
--column-family "info" \
--hbase-create-table \
--hbase-row-key "id" \
--hbase-table "hbase_book" \
--num-mappers 1 \
--split-by id
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

尖叫提示: sqoop1.4.6 只支持 HBase1.0.1 之前的版本的自动创建 HBase 表的功能

解决方案:手动创建 HBase 表
hbase> create 'hbase_book','info'

5) 在 HBase 中 scan 这张表得到如下内容
hbase> scan ‘hbase_book’

7、 常用的 Shell 操作

1 satus
例如:显示服务器状态
hbase> status 'hadoop102'

2 whoami
显示 HBase 当前用户,例如:
hbase> whoami

3 list
显示当前所有的表
hbase> list

4 count
统计指定表的记录数,例如:
hbase> count 'hbase_book'

5 describe
展示表结构信息
hbase> describe 'hbase_book'

6 exist
检查表是否存在,适用于表量特别多的情况
hbase> exist 'hbase_book'

7 is_enabled/is_disabled
检查表是否启用或禁用
hbase> is_enabled 'hbase_book'
hbase> is_disabled 'hbase_book'

8 alter
该命令可以改变表和列族的模式,例如:
为当前表增加列族:
hbase> alter 'hbase_book', NAME => 'CF2', VERSIONS => 2
为当前表删除列族:
hbase> alter 'hbase_book', 'delete' => ’CF2’

9 disable
禁用一张表
hbase> disable 'hbase_book'

10 drop
删除一张表,记得在删除表之前必须先禁用
hbase> drop 'hbase_book'

11 delete
删除一行中一个单元格的值,例如:
hbase> delete 'hbase_book', 'rowKey', 'CF:C'

12 truncate
清空表数据,即禁用表-删除表-创建表
hbase> truncate 'hbase_book'

13 create
创建表,例如:
hbase> create 'table', 'cf'
创建多个列族:
hbase> create 't1', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'}

8、数据的备份与恢复

1、 备份
停止 HBase 服务后,使用 distcp 命令运行 MapReduce 任务进行备份,将数据备份到另一个地方,可以是同一个集群,也可以是专用的备份集群。
即,把数据转移到当前集群的其他目录下(也可以不在同一个集群中) :

$ bin/hadoop distcp \
hdfs://hadoop102:9000/hbase \
hdfs://hadoop103:9000/HbaseBackup/backup20180719
  • 1
  • 2
  • 3

尖叫提示:执行该操作, 一定要开启 Yarn 服务

2、恢复
非常简单, 与备份方法一样,将数据整个移动回来即可。

$ bin/hadoop distcp \
hdfs://hadoop103:9000/HbaseBackup/backup20180719 \
hdfs://hadoop102:9000/hbase
  • 1
  • 2
  • 3

9、节点的管理

1、服役(commissioning)
当启动 regionserver 时, regionserver 会向 HMaster 注册并开始接收本地数据,开始的时候,新加入的节点不会有任何数据,平衡器开启的情况下,将会有新的 region 移动到开启的RegionServer 上。如果启动和停止进程是使用 ssh 和 HBase 脚本,那么会将新添加的节点的主机名加入到 conf/regionservers 文件中。

2、退役(decommissioning)
顾名思义,就是从当前 HBase 集群中删除某个 RegionServer,这个过程分为如下几个过程:

1 停止负载平衡器
hbase> balance_switch false
2 在退役节点上停止 RegionServer
hbase> hbase-daemon.sh stop regionserver
3 RegionServer 一旦停止,会关闭维护的所有 region
4 Zookeeper 上的该 RegionServer 节点消失
5 Master 节点检测到该 RegionServer 下线
6 RegionServer 的 region 服务得到重新分配

该关闭方法比较传统,需要花费一定的时间,而且会造成部分 region 短暂的不可用。

另一种方案:
1 RegionServer 先卸载所管理的 region
$ bin/graceful_stop.sh <RegionServer-hostname>
2 自动平衡数据
3 和之前的 2~6 步是一样的

10、 版本的确界

1 版本的下界
默认的版本下界是 0,即禁用。 row 版本使用的最小数目是与生存时间(TTL Time To Live)相结合的,并且我们根据实际需求可以有 0 或更多的版本,使用 0,即只有 1 个版本的值写入 cell。

2 版本的上界
之前默认的版本上界是 3,也就是一个 row 保留 3 个副本(基于时间戳的插入)。该值不要设计的过大,一般的业务不会超过 100。如果 cell 中存储的数据版本号超过了 3 个,再次插入数据时,最新的值会将最老的值覆盖。 (现版本已默认为 1)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/weixin_40725706/article/detail/568453
推荐阅读
相关标签
  

闽ICP备14008679号