赞
踩
目录
技术连载系列,前面内容请参考前面连载10内容:Kerberos安全认证-连载10-Hive Kerberos 安全配置及访问_IT贫道的博客-CSDN博客
大数据组件HBase也可以通过Kerberos进行安全认证,由于HBase中需要zookeeper进行元数据管理、主节点选举、故障恢复,所以这里对HBase进行Kerberos安全认证时,建议也对Zookeeper进行安全认证。
zookeeper集群节点分布如下:
节点IP | 节点名称 | Zookeeper |
192.168.179.4 | node1 | |
192.168.179.5 | node2 | |
192.168.179.6 | node3 | ★ |
192.168.179.7 | node4 | ★ |
192.168.179.8 | node5 | ★ |
目前启动zookeeper集群后,可以通过任意Zookeeper客户端操作Zookeeper,可以通过Kerberos对Zookeeper进行认证避免非认证用户操作Zookeeper。按照如下步骤进行zookeeper kerberos认证。
1) 创建zookeeper Princial服务主体
在node1节点执行如下命令创建zookeeper Princial服务主体。
- [root@node1 ~]# kadmin.local -q "addprinc -pw 123456 zookeeper/node3"
- [root@node1 ~]# kadmin.local -q "addprinc -pw 123456 zookeeper/node4"
- [root@node1 ~]# kadmin.local -q "addprinc -pw 123456 zookeeper/node5"
2) 将zookeeper服务主体写入到keytab文件
在node1节点执行如下命令,将zookeeper服务主体写入到keytab文件:
- [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/zookeeper.service.keytab zookeeper/node3@EXAMPLE.COM"
- [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/zookeeper.service.keytab zookeeper/node4@EXAMPLE.COM"
- [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/zookeeper.service.keytab zookeeper/node5@EXAMPLE.COM"
以上命令执行完成后,可以在node1节点/home/keytabs目录下生成zookeeper.service.keytab文件。
3) 将keytab文件分发到其他节点并修改权限
这里将node1节点生成的zookeeper.service.keytab文件分发到node2~node5节点,这里也可以直接分发到zookeeper服务所在节点,为了保持集群各个节点keytab文件一致,这里将该keytab文件分发到集群各个节点中。
- #在node1节点中执行如下命令进行分发
- [root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node2:/home/keytabs/
- [root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node3:/home/keytabs/
- [root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node4:/home/keytabs/
- [root@node1 ~]# scp /home/keytabs/zookeeper.service.keytab node5:/home/keytabs/
分发到node2~node5各个节点后,执行如下命令对zookeeper.service.keytab修改权限。
- #node1~node5所有节点执行如下命令
- chown root:hadoop /home/keytabs/zookeeper.service.keytab
- chmod 770 /home/keytabs/zookeeper.service.keytab
4) 配置zoo.cfg文件
在node3~node5 zookeeper节点上配置zoo.cfg配置文件,每个节点在该文件最后追加如下配置:
- kerberos.removeHostFromPrincipal=true
- kerberos.removeRealmFromPrincipal=true
- authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider
- jaasLoginRenew=3600000
- sessionRequireClientSASLAuth=false
- skipACL=yes
以上配置项的解释如下:
kerberos认证时是否从Principal中移除主机名部分。
kerberos认证时是否从Principal中移除领域(Realm)部分。
指定ZooKeeper对客户端进行身份验证时使用的身份验证类。
Jaas登录(Java Authentication and Authorization Service)的续约时间,这里是每隔1小时需要续约Jaas登录。
是否要求客户端进行SASL认证,SASL(Simple Authentication and Security Layer)认证是一种安全机制,用于在客户端和服务器之间进行身份验证和加密通信。当设置为要求客户端进行SASL认证时,客户端必须提供有效的凭证(如用户名和密码)来证明其身份,并与ZooKeeper服务器进行安全的通信。
这里需要将该值设置为false,否则Hadoop HA 不能正常启动。
跳过Zookeeper 访问控制列表(ACL)验证,允许连接zookeper后进行读取和写入。这里建议跳过,否则配置HBase 启动后不能向Zookeeper中写入数据。
这里在node3节点进行zoo.cfg文件的配置,配置完成后,将zoo.cfg文件分发到node4、node5 zookeeper节点上。
- [root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/zoo.cfg node4:/software/apache-zookeeper-3.6.3-bin/conf/
- [root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/zoo.cfg node5:/software/apache-zookeeper-3.6.3-bin/conf/
5) 配置jaas.conf文件
在node3~node5各个zookeeper节点中ZOOKEEPER_HOME/conf/目录下创建jaas.conf文件,该文件用于zookeeper服务端与客户端进行认证。配置内容如下:
- Server {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- keyTab="/home/keytabs/zookeeper.service.keytab"
- storeKey=true
- useTicketCache=false
- principal="zookeeper/node3@EXAMPLE.COM";
- };
-
- Client {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- keyTab="/home/keytabs/zookeeper.service.keytab"
- storeKey=true
- useTicketCache=false
- principal="zookeeper/node3@EXAMPLE.COM";
- };
以上配置文件是在node3 zookeeper节点配置,将以上文件分发到node4、node5节点后,需要将principal修改成对应节点的信息。
node4 zookeeper节点jaas.conf配置如下:
- Server {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- keyTab="/home/keytabs/zookeeper.service.keytab"
- storeKey=true
- useTicketCache=false
- principal="zookeeper/node4@EXAMPLE.COM";
- };
-
- Client {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- keyTab="/home/keytabs/zookeeper.service.keytab"
- storeKey=true
- useTicketCache=false
- principal="zookeeper/node4@EXAMPLE.COM";
- };
node5 zookeeper节点jaas.conf配置如下:
- Server {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- keyTab="/home/keytabs/zookeeper.service.keytab"
- storeKey=true
- useTicketCache=false
- principal="zookeeper/node5@EXAMPLE.COM";
- };
-
- Client {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- keyTab="/home/keytabs/zookeeper.service.keytab"
- storeKey=true
- useTicketCache=false
- principal="zookeeper/node5@EXAMPLE.COM";
- };
6) 配置java.env文件
在zookeeper各个节点ZOOKEEPER_HOME/conf目录下创建java.env文件,写入如下内容,这里在node3~node5各个节点都需配置。
- #指定jaas.conf文件路径
- export JVMFLAGS="-Djava.security.auth.login.config=/software/apache-zookeeper-3.6.3-bin/conf/jaas.conf"
以上可以先在node3节点进行配置,然后分发到node4~node5节点上。如下:
- [root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/java.env node4:/software/apache-zookeeper-3.6.3-bin/conf/
- [root@node3 ~]# scp /software/apache-zookeeper-3.6.3-bin/conf/java.env node5:/software/apache-zookeeper-3.6.3-bin/conf/
7) 启动zookeeper
在node3~node5各个节点执行如下命令,启动zookeeper。
- #启动zookeeper
- zkServer.sh start
-
- #检查zookeeper状态
- zkServer.sh status
HBase也支持Kerberos安全认证,经过测试,HBase2.5.x版本在经过Kerberos认证后与Hadoop通信认证有异常,具体报错如下:
hdfs.DataStreamer: Exception in createBlockOutputStream java.io.IOException: Invalid token in javax.security.sasl.qop: DI
这里建议使用HBase2.5.x以下版本与kerberos进行整合,但HBase版本的选择也需要参考Hadoop的版本,可以在Hbase官网找到Hadoop版本匹配的HBase版本,地址为:https://hbase.apache.org/book.html#hadoop
我们这里使用的Hadoop版本为3.3.4,HBase版本选择2.2.6版本。HBase Kerberos安全认证配置步骤如下:
1) 创建HBase服务用户并两两节点进行免密
在Hadoop体系中操作HBase时通常使用hbase用户,这里在集群各个节点创建hbase用户并设置在HBase用户下,节点之间的免密。
在node1~node5节点创建hbase用户,所属hadoop组。
- #这里设置用户密码为123456
- useradd hbase -g hadoop
- passwd hbase
各个节点切换到hbase用户,并设置各个节点两两免密。
- #所有节点切换成hbase用户
- su hbase
- cd ~
-
- #node1~node5所有节点生成SSH密钥
- ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa
-
- #node1~node5所有节点公钥复制到node1节点上,这个过程需要输入yes和密码
- ssh-copy-id node1
-
- #将node1 authorized_keys文件分发到node1~node5各节点,这个过程需要输入密码
- [hbase@node1 ~]$ cd ~/.ssh/
- [hbase@node1 .ssh]$ scp authorized_keys node2:`pwd`
- [hbase@node1 .ssh]$ scp authorized_keys node3:`pwd`
- [hbase@node1 .ssh]$ scp authorized_keys node4:`pwd`
- [hbase@node1 .ssh]$ scp authorized_keys node5:`pwd`
-
- #两两节点进行ssh测试,这一步骤必须做,然后node1~node5节点退出当前hbase用户
- exit
2) 创建HBase服务Princial主体并写入到keytab文件
在kerberos服务端node1节点执行如下命令创建HBase服务主体:
- [root@node1 ~]# kadmin.local -q "addprinc -pw 123456 hbase/node3"
- [root@node1 ~]# kadmin.local -q "addprinc -pw 123456 hbase/node4"
- [root@node1 ~]# kadmin.local -q "addprinc -pw 123456 hbase/node5"
在kerberos服务端node1节点执行如下命令将hbase主体写入到keytab文件:
- #node1节点执行命令,将主体写入到keytab
- [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/hbase.service.keytab hbase/node3@EXAMPLE.COM"
- [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/hbase.service.keytab hbase/node4@EXAMPLE.COM"
- [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /home/keytabs/hbase.service.keytab hbase/node5@EXAMPLE.COM"
以上命令执行完成后,在node1节点/home/keytabs目录下生成hbase.service.keytab文件,将该文件分发到各个节点并赋权,这里可以只发送到node3~node5 Hbase所在节点,为了保证各个大数据集群节点的keytabs一致,这里分发到所有节点。
- [root@node1 ~]# scp /home/keytabs/hbase.service.keytab node2:/home/keytabs/
- [root@node1 ~]# scp /home/keytabs/hbase.service.keytab node3:/home/keytabs/
- [root@node1 ~]# scp /home/keytabs/hbase.service.keytab node4:/home/keytabs/
- [root@node1 ~]# scp /home/keytabs/hbase.service.keytab node5:/home/keytabs/
分发完成后,在集群各个节点上执行如下命令,修改hbase.service.keytab密钥文件所属用户和组信息:
- chown root:hadoop /home/keytabs/hbase.service.keytab
- chmod 770 /home/keytabs/hbase.service.keytab
3) 修改hbase-site.xml配置文件
在所有hbase节点中修改HBASE_HOME/conf/hbase-site.xml文件内容,配置支持kerberos,向hbase-site.xml文件中追加如下内容:
- <!-- hbase开启安全认证 -->
- <property>
- <name>hbase.security.authorization</name>
- <value>true</value>
- </property>
- <!-- hbase配置kerberos安全认证 -->
- <property>
- <name>hbase.security.authentication</name>
- <value>kerberos</value>
- </property>
- <!-- HMaster配置kerberos安全凭据认证 -->
- <property>
- <name>hbase.master.kerberos.principal</name>
- <value>hbase/_HOST@EXAMPLE.COM</value>
- </property>
- <!-- HMaster配置kerberos安全证书keytab文件位置 -->
- <property>
- <name>hbase.master.keytab.file</name>
- <value>/home/keytabs/hbase.service.keytab</value>
- </property>
- <!-- Regionserver配置kerberos安全凭据认证 -->
- <property>
- <name>hbase.regionserver.kerberos.principal</name>
- <value>hbase/_HOST@EXAMPLE.COM</value>
- </property>
- <!-- Regionserver配置kerberos安全证书keytab文件位置 -->
- <property>
- <name>hbase.regionserver.keytab.file</name>
- <value>/home/keytabs/hbase.service.keytab</value>
- </property>
这里可以先在node3节点进行配置,配置完成后分发到node4、node5节点:
- [root@node3 ~]# scp /software/hbase-2.2.6/conf/hbase-site.xml node4:/software/hbase-2.2.6/conf/
- [root@node3 ~]# scp /software/hbase-2.2.6/conf/hbase-site.xml node5:/software/hbase-2.2.6/conf/
4) 准备hdfs-site.xml和core-site.xml文件
HBase底层存储使用HDFS,这里需要将Hadoop中hdfs-site.xml和core-site.xml文件发送到hbase各个节点HBASE_HOME/conf/目录中。在node3~node5各个HBase节点中执行如下命令:
- #node3~node5各个节点执行
- cp /software/hadoop-3.3.4/etc/hadoop/hdfs-site.xml /software/hbase-2.2.6/conf/
- cp /software/hadoop-3.3.4/etc/hadoop/core-site.xml /software/hbase-2.2.6/conf/
5) 准备zk-jaas.conf文件
HBase中会使用zookeeper管理元数据和选主,这里由于Zookeeper中已经开启了Kerberos认证,所以需要准备zk-jaas.conf文件连接zookeeper时进行认证。在HBase各个节点HBASE_HOME/conf目录下创建zk-jaas.conf文件,写入如下内容,不同的节点设置的principal是对应HBase主机节点。
node3节点HBASE_HOME/conf/zk-jaas.conf文件内容:
- Client {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- keyTab="/home/keytabs/hbase.service.keytab"
- useTicketCache=false
- principal="hbase/node3@EXAMPLE.COM";
- };
node4节点HBASE_HOME/conf/zk-jaas.conf文件内容:
- Client {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- keyTab="/home/keytabs/hbase.service.keytab"
- useTicketCache=false
- principal="hbase/node4@EXAMPLE.COM";
- };
node5节点HBASE_HOME/conf/zk-jaas.conf文件内容:
- Client {
- com.sun.security.auth.module.Krb5LoginModule required
- useKeyTab=true
- keyTab="/home/keytabs/hbase.service.keytab"
- useTicketCache=false
- principal="hbase/node5@EXAMPLE.COM";
- };
6) 修改hbase-env.sh配置
在各个hbase节点上配置HBASE_HOME/conf/hbase-env.sh,将HBASE_OPTS选项修改为如下:
- #node3~node5各个节点修改hbase-env.sh中的HBASE_OPTS配置
- export HBASE_OPTS="$HBASE_OPTS -XX:+UseConcMarkSweepGC -Djava.security.auth.login.config=/software/hbase-2.2.6/conf/zk-jaas.conf"
7) 修改HBase安装目录权限
在HBase各个节点创建HBASE_HOME/logs目录,如果该目录不存在需要提前创建,存在即可忽略,该目录为HBase日志目录,需要将该目录权限修改为root:hadoop、访问权限为770,否则hbase用户在启动Hbase集群时没有向该目录写日志权限。
- #在node3~node5 HBase节点执行命令创建HBASE_HOME/logs
- mkdir -p /software/hbase-2.2.6/logs
将各个节点的hbase安装目录权限修改为root:hadoop,logs目录访问权限为770:
- #在node3~node5 HBase节点执行命令
- chown -R root:hadoop /software/hbase-2.2.6
- chmod -R 770 /software/hbase-2.2.6/logs
在启动HBase集群之前,需要将Zookeeper和HDFS集群重启,Zookeeper和HDFS启动命令如下:
- #停止hadoop集群
- [root@node1 ~]# stop-all.sh
-
- #node3~node5节点,重启zookeeper
- [root@node3 ~]# zkServer.sh restart
- [root@node3 ~]# zkServer.sh restart
- [root@node3 ~]# zkServer.sh restart
-
- #启动HDFS集群
- [root@node1 ~]# start-all.sh
如果没有配置Kerberos认证时HBase之前启动过,在HDFS中将会有/hbase 目录,需要使用hdfs超级管理员用户将该目录的用户和所属组修改成hbase:hadoop。
- #在node5节点执行
- [root@node5 ~]# su hdfs
- [hdfs@node5 ~]$ kinit hdfs
- Password for hdfs@EXAMPLE.COM: 123456
-
- [hdfs@node5 logs]$ hdfs dfs -ls /
- Found 6 items
- drwxr-xr-x - root supergroup 0 2023-05-23 10:50 /hbase
-
- #修改用户和所属组
- [hdfs@node5 logs]$ hadoop dfs -chown -R hbase:hadoop /hbase
在HBase主节点node4上执行如下命令启动HBase集群:
- #无需切换hbase用户,可以直接在node4节点执行如下命令启动hbase集群
- [root@node4 ~]# sudo -i -u hbase start-hbase.sh
注意:以上启动Hbase集群命令中没有切换到hbase用户,sudo为使用超级用户权限执行命令,-i表示交互模式执行命令,-u指定切换的用户。
HBase启动后,可以通过http://node4:16010进行访问:
在node3~node5任意节点登录hbase shell,可以看到如果节点没有进行用户认证,读取HBase数据时没有权限:
- #假设在node4登录hbase客户端
- [root@node4 ~]# klist
- klist: No credentials cache found (filename: /tmp/krb5cc_0)
-
- [root@node4 ~]# hbase shell
- hbase(main):001:0> list
- TABLE
-
- ERROR: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
如果想要正常读取HBase中的数据需要进行客户端认证,这里以zhangsan用户为例,进行Kerberos主体认证,认证后操作HBase,操作如下:
- #进行用户认证
- [root@node4 ~]# kinit zhangsan
- Password for zhangsan@EXAMPLE.COM: 123456
- [root@node4 ~]# klist
- Ticket cache: FILE:/tmp/krb5cc_0
- Default principal: zhangsan@EXAMPLE.COM
-
- #登录hbase shell
- [root@node4 ~]# hbase shell
-
- hbase(main):001:0> create 'person','cf1','cf2'
- Created table person
-
- hbase(main):002:0> list
- person
-
- hbase(main):003:0> put 'person','row1','cf1:id','1'
-
- hbase(main):004:0> put 'person','row1','cf1:name','zs'
-
- hbase(main):005:0> put 'person','row1','cf1:age',18
-
- hbase(main):006:0> scan 'person'
- ROW COLUMN+CELL
- row1 column=cf1:age, timestamp=1684317908851, value=18
- row1 column=cf1:id, timestamp=1684317879179, value=1
- row1 column=cf1:name, timestamp=1684317894358, value=zs
java API操作经过Kerberos安全认证的HBase按照如下步骤实现即可。
1) 准备krb5.conf文件
将node1 kerberos服务端/etc/krb5.conf文件存放在IDEA项目中的resources资源目录中或者本地Window固定的某个目录中,用于编写代码时指定访问Kerberos的Realm。
2) 生成用户keytab文件
在kerberos服务端node1节点上,执行如下命令,对zhangsan用户主体生成keytab密钥文件。
- #在node1 kerberos服务端执行
- [root@node1 ~]# kadmin.local -q "ktadd -norandkey -kt /root/zhangsan.keytab zhangsan@EXAMPLE.COM"
以上命令执行之后,在/root目录下会生成zhangsan.keytab文件,将该文件复制到IDEA项目中的resources资源目录中或者本地window固定的某个目录中,该文件用于编写代码时认证kerberos。
3) 准备访问HDFS需要的资源文件
将HDFS中的core-site.xml 、hdfs-site.xml 、yarn-site.xml、hbase-site.xml文件上传到项目resources资源目录中。
4) 编写代码操作HBase
项目代码中引入如下maven依赖:
- <!-- hbase依赖包 -->
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-server</artifactId>
- <version>2.2.6</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hbase</groupId>
- <artifactId>hbase-client</artifactId>
- <version>2.2.6</version>
- </dependency>
编写Java代码操作Kerberos认证的HBase代码如下:
- /**
- * Java API 操作Kerberos 认证的HBase
- */
- public class OperateAuthHBase {
-
- private static Configuration conf;
- private static Connection connection;
- private static Admin admin;
-
- /**
- * 初始化:进行kerberos认证并设置hbase configuration
- */
- static {
- try {
- String principal = "zhangsan@EXAMPLE.COM";
- String keytabPath = "D:\\idea_space\\KerberosAuth\\KerberosAuthHBase\\src\\main\\resources\\zhangsan.keytab";
-
- conf = HBaseConfiguration.create();
- // 设置Kerberos 认证
- System.setProperty("java.security.krb5.conf", "D:\\idea_space\\KerberosAuth\\KerberosAuthHBase\\src\\main\\resources\\krb5.conf");
- conf.set("hadoop.security.authentication", "kerberos");
- UserGroupInformation.setConfiguration(conf);
- UserGroupInformation.loginUserFromKeytab(principal, keytabPath);
-
- //设置zookeeper集群地址及端口号
- conf.set("hbase.zookeeper.quorum","node3,node4,node5");
- conf.set("hbase.zookeeper.property.clientPort","2181");
-
- //创建连接
- connection = ConnectionFactory.createConnection(conf);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
-
- public static void main(String[] args) throws IOException {
- try{
- //获取Admin对象
- admin = connection.getAdmin();
-
- //创建表
- createHBaseTable("tbl1", "cf1");
- System.out.println("===========================");
-
- //查看Hbase中所有表
- listHBaseTables();
- System.out.println("===========================");
-
- //向表中插入数据
- putTableData("tbl1","row1", "cf1", "id", "100");
- putTableData("tbl1","row1", "cf1", "name", "zs");
- putTableData("tbl1","row1", "cf1", "age", "18");
-
- putTableData("tbl1","row2", "cf1", "id", "200");
- putTableData("tbl1","row2", "cf1", "name", "ls");
- putTableData("tbl1","row2", "cf1", "age", "19");
- System.out.println("===========================");
-
- //查询表中的数据
- scanTableData("tbl1");
- System.out.println("===========================");
-
- //查询指定row数据
- getRowData("tbl1", "row1","cf1","name");
- System.out.println("===========================");
-
- //删除指定row数据
- deleteRowData("tbl1", "row1","cf1","name");
- System.out.println("===========================");
-
- //查询表中的数据
- scanTableData("tbl1");
- System.out.println("===========================");
-
- //删除表
- deleteHBaseTable("tbl1");
-
- //查看Hbase中所有表
- listHBaseTables();
- System.out.println("===========================");
- }catch (Exception e) {
- e.printStackTrace();
- }finally {
- if(admin != null) {
- admin.close();
- }
- if(connection != null) {
- connection.close();
- }
- }
-
- }
-
- //判断表是否存在
- private static boolean isTableExist(String tableName) {
- try {
- return admin.tableExists(TableName.valueOf(tableName));
- } catch (IOException e) {
- e.printStackTrace();
- }
- return false;
- }
-
- private static void createHBaseTable(String tableName, String... cfs) {
- if(isTableExist(tableName)) {
- System.out.println("表已经存在!");
- return;
- }
- if(cfs == null || cfs.length == 0){
- System.out.println("列族不能为空!");
- return;
- }
- TableDescriptorBuilder tableDescriptorBuilder = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName));
- List<ColumnFamilyDescriptor> cFDBList = new ArrayList<>();
- for (String cf : cfs) {
- cFDBList.add(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes(cf)).build());
- }
- tableDescriptorBuilder.setColumnFamilies(cFDBList);
- try {
- admin.createTable(tableDescriptorBuilder.build());
- System.out.println("创建表" + tableName + "成功!");
- } catch (IOException e) {
- System.out.println("创建失败!");
- e.printStackTrace();
- }
-
- }
-
- //查看Hbase中所有表
- private static void listHBaseTables() {
- try {
- TableName[] tableNames = admin.listTableNames();
- System.out.println("打印所有命名空间表名:");
- for (TableName tableName : tableNames) {
- System.out.println(tableName);
- }
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
-
- //向表中插入数据
- private static void putTableData(String tableName, String rowKey, String cf, String column, String value) {
- Table table = null;
- try {
- table = connection.getTable(TableName.valueOf(tableName));
- Put p = new Put(Bytes.toBytes(rowKey));
- p.addColumn(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
- table.put(p);
- System.out.println("插入数据成功!");
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- closeTable(table);
- }
-
-
- }
-
- //查询表中的数据
- private static void scanTableData(String tableName) {
- if(tableName == null || tableName.length() == 0) {
- System.out.println("请正确输入表名!");
- return;
- }
- Table table = null;
- try {
- table = connection.getTable(TableName.valueOf(tableName));
- Scan scan = new Scan();
-
- ResultScanner resultScanner = table.getScanner(scan);
- Iterator<Result> iterator = resultScanner.iterator();
- while(iterator.hasNext()) {
- Result result = iterator.next();
- Cell[] cells = result.rawCells();
- for(Cell cell : cells) {
- //得到rowkey
- System.out.println("rowkey: " + Bytes.toString(CellUtil.cloneRow(cell)));
- //得到列族
- System.out.println("列族: " + Bytes.toString(CellUtil.cloneFamily(cell)));
- System.out.println("列: " + Bytes.toString(CellUtil.cloneQualifier(cell)));
- System.out.println("值: " + Bytes.toString(CellUtil.cloneValue(cell)));
- }
- }
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- closeTable(table);
- }
- }
-
- //查询指定row数据
- private static void getRowData(String tableName, String rowkey,String colFamily, String col) {
- Table table = null;
- try {
- table = connection.getTable(TableName.valueOf(tableName));
- Get g = new Get(Bytes.toBytes(rowkey));
- // 获取指定列族数据
- if(col == null && colFamily != null) {
- g.addFamily(Bytes.toBytes(colFamily));
- } else if(col != null && colFamily != null) {
- // 获取指定列数据
- g.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
- }
- Result result = table.get(g);
- System.out.println("查询指定rowkey 数据如下");
-
- for(Cell cell : result.rawCells()) {
- //得到rowkey
- System.out.println("rowkey: " + Bytes.toString(CellUtil.cloneRow(cell)));
- //得到列族
- System.out.println("列族: " + Bytes.toString(CellUtil.cloneFamily(cell)));
- System.out.println("列: " + Bytes.toString(CellUtil.cloneQualifier(cell)));
- System.out.println("值: " + Bytes.toString(CellUtil.cloneValue(cell)));
- }
-
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- closeTable(table);
- }
-
- }
-
- //删除指定row数据
- private static void deleteRowData(String tableName , String rowKey, String colFamily, String ... cols) {
- if(tableName == null || tableName.length() == 0) {
- System.out.println("请正确输入表名!");
- return;
- }
- Table table = null;
- try {
- table = connection.getTable(TableName.valueOf(tableName));
- Delete del = new Delete(Bytes.toBytes(rowKey));
- // 删除指定列族
- del.addFamily(Bytes.toBytes(colFamily));
- // 删除指定列
- for (String col : cols) {
- del.addColumn(Bytes.toBytes(colFamily), Bytes.toBytes(col));
- }
- table.delete(del);
- System.out.println("删除数据成功!");
- } catch (IOException e) {
- e.printStackTrace();
- } finally {
- closeTable(table);
- }
-
- }
-
- //删除表
- private static void deleteHBaseTable(String tableName) {
- try {
- if(!isTableExist(tableName)) {
- System.out.println("要删除的表不存在!");
- return;
- }
- admin.disableTable(TableName.valueOf(tableName));
- admin.deleteTable(TableName.valueOf(tableName));
-
- } catch (IOException e) {
- e.printStackTrace();
- }
-
- }
-
- //关闭表
- private static void closeTable(Table table) {
- if(table != null) {
- try {
- table.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
- }
欢迎点赞、评论、收藏,关注IT贫道,获取IT技术知识!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。