赞
踩
简单基于MapReduce实现KMeans算法。
KMeans算法作为一种划分式的聚类算法,利用MapReduce进行实现的主要难点在于满足KMeans每次迭代划分过程的中间结果保存。
因此利用HDFS进行中心点的存储,以实现各节点间的数据共享。
1、随机分配簇,初始化中心点,存入HDFS。
2、Mapper中读取数据文件中的每条数据并与中心点进行距离计算,输出key为最近的中心点序号。
3、Reducer中进行归并,计算新的中心点,存入新的中心文件。
4、判断停机条件,不满足则复制新的中心文件到原中心文件,重复2,3步骤。
5、输出聚类结果,包括数据点信息与对应簇序号。
实验数据选取自美国zillow房地产评估2017年房产数据,选取其中的经纬度信息进行聚类操作,方便可视化。数据经处理转移到txt格式,并存入HDFS中进行实验。
根据经纬度信息对美国zillow房地产进行聚类操作。
在porn.xml中导入如下的依赖:
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-jobclient</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-common</artifactId> <version>2.10.1</version> </dependency> <dependency> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> </dependency>
如图所示:
按住ctrl+c保存,出现如下界面,等待依赖包下载:
在CalUtil类中写入如下代码:
package com.njupt.kmeans.util; import java.util.ArrayList; // 计算工具类,两值距离,选择最近中心点等 public class CalUtil { // 计算两向量距离,欧式 private static double calDistance(ArrayList<Double> element1, ArrayList<Double> element2){ double disSum = 0; for(int i=0;i<element1.size();i++){ disSum += (element1.get(i) - element2.get(i)) * (element1.get(i) - element2.get(i)); } return Math.sqrt(disSum); } // 选择最近中心点,返回序号 public static int selectNearestCenter(ArrayList<Double> element, ArrayList<ArrayList<Double>> centers){ double minDis = 100000; int nearstIndex = 0; for(int i=0;i<centers.size();i++){ ArrayList<Double> center = centers.get(i); double dis = calDistance(element, center); if(dis < minDis){ minDis = dis; nearstIndex = i; } } return nearstIndex; } // 元素相加 public static void addElement(ArrayList<Double> element1, ArrayList<Double> element2){ for(int i=0;i<element1.size();i++) { element1.set(i, element1.get(i) + element2.get(i)); } } // 计算新中心点 public static void calCenter(int num, ArrayList<Double> element){ for(int i=0;i<element.size();i++){ element.set(i, element.get(i) / num); } } // 计算两次迭代的中心是否有变化,返回距离 public static double calDistanceBetweenCenters(ArrayList<ArrayList<Double>>oldCenter, ArrayList<ArrayList<Double>>newCenter){ // 因为data的读入顺序相同,所以最终收敛时聚类中心的顺序也相同 // 只要遍历计算距离即可,不用考虑中心点本身顺序 if(oldCenter.size() > newCenter.size()) return 1000; double sum = 0; for(int i=0;i<oldCenter.size();i++){ double singleDistance = calDistance(oldCenter.get(i), newCenter.get(i)); sum += singleDistance; } return sum; } }
如图所示:
在DataUtil中写入以下代码:
package com.njupt.kmeans.util; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.io.Text; import org.apache.hadoop.util.LineReader; import java.io.IOException; import java.util.ArrayList; // 实现hdfs中中心点文件的读取和删除 // 中心点文件放在output中 public class DataUtil { public static final String HDFS_INPUT = "hdfs://node1:8020/kmeans/input"; // input地址 public static final String HDFS_OUTPUT = "hdfs://node1:8020//kmeans/output"; // output地址 // 切割字符串成double数组集合 public static ArrayList<Double> splitStringIntoArray(String line){ ArrayList<Double> center = new ArrayList<Double>(); String[] lineContextArry = line.split(","); for(String s:lineContextArry){ Double c = Double.parseDouble(s); center.add(c); } return center; } // 将数组转为字符串形式 public static String convertArrayIntoString(ArrayList<Double> element){ StringBuffer sb = new StringBuffer(); for(Double d:element){ sb.append(d.toString()).append(","); } return sb.substring(0, sb.length() - 1); } // 获取文件对应的hdfs系统下的linereader private static LineReader getLineReader(String filePath) throws IOException { Path path = new Path(filePath); Configuration conf = new Configuration(); FileSystem fileSystem = path.getFileSystem(conf); FSDataInputStream fsdis = fileSystem.open(path); LineReader lineReader = new LineReader(fsdis, conf); return lineReader; } // 读入center private static void readCenterLines(LineReader lineReader, ArrayList<ArrayList<Double>> centers) throws IOException { Text line = new Text(); // 每一行进行一次读取 while (lineReader.readLine(line) > 0) { ArrayList<Double> center = splitStringIntoArray(line.toString().trim()); centers.add(center); } lineReader.close(); } // 读取中心点 // 可能是文件夹,遍历读取 public static ArrayList<ArrayList<Double>> readCenter(String centerPath) throws IOException { ArrayList<ArrayList<Double>> centers = new ArrayList<ArrayList<Double>>(); Path path = new Path(centerPath); Configuration conf = new Configuration(); FileSystem fileSystem = path.getFileSystem(conf); if(fileSystem.isDirectory(path)){ // 文件夹,遍历读取 FileStatus[] listFile = fileSystem.listStatus(path); for (FileStatus fileStatus : listFile) { LineReader lineReader = getLineReader(fileStatus.getPath().toString()); readCenterLines(lineReader, centers); } }else { // 普通文件,直接读取 LineReader lineReader = getLineReader(centerPath); readCenterLines(lineReader, centers); } return centers; } // 删除中心点 public static boolean deleteCenters(String centerPath) throws IOException { Configuration conf = new Configuration(); Path path = new Path(centerPath); FileSystem fileSystem = path.getFileSystem(conf); return fileSystem.delete(path, true); } // 利用new文件覆盖原文件 public static void changeCenters(String centerPath, String newCenterPath, Configuration configuration) throws IOException { // 删除原center文件 Path cPath = new Path(centerPath); Path ncPath = new Path(newCenterPath); ncPath.getFileSystem(configuration).delete(cPath, true); FileSystem fileSystem = FileSystem.get(configuration); RemoteIterator<LocatedFileStatus> sourceFiles = fileSystem.listFiles(ncPath, true); if(sourceFiles != null) { while(sourceFiles.hasNext()){ FileUtil.copy(fileSystem, sourceFiles.next().getPath(), fileSystem, cPath, false, configuration); } } } }
如图所示:
注意!!!:
此处路径中的节点名与端口号要根据实际情况而定
一种查看方式:启动hadoop在浏览器中打开
随机赋值的mapper类。
在CenterRandomMapper类中输入如下代码:
package com.njupt.kmeans.center; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; // 随机为data分配簇 // 输出簇编号方便reduce计算 // setup中读取k大小 public class CenterRandomMapper extends Mapper<Object, Text, Text, Text> { private int k; @Override protected void setup(Context context) throws IOException, InterruptedException { // 读取k值 Configuration configuration = context.getConfiguration(); k = configuration.getInt("cluster.k", 3); System.out.println("k:" + k); } @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { // 随机分配簇 int index = (int) (Math.random() * k); // System.out.println(index); context.write(new Text(Integer.toString(index)), value); } }
如图所示:
初始化中心点的任务配置类,实现一个static方法。
在CenterRandomAdapter类中输入如下代码:
package com.njupt.kmeans.center; import com.njupt.kmeans.KmeansRun; import com.njupt.kmeans.cluster.KmeansReducer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; // 设置随机中心点的任务 public class CenterRandomAdapter { public static void createRandomCenter(String dataPath, String centerPath, int k){ Configuration hadoopConfig = new Configuration(); hadoopConfig.setInt("cluster.k", k); try { Job job = Job.getInstance(hadoopConfig, "random center task"); job.setJarByClass(KmeansRun.class); job.setMapperClass(CenterRandomMapper.class); job.setReducerClass(KmeansReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 输出为新计算得到的center,已存在则删除 Path outPath = new Path(centerPath); outPath.getFileSystem(hadoopConfig).delete(outPath, true); //job执行作业时输入和输出文件的路径 FileInputFormat.addInputPath(job, new Path(dataPath)); FileOutputFormat.setOutputPath(job, new Path(centerPath)); //执行job,直到完成 job.waitForCompletion(true); System.out.println("random center task"); } catch (IOException | InterruptedException | ClassNotFoundException e) { e.printStackTrace(); } } }
如图所示:
此时有报错是因为有的类还没有创建。
写入如下代码:
package com.njupt.kmeans.cluster; import com.njupt.kmeans.util.CalUtil; import com.njupt.kmeans.util.DataUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.util.ArrayList; // map-reduce体系中map类 // 主要思路是读取中心点文件,将元素进行中心点归属判别,输出的key设置为中心点序号方便后续计算新中心点 public class KmeansMapper extends Mapper<Object, Text, Text, Text> { private ArrayList<ArrayList<Double>> centers = null; @Override protected void setup(Context context) throws IOException, InterruptedException { // 读一下centers // 地址从配置中拿好了 Configuration configuration = context.getConfiguration(); String centerPath = configuration.get("cluster.center_path"); centers = DataUtil.readCenter(centerPath); } @Override protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { ArrayList<Double> element = DataUtil.splitStringIntoArray(value.toString()); // 选择最近中心点,将其作为key int index = CalUtil.selectNearestCenter(element, centers); context.write(new Text(Integer.toString(index)), value); } }
如图所示:
写入如下代码:
package com.njupt.kmeans.cluster; import com.njupt.kmeans.util.CalUtil; import com.njupt.kmeans.util.DataUtil; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.ArrayList; // reduce,主要思路是把同一key,也就是同index的元素相加算新的中心点 public class KmeansReducer extends Reducer<Text, Text, Text, Text> { @Override protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { ArrayList<Double> sumElement = new ArrayList<Double>(); int num = 0; // 遍历values相加,求新中心点 for(Text t:values){ num += 1; ArrayList<Double> element = DataUtil.splitStringIntoArray(t.toString()); if(sumElement.size() <= 0){ sumElement = new ArrayList<Double>(element); continue; } CalUtil.addElement(sumElement, element); } CalUtil.calCenter(num, sumElement); // 存放新中心点 context.write(new Text(""), new Text(DataUtil.convertArrayIntoString(sumElement))); } }
如图所示:
该类中实现停机检查方法chekStop();
该类中实现单次迭代任务设置方法start();
该类中实现聚类结果输出任务设置方法createClusterResult()。
package com.njupt.kmeans.cluster; import com.njupt.kmeans.KmeansRun; import com.njupt.kmeans.util.CalUtil; import com.njupt.kmeans.util.DataUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; import java.util.ArrayList; // 主要是把map和reduce的流程组装起来,顺便写个main public class KmeansAdapter { // 计算是否能停机 // 检查中心点相同情况,停止 // 如果不相同,利用新center去覆盖旧center(未完成) public static boolean checkStop(String centerPath, String newCenterPath){ // 读取新旧center文件 try { ArrayList<ArrayList<Double>> newCenters = DataUtil.readCenter(newCenterPath); ArrayList<ArrayList<Double>> centers = DataUtil.readCenter(centerPath); // 获取距离信息 double distanceSum = CalUtil.calDistanceBetweenCenters(centers, newCenters); if(distanceSum == 0){ // 停机,不做修改 return true; }else{ // 覆盖原中心文件 System.out.println("distanceSum=" + distanceSum); DataUtil.changeCenters(centerPath, newCenterPath, new Configuration()); return false; } } catch (IOException e) { System.out.println(centerPath + " ! file wrong"); e.printStackTrace(); } return true; } // 一次迭代流程 // map读取中心,分类,reduce计算新中心,存储 // 比较两次中心差距,存储新中心点 public static void start(String dataPath, String centerPath, String newCenterPath){ // 设置原中心点 Configuration hadoopConfig = new Configuration(); hadoopConfig.set("cluster.center_path", centerPath); try { Job job = Job.getInstance(hadoopConfig, "one round cluster task"); job.setJarByClass(KmeansRun.class); job.setMapperClass(KmeansMapper.class); job.setReducerClass(KmeansReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 输出为新计算得到的center,已存在则删除 Path outPath = new Path(newCenterPath); outPath.getFileSystem(hadoopConfig).delete(outPath, true); //job执行作业时输入和输出文件的路径 FileInputFormat.addInputPath(job, new Path(dataPath)); FileOutputFormat.setOutputPath(job, new Path(newCenterPath)); //执行job,直到完成 job.waitForCompletion(true); System.out.println("finish one round cluster task"); } catch (IOException | InterruptedException | ClassNotFoundException e) { e.printStackTrace(); } } // 根据center文件,生成聚类结果 // 利用mapper即可 public static void createClusterResult(String dataPath, String centerPath, String clusterResultPath){ // 设置原中心点 Configuration hadoopConfig = new Configuration(); hadoopConfig.set("cluster.center_path", centerPath); try { Job job = Job.getInstance(hadoopConfig, "cluster result task"); job.setJarByClass(KmeansRun.class); // 无reducer job.setMapperClass(KmeansMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 输出为新计算得到的center,已存在则删除 Path outPath = new Path(clusterResultPath); outPath.getFileSystem(hadoopConfig).delete(outPath, true); //job执行作业时输入和输出文件的路径 FileInputFormat.addInputPath(job, new Path(dataPath)); FileOutputFormat.setOutputPath(job, new Path(clusterResultPath)); //执行job,直到完成 job.waitForCompletion(true); System.out.println("cluster result task finished"); } catch (IOException | InterruptedException | ClassNotFoundException e) { e.printStackTrace(); } } }
package com.njupt.kmeans; import com.njupt.kmeans.center.CenterRandomAdapter; import com.njupt.kmeans.cluster.KmeansAdapter; import com.njupt.kmeans.util.DataUtil; // kmeans主方法 // 主要就是循环调用直到次数或者停机状态 // 每次循环判断下停机 // data为逗号连接 public class KmeansRun { public static void main(String[] args){ // 命令行参数为数据集名称与聚类数 String dataName = args[0]; int k = Integer.parseInt(args[1]); System.out.println(k); String centerPath = DataUtil.HDFS_OUTPUT + "/centers.txt"; String newCenterPath = DataUtil.HDFS_OUTPUT + "/new_centers.txt"; String dataPath = DataUtil.HDFS_INPUT + "/" + dataName; String clusterResultPath = DataUtil.HDFS_OUTPUT + "/kmeans_cluster_result.txt"; // 初始化随机中心点 CenterRandomAdapter.createRandomCenter(dataPath, centerPath, k); // 默认1000次,中途停退出 for(int i=0;i<30;i++){ System.out.println("round " + i); KmeansAdapter.start(dataPath, centerPath, newCenterPath); if(KmeansAdapter.checkStop(centerPath, newCenterPath)) break; } KmeansAdapter.createClusterResult(dataPath, centerPath, clusterResultPath); } }
此时在target包中即会出现jar包,如图所示:
复制到一个找得到的地方,重命名为kmeans.jar
网盘下载:链接:https://pan.baidu.com/s/1cbXEmjess1xGddM9FQ4GJw
提取码:gao5
输入以下命令:
start-dfs.sh
start-yarn.sh
如图所示:
http://192.168.198.130:9870 (其中192.168.198.130换成自己的node1的IP地址)
hdfs dfs -mkdir -p /kmeans/input
如图所示:
可以在浏览器查看创建情况
hdfs dfs -put kdata.txt /kmeans/input
如图所示:
可以在浏览器查看上传情况:
hadoop jar kmeans.jar com.njupt.kmeans.KmeansRun kdata.txt 3
其中最后一个参数为聚类数,可以自行设置,这里设置为3。(可以设置为2,运行时间短一点)
hdfs dfs -text /kmeans/output/kmeans_cluster_result.txt/part-r-00000
如图:
也可以在浏览器端下载结果文件
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。