当前位置:   article > 正文

Kerberos安全认证-连载11-HBase Kerberos安全配置及访问_kerberos hbase

kerberos hbase

目录

1. Zookeeper Kerberos配置

2. HBase配置Kerberos

3. HBase启动及访问验证

4. HBase Shell操作HBase

​​​​​​​5. Java API操作HBase


技术连载系列,前面内容请参考前面连载10内容:​​​​​​​​​​​​​​Kerberos安全认证-连载10-Hive Kerberos 安全配置及访问_IT贫道的博客-CSDN博客

大数据组件HBase也可以通过Kerberos进行安全认证,由于HBase中需要zookeeper进行元数据管理、主节点选举、故障恢复,所以这里对HBase进行Kerberos安全认证时,建议也对Zookeeper进行安全认证。

1. Zookeeper Kerberos配置

zookeeper集群节点分布如下:

节点IP

节点名称

Zookeeper

192.168.179.4

node1

192.168.179.5

node2

192.168.179.6

node3

192.168.179.7

node4

192.168.179.8

node5

目前启动zookeeper集群后,可以通过任意Zookeeper客户端操作Zookeeper,可以通过Kerberos对Zookeeper进行认证避免非认证用户操作Zookeeper。按照如下步骤进行zookeeper kerberos认证。

1) 创建zookeeper Princial服务主体

在node1节点执行如下命令创建zookeeper Princial服务主体。

  1. [root@node1 ~]# kadmin.local -q "addprinc -pw 123456 zookeeper/node3"
  2. [root@node1 ~]# kadmin.local -q "addprinc -pw 123456 zookeeper/node4"
  3. [root@node1 ~]# kadmin.local -q "addprinc -pw 123456 zookeeper/node5"

​​​​​​​2) 将zookeeper服务主体写入到keytab文件

