赞
踩
使用Java + MapRedcue实现 K-means 聚类算法,使用python实现基于散点图的可视化。
代码库Github链接为:https://github.com/Resulte/K-means-with-MapReduce
(希望您去Github的时候可以顺手给个Star)
聚类是针对给定的样本,依据它们特征的相似度或者距离,将其归并到若干个“类” 或“簇”的问题。使用聚类算法能够挖掘数据中潜在的分布规律。K-means 聚类算法是最常用的聚类算法。
本项目使用Java + MapRedcue 计算框架,实现 K-means 聚类算法并处理给定的数据集(未使用第三方库,例如 MLlib)。同时,用python以散点图的方式对结果进行可视化展示。
输入数据集为当前目录下的kmeans_input文件夹。
随机分配簇,初始化中心点,存入HDFS。
利用Mapper读取每一个元素的向量信息,随机赋值,在Reducer中计算中心点信息。由于中心点的计算与迭代时的计算相同,与迭代计算共用一个reducer类。
开启一个Job,初始化中心点的任务配置类,实现一个static方法:
public static void createRandomCenter(String dataPath, String centerPath, int k){ Configuration hadoopConfig = new Configuration(); hadoopConfig.setInt("cluster.k", k); try { Job job = Job.getInstance(hadoopConfig, "random center task"); job.setJarByClass(KmeansRun.class); job.setMapperClass(CenterRandomMapper.class); job.setReducerClass(KmeansReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 输出为新计算得到的center,已存在则删除 Path outPath = new Path(centerPath); outPath.getFileSystem(hadoopConfig).delete(outPath, true); //job执行作业时输入和输出文件的路径 FileInputFormat.addInputPath(job, new Path(dataPath)); FileOutputFormat.setOutputPath(job, new Path(centerPath)); //执行job,直到完成 job.waitForCompletion(true); System.out.println("random center task"); } }
map方法中,根据聚类簇数量,对每个元素赋予随机的类簇序号,作为输出的key。value为元素向量,保持不变。
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
// 随机分配簇
int index = (int) (Math.random() * k);
System.out.println(index);
context.write(new Text(Integer.toString(index)), value);
}
利用Mapper将所有元素与中心点进行对比,分配到最近的簇中。利用Reducer进行求和并计算新的中心点信息。
判断地址属性,对文件夹进行遍历,利用LineReader进行所有文件信息的读取,最终返回二维double集合形式的中心点信息。
private ArrayList<ArrayList<Double>> centers = null; @Override protected void setup(Context context) throws IOException, InterruptedException { // 读一下centers // 地址从配置中拿好了 Configuration configuration = context.getConfiguration(); String centerPath = configuration.get("cluster.center_path"); centers = DataUtil.readCenter(centerPath);} 在mapper执行前,利用setup方法进行中心点的读取。DataUtil为工具类,readCenter()实现对HDFS中center文件夹中所有文件信息的读取。 public static ArrayList<ArrayList<Double>> readCenter(String centerPath) throws IOException { ArrayList<ArrayList<Double>> centers = new ArrayList<ArrayList<Double>>(); Path path = new Path(centerPath); Configuration conf = new Configuration(); FileSystem fileSystem = path.getFileSystem(conf); if(fileSystem.isDirectory(path)){ // 文件夹,遍历读取 FileStatus[] listFile = fileSystem.listStatus(path); for (FileStatus fileStatus : listFile) { LineReader lineReader = getLineReader(fileStatus.getPath().toString()); readCenterLines(lineReader, centers); } }else { // 普通文件,直接读取 LineReader lineReader = getLineReader(centerPath); readCenterLines(lineReader, centers); } return centers; } protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { ArrayList<Double> element = DataUtil.splitStringIntoArray(value.toString()); // 选择最近中心点,将其作为key int index = CalUtil.selectNearestCenter(element, centers); context.write(new Text(Integer.toString(index)), value); }
// map读取中心,分类,reduce计算新中心,存储 // 比较两次中心差距,存储新中心点 public static void start(String dataPath, String centerPath, String newCenterPath){ // 设置原中心点 Configuration hadoopConfig = new Configuration(); hadoopConfig.set("cluster.center_path", centerPath); try { Job job = Job.getInstance(hadoopConfig, "one round cluster task"); job.setJarByClass(KmeansRun.class); job.setMapperClass(KmeansMapper.class); job.setReducerClass(KmeansReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); // 输出为新计算得到的center,已存在则删除 Path outPath = new Path(newCenterPath); outPath.getFileSystem(hadoopConfig).delete(outPath, true); //job执行作业时输入和输出文件的路径 FileInputFormat.addInputPath(job, new Path(dataPath)); FileOutputFormat.setOutputPath(job, new Path(newCenterPath)); //执行job,直到完成 job.waitForCompletion(true); System.out.println("finish one round cluster task"); } catch (IOException | InterruptedException | ClassNotFoundException e) { e.printStackTrace(); } }
输出聚类结果,包括数据点信息与对应簇序号。
开启另一个Job用于生成聚类结果,该方法包括三个参数,分别为数据集地址,中心点地址以及聚类结果输出地址:
public static void main(String[] args){ // 命令行参数为数据集名称与聚类数 String dataName = args[0]; int k = Integer.parseInt(args[1]); String centerPath = DataUtil.HDFS_OUTPUT + "/centers.txt"; String newCenterPath = DataUtil.HDFS_OUTPUT + "/new_centers.txt"; String dataPath = DataUtil.HDFS_INPUT + "/" + dataName; String clusterResultPath = DataUtil.HDFS_OUTPUT + "/kmeans_cluster_result.txt"; // 初始化随机中心点 CenterRandomAdapter.createRandomCenter(dataPath, centerPath, k); // 默认1000次,中途停退出 for(int i=0;i<1000;i++){ System.out.println("round " + i); KmeansAdapter.start(dataPath, centerPath, newCenterPath); if(KmeansAdapter.checkStop(centerPath, newCenterPath)) break; } KmeansAdapter.createClusterResult(dataPath, centerPath, clusterResultPath); }
可视化时,首先要读取文件的每一行,并将聚类编号和x y坐标分开,用不同的颜色标识不同的聚类,最后画出来即可,代码如下:
import matplotlib.pyplot as plt import re label = [] x = [] y = [] with open('kmeans_cluster_result.txt') as file: lines = file.readlines() for line in lines: arr = re.split(' |\t',line) label.append(int(arr[0])) x.append(float(arr[1])) y.append(float(arr[2])) mark = ['or', 'ob', 'og', 'ok', '^r', '+r', 'sr', 'dr', '<r', 'pr'] for i in range(0, len(label)): plt.plot(x[i], y[i], mark[label[i]], markersize = 5 ) plt.show()
经过实验发现,聚类数K=4是最好的。
首先,将输入文本放入HDFS中:
./bin/hdfs dfs -put ./input /input
然后,执行Jar包,其中第一个参数是输入路径,第二个是输出路径,第三个是聚类数:
./bin/hadoop jar ./hadoop_kmeans-1.0-SNAPSHOT.jar /input /output 4
可视化结果如下:
四个质心的坐标位置如下:
581.2483735304816 -930.0778680990376
149.21612547138778 -642.3408090744012
386.7612212774783 203.07996261722337
-1033.107048894814 146.3141243409019
部分聚类的结果如下,其中第一列是聚类编号,后两列是坐标:
0 201.95839333926213 -610.1614888687657 0 32.01462887825652 -647.6412547780841 0 54.83729999667125 -904.3811344485978 0 242.28608650620268 -377.1755098864822 ...... 1 -953.0052501621777 32.26020342111498 1 -1073.8591238331912 27.82613875738177 1 -1023.053861943868 157.36024019224365 1 -1016.3805864102071 -55.27811303612242 ...... 2 492.5590001813256 256.39009060636425 2 89.38694734881466 3.9521397392150845 2 556.8900791907337 83.89276370678998 2 -217.58267720495377 285.0702044786078 ...... 3 776.397640388748 -1146.8323356276164 3 389.51109535472244 -1167.5703732509387 3 574.3763506986619 -936.2546652885289 3 586.0914857048108 -943.63646399339 ......
全部的输出结果请看output文件夹。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。