赞
踩
如果要排序的数据太多了,就不能一次性加载到内存中进行排序,只能分而治之,然后再合并。
从第一次接触算法开始,本人就写过归并排序的算法。 只不过当年写的归并排序算法,数据是可以一次性加载到内存中完成排序的。
甚至在此后的许多年,工作中始终没有遇到过内存装不下但又需要排序的场景
不过最近闲来无事,所以想弥补一下大数据量场景下的排序经历,遂得此文。
1. TestFileGenerator.java 生成测试文件,该文件体积较大,该文件的每一行是一个32位的整数。稍后将会对文件中的数字进行排序
2. FileSpliter.java 该类可以将上一个类(TestFileGenerator.java)生成的大文件拆分成多个小文件。各个小文件的内容总和与大文件一致
3. FileSorter.java 读取每一个小文件中的所有整数, 并进行升序排序,然后再将排序后的数字重新写回文件
4. IndexMinPQ.java 优先队列的一个实现类,该类每次取出队列中的最小元素,该类支持将每个元素关联一个索引。 该类是支撑归并排序的核心类
5. LineReader.java 每调用一次该类的readLine()方法,可以从文件中读取一行
6. FileMergeSort.java 该类归并每一个小文件中的数值,最终得到所有数字升序排序的大文件(包含小文件中的所有数字,并且是升序排序的)
运行代码之前建议事先在E盘建立目录E:/java_wps/,或者修改代码中磁盘路径,该示例是在windows中运行的,读者也可以改为linux路径再运行
public class Test { public static void main(String[] args) throws IOException { String originalFile = "E:/java_wps/test.txt"; //生成1000个整数,保存到文件E:/java_wps/test.txt中 TestFileGenerator.generateFile(originalFile, 1000); //每100个数字拆分到一个单独的文件中 FileSpliter fs = new FileSpliter(new File(originalFile), new FileSpliter.WriteListener(100)); fs.readLines(); //找出上一个步骤生成的小文件 File[] files = FileSorter.find("E:/java_wps"); //将每一个小文件进行单独的排序 Arrays.stream(files).forEach(f -> FileSorter.doSort(f)); //归并排序每一个小文件中的内容,得到最终有序的大文件E:/java_wps/merge.txt FileMergeSort.mergeFiles(files, "E:/java_wps/merge.txt"); } }
import org.apache.commons.io.IOUtils; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.Random; public class TestFileGenerator { public static void generateFile(String filePath, long lineSize) throws IOException { File file = new File(filePath); if (file.exists()) { file.delete(); } FileOutputStream fos = new FileOutputStream(file, true); try { Random random = new Random(System.currentTimeMillis() + new Random(System.currentTimeMillis() / 17).nextInt(100000)); StringBuilder builder = new StringBuilder(); String LF = "\n"; for(long i = 1; i <= lineSize; i++) { builder.setLength(0); builder.append(random.nextInt(Integer.MAX_VALUE)); if (i < lineSize) { builder.append(LF); } IOUtils.write(builder.toString(), fos, "UTF-8"); } } finally { IOUtils.closeQuietly(fos); } } public static void main(String[] args) throws IOException { TestFileGenerator.generateFile("E:/java_wps/test.txt", 1000); } }
import org.apache.commons.io.IOUtils; import java.io.*; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import static org.apache.commons.io.IOUtils.EOF; public class FileSpliter { private static final String RAF_MODE = "r"; private final byte[] inbuf = new byte[8192]; private final RandomAccessFile reader; private final Charset charset = Charset.forName("UTF-8"); private final WriteListener listener; public FileSpliter(File file, WriteListener listener) throws IOException { this.reader = new RandomAccessFile(file, RAF_MODE); this.reader.seek(0L); this.listener = listener; } public long readLines() throws IOException { try (ByteArrayOutputStream lineBuf = new ByteArrayOutputStream(64)) { long pos = reader.getFilePointer(); long rePos = pos; // position to re-read int num; boolean seenCR = false; while ((num = reader.read(inbuf)) != EOF) { for (int i = 0; i < num; i++) { final byte ch = inbuf[i]; switch (ch) { case '\n': seenCR = false; // swallow CR before LF listener.handle(new String(lineBuf.toByteArray(), charset)); lineBuf.reset(); rePos = pos + i + 1; break; case '\r': if (seenCR) { lineBuf.write('\r'); } seenCR = true; break; default: if (seenCR) { seenCR = false; // swallow final CR listener.handle(new String(lineBuf.toByteArray(), charset)); lineBuf.reset(); rePos = pos + i + 1; } lineBuf.write(ch); } } pos = reader.getFilePointer(); } reader.seek(rePos); // Ensure we can re-read if necessary byte[] bytes = lineBuf.toByteArray(); if (bytes.length > 0) { listener.handle(new String(bytes, charset)); } listener.write(); return rePos; } } public static class WriteListener { public WriteListener(int threshold) { this.threshold = threshold; } int threshold = 1000000; List<String> lines = new ArrayList<>(); AtomicInteger atomicInteger = new AtomicInteger(1); public void handle(final String line) throws IOException { if (line != null) { lines.add(line); } if (lines.size() >= threshold) { write(); lines.clear(); } } public void write() throws IOException { if (lines.size() <= 0) { return; } FileOutputStream fos = null; try { File file = new File("E:/java_wps/" + "sub-" + atomicInteger.getAndAdd(1) + ".txt"); fos = new FileOutputStream(file, true); IOUtils.writeLines(lines, null, fos, "UTF-8"); } finally { IOUtils.closeQuietly(fos); } } } public static void main(String[] args) throws IOException { File file = new File("E:/java_wps/test.txt"); //每100行拆分到一个单独的文件中 FileSpliter fs = new FileSpliter(file, new WriteListener(100)); fs.readLines(); } }
import org.apache.commons.io.Charsets; import org.apache.commons.io.IOUtils; import java.io.*; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class FileSorter { public static void main(String[] args) throws IOException { File[] files = find("E:/java_wps"); Arrays.stream(files).forEach(f -> doSort(f)); } public static File[] find(String path) { File dir = new File(path); File[] files = dir.listFiles(new FileFilter() { @Override public boolean accept(File dir) { return dir.isFile() && dir.getName().startsWith("sub-") && dir.getName().endsWith(".txt"); } }); return files; } public static void writeLines(final int[] lines, String lineEnding, final OutputStream output, final Charset charset) throws IOException { if (lines == null) { return; } if (lineEnding == null) { lineEnding = System.lineSeparator(); } final Charset cs = Charsets.toCharset(charset); for (final Object line : lines) { if (line != null) { output.write(line.toString().getBytes(cs)); } output.write(lineEnding.getBytes(cs)); } } public static void doSort(File file) { try { sort(file); } catch (IOException e) { e.printStackTrace(); } } private static List<Integer> readFile(File file) throws IOException { List<String> strings = null; try(FileInputStream is = new FileInputStream(file)) { strings = IOUtils.readLines(is, "UTF-8"); } List<Integer> integers = new ArrayList<>(); for (String item : strings) { if (item == null) { continue; } item = item.trim(); if (item.length() == 0) { continue; } try { int i = Integer.parseInt(item); integers.add(i); } catch (NumberFormatException e) { } } return integers; } private static void sort(File file) throws IOException { List<Integer> integers = readFile(file); int[] ts = new int[integers.size()]; int idx = 0; while (integers.size() > 0) { Integer remove = integers.remove(0); ts[idx++] = remove; } Arrays.sort(ts); try(FileOutputStream fos = new FileOutputStream(file)) { writeLines(ts, null, fos, Charset.forName("UTF-8")); } } }
import java.util.Iterator; import java.util.NoSuchElementException; /** * The <tt>IndexMinPQ</tt> class represents an indexed priority queue of generic keys. * It supports the usual <em>insert</em> and <em>delete-the-minimum</em> * operations, along with <em>delete</em> and <em>change-the-key</em> * methods. In order to let the client refer to keys on the priority queue, * an integer between 0 and maxN-1 is associated with each key—the client * uses this integer to specify which key to delete or change. * It also supports methods for peeking at the minimum key, * testing if the priority queue is empty, and iterating through * the keys. * <p> * This implementation uses a binary heap along with an array to associate * keys with integers in the given range. * The <em>insert</em>, <em>delete-the-minimum</em>, <em>delete</em>, * <em>change-key</em>, <em>decrease-key</em>, and <em>increase-key</em> * operations take logarithmic time. * The <em>is-empty</em>, <em>size</em>, <em>min-index</em>, <em>min-key</em>, and <em>key-of</em> * operations take constant time. * Construction takes time proportional to the specified capacity. * <p> * For additional documentation, see <a href="http://algs4.cs.princeton.edu/24pq">Section 2.4</a> of * <i>Algorithms, 4th Edition</i> by Robert Sedgewick and Kevin Wayne. * * @author Robert Sedgewick * @author Kevin Wayne * * @param <Key> the generic type of key on this priority queue */ public class IndexMinPQ<Key extends Comparable<Key>> implements Iterable<Integer> { private int maxN; // maximum number of elements on PQ private int N; // number of elements on PQ private int[] pq; // binary heap using 1-based indexing private int[] qp; // inverse of pq - qp[pq[i]] = pq[qp[i]] = i private Key[] keys; // keys[i] = priority of i /** * Initializes an empty indexed priority queue with indices between <tt>0</tt> * and <tt>maxN - 1</tt>. * @param maxN the keys on this priority queue are index from <tt>0</tt> * <tt>maxN - 1</tt> * @throws IllegalArgumentException if <tt>maxN</tt> < <tt>0</tt> */ public IndexMinPQ(int maxN) { if (maxN < 0) throw new IllegalArgumentException(); this.maxN = maxN; keys = (Key[]) new Comparable[maxN + 1]; // make this of length maxN?? pq = new int[maxN + 1]; qp = new int[maxN + 1]; // make this of length maxN?? for (int i = 0; i <= maxN; i++) qp[i] = -1; } /** * Returns true if this priority queue is empty. * * @return <tt>true</tt> if this priority queue is empty; * <tt>false</tt> otherwise */ public boolean isEmpty() { return N == 0; } /** * Is <tt>i</tt> an index on this priority queue? * * @param i an index * @return <tt>true</tt> if <tt>i</tt> is an index on this priority queue; * <tt>false</tt> otherwise * @throws IndexOutOfBoundsException unless 0 ≤ <tt>i</tt> < <tt>maxN</tt> */ public boolean contains(int i) { if (i < 0 || i >= maxN) throw new IndexOutOfBoundsException(); return qp[i] != -1; } /** * Returns the number of keys on this priority queue. * * @return the number of keys on this priority queue */ public int size() { return N; } /** * Associates key with index <tt>i</tt>. * * @param i an index * @param key the key to associate with index <tt>i</tt> * @throws IndexOutOfBoundsException unless 0 ≤ <tt>i</tt> < <tt>maxN</tt> * @throws IllegalArgumentException if there already is an item associated * with index <tt>i</tt> */ public void insert(int i, Key key) { if (i < 0 || i >= maxN) throw new IndexOutOfBoundsException(); if (contains(i)) throw new IllegalArgumentException("index is already in the priority queue"); N++; qp[i] = N; pq[N] = i; keys[i] = key; swim(N); } /** * Returns an index associated with a minimum key. * * @return an index associated with a minimum key * @throws NoSuchElementException if this priority queue is empty */ public int minIndex() { if (N == 0) throw new NoSuchElementException("Priority queue underflow"); return pq[1]; } /** * Returns a minimum key. * * @return a minimum key * @throws NoSuchElementException if this priority queue is empty */ public Key minKey() { if (N == 0) throw new NoSuchElementException("Priority queue underflow"); return keys[pq[1]]; } /** * Removes a minimum key and returns its associated index. * @return an index associated with a minimum key * @throws NoSuchElementException if this priority queue is empty */ public int delMin() { if (N == 0) throw new NoSuchElementException("Priority queue underflow"); int min = pq[1]; exch(1, N--); sink(1); assert min == pq[N+1]; qp[min] = -1; // delete keys[min] = null; // to help with garbage collection pq[N+1] = -1; // not needed return min; } /** * Returns the key associated with index <tt>i</tt>. * * @param i the index of the key to return * @return the key associated with index <tt>i</tt> * @throws IndexOutOfBoundsException unless 0 ≤ <tt>i</tt> < <tt>maxN</tt> * @throws NoSuchElementException no key is associated with index <tt>i</tt> */ public Key keyOf(int i) { if (i < 0 || i >= maxN) throw new IndexOutOfBoundsException(); if (!contains(i)) throw new NoSuchElementException("index is not in the priority queue"); else return keys[i]; } /** * Change the key associated with index <tt>i</tt> to the specified value. * * @param i the index of the key to change * @param key change the key associated with index <tt>i</tt> to this key * @throws IndexOutOfBoundsException unless 0 ≤ <tt>i</tt> < <tt>maxN</tt> * @throws NoSuchElementException no key is associated with index <tt>i</tt> */ public void changeKey(int i, Key key) { if (i < 0 || i >= maxN) throw new IndexOutOfBoundsException(); if (!contains(i)) throw new NoSuchElementException("index is not in the priority queue"); keys[i] = key; swim(qp[i]); sink(qp[i]); } /** * Change the key associated with index <tt>i</tt> to the specified value. * * @param i the index of the key to change * @param key change the key associated with index <tt>i</tt> to this key * @throws IndexOutOfBoundsException unless 0 ≤ <tt>i</tt> < <tt>maxN</tt> * @deprecated Replaced by {@link #changeKey(int, Key)}. */ public void change(int i, Key key) { changeKey(i, key); } /** * Decrease the key associated with index <tt>i</tt> to the specified value. * * @param i the index of the key to decrease * @param key decrease the key associated with index <tt>i</tt> to this key * @throws IndexOutOfBoundsException unless 0 ≤ <tt>i</tt> < <tt>maxN</tt> * @throws IllegalArgumentException if key ≥ key associated with index <tt>i</tt> * @throws NoSuchElementException no key is associated with index <tt>i</tt> */ public void decreaseKey(int i, Key key) { if (i < 0 || i >= maxN) throw new IndexOutOfBoundsException(); if (!contains(i)) throw new NoSuchElementException("index is not in the priority queue"); if (keys[i].compareTo(key) <= 0) throw new IllegalArgumentException("Calling decreaseKey() with given argument would not strictly decrease the key"); keys[i] = key; swim(qp[i]); } /** * Increase the key associated with index <tt>i</tt> to the specified value. * * @param i the index of the key to increase * @param key increase the key associated with index <tt>i</tt> to this key * @throws IndexOutOfBoundsException unless 0 ≤ <tt>i</tt> < <tt>maxN</tt> * @throws IllegalArgumentException if key ≤ key associated with index <tt>i</tt> * @throws NoSuchElementException no key is associated with index <tt>i</tt> */ public void increaseKey(int i, Key key) { if (i < 0 || i >= maxN) throw new IndexOutOfBoundsException(); if (!contains(i)) throw new NoSuchElementException("index is not in the priority queue"); if (keys[i].compareTo(key) >= 0) throw new IllegalArgumentException("Calling increaseKey() with given argument would not strictly increase the key"); keys[i] = key; sink(qp[i]); } /** * Remove the key associated with index <tt>i</tt>. * * @param i the index of the key to remove * @throws IndexOutOfBoundsException unless 0 ≤ <tt>i</tt> < <tt>maxN</tt> * @throws NoSuchElementException no key is associated with index <t>i</tt> */ public void delete(int i) { if (i < 0 || i >= maxN) throw new IndexOutOfBoundsException(); if (!contains(i)) throw new NoSuchElementException("index is not in the priority queue"); int index = qp[i]; exch(index, N--); swim(index); sink(index); keys[i] = null; qp[i] = -1; } /*************************************************************************** * General helper functions. ***************************************************************************/ private boolean greater(int i, int j) { return keys[pq[i]].compareTo(keys[pq[j]]) > 0; } private void exch(int i, int j) { int swap = pq[i]; pq[i] = pq[j]; pq[j] = swap; qp[pq[i]] = i; qp[pq[j]] = j; } /*************************************************************************** * Heap helper functions. ***************************************************************************/ private void swim(int k) { while (k > 1 && greater(k/2, k)) { exch(k, k/2); k = k/2; } } private void sink(int k) { while (2*k <= N) { int j = 2*k; if (j < N && greater(j, j+1)) j++; if (!greater(k, j)) break; exch(k, j); k = j; } } /*************************************************************************** * Iterators. ***************************************************************************/ /** * Returns an iterator that iterates over the keys on the * priority queue in ascending order. * The iterator doesn't implement <tt>remove()</tt> since it's optional. * * @return an iterator that iterates over the keys in ascending order */ public Iterator<Integer> iterator() { return new HeapIterator(); } private class HeapIterator implements Iterator<Integer> { // create a new pq private IndexMinPQ<Key> copy; // add all elements to copy of heap // takes linear time since already in heap order so no keys move public HeapIterator() { copy = new IndexMinPQ<Key>(pq.length - 1); for (int i = 1; i <= N; i++) copy.insert(pq[i], keys[pq[i]]); } public boolean hasNext() { return !copy.isEmpty(); } public void remove() { throw new UnsupportedOperationException(); } public Integer next() { if (!hasNext()) throw new NoSuchElementException(); return copy.delMin(); } } /** * Unit tests the <tt>IndexMinPQ</tt> data type. */ public static void main(String[] args) { // insert a bunch of strings String[] strings = { "it", "was", "the", "best", "of", "times", "it", "was", "the", "worst" }; IndexMinPQ<String> pq = new IndexMinPQ<String>(strings.length); for (int i = 0; i < strings.length; i++) { pq.insert(i, strings[i]); } // delete and print each key while (!pq.isEmpty()) { int i = pq.delMin(); System.out.println(i + " " + strings[i]); } System.out.println(); // reinsert the same strings for (int i = 0; i < strings.length; i++) { pq.insert(i, strings[i]); } // print each key using the iterator for (int i : pq) { System.out.println(i + " " + strings[i]); } while (!pq.isEmpty()) { pq.delMin(); } } }
import org.apache.commons.io.IOUtils; import java.io.*; import java.util.ArrayList; import java.util.Arrays; import java.util.List; /** * <p>类说明:</p> * * @author lihong10 2021/5/21 14:31 * @version v1.0 * @modificationHistory=========================逻辑或功能性重大变更记录 * @modify by user: {修改人} 2021/5/21 14:31 * @modify by reason:{方法名}:{原因} */ public class LineReader { private static final int EOF = -1; private ByteArrayOutputStream lineBuf = new ByteArrayOutputStream(64); private boolean end = false; private RandomAccessFile reader; public LineReader(RandomAccessFile reader) { this.reader = reader; } public LineReader(File f) throws FileNotFoundException { this.reader = new RandomAccessFile(f, "r"); } public LineReader(File f, long pos) throws IOException { this.reader = new RandomAccessFile(f, "r"); reader.seek(pos); } public LineReader(RandomAccessFile reader, long pos) throws IOException { this.reader = reader; reader.seek(pos); } public List<String> readLines() throws IOException { int i = 0; List<String> list = new ArrayList<>(16); long pos = reader.getFilePointer(); String line = readLine(); while (line != null) { list.add(line); pos = reader.getFilePointer(); line = readLine(); } reader.seek(pos); // Ensure we can re-read if necessary return list; } /** * 该方法只有这样实现,才能与 IOUtils.readLines(fis, "UTF-8");返回的结果一致 * Version of readline() that returns null on EOF rather than a partial line. * * @return the line, or null if EOF reached before '\n' is seen. * @throws IOException if an error occurs. */ public String readLine() throws IOException { int ch; while ((ch = reader.read()) != -1) { switch (ch) { case '\n': return bufferToString(true, "UTF8", false); case '\r': int next = reader.read(); //预读取下一位 if (next == EOF) {//如果下一位是EOF,也即ch刚好是最后一个字符,返回 end = true; return bufferToString(true, "UTF8", false); } long filePointer = reader.getFilePointer(); if (filePointer > 0) { reader.seek(filePointer - 1); //之前预读取过,所以要复位指针 } if (next == '\n') { //如果当前字符是\r, 下一位是\n, 则继续读取,待读取到\n时再返回 break; } else { //如果当前字符是\r, 下一位不是\n, 则返回 return bufferToString(true, "UTF8", false); } default: lineBuf.write(ch); // add character, not its ascii value } } if (ch == EOF) { //目的是读取上一次换行到EOF之间的,最后一行数据 if (!end) { end = true; return bufferToString(true, "UTF-8", true); } } return null; } private String bufferToString(boolean reset, String charset, boolean nullWhenEmpty) throws UnsupportedEncodingException { byte[] bytes = lineBuf.toByteArray(); if (reset) { lineBuf.reset(); } if (nullWhenEmpty && (bytes.length == 0)) { return null; } else { return new String(bytes, charset); } } public static byte[] getBytes(String src, String charset) { try { return src.getBytes(charset); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; } public static void main(String[] args) throws IOException { String path = "E:/java_wps/num.txt"; File f = new File(path); FileOutputStream fos = new FileOutputStream(f, false); String content = "1234\n\r5678\r\r\r4\r44444\r\n5\r\r\r7\r\n\n\n\r\r999\n\r\r\r我"; byte[] bytes = getBytes(content, "UTF-8"); System.out.println(Arrays.toString(bytes)); IOUtils.write(content, fos); IOUtils.closeQuietly(fos); RandomAccessFile reader = new RandomAccessFile(f, "r"); reader.seek(0); LineReader lineReader = new LineReader(reader); List<String> lines = lineReader.readLines(); System.out.println("使用LineReader读取, 大小是:" + lines.size()); System.out.println(lines); FileInputStream fis = new FileInputStream(new File(path)); List<String> strings = IOUtils.readLines(fis, "UTF-8"); IOUtils.closeQuietly(fis); System.out.println("使用IOUtils读取,大小是:" + strings.size()); System.out.println(strings); } }
import org.apache.commons.io.IOUtils; import java.io.*; public class FileMergeSort { public static File[] find(String path) { File dir = new File(path); File[] files = dir.listFiles(new FileFilter() { @Override public boolean accept(File dir) { return dir.isFile() && dir.getName().startsWith("sub-") && dir.getName().endsWith(".txt"); } }); return files; } public static class LineReaderWrapper { private LineReader lineReader; private boolean end = false; public LineReaderWrapper(LineReader lineReader) { this.lineReader = lineReader; } public boolean isEnd() { return end; } public Integer readInt() throws IOException { String line = lineReader.readLine(); if (line == null) { this.end = true; } try { int i = Integer.parseInt(line); return i; } catch (NumberFormatException e) { } return null; } public Integer readIfNotEnd() throws IOException { while (!this.end) { Integer integer = readInt(); if (integer != null) { return integer; } } return null; } } public static void mergeFiles(File[] files, String dist) throws IOException { int N = files.length; IndexMinPQ<Integer> pq = new IndexMinPQ<>(N); LineReaderWrapper[] lineReader = new LineReaderWrapper[N]; for (int i = 0; i < files.length; i++) { lineReader[i] = new LineReaderWrapper(new LineReader(files[i])); } for (int i = 0; i < files.length; i++) { Integer value = null; if (!lineReader[i].isEnd() && (value = lineReader[i].readIfNotEnd()) != null) { pq.insert(i, value); } } FileOutputStream fos = new FileOutputStream(new File(dist)); StringBuilder builder = new StringBuilder(); String LF = "\n"; while (!pq.isEmpty()) { Integer min = pq.minKey(); int idx = pq.delMin(); Integer value = null; if (!lineReader[idx].isEnd() && (value = lineReader[idx].readIfNotEnd()) != null) { pq.insert(idx, value); } builder.setLength(0); builder.append(min).append(LF); IOUtils.write(builder.toString(), fos, "UTF-8"); } IOUtils.closeQuietly(fos); } public static void main(String[] args) throws IOException { File[] files = find("E:/java_wps"); mergeFiles(files, "E:/java_wps/merge.txt"); } }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。