在node1节点执行如下命令,将zookeeper服务主体写入到keytab文件:

  1. [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/zookeeper.service.keytab zookeeper/node3@EXAMPLE.COM"
  2. [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/zookeeper.service.keytab zookeeper/node4@EXAMPLE.COM"
  3. [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/zookeeper.service.keytab zookeeper/node5@EXAMPLE.COM"

以上命令执行完成后,可以在node1节点/home/keytabs目录下生成zookeeper.service.keytab文件。

3) 将keytab文件分发到其他节点并修改权限

这里将node1节点生成的zookeeper.service.keytab文件分发到node2~node5节点,这里也可以直接分发到zookeeper服务所在节点,为了保持集群各个节点keytab文件一致,这里将该keytab文件分发到集群各个节点中。

  1. #在node1节点中执行如下命令进行分发
  2. [root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node2:/home/keytabs/
  3. [root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node3:/home/keytabs/
  4. [root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node4:/home/keytabs/
  5. [root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node5:/home/keytabs/

分发到node2~node5各个节点后,执行如下命令对zookeeper.service.keytab修改权限。

  1. #node1~node5所有节点执行如下命令
  2. chown root:hadoop /home/keytabs/zookeeper.service.keytab
  3. chmod 770 /home/keytabs/zookeeper.service.keytab

​​​​​​​4) 配置zoo.cfg文件

在node3~node5 zookeeper节点上配置zoo.cfg配置文件,每个节点在该文件最后追加如下配置:

  1. kerberos.removeHostFromPrincipal=true
  2. kerberos.removeRealmFromPrincipal=true
  3. authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
  4. jaasLoginRenew=3600000
  5. sessionRequireClientSASLAuth=false
  6. skipACL=yes

以上配置项的解释如下:

  • kerberos.removeHostFromPrincipal

kerberos认证时是否从Principal中移除主机名部分。

  • kerberos.removeRealmFromPrincipal

kerberos认证时是否从Principal中移除领域(Realm)部分。

  • authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider

指定ZooKeeper对客户端进行身份验证时使用的身份验证类。

  • jaasLoginRenew

Jaas登录(Java Authentication and Authorization Service)的续约时间,这里是每隔1小时需要续约Jaas登录。

  • sessionRequireClientSASLAuth

是否要求客户端进行SASL认证,SASL(Simple Authentication and Security Layer)认证是一种安全机制,用于在客户端和服务器之间进行身份验证和加密通信。当设置为要求客户端进行SASL认证时,客户端必须提供有效的凭证(如用户名和密码)来证明其身份,并与ZooKeeper服务器进行安全的通信。

这里需要将该值设置为false,否则Hadoop HA 不能正常启动。

  • skipACL=yes

跳过Zookeeper 访问控制列表(ACL)验证,允许连接zookeper后进行读取和写入。这里建议跳过,否则配置HBase 启动后不能向Zookeeper中写入数据。

这里在node3节点进行zoo.cfg文件的配置,配置完成后,将zoo.cfg文件分发到node4、node5 zookeeper节点上。

  1. [root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/zoo.cfg node4:/software/apache-zookeeper-3.6.3-bin/conf/
  2. [root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/zoo.cfg node5:/software/apache-zookeeper-3.6.3-bin/conf/

​​​​​​​​​​​​​​5) 配置jaas.conf文件

在node3~node5各个zookeeper节点中ZOOKEEPER_HOME/conf/目录下创建jaas.conf文件,该文件用于zookeeper服务端与客户端进行认证。配置内容如下:

  1. Server {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. keyTab="/home/keytabs/zookeeper.service.keytab"
  5. storeKey=true
  6. useTicketCache=false
  7. principal="zookeeper/node3@EXAMPLE.COM";
  8. };
  9. Client {
  10. com.sun.security.auth.module.Krb5LoginModule required
  11. useKeyTab=true
  12. keyTab="/home/keytabs/zookeeper.service.keytab"
  13. storeKey=true
  14. useTicketCache=false
  15. principal="zookeeper/node3@EXAMPLE.COM";
  16. };

以上配置文件是在node3 zookeeper节点配置,将以上文件分发到node4、node5节点后,需要将principal修改成对应节点的信息。

node4 zookeeper节点jaas.conf配置如下:

  1. Server {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. keyTab="/home/keytabs/zookeeper.service.keytab"
  5. storeKey=true
  6. useTicketCache=false
  7. principal="zookeeper/node4@EXAMPLE.COM";
  8. };
  9. Client {
  10. com.sun.security.auth.module.Krb5LoginModule required
  11. useKeyTab=true
  12. keyTab="/home/keytabs/zookeeper.service.keytab"
  13. storeKey=true
  14. useTicketCache=false
  15. principal="zookeeper/node4@EXAMPLE.COM";
  16. };

node5 zookeeper节点jaas.conf配置如下:

  1. Server {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. keyTab="/home/keytabs/zookeeper.service.keytab"
  5. storeKey=true
  6. useTicketCache=false
  7. principal="zookeeper/node5@EXAMPLE.COM";
  8. };
  9. Client {
  10. com.sun.security.auth.module.Krb5LoginModule required
  11. useKeyTab=true
  12. keyTab="/home/keytabs/zookeeper.service.keytab"
  13. storeKey=true
  14. useTicketCache=false
  15. principal="zookeeper/node5@EXAMPLE.COM";
  16. };

​​​​​​​6) 配置java.env文件

在zookeeper各个节点ZOOKEEPER_HOME/conf目录下创建java.env文件,写入如下内容,这里在node3~node5各个节点都需配置。

  1. #指定jaas.conf文件路径
  2. export JVMFLAGS="-Djava.security.auth.login.config=/software/apache-zookeeper-3.6.3-bin/conf/jaas.conf"

以上可以先在node3节点进行配置,然后分发到node4~node5节点上。如下:

  1. [root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/java.env node4:/software/apache-zookeeper-3.6.3-bin/conf/
  2. [root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/java.env node5:/software/apache-zookeeper-3.6.3-bin/conf/

​​​​​​​​​​​​​​7) 启动zookeeper

在node3~node5各个节点执行如下命令,启动zookeeper。

  1. #启动zookeeper
  2. zkServer.sh start
  3. #检查zookeeper状态
  4. zkServer.sh status

​​​​​​​2. HBase配置Kerberos

HBase也支持Kerberos安全认证,经过测试,HBase2.5.x版本在经过Kerberos认证后与Hadoop通信认证有异常,具体报错如下:

hdfs.DataStreamer: Exception in createBlockOutputStream java.io.IOException: Invalid token in javax.security.sasl.qop: DI

这里建议使用HBase2.5.x以下版本与kerberos进行整合,但HBase版本的选择也需要参考Hadoop的版本,可以在Hbase官网找到Hadoop版本匹配的HBase版本,地址为:https://hbase.apache.org/book.html#hadoop

我们这里使用的Hadoop版本为3.3.4,HBase版本选择2.2.6版本。HBase Kerberos安全认证配置步骤如下:

1) 创建HBase服务用户并两两节点进行免密

在Hadoop体系中操作HBase时通常使用hbase用户,这里在集群各个节点创建hbase用户并设置在HBase用户下,节点之间的免密。

在node1~node5节点创建hbase用户,所属hadoop组。

  1. #这里设置用户密码为123456
  2. useradd hbase -g hadoop
  3. passwd hbase

各个节点切换到hbase用户,并设置各个节点两两免密。

  1. #所有节点切换成hbase用户
  2. su hbase
  3. cd ~
  4. #node1~node5所有节点生成SSH密钥
  5. ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
  6. #node1~node5所有节点公钥复制到node1节点上,这个过程需要输入yes和密码
  7. ssh-copy-id node1
  8. #将node1 authorized_keys文件分发到node1~node5各节点,这个过程需要输入密码
  9. [hbase@node1 ~]$ cd ~/.ssh/
  10. [hbase@node1 .ssh]$ scp authorized_keys node2:`pwd`
  11. [hbase@node1 .ssh]$ scp authorized_keys node3:`pwd`
  12. [hbase@node1 .ssh]$ scp authorized_keys node4:`pwd`
  13. [hbase@node1 .ssh]$ scp authorized_keys node5:`pwd`
  14. #两两节点进行ssh测试,这一步骤必须做,然后node1~node5节点退出当前hbase用户
  15. exit

​​​​​​​2) 创建HBase服务Princial主体并写入到keytab文件

在kerberos服务端node1节点执行如下命令创建HBase服务主体:

  1. [root@node1 ~]# kadmin.local -q "addprinc -pw 123456 hbase/node3"
  2. [root@node1 ~]# kadmin.local -q "addprinc -pw 123456 hbase/node4"
  3. [root@node1 ~]# kadmin.local -q "addprinc -pw 123456 hbase/node5"

在kerberos服务端node1节点执行如下命令将hbase主体写入到keytab文件:

  1. #node1节点执行命令,将主体写入到keytab
  2. [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/hbase.service.keytab hbase/node3@EXAMPLE.COM"
  3. [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/hbase.service.keytab hbase/node4@EXAMPLE.COM"
  4. [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/hbase.service.keytab hbase/node5@EXAMPLE.COM"

以上命令执行完成后,在node1节点/home/keytabs目录下生成hbase.service.keytab文件,将该文件分发到各个节点并赋权,这里可以只发送到node3~node5 Hbase所在节点,为了保证各个大数据集群节点的keytabs一致,这里分发到所有节点。

  1. [root@node1 ~]# scp /home/keytabs/hbase.service.keytab node2:/home/keytabs/
  2. [root@node1 ~]# scp /home/keytabs/hbase.service.keytab node3:/home/keytabs/
  3. [root@node1 ~]# scp /home/keytabs/hbase.service.keytab node4:/home/keytabs/
  4. [root@node1 ~]# scp /home/keytabs/hbase.service.keytab node5:/home/keytabs/

分发完成后,在集群各个节点上执行如下命令,修改hbase.service.keytab密钥文件所属用户和组信息:

  1. chown root:hadoop /home/keytabs/hbase.service.keytab
  2. chmod 770 /home/keytabs/hbase.service.keytab

​​​​​​​3) 修改hbase-site.xml配置文件

在所有hbase节点中修改HBASE_HOME/conf/hbase-site.xml文件内容,配置支持kerberos,向hbase-site.xml文件中追加如下内容:

  1. <!-- hbase开启安全认证 -->
  2. <property>
  3. <name>hbase.security.authorization</name>
  4. <value>true</value>
  5. </property>
  6. <!-- hbase配置kerberos安全认证 -->
  7. <property>
  8. <name>hbase.security.authentication</name>
  9. <value>kerberos</value>
  10. </property>
  11. <!-- HMaster配置kerberos安全凭据认证 -->
  12. <property>
  13. <name>hbase.master.kerberos.principal</name>
  14. <value>hbase/_HOST@EXAMPLE.COM</value>
  15. </property>
  16. <!-- HMaster配置kerberos安全证书keytab文件位置 -->
  17. <property>
  18. <name>hbase.master.keytab.file</name>
  19. <value>/home/keytabs/hbase.service.keytab</value>
  20. </property>
  21. <!-- Regionserver配置kerberos安全凭据认证 -->
  22. <property>
  23. <name>hbase.regionserver.kerberos.principal</name>
  24. <value>hbase/_HOST@EXAMPLE.COM</value>
  25. </property>
  26. <!-- Regionserver配置kerberos安全证书keytab文件位置 -->
  27. <property>
  28. <name>hbase.regionserver.keytab.file</name>
  29. <value>/home/keytabs/hbase.service.keytab</value>
  30. </property>

这里可以先在node3节点进行配置,配置完成后分发到node4、node5节点:

  1. [root@node3 ~]# scp /software/hbase-2.2.6/conf/hbase-site.xml node4:/software/hbase-2.2.6/conf/
  2. [root@node3 ~]# scp /software/hbase-2.2.6/conf/hbase-site.xml node5:/software/hbase-2.2.6/conf/

​​​​​​​4) 准备hdfs-site.xml和core-site.xml文件

HBase底层存储使用HDFS,这里需要将Hadoop中hdfs-site.xml和core-site.xml文件发送到hbase各个节点HBASE_HOME/conf/目录中。在node3~node5各个HBase节点中执行如下命令:

  1. #node3~node5各个节点执行
  2. cp /software/hadoop-3.3.4/etc/hadoop/hdfs-site.xml /software/hbase-2.2.6/conf/
  3. cp /software/hadoop-3.3.4/etc/hadoop/core-site.xml /software/hbase-2.2.6/conf/

​​​​​​​​​​​​​​5) 准备zk-jaas.conf文件

HBase中会使用zookeeper管理元数据和选主,这里由于Zookeeper中已经开启了Kerberos认证,所以需要准备zk-jaas.conf文件连接zookeeper时进行认证。在HBase各个节点HBASE_HOME/conf目录下创建zk-jaas.conf文件,写入如下内容,不同的节点设置的principal是对应HBase主机节点。

node3节点HBASE_HOME/conf/zk-jaas.conf文件内容:

  1. Client {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. keyTab="/home/keytabs/hbase.service.keytab"
  5. useTicketCache=false
  6. principal="hbase/node3@EXAMPLE.COM";
  7. };

node4节点HBASE_HOME/conf/zk-jaas.conf文件内容:

  1. Client {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. keyTab="/home/keytabs/hbase.service.keytab"
  5. useTicketCache=false
  6. principal="hbase/node4@EXAMPLE.COM";
  7. };

node5节点HBASE_HOME/conf/zk-jaas.conf文件内容:

  1. Client {
  2. com.sun.security.auth.module.Krb5LoginModule required
  3. useKeyTab=true
  4. keyTab="/home/keytabs/hbase.service.keytab"
  5. useTicketCache=false
  6. principal="hbase/node5@EXAMPLE.COM";
  7. };

​​​​​​​​​​​​​​6) 修改hbase-env.sh配置

在各个hbase节点上配置HBASE_HOME/conf/hbase-env.sh,将HBASE_OPTS选项修改为如下:

  1. #node3~node5各个节点修改hbase-env.sh中的HBASE_OPTS配置
  2. export HBASE_OPTS="$HBASE_OPTS -XX:+UseConcMarkSweepGC -Djava.security.auth.login.config=/software/hbase-2.2.6/conf/zk-jaas.conf"

​​​​​​​​​​​​​​7) 修改HBase安装目录权限

在HBase各个节点创建HBASE_HOME/logs目录,如果该目录不存在需要提前创建,存在即可忽略,该目录为HBase日志目录,需要将该目录权限修改为root:hadoop、访问权限为770,否则hbase用户在启动Hbase集群时没有向该目录写日志权限。

  1. #在node3~node5 HBase节点执行命令创建HBASE_HOME/logs
  2. mkdir -p /software/hbase-2.2.6/logs

将各个节点的hbase安装目录权限修改为root:hadoop,logs目录访问权限为770:

  1. #在node3~node5 HBase节点执行命令
  2. chown -R root:hadoop /software/hbase-2.2.6
  3. chmod -R 770 /software/hbase-2.2.6/logs

​​​​​​​3. HBase启动及访问验证

在启动HBase集群之前,需要将Zookeeper和HDFS集群重启,Zookeeper和HDFS启动命令如下:

  1. #停止hadoop集群
  2. [root@node1 ~]# stop-all.sh
  3. #node3~node5节点,重启zookeeper
  4. [root@node3 ~]# zkServer.sh restart
  5. [root@node3 ~]# zkServer.sh restart
  6. [root@node3 ~]# zkServer.sh restart
  7. #启动HDFS集群
  8. [root@node1 ~]# start-all.sh

如果没有配置Kerberos认证时HBase之前启动过,在HDFS中将会有/hbase 目录,需要使用hdfs超级管理员用户将该目录的用户和所属组修改成hbase:hadoop。

  1. #在node5节点执行
  2. [root@node5 ~]# su hdfs
  3. [hdfs@node5 ~]$ kinit hdfs
  4. Password for hdfs@EXAMPLE.COM: 123456
  5. [hdfs@node5 logs]$ hdfs dfs -ls /
  6. Found 6 items
  7. drwxr-xr-x - root supergroup 0 2023-05-23 10:50 /hbase
  8. #修改用户和所属组
  9. [hdfs@node5 logs]$ hadoop dfs -chown -R hbase:hadoop /hbase

在HBase主节点node4上执行如下命令启动HBase集群:

  1. #无需切换hbase用户,可以直接在node4节点执行如下命令启动hbase集群
  2. [root@node4 ~]# sudo -i -u hbase start-hbase.sh

注意:以上启动Hbase集群命令中没有切换到hbase用户,sudo为使用超级用户权限执行命令,-i表示交互模式执行命令,-u指定切换的用户。

HBase启动后,可以通过http://node4:16010进行访问:

 ​​​​​​​4. HBase Shell操作HBase

在node3~node5任意节点登录hbase shell,可以看到如果节点没有进行用户认证,读取HBase数据时没有权限:

  1. #假设在node4登录hbase客户端
  2. [root@node4 ~]# klist
  3. klist: No credentials cache found (filename: /tmp/krb5cc_0)
  4. [root@node4 ~]# hbase shell
  5. hbase(main):001:0> list
  6. TABLE
  7. ERROR: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)

如果想要正常读取HBase中的数据需要进行客户端认证,这里以zhangsan用户为例,进行Kerberos主体认证,认证后操作HBase,操作如下:

  1. #进行用户认证
  2. [root@node4 ~]# kinit zhangsan
  3. Password for zhangsan@EXAMPLE.COM: 123456
  4. [root@node4 ~]# klist
  5. Ticket cache: FILE:/tmp/krb5cc_0
  6. Default principal: zhangsan@EXAMPLE.COM
  7. #登录hbase shell
  8. [root@node4 ~]# hbase shell
  9. hbase(main):001:0> create 'person','cf1','cf2'
  10. Created table person
  11. hbase(main):002:0> list
  12. person
  13. hbase(main):003:0> put 'person','row1','cf1:id','1'
  14. hbase(main):004:0> put 'person','row1','cf1:name','zs'
  15. hbase(main):005:0> put 'person','row1','cf1:age',18
  16. hbase(main):006:0> scan 'person'
  17. ROW COLUMN+CELL
  18. row1 column=cf1:age, timestamp=1684317908851, value=18
  19. row1 column=cf1:id, timestamp=1684317879179, value=1
  20. row1 column=cf1:name, timestamp=1684317894358, value=zs

​​​​​​​5. Java API操作HBase

java API操作经过Kerberos安全认证的HBase按照如下步骤实现即可。

1) 准备krb5.conf文件

