当前位置:   article > 正文

Hbase如何通过Kerberos认证获取数据_habse2.0 keberos

habse2.0 keberos

        最近在做一个项目,需要先拉取hbase上面的数据,然后进行指标计算。

        但是集群Hadoop生态圈都设置了Kerberos认证,故记录此文,解决Kerberos认证过程中的问题。

        代码需要先获取Hbase连接,而连接需要先通过Kerberos认证。(详细关注kerberos()这个方法)。然后拿着连接去遍历Hbase表,表名是根据当前时间生成的。然后根据rowkey和column添加过滤器,遍历拿出相关数据。

        注意://cluster模式下测试连接HBase时一定要使用UGI的doAs方法,把连接写在里面,这样就可以避免连接不上报GSS错误。(详见代码最后面)

  1. package com.bigdata.process;
  2. /**
  3. * author: DahongZhou
  4. * Date:
  5. */
  6. import com.bigdata.bean.HbaseBean;
  7. import com.bigdata.utils.db.GenerateTableName;
  8. import com.bigdata.utils.Logger;
  9. import org.apache.hadoop.conf.Configuration;
  10. import org.apache.hadoop.hbase.Cell;
  11. import org.apache.hadoop.hbase.CellUtil;
  12. import org.apache.hadoop.hbase.HBaseConfiguration;
  13. import org.apache.hadoop.hbase.TableName;
  14. import org.apache.hadoop.hbase.client.*;
  15. import org.apache.hadoop.hbase.filter.*;
  16. import org.apache.hadoop.hbase.util.Bytes;
  17. import org.apache.hadoop.security.UserGroupInformation;
  18. import org.apache.spark.api.java.JavaRDD;
  19. import org.apache.spark.api.java.JavaSparkContext;
  20. import org.apache.spark.api.java.function.FlatMapFunction;
  21. import org.apache.spark.sql.DataFrame;
  22. import org.apache.spark.sql.SQLContext;
  23. import java.io.IOException;
  24. import java.security.PrivilegedAction;
  25. import java.text.ParseException;
  26. import java.util.ArrayList;
  27. import java.util.Iterator;
  28. import java.util.List;
  29. import java.util.concurrent.Executors;
  30. /**
  31. * scan hbase中的数据,并封装成dataframe
  32. */
  33. public class HbaseProcessed {
  34. static Logger logger = Logger.getLogger(HbaseProcessed.class);
  35. private static Connection connection;
  36. static Configuration conf = null;
  37. static String tableName1 = "AMR:DWD_AMR_CQ_METER-E-DAY_201806";
  38. static String tableName_pre = "AMR:DWD_AMR_CQ_METER-E-DAY_";
  39. static String hbaseStart = "201701";
  40. static DataFrame dfs = null;
  41. static JavaSparkContext javaSparkContext = null;
  42. static SQLContext sqlContext = null;
  43. static DataFrame temp_df = null;
  44. /**
  45. * 将hbase数据union为一张总dataframe
  46. *
  47. * @param javaSparkContext
  48. * @param sqlContext
  49. * @return hbase表数据的dataFrame
  50. * @throws ParseException
  51. * @throws IOException
  52. */
  53. public static DataFrame getDfs(JavaSparkContext javaSparkContext, SQLContext sqlContext,
  54. String rowKey, String colunm) throws ParseException, IOException {
  55. //创建空的dataframe用来union全部hbase表的dataframe
  56. ArrayList<String> tempList = new ArrayList<>();
  57. JavaRDD<String> temp_rdd = javaSparkContext.parallelize(tempList, 1);
  58. temp_df = sqlContext.createDataFrame(temp_rdd, HbaseBean.class);
  59. //将空的带有schema的dataframe赋值给dfs,不然在unionAll的时候会出错
  60. dfs = temp_df;
  61. //获取全部表名
  62. ArrayList<String> tables = GenerateTableName.getTables();
  63. //获取每个表,将每个hbase表union得到一个大的dataframe
  64. for (int i = 0; i < tables.size(); i++) {
  65. DataFrame df = generateDataFrame(javaSparkContext, sqlContext, tables.get(i), rowKey, colunm);
  66. dfs = dfs.unionAll(df);
  67. }
  68. return dfs;
  69. }
  70. /**
  71. *
  72. * @param javaSparkContext
  73. * @param sqlContext
  74. * @param tableName
  75. * @return
  76. * @throws IOException
  77. */
  78. // public static DataFrame getHbaseDataFrame(JavaSparkContext javaSparkContext, SQLContext sqlContext, String tableName) throws IOException {
  79. // return generateDataFrame(javaSparkContext, sqlContext, tableName);
  80. // }
  81. /**
  82. * 利用Scan对Hasee表进行筛选并放入到RDD里面然后生成dataFrame
  83. *
  84. * @param javaSparkContext
  85. * @param sqlContext
  86. * @param tableName
  87. * @return hbase表数据筛选后的dataFrame
  88. * @throws IOException
  89. */
  90. public static DataFrame generateDataFrame(JavaSparkContext javaSparkContext, SQLContext sqlContext,
  91. String tableName, String rowKey, String colunm) throws IOException {
  92. DataFrame df = null;
  93. // 1)创建一个hbase的connection
  94. //默认会读取配置文件hbase-site.xml
  95. connection = getConnection();
  96. //String tableName = "AMR:DWD_AMR_CQ_METER-E-DAY_201912";
  97. String split = ","; //读取hbase的数据,并用该分隔符来分割,包括rowkey、列名(电量表id)、值(电量数)
  98. /**
  99. * 并不知道rowkey的具体值,获取全表的数据。如果数据量巨大,这个方法非常消耗性能。
  100. * 一般配合,选取范围或filter使用
  101. */
  102. logger.warn("-----------扫描全表的数据-------------");
  103. Scan scan = new Scan();
  104. //添加数据筛选的范围,为拼接的rowkey范围
  105. String[] rowkeyDateArr = gennerateRowkeyDateArr(tableName);
  106. scan.setStartRow(Bytes.toBytes(rowKey + "." + rowkeyDateArr[0]));
  107. scan.setStopRow(Bytes.toBytes(rowKey + "." + rowkeyDateArr[1]));
  108. //设置scan的过滤器:只scan得到rowkey中以“PAP_E”结尾的数据
  109. Filter suffix_filter = new RowFilter(CompareFilter.CompareOp.EQUAL,
  110. new RegexStringComparator(".*PAP_E$"));
  111. //rowkey以包含传入的用户编号,其对应的台区编码和供电单位编号的数据
  112. ArrayList<Filter> listForFilters = new ArrayList<Filter>();
  113. listForFilters.add(suffix_filter); //添加rowkey后缀过滤器
  114. Filter filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(rowKey));
  115. listForFilters.add(filter); //遍历添加‘台区编码和供电单位编号的数据’
  116. SubstringComparator substringComparator = new SubstringComparator(colunm);
  117. Filter columnFilter = new QualifierFilter(CompareFilter.CompareOp.EQUAL, substringComparator);
  118. listForFilters.add(columnFilter);
  119. Filter filters = new FilterList(FilterList.Operator.MUST_PASS_ALL, listForFilters);
  120. Scan scans = scan.setFilter(filters);
  121. scans.setMaxResultSize(50);
  122. scans.setCaching(100);
  123. scans.setBatch(5);
  124. scan.setCacheBlocks(false);// no read cache
  125. scan.addFamily(Bytes.toBytes("DATA"));
  126. logger.warn("开始获取表。。。");
  127. //获取表
  128. Table table = connection.getTable(TableName.valueOf(tableName));
  129. logger.warn("connection.toString() is :==>" + connection.toString());
  130. logger.warn("table.toString() is :==>" + table.toString());
  131. ResultScanner scanner = null;
  132. boolean tableExist = isTableExist(tableName, connection);
  133. if (tableExist) { //判断表是否存在
  134. logger.warn(tableName + " isTableExist: " + tableExist);
  135. scanner = table.getScanner(scans);
  136. logger.warn(" scanner 对象: " + scanner);
  137. Result result = null;
  138. ArrayList<String> list = new ArrayList<>();
  139. while ((result = scanner.next()) != null) {
  140. logger.warn(" result 对象: " + result);
  141. String rowkey = "";
  142. // String family = "";
  143. String column = "";
  144. String value = "";
  145. List<Cell> cells = result.listCells(); //获取到hbase表的Cells
  146. for (Cell cell : cells) { //遍历Cells,并封装到List<String>集合中
  147. // 列簇、列名、值、rowkey
  148. // 打印rowkey,family,qualifier,value
  149. rowkey = Bytes.toString(CellUtil.cloneRow(cell));
  150. //family = Bytes.toString(CellUtil.cloneFamily(cell));
  151. column = Bytes.toString(CellUtil.cloneQualifier(cell));
  152. value = Bytes.toString(CellUtil.cloneValue(cell));
  153. if (column.isEmpty() || column.equals("null")) {
  154. column = "0";//空值设置为"0"
  155. }
  156. if (value.isEmpty() || value.equals("null")) {
  157. value = "0";//空值设置为"0"
  158. }
  159. list.add(rowkey + split + column + split + value);
  160. //System.out.println(rowkey + split + family + split + column + split + value);
  161. }
  162. }
  163. logger.warn("hbase表" + tableName + "数据:" + list);
  164. //通过list集合的大小来设置rdd的分区数(分区的个数尽量等于集群中的CPU核心(core)数目)
  165. int list_size = list.size();
  166. int partitions = 1; //分区数,默认为1
  167. int per_partitions = 1000; //每个分区的数据条数
  168. if (list_size > per_partitions) {
  169. partitions = (int) (list_size / per_partitions) + 1;
  170. }
  171. // System.out.println(tableName + "表,共有:" + list_size + "条数据");
  172. // System.out.println("rdd的分区数为:" + partitions);
  173. // 通过list,生成rdd
  174. JavaRDD<String> rdd = javaSparkContext.parallelize(list, partitions);
  175. // System.out.println("rdd count "+rdd.count());
  176. //将rdd封装成HbaseBean类型,此处调用mapPartitions(在每个分区中处理,能提高性能)
  177. JavaRDD<HbaseBean> rowRDD = rdd.mapPartitions(new FlatMapFunction<Iterator<String>, HbaseBean>() {
  178. @Override
  179. public Iterable<HbaseBean> call(Iterator<String> stringIterator) throws Exception {
  180. ArrayList<HbaseBean> arrayList = new ArrayList<HbaseBean>();
  181. while (stringIterator.hasNext()) { //遍历rdd每一行,并封装成HbaseBean,然后添加到ArrayList<HbaseBean>
  182. String next = stringIterator.next();
  183. String parts[] = next.split(",");
  184. HbaseBean bean = new HbaseBean();
  185. bean.setRowkey(parts[0]);
  186. // bean.setFamily(parts[1]);
  187. bean.setDlb_id(parts[1]);
  188. bean.setDlb_dl(parts[2]);
  189. arrayList.add(bean);
  190. }
  191. return arrayList;
  192. }
  193. });
  194. // System.out.println("rowRDD count "+rowRDD.count());
  195. // System.out.println(tableName+"---------out tableName---------");
  196. // if (!GenerateTableName.nowMonthTable(tableName)) {
  197. // //如果hbase表是上个月,则将hbase的数据rdd checkpoint起来,方便后面调用
  198. // // System.out.println(tableName+"---------inner tableName---------");
  199. // // 在checkpoint之前,需要先cache,可以提高性能。因为checkpoint会用一个新的线程去计算rdd,然后再存储到磁盘中
  200. // rowRDD.cache();
  201. // rowRDD.checkpoint();
  202. // rowRDD.unpersist();
  203. // }
  204. //创建hbase表的DataFrame
  205. df = sqlContext.createDataFrame(rowRDD, HbaseBean.class);
  206. if (df == null) {
  207. df = temp_df; //将空的带有schema的dataframe赋值给dfs,不然在unionAll的时候会出错
  208. }
  209. } else df = temp_df;
  210. // df.cache();
  211. // connection.close();
  212. // df.show(100,false);
  213. // scanner.close();
  214. // table.close();
  215. return df;
  216. }
  217. /**
  218. * 判断表是否存在
  219. *
  220. * @param tableName
  221. * @return 各表是否存在
  222. * @throws IOException
  223. */
  224. public static boolean isTableExist(String tableName, Connection connection) throws IOException {
  225. connection = getConnection();
  226. Admin admin = connection.getAdmin();
  227. return admin.tableExists(TableName.valueOf(tableName));
  228. }
  229. /**
  230. * kerberos认证,用户为USER_SYX,密钥文件为服务器上的/app/keytab/USER_SYX.keytab,配置文件为服务器上的/etc/krb5.conf
  231. *
  232. * @return
  233. */
  234. public static Configuration kerberos() throws IOException {
  235. Configuration HBASE_CONFIG = new Configuration();
  236. HBASE_CONFIG.set("hbase.zookeeper.quorum", "node01,node02,node03");
  237. HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");
  238. HBASE_CONFIG.set("hbase.rpc.timeout", "1800000");
  239. HBASE_CONFIG.set("hbase.master.kerberos.principal", "hbase/_HOST@tsq.com");
  240. HBASE_CONFIG.set("hbase.regionserver.kerberos.principal", "hbase/_HOST@tsq.com");
  241. HBASE_CONFIG.set("hbase.security.authentication", "kerberos");
  242. HBASE_CONFIG.set("hadoop.security.authentication", "kerberos");
  243. HBASE_CONFIG.set("zookeeper.znode.parent", "/hbase");
  244. conf = HBaseConfiguration.create(HBASE_CONFIG);
  245. //添加hbase-site配置文件,此方式不行,因为也需要先通过hdfs的kerberos认证
  246. // hbase_site_file = "hdfs://node01:8020/user/hbase/hbase-site.xml";
  247. // krb5Path = "hdfs://node01:8020/user/hbase/krb5.conf";
  248. // keytabPath = "hdfs://node01:8020/user/hbase/USER_SYX.keytab";
  249. // String principal = "USER_xxx@tsq.com";
  250. // String principal = "hbase/USER_xxx@tsq.com";
  251. String principal = "USER_xxx";
  252. // String principal = "_HOST@tsq.com";
  253. //--files会把文件上传到hdfs的.sparkStagin/applicationId目录下,System.getenv("SPARK_YARN_STAGING_DIR")能获取到这个目录
  254. // krb5.conf这个文件是从远程服务器上copy下来的
  255. System.setProperty("java.security.krb5.conf", "/etc/krb5.conf");
  256. // 这个hbase.keytab也是从远程服务器上copy下来的, 里面存储的是密码相关信息
  257. // 这样我们就不需要交互式输入密码了
  258. // conf.set("keytab.file", "/app/keytab/USER_SYX.keytab");
  259. // 这个可以理解成用户名信息,也就是Principal
  260. // conf.set("kerberos.principal", "hbase/USER_SYX@tsq.com");
  261. UserGroupInformation.setConfiguration(conf);
  262. logger.warn("-------------kerberos conf添加完成");
  263. // SparkFiles.get获取的目录是driver node下的本地目录
  264. // String xmlFile = SparkFiles.get("/app/test/config/hbase-site.xml");
  265. // String confFile = SparkFiles.get("/app/test/config/krb5.conf");
  266. // String keytabFile = SparkFiles.get("/app/test/config/USER_SYX.keytab");
  267. // logger.warn(xmlFile + "=====" + confFile + "=====" + keytabFile);
  268. try {
  269. UserGroupInformation.loginUserFromKeytab(principal, "/app/keytab/USER_SYX.keytab");
  270. logger.warn("-------------loginUserFromKeytab完成");
  271. } catch (IOException e) {
  272. e.printStackTrace();
  273. System.exit(1);
  274. }
  275. logger.warn("********** HBase Succeeded in authenticating through Kerberos! **********");
  276. return conf;
  277. }
  278. /**
  279. * 与hbase建立连接
  280. *
  281. * @return
  282. * @throws IOException
  283. */
  284. public static Connection getConnection() throws IOException {
  285. if (connection == null) {
  286. // 集群启用了kerberos认证
  287. conf = kerberos();//集群启用了kerberos认证,没有认证的话,将这行注释掉即可
  288. // kerberos认证集群必须传递已经认证过的conf
  289. // conf.set("hbase.zookeeper.quorum", "node01,node02,node03");
  290. // conf.set("hbase.zookeeper.property.clientPort", "2181");
  291. //cluster模式下测试连接HBase时一定要使用UGI的doAs方法,把连接写在里面,这样就可以避免连接不上报GSS错误。
  292. UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI("USER_xxx", "/app/keytab/USER_SYX.keytab");
  293. connection = ugi.doAs(new PrivilegedAction<Connection>() {
  294. @Override
  295. public Connection run() {
  296. try {
  297. HbaseProcessed.connection = ConnectionFactory.createConnection(conf, Executors.newFixedThreadPool(4));
  298. } catch (IOException e) {
  299. logger.warn("PrivilegedAction!! ===>" + e);
  300. }
  301. return HbaseProcessed.connection;
  302. }
  303. }
  304. );
  305. }
  306. return connection;
  307. }
  308. /**
  309. * 根据hbase的表名,来拼接scan的起始rowkey和结束rowkey
  310. *
  311. * @param tableName hbase的表名
  312. * @return 起始rowkey和结束rowkey组成的数组,即{起始rowkey,结束rowkey}
  313. */
  314. private static String[] gennerateRowkeyDateArr(String tableName) {
  315. // AMR:DWD_AMR_CQ_METER-E-DAY_201911
  316. String[] tableNameArr = tableName.split("_");
  317. String year_month = tableNameArr[tableNameArr.length - 1];
  318. String year = year_month.substring(0, 5);
  319. String month = year_month.substring(5);
  320. System.out.println("year month is " + year + month);
  321. //NA.0410354369.01202019000000.PAP_E
  322. //startRowkey返回时间格式为month day year 000000
  323. //stopRowkey返回时间格式为month day year 999999
  324. String startRow = month + "00" + year + "000000" + ".PAP_E";
  325. String stopRow = month + "31" + year + "999999" + ".PAP_E";
  326. String[] rowkeyRangeArr = {startRow, stopRow};
  327. System.out.println(rowkeyRangeArr);
  328. return rowkeyRangeArr;
  329. }
  330. }

 

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/611945
推荐阅读
相关标签
  

闽ICP备14008679号