reduce先将数据写入到hdfs,再load加载数据
- public static void main(String[] args) throws URISyntaxException, IOException, TaskExecFailException {
- //设置链接的服务器
- ConnBean connBean = new ConnBean("node01", "root", "123456");
- //链接服务器
- SSHExec sshExec = SSHExec.getInstance(connBean);
- sshExec.connect();
-
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(new URI("hdfs://node01:8020"), conf);
- FileStatus stats[] = fs.listStatus(new Path("/rua/networkqualityinfo"));
- for (FileStatus stat : stats) {
- Path path = stat.getPath();
- String fileName = path.getName().replace(".txt", "");
- String[] sps = fileName.split("-");
- String year = sps[0];
- String month = sps[1];
- String day = sps[2];
-
- //设置执行的命令
- ExecCommand execCommand = new ExecCommand("hive -e \"load data inpath '/rua/networkqualityinfo/" + path.getName() + "' into table rua.networkqualityinfo partition(year='"+year+"',month='"+month+"',day='"+day+"')\"");
- //执行命令
- Result exec = sshExec.exec(execCommand);
-
- }
- fs.close();
- //关闭连接
- sshExec.disconnect();
- }
我们可以直接将数据按格式写入到hdfs下的hive文件夹中,再进行修复
- @Override
- protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException {
- String[] sps = key.toString().split("-");
- String year = sps[0];
- String month = sps[1];
- String day = sps[2];
- Path path = new Path("/user/hive/warehouse/rua.db/app_traffic/year=" + year + "/month=" + month + "/day=" + day + "/" + (year + "-" + month + "-" + day));
- // Path path = new Path("/rua/app_traffic/" + (year + "-" + month + "-" + day) + ".txt");
-
- FileSystem fs = null;
- try {
- fs = FileSystem.get(new URI("hdfs://192.168.100.101:8020"), new Configuration());
- } catch (URISyntaxException e) {
- e.printStackTrace();
- }
-
- FSDataOutputStream fsDataOutputStream = fs.create(path);
-
- for (Text value : values) {
- byte[] bytes = (value.toString() + "\n").getBytes();
- fsDataOutputStream.write(bytes, 0, bytes.length);
- }
-
- fsDataOutputStream.close();
- fs.close();
- }
msck repair table app_traffic;