将node1 kerberos服务端/etc/krb5.conf文件存放在IDEA项目中的resources资源目录中或者本地Window固定的某个目录中,用于编写代码时指定访问Kerberos的Realm。

2) 生成用户keytab文件

在kerberos服务端node1节点上,执行如下命令,对zhangsan用户主体生成keytab密钥文件。

  1. #在node1 kerberos服务端执行
  2. [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /root/zhangsan.keytab zhangsan@EXAMPLE.COM"

以上命令执行之后,在/root目录下会生成zhangsan.keytab文件,将该文件复制到IDEA项目中的resources资源目录中或者本地window固定的某个目录中,该文件用于编写代码时认证kerberos。

3) 准备访问HDFS需要的资源文件

将HDFS中的core-site.xml 、hdfs-site.xml 、yarn-site.xml、hbase-site.xml文件上传到项目resources资源目录中。

4) 编写代码操作HBase

项目代码中引入如下maven依赖:

  1. <!-- hbase依赖包 -->
  2. <dependency>
  3. <groupId>org.apache.hbase</groupId>
  4. <artifactId>hbase-server</artifactId>
  5. <version>2.2.6</version>
  6. </dependency>
  7. <dependency>
  8. <groupId>org.apache.hbase</groupId>
  9. <artifactId>hbase-client</artifactId>
  10. <version>2.2.6</version>
  11. </dependency>

