赞
踩
最近在做一个项目,需要先拉取hbase上面的数据,然后进行指标计算。
但是集群Hadoop生态圈都设置了Kerberos认证,故记录此文,解决Kerberos认证过程中的问题。
代码需要先获取Hbase连接,而连接需要先通过Kerberos认证。(详细关注kerberos()这个方法)。然后拿着连接去遍历Hbase表,表名是根据当前时间生成的。然后根据rowkey和column添加过滤器,遍历拿出相关数据。
注意://cluster模式下测试连接HBase时一定要使用UGI的doAs方法,把连接写在里面,这样就可以避免连接不上报GSS错误。(详见代码最后面)
-
- package com.bigdata.process;
-
- /**
- * author: DahongZhou
- * Date:
- */
-
-
- import com.bigdata.bean.HbaseBean;
- import com.bigdata.utils.db.GenerateTableName;
- import com.bigdata.utils.Logger;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.hbase.Cell;
- import org.apache.hadoop.hbase.CellUtil;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.*;
- import org.apache.hadoop.hbase.filter.*;
- import org.apache.hadoop.hbase.util.Bytes;
- import org.apache.hadoop.security.UserGroupInformation;
- import org.apache.spark.api.java.JavaRDD;
- import org.apache.spark.api.java.JavaSparkContext;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.sql.DataFrame;
- import org.apache.spark.sql.SQLContext;
-
- import java.io.IOException;
- import java.security.PrivilegedAction;
- import java.text.ParseException;
- import java.util.ArrayList;
- import java.util.Iterator;
- import java.util.List;
- import java.util.concurrent.Executors;
-
- /**
- * scan hbase中的数据,并封装成dataframe
- */
- public class HbaseProcessed {
- static Logger logger = Logger.getLogger(HbaseProcessed.class);
- private static Connection connection;
- static Configuration conf = null;
- static String tableName1 = "AMR:DWD_AMR_CQ_METER-E-DAY_201806";
- static String tableName_pre = "AMR:DWD_AMR_CQ_METER-E-DAY_";
- static String hbaseStart = "201701";
- static DataFrame dfs = null;
- static JavaSparkContext javaSparkContext = null;
- static SQLContext sqlContext = null;
- static DataFrame temp_df = null;
-
-
- /**
- * 将hbase数据union为一张总dataframe
- *
- * @param javaSparkContext
- * @param sqlContext
- * @return hbase表数据的dataFrame
- * @throws ParseException
- * @throws IOException
- */
- public static DataFrame getDfs(JavaSparkContext javaSparkContext, SQLContext sqlContext,
- String rowKey, String colunm) throws ParseException, IOException {
- //创建空的dataframe用来union全部hbase表的dataframe
- ArrayList<String> tempList = new ArrayList<>();
- JavaRDD<String> temp_rdd = javaSparkContext.parallelize(tempList, 1);
- temp_df = sqlContext.createDataFrame(temp_rdd, HbaseBean.class);
-
- //将空的带有schema的dataframe赋值给dfs,不然在unionAll的时候会出错
- dfs = temp_df;
-
- //获取全部表名
- ArrayList<String> tables = GenerateTableName.getTables();
-
- //获取每个表,将每个hbase表union得到一个大的dataframe
- for (int i = 0; i < tables.size(); i++) {
- DataFrame df = generateDataFrame(javaSparkContext, sqlContext, tables.get(i), rowKey, colunm);
- dfs = dfs.unionAll(df);
- }
-
- return dfs;
- }
-
-
- /**
- *
- * @param javaSparkContext
- * @param sqlContext
- * @param tableName
- * @return
- * @throws IOException
- */
- // public static DataFrame getHbaseDataFrame(JavaSparkContext javaSparkContext, SQLContext sqlContext, String tableName) throws IOException {
- // return generateDataFrame(javaSparkContext, sqlContext, tableName);
- // }
-
- /**
- * 利用Scan对Hasee表进行筛选并放入到RDD里面然后生成dataFrame
- *
- * @param javaSparkContext
- * @param sqlContext
- * @param tableName
- * @return hbase表数据筛选后的dataFrame
- * @throws IOException
- */
- public static DataFrame generateDataFrame(JavaSparkContext javaSparkContext, SQLContext sqlContext,
- String tableName, String rowKey, String colunm) throws IOException {
- DataFrame df = null;
- // 1)创建一个hbase的connection
- //默认会读取配置文件hbase-site.xml
-
- connection = getConnection();
- //String tableName = "AMR:DWD_AMR_CQ_METER-E-DAY_201912";
- String split = ","; //读取hbase的数据,并用该分隔符来分割,包括rowkey、列名(电量表id)、值(电量数)
-
-
- /**
- * 并不知道rowkey的具体值,获取全表的数据。如果数据量巨大,这个方法非常消耗性能。
- * 一般配合,选取范围或filter使用
- */
- logger.warn("-----------扫描全表的数据-------------");
- Scan scan = new Scan();
-
- //添加数据筛选的范围,为拼接的rowkey范围
- String[] rowkeyDateArr = gennerateRowkeyDateArr(tableName);
- scan.setStartRow(Bytes.toBytes(rowKey + "." + rowkeyDateArr[0]));
- scan.setStopRow(Bytes.toBytes(rowKey + "." + rowkeyDateArr[1]));
-
- //设置scan的过滤器:只scan得到rowkey中以“PAP_E”结尾的数据
- Filter suffix_filter = new RowFilter(CompareFilter.CompareOp.EQUAL,
- new RegexStringComparator(".*PAP_E$"));
-
-
- //rowkey以包含传入的用户编号,其对应的台区编码和供电单位编号的数据
- ArrayList<Filter> listForFilters = new ArrayList<Filter>();
-
- listForFilters.add(suffix_filter); //添加rowkey后缀过滤器
-
-
- Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKey));
- listForFilters.add(filter); //遍历添加‘台区编码和供电单位编号的数据’
-
- SubstringComparator substringComparator = new SubstringComparator(colunm);
- Filter columnFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
- listForFilters.add(columnFilter);
-
- Filter filters = new FilterList(FilterList.Operator.MUST_PASS_ALL, listForFilters);
- Scan scans = scan.setFilter(filters);
-
- scans.setMaxResultSize(50);
- scans.setCaching(100);
- scans.setBatch(5);
- scan.setCacheBlocks(false);// no read cache
- scan.addFamily(Bytes.toBytes("DATA"));
-
-
- logger.warn("开始获取表。。。");
- //获取表
- Table table = connection.getTable(TableName.valueOf(tableName));
- logger.warn("connection.toString() is :==>" + connection.toString());
- logger.warn("table.toString() is :==>" + table.toString());
-
- ResultScanner scanner = null;
- boolean tableExist = isTableExist(tableName, connection);
- if (tableExist) { //判断表是否存在
- logger.warn(tableName + " isTableExist: " + tableExist);
- scanner = table.getScanner(scans);
- logger.warn(" scanner 对象: " + scanner);
-
- Result result = null;
- ArrayList<String> list = new ArrayList<>();
- while ((result = scanner.next()) != null) {
- logger.warn(" result 对象: " + result);
-
- String rowkey = "";
- // String family = "";
- String column = "";
- String value = "";
- List<Cell> cells = result.listCells(); //获取到hbase表的Cells
- for (Cell cell : cells) { //遍历Cells,并封装到List<String>集合中
- // 列簇、列名、值、rowkey
- // 打印rowkey,family,qualifier,value
- rowkey = Bytes.toString(CellUtil.cloneRow(cell));
- //family = Bytes.toString(CellUtil.cloneFamily(cell));
- column = Bytes.toString(CellUtil.cloneQualifier(cell));
- value = Bytes.toString(CellUtil.cloneValue(cell));
- if (column.isEmpty() || column.equals("null")) {
- column = "0";//空值设置为"0"
- }
- if (value.isEmpty() || value.equals("null")) {
- value = "0";//空值设置为"0"
- }
- list.add(rowkey + split + column + split + value);
-
- //System.out.println(rowkey + split + family + split + column + split + value);
- }
- }
-
- logger.warn("hbase表" + tableName + "数据:" + list);
-
- //通过list集合的大小来设置rdd的分区数(分区的个数尽量等于集群中的CPU核心(core)数目)
- int list_size = list.size();
- int partitions = 1; //分区数,默认为1
- int per_partitions = 1000; //每个分区的数据条数
- if (list_size > per_partitions) {
- partitions = (int) (list_size / per_partitions) + 1;
- }
- // System.out.println(tableName + "表,共有:" + list_size + "条数据");
- // System.out.println("rdd的分区数为:" + partitions);
- // 通过list,生成rdd
- JavaRDD<String> rdd = javaSparkContext.parallelize(list, partitions);
- // System.out.println("rdd count "+rdd.count());
-
- //将rdd封装成HbaseBean类型,此处调用mapPartitions(在每个分区中处理,能提高性能)
- JavaRDD<HbaseBean> rowRDD = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, HbaseBean>() {
- @Override
- public Iterable<HbaseBean> call(Iterator<String> stringIterator) throws Exception {
- ArrayList<HbaseBean> arrayList = new ArrayList<HbaseBean>();
-
- while (stringIterator.hasNext()) { //遍历rdd每一行,并封装成HbaseBean,然后添加到ArrayList<HbaseBean>
- String next = stringIterator.next();
- String parts[] = next.split(",");
- HbaseBean bean = new HbaseBean();
- bean.setRowkey(parts[0]);
- // bean.setFamily(parts[1]);
- bean.setDlb_id(parts[1]);
- bean.setDlb_dl(parts[2]);
- arrayList.add(bean);
- }
- return arrayList;
- }
- });
- // System.out.println("rowRDD count "+rowRDD.count());
-
- // System.out.println(tableName+"---------out tableName---------");
- // if (!GenerateTableName.nowMonthTable(tableName)) {
- // //如果hbase表是上个月,则将hbase的数据rdd checkpoint起来,方便后面调用
- // // System.out.println(tableName+"---------inner tableName---------");
- // // 在checkpoint之前,需要先cache,可以提高性能。因为checkpoint会用一个新的线程去计算rdd,然后再存储到磁盘中
- // rowRDD.cache();
- // rowRDD.checkpoint();
- // rowRDD.unpersist();
- // }
-
- //创建hbase表的DataFrame
- df = sqlContext.createDataFrame(rowRDD, HbaseBean.class);
- if (df == null) {
- df = temp_df; //将空的带有schema的dataframe赋值给dfs,不然在unionAll的时候会出错
- }
- } else df = temp_df;
-
-
- // df.cache();
- // connection.close();
- // df.show(100,false);
- // scanner.close();
- // table.close();
- return df;
- }
-
-
- /**
- * 判断表是否存在
- *
- * @param tableName
- * @return 各表是否存在
- * @throws IOException
- */
-
- public static boolean isTableExist(String tableName, Connection connection) throws IOException {
- connection = getConnection();
- Admin admin = connection.getAdmin();
-
- return admin.tableExists(TableName.valueOf(tableName));
- }
-
- /**
- * kerberos认证,用户为USER_SYX,密钥文件为服务器上的/app/keytab/USER_SYX.keytab,配置文件为服务器上的/etc/krb5.conf
- *
- * @return
- */
- public static Configuration kerberos() throws IOException {
- Configuration HBASE_CONFIG = new Configuration();
- HBASE_CONFIG.set("hbase.zookeeper.quorum", "node01,node02,node03");
- HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
- HBASE_CONFIG.set("hbase.rpc.timeout", "1800000");
- HBASE_CONFIG.set("hbase.master.kerberos.principal", "hbase/_HOST@tsq.com");
- HBASE_CONFIG.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@tsq.com");
- HBASE_CONFIG.set("hbase.security.authentication", "kerberos");
- HBASE_CONFIG.set("hadoop.security.authentication", "kerberos");
- HBASE_CONFIG.set("zookeeper.znode.parent", "/hbase");
- conf = HBaseConfiguration.create(HBASE_CONFIG);
-
- //添加hbase-site配置文件,此方式不行,因为也需要先通过hdfs的kerberos认证
- // hbase_site_file = "hdfs://node01:8020/user/hbase/hbase-site.xml";
- // krb5Path = "hdfs://node01:8020/user/hbase/krb5.conf";
- // keytabPath = "hdfs://node01:8020/user/hbase/USER_SYX.keytab";
-
- // String principal = "USER_xxx@tsq.com";
- // String principal = "hbase/USER_xxx@tsq.com";
- String principal = "USER_xxx";
- // String principal = "_HOST@tsq.com";
-
-
- //--files会把文件上传到hdfs的.sparkStagin/applicationId目录下,System.getenv("SPARK_YARN_STAGING_DIR")能获取到这个目录
-
- // krb5.conf这个文件是从远程服务器上copy下来的
- System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
-
- // 这个hbase.keytab也是从远程服务器上copy下来的, 里面存储的是密码相关信息
- // 这样我们就不需要交互式输入密码了
- // conf.set("keytab.file", "/app/keytab/USER_SYX.keytab");
- // 这个可以理解成用户名信息,也就是Principal
- // conf.set("kerberos.principal", "hbase/USER_SYX@tsq.com");
- UserGroupInformation.setConfiguration(conf);
- logger.warn("-------------kerberos conf添加完成");
-
- // SparkFiles.get获取的目录是driver node下的本地目录
- // String xmlFile = SparkFiles.get("/app/test/config/hbase-site.xml");
- // String confFile = SparkFiles.get("/app/test/config/krb5.conf");
- // String keytabFile = SparkFiles.get("/app/test/config/USER_SYX.keytab");
- // logger.warn(xmlFile + "=====" + confFile + "=====" + keytabFile);
-
-
- try {
- UserGroupInformation.loginUserFromKeytab(principal, "/app/keytab/USER_SYX.keytab");
- logger.warn("-------------loginUserFromKeytab完成");
-
- } catch (IOException e) {
- e.printStackTrace();
- System.exit(1);
- }
- logger.warn("********** HBase Succeeded in authenticating through Kerberos! **********");
-
- return conf;
- }
-
- /**
- * 与hbase建立连接
- *
- * @return
- * @throws IOException
- */
- public static Connection getConnection() throws IOException {
- if (connection == null) {
- // 集群启用了kerberos认证
- conf = kerberos();//集群启用了kerberos认证,没有认证的话,将这行注释掉即可
-
- // kerberos认证集群必须传递已经认证过的conf
- // conf.set("hbase.zookeeper.quorum", "node01,node02,node03");
- // conf.set("hbase.zookeeper.property.clientPort", "2181");
-
-
- //cluster模式下测试连接HBase时一定要使用UGI的doAs方法,把连接写在里面,这样就可以避免连接不上报GSS错误。
- UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI("USER_xxx", "/app/keytab/USER_SYX.keytab");
-
- connection = ugi.doAs(new PrivilegedAction<Connection>() {
- @Override
- public Connection run() {
- try {
- HbaseProcessed.connection = ConnectionFactory.createConnection(conf, Executors.newFixedThreadPool(4));
- } catch (IOException e) {
- logger.warn("PrivilegedAction!! ===>" + e);
- }
- return HbaseProcessed.connection;
- }
- }
- );
-
- }
- return connection;
- }
-
-
- /**
- * 根据hbase的表名,来拼接scan的起始rowkey和结束rowkey
- *
- * @param tableName hbase的表名
- * @return 起始rowkey和结束rowkey组成的数组,即{起始rowkey,结束rowkey}
- */
- private static String[] gennerateRowkeyDateArr(String tableName) {
- // AMR:DWD_AMR_CQ_METER-E-DAY_201911
- String[] tableNameArr = tableName.split("_");
- String year_month = tableNameArr[tableNameArr.length - 1];
- String year = year_month.substring(0, 5);
- String month = year_month.substring(5);
-
- System.out.println("year month is " + year + month);
- //NA.0410354369.01202019000000.PAP_E
- //startRowkey返回时间格式为month day year 000000
- //stopRowkey返回时间格式为month day year 999999
- String startRow = month + "00" + year + "000000" + ".PAP_E";
- String stopRow = month + "31" + year + "999999" + ".PAP_E";
- String[] rowkeyRangeArr = {startRow, stopRow};
- System.out.println(rowkeyRangeArr);
- return rowkeyRangeArr;
- }
-
- }
-
-
-

Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。