当前位置:   article > 正文

hive大量数据导入分区表时过慢优化

hive计算结果数据导入hdfs很慢

原加载数据代码

reduce先将数据写入到hdfs,再load加载数据

  1. public static void main(String[] args) throws URISyntaxException, IOException, TaskExecFailException {
  2. //设置链接的服务器
  3. ConnBean connBean = new ConnBean("node01", "root", "123456");
  4. //链接服务器
  5. SSHExec sshExec = SSHExec.getInstance(connBean);
  6. sshExec.connect();
  7. Configuration conf = new Configuration();
  8. FileSystem fs = FileSystem.get(new URI("hdfs://node01:8020"), conf);
  9. FileStatus stats[] = fs.listStatus(new Path("/rua/networkqualityinfo"));
  10. for (FileStatus stat : stats) {
  11. Path path = stat.getPath();
  12. String fileName = path.getName().replace(".txt", "");
  13. String[] sps = fileName.split("-");
  14. String year = sps[0];
  15. String month = sps[1];
  16. String day = sps[2];
  17. //设置执行的命令
  18. ExecCommand execCommand = new ExecCommand("hive -e \"load data inpath '/rua/networkqualityinfo/" + path.getName() + "' into table rua.networkqualityinfo partition(year='"+year+"',month='"+month+"',day='"+day+"')\"");
  19. //执行命令
  20. Result exec = sshExec.exec(execCommand);
  21. }
  22. fs.close();
  23. //关闭连接
  24. sshExec.disconnect();
  25. }

优化后

我们可以直接将数据按格式写入到hdfs下的hive文件夹中,再进行修复

  1. @Override
  2. protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
  3. String[] sps = key.toString().split("-");
  4. String year = sps[0];
  5. String month = sps[1];
  6. String day = sps[2];
  7. Path path = new Path("/user/hive/warehouse/rua.db/app_traffic/year=" + year + "/month=" + month + "/day=" + day + "/" + (year + "-" + month + "-" + day));
  8. // Path path = new Path("/rua/app_traffic/" + (year + "-" + month + "-" + day) + ".txt");
  9. FileSystem fs = null;
  10. try {
  11. fs = FileSystem.get(new URI("hdfs://192.168.100.101:8020"), new Configuration());
  12. } catch (URISyntaxException e) {
  13. e.printStackTrace();
  14. }
  15. FSDataOutputStream fsDataOutputStream = fs.create(path);
  16. for (Text value : values) {
  17. byte[] bytes = (value.toString() + "\n").getBytes();
  18. fsDataOutputStream.write(bytes, 0, bytes.length);
  19. }
  20. fsDataOutputStream.close();
  21. fs.close();
  22. }
msck repair table app_traffic;
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小丑西瓜9/article/detail/532278
推荐阅读
相关标签
  

闽ICP备14008679号