编写Java代码操作Kerberos认证的HBase代码如下:

  1. /**
  2. * Java API 操作Kerberos 认证的HBase
  3. */
  4. public class OperateAuthHBase {
  5. private static Configuration conf;
  6. private static Connection connection;
  7. private static Admin admin;
  8. /**
  9. * 初始化:进行kerberos认证并设置hbase configuration
  10. */
  11. static {
  12. try {
  13. String principal = "zhangsan@EXAMPLE.COM";
  14. String keytabPath = "D:\\idea_space\\KerberosAuth\\KerberosAuthHBase\\src\\main\\resources\\zhangsan.keytab";
  15. conf = HBaseConfiguration.create();
  16. // 设置Kerberos 认证
  17. System.setProperty("java.security.krb5.conf", "D:\\idea_space\\KerberosAuth\\KerberosAuthHBase\\src\\main\\resources\\krb5.conf");
  18. conf.set("hadoop.security.authentication", "kerberos");
  19. UserGroupInformation.setConfiguration(conf);
  20. UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
  21. //设置zookeeper集群地址及端口号
  22. conf.set("hbase.zookeeper.quorum","node3,node4,node5");
  23. conf.set("hbase.zookeeper.property.clientPort","2181");
  24. //创建连接
  25. connection = ConnectionFactory.createConnection(conf);
  26. } catch (IOException e) {
  27. throw new RuntimeException(e);
  28. }
  29. }
  30. public static void main(String[] args) throws IOException {
  31. try{
  32. //获取Admin对象
  33. admin = connection.getAdmin();
  34. //创建表
  35. createHBaseTable("tbl1", "cf1");
  36. System.out.println("===========================");
  37. //查看Hbase中所有表
  38. listHBaseTables();
  39. System.out.println("===========================");
  40. //向表中插入数据
  41. putTableData("tbl1","row1", "cf1", "id", "100");
  42. putTableData("tbl1","row1", "cf1", "name", "zs");
  43. putTableData("tbl1","row1", "cf1", "age", "18");
  44. putTableData("tbl1","row2", "cf1", "id", "200");
  45. putTableData("tbl1","row2", "cf1", "name", "ls");
  46. putTableData("tbl1","row2", "cf1", "age", "19");
  47. System.out.println("===========================");
  48. //查询表中的数据
  49. scanTableData("tbl1");
  50. System.out.println("===========================");
  51. //查询指定row数据
  52. getRowData("tbl1", "row1","cf1","name");
  53. System.out.println("===========================");
  54. //删除指定row数据
  55. deleteRowData("tbl1", "row1","cf1","name");
  56. System.out.println("===========================");
  57. //查询表中的数据
  58. scanTableData("tbl1");
  59. System.out.println("===========================");
  60. //删除表
  61. deleteHBaseTable("tbl1");
  62. //查看Hbase中所有表
  63. listHBaseTables();
  64. System.out.println("===========================");
  65. }catch (Exception e) {
  66. e.printStackTrace();
  67. }finally {
  68. if(admin != null) {
  69. admin.close();
  70. }
  71. if(connection != null) {
  72. connection.close();
  73. }
  74. }
  75. }
  76. //判断表是否存在
  77. private static boolean isTableExist(String tableName) {
  78. try {
  79. return admin.tableExists(TableName.valueOf(tableName));
  80. } catch (IOException e) {
  81. e.printStackTrace();
  82. }
  83. return false;
  84. }
  85. private static void createHBaseTable(String tableName, String... cfs) {
  86. if(isTableExist(tableName)) {
  87. System.out.println("表已经存在!");
  88. return;
  89. }
  90. if(cfs == null || cfs.length == 0){
  91. System.out.println("列族不能为空!");
  92. return;
  93. }
  94. TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
  95. List<ColumnFamilyDescriptor> cFDBList = new ArrayList<>();
  96. for (String cf : cfs) {
  97. cFDBList.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build());
  98. }
  99. tableDescriptorBuilder.setColumnFamilies(cFDBList);
  100. try {
  101. admin.createTable(tableDescriptorBuilder.build());
  102. System.out.println("创建表" + tableName + "成功!");
  103. } catch (IOException e) {
  104. System.out.println("创建失败!");
  105. e.printStackTrace();
  106. }
  107. }
  108. //查看Hbase中所有表
  109. private static void listHBaseTables() {
  110. try {
  111. TableName[] tableNames = admin.listTableNames();
  112. System.out.println("打印所有命名空间表名:");
  113. for (TableName tableName : tableNames) {
  114. System.out.println(tableName);
  115. }
  116. } catch (IOException e) {
  117. e.printStackTrace();
  118. }
  119. }
  120. //向表中插入数据
  121. private static void putTableData(String tableName, String rowKey, String cf, String column, String value) {
  122. Table table = null;
  123. try {
  124. table = connection.getTable(TableName.valueOf(tableName));
  125. Put p = new Put(Bytes.toBytes(rowKey));
  126. p.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
  127. table.put(p);
  128. System.out.println("插入数据成功!");
  129. } catch (IOException e) {
  130. e.printStackTrace();
  131. } finally {
  132. closeTable(table);
  133. }
  134. }
  135. //查询表中的数据
  136. private static void scanTableData(String tableName) {
  137. if(tableName == null || tableName.length() == 0) {
  138. System.out.println("请正确输入表名!");
  139. return;
  140. }
  141. Table table = null;
  142. try {
  143. table = connection.getTable(TableName.valueOf(tableName));
  144. Scan scan = new Scan();
  145. ResultScanner resultScanner = table.getScanner(scan);
  146. Iterator<Result> iterator = resultScanner.iterator();
  147. while(iterator.hasNext()) {
  148. Result result = iterator.next();
  149. Cell[] cells = result.rawCells();
  150. for(Cell cell : cells) {
  151. //得到rowkey
  152. System.out.println("rowkey: " + Bytes.toString(CellUtil.cloneRow(cell)));
  153. //得到列族
  154. System.out.println("列族: " + Bytes.toString(CellUtil.cloneFamily(cell)));
  155. System.out.println("列: " + Bytes.toString(CellUtil.cloneQualifier(cell)));
  156. System.out.println("值: " + Bytes.toString(CellUtil.cloneValue(cell)));
  157. }
  158. }
  159. } catch (IOException e) {
  160. e.printStackTrace();
  161. } finally {
  162. closeTable(table);
  163. }
  164. }
  165. //查询指定row数据
  166. private static void getRowData(String tableName, String rowkey,String colFamily, String col) {
  167. Table table = null;
  168. try {
  169. table = connection.getTable(TableName.valueOf(tableName));
  170. Get g = new Get(Bytes.toBytes(rowkey));
  171. // 获取指定列族数据
  172. if(col == null && colFamily != null) {
  173. g.addFamily(Bytes.toBytes(colFamily));
  174. } else if(col != null && colFamily != null) {
  175. // 获取指定列数据
  176. g.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
  177. }
  178. Result result = table.get(g);
  179. System.out.println("查询指定rowkey 数据如下");
  180. for(Cell cell : result.rawCells()) {
  181. //得到rowkey
  182. System.out.println("rowkey: " + Bytes.toString(CellUtil.cloneRow(cell)));
  183. //得到列族
  184. System.out.println("列族: " + Bytes.toString(CellUtil.cloneFamily(cell)));
  185. System.out.println("列: " + Bytes.toString(CellUtil.cloneQualifier(cell)));
  186. System.out.println("值: " + Bytes.toString(CellUtil.cloneValue(cell)));
  187. }
  188. } catch (IOException e) {
  189. e.printStackTrace();
  190. } finally {
  191. closeTable(table);
  192. }
  193. }
  194. //删除指定row数据
  195. private static void deleteRowData(String tableName , String rowKey, String colFamily, String ... cols) {
  196. if(tableName == null || tableName.length() == 0) {
  197. System.out.println("请正确输入表名!");
  198. return;
  199. }
  200. Table table = null;
  201. try {
  202. table = connection.getTable(TableName.valueOf(tableName));
  203. Delete del = new Delete(Bytes.toBytes(rowKey));
  204. // 删除指定列族
  205. del.addFamily(Bytes.toBytes(colFamily));
  206. // 删除指定列
  207. for (String col : cols) {
  208. del.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
  209. }
  210. table.delete(del);
  211. System.out.println("删除数据成功!");
  212. } catch (IOException e) {
  213. e.printStackTrace();
  214. } finally {
  215. closeTable(table);
  216. }
  217. }
  218. //删除表
  219. private static void deleteHBaseTable(String tableName) {
  220. try {
  221. if(!isTableExist(tableName)) {
  222. System.out.println("要删除的表不存在!");
  223. return;
  224. }
  225. admin.disableTable(TableName.valueOf(tableName));
  226. admin.deleteTable(TableName.valueOf(tableName));
  227. } catch (IOException e) {
  228. e.printStackTrace();
  229. }
  230. }
  231. //关闭表
  232. private static void closeTable(Table table) {
  233. if(table != null) {
  234. try {
  235. table.close();
  236. } catch (IOException e) {
  237. e.printStackTrace();
  238. }
  239. }
  240. }
  241. }

欢迎点赞、评论、收藏,关注IT贫道,获取IT技术知识!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/611938
推荐阅读
相关标签
  

闽ICP备14008679号