赞
踩
外部排序是一种把大量无序的数据,按一定的顺序排序的算法。它是一种非常适合处理海量数据的算法,其原理主要有以下几点:
首先,将所有要排序的数据分割成若干个较小的子文件,每个子文件只有少量的内存可以容纳,然后分别在每个子文件中进行内部排序,最后再把每个排好序的子文件合并起来即可完成排序。
合并的过程如下:
原理图:
首先我们需要写一个随机数生成器,生成我们想要的随机数的数量写入文件中,然后才能对我们的随机数文件进行外部排序。
import java.io.*; import java.util.UUID; public class NumGenerater { public static void getNumber(String destName, int target) throws IOException { long start = System.currentTimeMillis(); File file = new File(destName); BufferedWriter bwriter = null; try { bwriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file))); StringBuffer stringBuffer = new StringBuffer(); for(int i = 1; i <= target; i ++) { stringBuffer.append(UUID.randomUUID()+ "\n"); // 每隔一段时间生成10000个UUID 写入文件中 if(i == target || i % 10000 == 0) { bwriter.write(String.valueOf(stringBuffer)); bwriter.flush(); stringBuffer = new StringBuffer(); } } } catch (Exception e) { e.printStackTrace(); } finally { bwriter.close(); // 关闭文件 } System.out.printf("生成%d个UUID需要的时间是%dms\n", target, (System.currentTimeMillis() - start)); } public static void main(String[] args) throws IOException { final int target = 1000000; long start = System.currentTimeMillis(); getNumber("D:\\MyProject\\coder\\src\\sort\\test\\test.txt", target); System.out.printf("生成%d个UUID需要的时间是%dms\n", target, (System.currentTimeMillis() - start)); } }
程序结果如图:
这个文件工具类是帮助读入文件然后将大文件的内容按照指定行数拆分多个小文件,方便后面进行小文件排序。
import java.io.*; import java.util.ArrayList; import java.util.List; public class FileUtils { public static List<String> readStringFromtxt(String txtpath, int targetLine) throws IOException { System.out.println("正在进行大文件拆分小文件:"); long startTime = System.currentTimeMillis(); List<String> fileList = new ArrayList<>(); File file = new File(txtpath); try { BufferedReader br = new BufferedReader(new FileReader(file)); String fileName = spiltName(file.getName()); int i = 0; while(++ i != 0){ String s = null; int count = 0; StringBuilder result = new StringBuilder(); while ((s = br.readLine()) != null) { result.append(s + "\n"); count ++; if(count == targetLine) break; } String destFileName = file.getParent() + getName(fileName, i); fileList.add(destFileName); writeStringToTxt(destFileName, String.valueOf(result)); if(s == null) break; } br.close(); } catch (Exception e) { e.printStackTrace(); } System.out.println("拆分小文件耗时:" + (System.currentTimeMillis() - startTime) + "ms"); return fileList; } private static String spiltName(String str) { return str.substring(0, str.lastIndexOf('.')); } private static String getName(String fileName,int i) { return "\\"+ fileName + "-" + i + ".txt"; } public static int writeStringToTxt(String targetTxt, String str) throws IOException { File file = new File(targetTxt); BufferedWriter bwriter = null; try { bwriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(file))); bwriter.write(str); } catch (Exception e) { e.printStackTrace(); } finally { bwriter.close(); } return 0; } public static void main(String[] args) throws IOException { List<String> list = readStringFromtxt("D:\\MyProject\\coder\\src\\sort\\test\\test.txt", 200000); for(String str: list) { System.out.println(str); } } }
程序结果如下:
这个文件排序类主要是 将文件列表里的小文件里面的内容读取出来然后进行排序重新写入到各自的文件中。
import java.io.*; import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; public class FileSort { static void sortFile(String filePath) throws IOException { File file = new File(filePath); List<String> list = new ArrayList<>(); BufferedReader br = new BufferedReader(new FileReader(file)); try { String s = null; while ((s = br.readLine()) != null) { //读取文件的内容 list.add(s); } } catch (Exception e) { e.printStackTrace(); } finally { br.close(); } list = (List<String>) list.stream().sorted().collect(Collectors.toList()); // 小文件用原生的内部排序就行 FileUtils.writeStringToTxt(filePath, getListString(list)); // 将String用FileUtils写入文件中 } public static String getListString(List<String> list) { StringBuffer stringBuffer = new StringBuffer(); for(String str: list) { stringBuffer.append(str).append("\n"); } return String.valueOf(stringBuffer); } public static void sortListFile(List<String> list) throws IOException { Long startTime = System.currentTimeMillis(); System.out.println("正在进行小文件排序:"); for(String str: list) { sortFile(str); } System.out.println("小文件排序耗时:" + (System.currentTimeMillis() - startTime) + "ms"); } }
最后是多路归并的类,负责多路归并小文件。这里把多个有序的输入流归并成一个有序的输出流,这个类可以实现多个排序比较结果的汇总。
import java.io.*; import java.util.List; import java.util.PriorityQueue; public class Merger { public static void kMerge(List<String> fileList) throws IOException { Long startTime = System.currentTimeMillis(); System.out.println("正在进行多路归并小文件:"); PriorityQueue<Node> priorityQueue = new PriorityQueue<>(); String destFileName = new File(fileList.get(0)).getParent() + "\\sort.txt"; File destFile = new File(destFileName); for(String str: fileList) { Node node = new Node(str); priorityQueue.add(node); } BufferedWriter bWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(destFile))); while(!priorityQueue.isEmpty()) { Node node = priorityQueue.poll(); bWriter.write(node.getValue() + "\n"); bWriter.flush(); if(node.readNewValue()) { priorityQueue.add(node); //如果有后续的值继续添加回去 } else { node.closeBufferedReader(); } } bWriter.close(); System.out.println("多路归并小文件耗时:" + (System.currentTimeMillis() - startTime) + "ms"); } // 文件字符流流节点,可以记录当前文件字符流到哪了,读取的当前行的内容 public static class Node implements Comparable<Node> { public Node(String fileName) throws IOException { this.fileName = fileName; File file = new File(fileName); br = new BufferedReader(new FileReader(file)); readNewValue(); } BufferedReader br; String fileName; String value; public void closeBufferedReader() throws IOException { br.close(); } public boolean readNewValue() throws IOException { value = br.readLine(); return value != null; } public String getValue() { return value; } @Override public int compareTo(Node o) { return value.compareTo(o.value); } } }
从排序后的文件中检查排序后的代码是否正确。
import java.io.*; import java.util.ArrayList; import java.util.List; public class Checker { public static boolean checkFileisSorted(String filePath) throws IOException { File file = new File(filePath); boolean flag = true; BufferedReader br = null; try { br = new BufferedReader(new FileReader(file)); String s = null; String ans = ""; while ((s = br.readLine()) != null) { if (ans.compareTo(s) <= 0) { ans = s; } else { flag = false; break; } } } catch (Exception e) { e.printStackTrace(); } finally { br.close(); } return flag; } public static void main(String[] args) throws IOException { System.out.println(checkFileisSorted("D:\\MyProject\\coder\\src\\sort\\test\\test.txt")); } }
本次测试使用的是 MagicBook pro 16GB RAM, 512GB SSD, 4800H CPU, 开启高能模式(35w性能释放)
import java.io.File; import java.io.IOException; import java.util.List; public class SortTester { public static void main(String[] args) throws IOException { String fileName = "D:\\MyProject\\coder\\src\\sort\\test\\test.txt"; NumGenerater.getNumber(fileName, 1000000); //10000000 Long startTime = System.currentTimeMillis(); List<String> fileList = FileUtils.readStringFromtxt(fileName, 50000); FileSort.sortListFile(fileList); Merger.kMerge(fileList); System.out.println("file sort cost:" + (System.currentTimeMillis() - startTime) + "ms"); System.out.println("checker:" + Checker.checkFileisSorted("D:\\MyProject\\coder\\src\\sort\\test\\sort.txt")); for(String str: fileList) { File file = new File(str); if(file.exists()) { file.delete(); } } } }
一百万行UUID排序时间 (37MB的数据量)
一千万行UUID排序时间 (370MB的数据量)
一亿UUID排序时间(3.7GB的数据量)
10亿行UUID排序时间(37GB的数据量)
从大文件拆分成小文件可以吗?
这个我还没很好的思路,多线程拆分不太会,目前只能单线程拆分成多个小文件。
从小文件读取排序后输出小文件中?
这个是可以的,每个小文件的内容是独立的,我们可以用多个线程去执行其中排序,加快了小文件排序的时间。
将小文件和并成大文件?
要将小文件有序地合成大文件,目前只能单线程执行,但是我们读取小文件可以开启读线程,将读取的内容放到放到优先队列中,然后取出优先队列的值再放到阻塞队列中,写线程从阻塞队列获取内容写到文件中。
设置一个线程池,防止消耗过多的线程,在线程池里取线程读取单文件进行排序的操作
SortTask.java
import java.io.*; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.stream.Collectors; public class SortTask implements Runnable{ private String filePath; private CountDownLatch countDownLatch; public SortTask(String filePath, CountDownLatch countDownLatch) { this.filePath = filePath; this.countDownLatch = countDownLatch; } private void sortFile(String filePath) throws IOException { File file = new File(filePath); List<String> list = new ArrayList<>(); BufferedReader br = null; try { br = new BufferedReader(new FileReader(file)); String s = null; while ((s = br.readLine()) != null) { list.add(s); } } catch (Exception e) { e.printStackTrace(); } finally { br.close(); } list = (List<String>) list.stream().sorted().collect(Collectors.toList()); FileUtils.writeStringToTxt(filePath, getListString(list)); } private static String getListString(List<String> list) { StringBuffer stringBuffer = new StringBuffer(); for(String str: list) { stringBuffer.append(str).append("\n"); } return String.valueOf(stringBuffer); } @Override public void run() { try { sortFile(filePath); } catch (IOException e) { throw new RuntimeException(e); } countDownLatch.countDown(); } }
FileSort.java
import java.io.FileNotFoundException; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class FileSort { public static void sortListFile(List<String> list) throws FileNotFoundException { Long startTime = System.currentTimeMillis(); System.out.println("正在进行多线程小文件排序:"); ExecutorService executorService = Executors.newFixedThreadPool(10); CountDownLatch countDownLatch = new CountDownLatch(list.size()); for(String str: list) { executorService.execute(new SortTask(str,countDownLatch)); } try { countDownLatch.await(); executorService.shutdown(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("多线程小文件排序耗时:" + (System.currentTimeMillis() - startTime) + "ms"); } public static void main(String[] args) throws FileNotFoundException { } }
读写线程分开执行归并操作,大大节约时间。(外部排序的时间主要是Merge操作)
Merger.java
import java.io.*; import java.util.List; import java.util.PriorityQueue; import java.util.concurrent.ArrayBlockingQueue; public class Merger { private static final int MAX_QUEUE_LENGTH = 1000000; // 存放进队列中的随机数 private static ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(MAX_QUEUE_LENGTH); public static void kMerge(List<String> fileList) throws IOException, InterruptedException { long startTime = System.currentTimeMillis(); System.out.println("正在进行读写双线程多路归并小文件:"); PriorityQueue<Node> priorityQueue = new PriorityQueue<>(); String destFileName = new File(fileList.get(0)).getParent() + "\\sort.txt"; File destFile = new File(destFileName); for(String str: fileList) { Node node = new Node(str); priorityQueue.add(node); } BufferedWriter bWriter = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(destFile))); final boolean[] flag = {false}; Thread thread1 = new Thread(new Runnable() { @Override public void run() { while(!priorityQueue.isEmpty()) { Node node = priorityQueue.poll(); blockingQueue.add(node.getValue() + "\n"); try { if(node.readNewValue()) { priorityQueue.add(node); //如果有后续的值继续添加回去 } else { // 记得关闭流 try { node.closeBufferedReader(); } catch (IOException e) { throw new RuntimeException(e); } } } catch (IOException e) { throw new RuntimeException(e); } } flag[0] = true; } }); Thread thread2 = new Thread(new Runnable() { @Override public void run() { StringBuffer stringBuffer = new StringBuffer(); int count = 0; while(true) { String str = null; try { str = blockingQueue.take(); } catch (InterruptedException e) { throw new RuntimeException(e); } count ++; stringBuffer.append(str); if(count == 10000) { try { bWriter.write(String.valueOf(stringBuffer)); bWriter.flush(); } catch (IOException e) { throw new RuntimeException(e); } stringBuffer = new StringBuffer(); count = 0; } if(blockingQueue.size() == 0 && flag[0] == true) { try { bWriter.write(String.valueOf(stringBuffer)); bWriter.flush(); } catch (IOException e) { throw new RuntimeException(e); } try { bWriter.close(); } catch (IOException e) { throw new RuntimeException(e); } System.out.println("读写分离双线程多路归并小文件耗时:" + (System.currentTimeMillis() - startTime) + "ms"); break; } } } }); thread1.start(); thread2.start(); thread1.join(); thread2.join(); } public static class Node implements Comparable<Node> { public Node(String fileName) throws IOException { this.fileName = fileName; File file = new File(fileName); br = new BufferedReader(new FileReader(file)); readNewValue(); } BufferedReader br; String fileName; String value; public void closeBufferedReader() throws IOException { br.close(); } public boolean readNewValue() throws IOException { value = br.readLine(); return value != null; } public String getValue() { return value; } @Override public int compareTo(Node o) { return value.compareTo(o.value); } } }
一百万行UUID (37MB的数据量)
一千万行UUID(370MB的数据量)
一亿行UUID(3.7GB的数据量)
10亿行UUID(37GB的数据量)
外部排序是提供大规模数据排序的一种有效的技术,它的主要思想是通过不断的读写文件来分割和排序数据集。
要学习如何正确使用外部排序,可以采用以下几个方法:
最后,要多加练习,完善自己的技能,不断熟悉和掌握外部排序的各个知识点,为以后的项目提供有效的支持。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。