当前位置:   article > 正文

头歌大数据——MapReduce综合应用案例 — 电信数据清洗 答案 无解析_第1关:数据清洗

第1关:数据清洗

第1关:数据清洗

编程要求

根据提示,在右侧编辑器补充代码,对数据按照一定规则进行清洗。

Tips:本关需要补充三个文件的代码~如下图所示,点击小三角切换文件~

 

记得启动HDFS~~

start-dfs.sh

代码如下: 

  1. //LogMR.java
  2. package com;
  3. import java.io.IOException;
  4. import java.sql.Connection;
  5. import java.sql.ResultSet;
  6. import java.sql.SQLException;
  7. import java.sql.Statement;
  8. import java.text.SimpleDateFormat;
  9. import java.util.ArrayList;
  10. import java.util.HashMap;
  11. import java.util.Iterator;
  12. import java.util.List;
  13. import java.util.Map;
  14. import org.apache.hadoop.conf.Configuration;
  15. import org.apache.hadoop.fs.FileSystem;
  16. import org.apache.hadoop.fs.Path;
  17. import org.apache.hadoop.io.LongWritable;
  18. import org.apache.hadoop.io.NullWritable;
  19. import org.apache.hadoop.io.Text;
  20. import org.apache.hadoop.mapreduce.Job;
  21. import org.apache.hadoop.mapreduce.Mapper;
  22. import org.apache.hadoop.mapreduce.Reducer;
  23. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  24. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  25. public class LogMR {
  26. /********** begin **********/
  27. static class MyMapper extends Mapper<LongWritable, Text, PhoneLog, NullWritable> {
  28. Map<String, String> userMap = new HashMap<>();
  29. Map<String, String> addressMap = new HashMap<>();
  30. SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  31. PhoneLog pl = new PhoneLog();
  32. Text text = new Text();
  33. @Override
  34. protected void setup(Context context) throws IOException, InterruptedException {
  35. Connection connection = DBHelper.getConnection();
  36. try {
  37. Statement statement = connection.createStatement();
  38. String sql = "select * from userphone";
  39. ResultSet resultSet = statement.executeQuery(sql);
  40. while (resultSet.next()) {
  41. String phone = resultSet.getString(2);
  42. String trueName = resultSet.getString(3);
  43. userMap.put(phone, trueName);
  44. }
  45. String sql2 = "select * from allregion";
  46. ResultSet resultSetA = statement.executeQuery(sql2);
  47. while (resultSetA.next()) {
  48. String phone = resultSetA.getString(2);
  49. String trueName = resultSetA.getString(3);
  50. addressMap.put(phone, trueName);
  51. }
  52. } catch (SQLException e) {
  53. e.printStackTrace();
  54. }
  55. }
  56. @Override
  57. protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
  58. String str = value.toString();
  59. String[] split = str.split(",");
  60. if (split.length == 6) {
  61. String trueName1 = userMap.get(split[0]);
  62. String trueName2 = userMap.get(split[1]);
  63. String address1 = addressMap.get(split[4]);
  64. String address2 = addressMap.get(split[5]);
  65. long startTimestamp = Long.parseLong(split[2]);
  66. String startTime = sdf.format(startTimestamp * 1000);
  67. long endTimestamp = Long.parseLong(split[3]);
  68. String endTime = sdf.format(endTimestamp * 1000);
  69. long timeLen = endTimestamp - startTimestamp;
  70. pl.SetPhoneLog(trueName1, trueName2, split[0], split[1], startTime, endTime, timeLen, address1,
  71. address2);
  72. context.write(pl, NullWritable.get());
  73. }
  74. }
  75. }
  76. public static void main(String[] args) throws Exception {
  77. Configuration conf = new Configuration();
  78. Job job = Job.getInstance(conf);
  79. job.setJarByClass(LogMR.class);
  80. job.setMapperClass(MyMapper.class);
  81. job.setMapOutputKeyClass(PhoneLog.class);
  82. job.setMapOutputValueClass(NullWritable.class);
  83. job.setNumReduceTasks(0);
  84. Path inPath = new Path("/user/test/input/a.txt");
  85. Path out = new Path("/user/test/output");
  86. FileInputFormat.setInputPaths(job, inPath);
  87. FileOutputFormat.setOutputPath(job, out);
  88. job.waitForCompletion(true);
  89. }
  90. /********** end **********/
  91. }
  1. //DBHelper.java
  2. package com;
  3. import java.sql.Connection;
  4. import java.sql.DriverManager;
  5. import java.sql.SQLException;
  6. public class DBHelper {
  7. /********** begin **********/
  8. private static final String driver = "com.mysql.jdbc.Driver";
  9. private static final String url = "jdbc:mysql://localhost:3306/mydb?useUnicode=true&characterEncoding=UTF-8";
  10. private static final String username = "root";// 数据库的用户名
  11. private static final String password = "123123";// 数据库的密码:这个是自己安装数据库的时候设置的,每个人不同。
  12. private static Connection conn = null; // 声明数据库连接对象
  13. static {
  14. try {
  15. Class.forName(driver);
  16. } catch (Exception ex) {
  17. ex.printStackTrace();
  18. }
  19. }
  20. public static Connection getConnection() {
  21. if (conn == null) {
  22. try {
  23. conn = DriverManager.getConnection(url, username, password);
  24. } catch (SQLException e) {
  25. e.printStackTrace();
  26. } // 连接数据库
  27. return conn;
  28. }
  29. return conn;
  30. }
  31. /********** end **********/
  32. }
  1. //PhoneLog.java
  2. package com;
  3. import java.io.DataInput;
  4. import java.io.DataOutput;
  5. import java.io.IOException;
  6. import org.apache.hadoop.io.Writable;
  7. import org.apache.hadoop.io.WritableComparable;
  8. public class PhoneLog implements WritableComparable<PhoneLog> {
  9. private String userA;
  10. private String userB;
  11. private String userA_Phone;
  12. private String userB_Phone;
  13. private String startTime;
  14. private String endTime;
  15. private Long timeLen;
  16. private String userA_Address;
  17. private String userB_Address;
  18. public PhoneLog() {
  19. }
  20. public void SetPhoneLog(String userA, String userB, String userA_Phone, String userB_Phone, String startTime,
  21. String endTime, Long timeLen, String userA_Address, String userB_Address) {
  22. this.userA = userA;
  23. this.userB = userB;
  24. this.userA_Phone = userA_Phone;
  25. this.userB_Phone = userB_Phone;
  26. this.startTime = startTime;
  27. this.endTime = endTime;
  28. this.timeLen = timeLen;
  29. this.userA_Address = userA_Address;
  30. this.userB_Address = userB_Address;
  31. }
  32. public String getUserA_Phone() {
  33. return userA_Phone;
  34. }
  35. public void setUserA_Phone(String userA_Phone) {
  36. this.userA_Phone = userA_Phone;
  37. }
  38. public String getUserB_Phone() {
  39. return userB_Phone;
  40. }
  41. public void setUserB_Phone(String userB_Phone) {
  42. this.userB_Phone = userB_Phone;
  43. }
  44. public String getUserA() {
  45. return userA;
  46. }
  47. public void setUserA(String userA) {
  48. this.userA = userA;
  49. }
  50. public String getUserB() {
  51. return userB;
  52. }
  53. public void setUserB(String userB) {
  54. this.userB = userB;
  55. }
  56. public String getStartTime() {
  57. return startTime;
  58. }
  59. public void setStartTime(String startTime) {
  60. this.startTime = startTime;
  61. }
  62. public String getEndTime() {
  63. return endTime;
  64. }
  65. public void setEndTime(String endTime) {
  66. this.endTime = endTime;
  67. }
  68. public Long getTimeLen() {
  69. return timeLen;
  70. }
  71. public void setTimeLen(Long timeLen) {
  72. this.timeLen = timeLen;
  73. }
  74. public String getUserA_Address() {
  75. return userA_Address;
  76. }
  77. public void setUserA_Address(String userA_Address) {
  78. this.userA_Address = userA_Address;
  79. }
  80. public String getUserB_Address() {
  81. return userB_Address;
  82. }
  83. public void setUserB_Address(String userB_Address) {
  84. this.userB_Address = userB_Address;
  85. }
  86. @Override
  87. public void write(DataOutput out) throws IOException {
  88. out.writeUTF(userA);
  89. out.writeUTF(userB);
  90. out.writeUTF(userA_Phone);
  91. out.writeUTF(userB_Phone);
  92. out.writeUTF(startTime);
  93. out.writeUTF(endTime);
  94. out.writeLong(timeLen);
  95. out.writeUTF(userA_Address);
  96. out.writeUTF(userB_Address);
  97. }
  98. @Override
  99. public void readFields(DataInput in) throws IOException {
  100. userA = in.readUTF();
  101. userB = in.readUTF();
  102. userA_Phone = in.readUTF();
  103. userB_Phone = in.readUTF();
  104. startTime = in.readUTF();
  105. endTime = in.readUTF();
  106. timeLen = in.readLong();
  107. userA_Address = in.readUTF();
  108. userB_Address = in.readUTF();
  109. }
  110. @Override
  111. public String toString() {
  112. return userA + "," + userB + "," + userA_Phone + "," + userB_Phone + "," + startTime + "," + endTime + ","
  113. + timeLen + "," + userA_Address + "," + userB_Address;
  114. }
  115. @Override
  116. public int compareTo(PhoneLog pl) {
  117. if(this.hashCode() == pl.hashCode()) {
  118. return 0;
  119. }
  120. return -1;
  121. }
  122. }

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/不正经/article/detail/631336
推荐阅读
相关标签
  

闽ICP备14008679号