赞
踩
hello 大家好,我是爱抄中间件代码的路人丙,今天想跟大家分享一下,在公司文件读写业务场景下引入mmap进行文件写入的调研、优化心得体会以及实践。(这篇文章最开始是ppt,是笔者在部门内部的第一次技术分享的内容!)
前言:该篇文章是该系列的第二篇!
笔者之前就看过一部分RocketMQ的源码,所以了解到RocketMQ在文件读写特别是写入使用了mmap,恰好机缘巧合之下,负责公司的业务部分涉及到了文件写入的场景且存在性能问题,于是笔者就想到了RocketMQ的0拷贝文件写入,然后笔者就去扒了RocketMQ对应0拷贝的代码,然后参考其源码对业务代码进行升级改造,最终的结果就是原业务对应接口单次RT的时间缩短了3~15倍(具体的优化效率取决于文件写入的大小,笔者的测试文件大小仅在200M-1.5G范围),最后的产出就是极大的提升了用户对对应功能的使用体验。
注意:
以下涉及0拷贝的内容,均以mmap代替,即描述到mmap时,即指“零拷贝”
看完本系列文章你也将是高性能文件写入的高手:(此篇文章为系列第二篇)
1、mmap是什么
2、Java中的mmap之MappedByteBuffer 安全使用(参考 RocketMQ源码 MappedFile类)
3、代码实操:传统文件写入 vs mmap文件写入的性能对比
4、mmap的优缺点,以及适用场景(参考 RocketMQ源码对mmap的使用,避免写出内存泄漏的代码)
5、高性能文件断点续传的代码
笔者因为之前看过RocketMQ4.6部分源码,知道RocketMQ中有零拷贝这个概念,所以就去看了RocketMQ对应零拷贝相关的源码了
RocketMQ使用mmap零拷贝的主要业务场景在持久化落盘: 分别是CommitLog以及IndexLog的读写落盘
以下截图源码来自:RocketMQ 4.6分支
RocketMQ对于mmap的使用主要封装在MappedFile类中:
看源码,建议有目标的阅读,这样效率更高(避免其他干扰),所以笔者此次阅读源码的目标是:RocketMQ如何进行mmap使用以及对于mmap生命周期管理的源码实现(说白了就是,mmap怎么初始化、怎么写、怎么销毁)
mmap初始化:MappedFile类构造函数会进行初始化init(),即进行mmap系统调用
以下是其核心代码:
private void init(final String fileName, final int fileSize) throws IOException { // 文件名(全路径) MessageStoreConfig类的storePathCommitLog属性,可以通过setStorePathCommitLog 去修改文件存储的路径 this.fileName = fileName; // 文件大小 MessageStoreConfig类的mappedFileSizeCommitLog属性(默认1G) 所以mmap申请的1g的内存映射 // fileSize 可以看成1g this.fileSize = fileSize; this.file = new File(fileName); // fileFromOffset文件起始地址 大家都知道RockeMQ在设计的时候文件名都是偏移地址 this.fileFromOffset = Long.parseLong(this.file.getName()); boolean ok = false; // 检查文件存储路径的父目录是否创建 ,没创建会创建 ensureDirOK(this.file.getParent()); try { // 熟悉的代码,熟悉的配方 this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); // 可以看到这里 直接申请了 1个g fileSize的大小已经在前面说明过了,如果你要改,可以自己改 //重要: mmap申请 this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize); // 看到这里可以初步推测TOTAL_MAPPED_VIRTUAL_MEMORY属性:记录mmap映射大小 TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize); // 计数做了一个+1 初步推测应该就是计数一下,但是不知道有什么用 TOTAL_MAPPED_FILES.incrementAndGet(); ok = true; } catch (FileNotFoundException e) { log.error("Failed to create file " + this.fileName, e); throw e; } catch (IOException e) { log.error("Failed to map file " + this.fileName, e); throw e; } finally { // 如果这个方法出异常了,那么就会把fileChannel关掉 // 不知道大家有不有跟我一样的疑问:这个mappedByteBuffer不用管么? if (!ok && this.fileChannel != null) { this.fileChannel.close(); } } // 看到这里,我们大概知道RocketMQ是怎么去初始化MappedByteBuffer以及怎么去结合自己的业务场景设计 // 所以如果你没用过MappedByteBuffer,那么看到这里应该也会照猫画虎了,比如这个init()方法代码就可以抄哇(我真是人如其名!) // ok ,到这里,我们知道怎么初始化的了。但是还不知道RocketMQ怎么用这个MappedByteBuffer的,怎么办,ctr + 鼠标左键 锁定mappedByteBuffer属性 } public static void ensureDirOK(final String dirName) { if (dirName != null) { File f = new File(dirName); if (!f.exists()) { boolean result = f.mkdirs(); log.info(dirName + " mkdir " + (result ? "OK" : "Failed")); } } }
上面代码注释中,“重要”关键词
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
// 可以看到这里 直接申请了 1个g fileSize的大小已经在前面说明过了,如果你要改,可以自己改
//重要: mmap申请
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
我们看一下JDK1.8中对mmap申请方法的说明:
笔者用翻译工具翻译了一下,大概意思如下:(以下翻译来自JDK1.8源码注释(windows10有道词典10.2.6翻译))
“将该通道文件的一个区域直接映射到内存中。 文件的一个区域可以通过以下三种方式之一映射到内存中: 只读:任何修改结果缓冲区的尝试都会导致抛出java.nio.ReadOnlyBufferException异常。(MapMode.READ_ONLY) 读/写:对结果缓冲区所做的更改最终将传播到文件中;它们可能对映射同一文件的其他程序可见,也可能不可见。(MapMode.READ_WRITE) Private:对结果缓冲区所做的更改不会传播到文件中,并且对映射同一文件的其他程序不可见;相反,它们将导致创建缓冲区修改部分的私有副本。(MapMode.PRIVATE) 对于只读映射,该通道必须已打开以供读取;对于读/写映射或私有映射,该通道必须同时为读和写打开。 此方法返回的映射字节缓冲区的位置为0,限制和容量为size;它的标记将是未定义的。缓冲区及其所表示的映射将保持有效,直到缓冲区本身被垃圾收集。 映射一旦建立,就不依赖于用于创建它的文件通道。特别是,关闭通道对映射的有效性没有影响。 内存映射文件的许多细节本质上依赖于底层操作系统,因此不明确。当请求的区域未完全包含在此通道的文件中时,此方法的行为未指定。这个程序或其他程序对基础文件的内容或大小所做的更改是否传播到缓冲区是未指定的。将对缓冲区的更改传播到文件的速率未指定。 对于大多数操作系统,将文件映射到内存比通过通常的读写方法读取或写入几十kb的数据要昂贵得多。从性能的角度来看,通常只值得将相对较大的文件映射到内存中。 参数: mode - FileChannel中定义的常量READ_ONLY、READ_WRITE或PRIVATE之一。MapMode类,根据文件是要被映射为只读、读/写还是私有(写时复制),分别position -映射区域在文件中的起始位置;必须是非负的size -要映射的区域的大小;必须非负且不大于整数。MAX_VALUE 返回: 映射的字节缓冲区 抛出: NonReadableChannelException -如果模式为READ_ONLY,但该通道未打开以进行读取 NonWritableChannelException -如果模式是READ_WRITE或PRIVATE,但该通道没有为读写打开 IllegalArgumentException -如果参数的前提条件不成立 IOException -如果发生其他I/O错误 参见: FileChannel。MapMode, MappedByteBuffer ”
注意:对于大多数操作系统,将文件映射到内存比通过通常的读写方法读取或写入几十kb的数据要昂贵得多。从性能的角度来看,通常只值得将相对较大的文件映射到内存中。
接下来,我们看看RocketMQ如何写入的:
简单来说,写入就一行代码:
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
以下截图是映射销毁的核心逻辑:
public static void clean(final ByteBuffer buffer) {
if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0)
return;
// 不满足以上条件
invoke(invoke(viewed(buffer), "cleaner"), "clean");
}
至此,我们知道了RocketMQ源码中是如何管理mmap文件读写的生命周期的,相信小伙伴们看到这里应该知道如何在Java语言体系中使用零拷贝mmap了。
至此我们已经知道如何在Java中使用高性能的mmap了以及安全的管理MappedByteBuffer生命周期了,接下来,我们就看一下引入mmap的效果以及存在的问题。
参考RocketMQ对于mmap生命周期的源码实现,我们接入自己的业务场景:文件断点续传场景
注意:本次测试环境仅统计文件写入的时间,不考虑网络IO以及带宽的影响
此处笔者省略了具体代码实现…
// 缓存MappedFile实列,其实就是缓存MappedByteBuffer的引用 final Map<String, MappedFile> tempMMap = new ConcurrentHashMap<>(); private cn.novelweb.tool.http.Result checkFileMd5WithMMap(String fileMd5, String fileName, String confFilePath, String tmpFilePath, int fileSize) throws Exception { boolean isParamEmpty = StringUtils.isBlank(fileMd5) || StringUtils.isBlank(fileName) || StringUtils.isBlank(confFilePath) || StringUtils.isBlank(tmpFilePath); if (isParamEmpty) { throw new Exception("参数值为空"); } // 构建分片配置文件对象 File confFile = new File(confFilePath + File.separatorChar + fileName + ".conf"); // 布尔值:上传的文件缓存对象是否存在 boolean isTmpFileEmpty = new File(tmpFilePath \+ File.separatorChar + fileName + "_tmp").exists(); // 分片记录文件 和 文件缓存文件 同时存在 则 状态码定义为 206 if (confFile.exists() && isTmpFileEmpty) { byte[] completeList = FileUtils.readFileToByteArray(confFile); List<String> missChunkList = new LinkedList<String>(); for (int i = 0; i < completeList.length; i++) { if (completeList[i] != Byte.MAX_VALUE) { missChunkList.add(Integer.toString(i)); } } JSONArray jsonArray = JSON.parseArray(JSONObject.toJSONString(missChunkList)); return cn.novelweb.tool.http.Result.ok(HttpStatus.PARTIAL_CONTENT.value(), "文件已经上传了一部分", jsonArray); } // 布尔值:上传的文件对象是否存在 boolean isFileEmpty = new File(tmpFilePath + File.separatorChar + fileName) .exists(); // 上传的文件 和 配置文件 同时存在 则 当前状态码为 200 if (isFileEmpty && confFile.exists()) { return cn.novelweb.tool.http.Result.ok(HttpStatus.OK.value(), "文件已上传成功"); } // 不存在则 构建mmap file tempMMap.put(fileName,new MappedFile(tmpFilePath \+ File.separatorChar + fileName + "_tmp",fileSize)); // 构造 mmap return cn.novelweb.tool.http.Result.ok(HttpStatus.NOT_FOUND.value(), "文件不存在"); }
该方法可以发现,统计其实只统计了fragmentFileUploaderMmap方法的耗时
public Result<Object> breakpointResumeUploadMmap(UploadFileParam param, HttpServletRequest request) { try { // 这里的 chunkSize(分片大小) 要与前端传过来的大小一致 5242880L = 5 * 1024 * 1024 5M long start = System.currentTimeMillis(); // cn.novelweb.tool.http.Result result = LocalUpload.fragmentFileUploader(param, confFilePath, savePath, 5242880L, request); // nomap cn.novelweb.tool.http.Result result = fragmentFileUploaderMmap(param, confFilePath, savePath, 5242880L, request); // mmap long end = System.currentTimeMillis() - start; counter.add(end); log.info("文件:{},块号:{} , 数组长度:{} 总耗时:{}",param.getName(),param.getChunk(),counter.size(),counter.stream().mapToLong(e->e).sum()); return NovelWebUtils.forReturn(result); } catch (Exception e) { log.error(e.getMessage(), e); } return Results.newFailResult(ErrorCode.FILE_UPLOAD, "上传失败"); }
大家会发现,下面加锁的方式是直接在方法上加了synchronized的,其实这里还可以优化(我们只需要在初始化mmap的时候给每一个文件配一把锁即可,这样我们就可以把并发都分摊到不同的文件了而不是集中在下面这个方法,这个方案理论上针对有并发要求是有优化效果的),但是由于本篇文章主要探究的是mmap的写性能,所以这些可以进一步优化的细节,感兴趣的小伙伴可以自己尝试优化测试一下
private synchronized cn.novelweb.tool.http.Result<?> fragmentFileUploaderMmap(UploadFileParam param, String confFilePath, String filePath, long chunkSize, HttpServletRequest request) throws Exception { boolean isParamEmpty = StringUtils.isBlank(filePath) || StringUtils.isBlank(confFilePath) && param.getFile() == null; if (isParamEmpty) { throw new Exception("参数值为空"); } // 判断enctype属性是否为multipart/form-data boolean isMultipart = ServletFileUpload.isMultipartContent(request); if (!isMultipart) { throw new IllegalArgumentException("上传内容不是有效的multipart/form-data类型."); } try { // 分片配置文件 File confFile = FileUtil.file(FileUtil.mkdir(confFilePath), String.format("%s.conf", param.getName())); RandomAccessFile accessConfFile = new RandomAccessFile(confFile, "rw"); // 把该分段标记为 true 表示完成 accessConfFile.setLength(param.getChunks()); accessConfFile.seek(param.getChunk()); accessConfFile.write(Byte.MAX_VALUE); accessConfFile.close(); / mmap的写入方式 /// // _tmp的缓存文件对象 File tmpFile = FileUtil.file(FileUtil.mkdir(filePath), String.format("%s_tmp", param.getName())); // 随机位置写入文件 long offset = chunkSize * param.getChunk(); // 定位到该分片的偏移量、写入该分片数据、释放 MappedFile mappedFile = tempMMap.get(param.getName()); if (null == mappedFile){ mappedFile = new MappedFile(param.getName(), param.getSize()); tempMMap.put(param.getName(),mappedFile); } mappedFile.put(param.getFile().getBytes(),offset); // 检查是否全部分片都成功上传 byte[] completeList = FileUtils.readFileToByteArray(confFile); byte isComplete = Byte.MAX_VALUE; for (int i = 0; i < completeList.length && isComplete == Byte.MAX_VALUE; i++) { // 与运算, 如果有部分没有完成则 isComplete 不是 Byte.MAX_VALUE isComplete = (byte) (isComplete & completeList[i]); } if (isComplete != Byte.MAX_VALUE) { return cn.novelweb.tool.http.Result.ok(HttpStatus.OK.value(), "文件上传成功"); } // 文件的所有分块全部上传成功后,清除mmap内存映射 mappedFile.shutdown(); boolean isSuccess = renameFile(tmpFile, param.getName()); if (!isSuccess) { throw new Exception("文件重命名时失败"); } // 全部上传成功后构建文件对象 FileInfo fileInfo = FileInfo.builder() .hash(param.getMd5()) .name(param.getName()) .type(param.getFile().getContentType()) .path(tmpFile.getParent() + File.separatorChar + param.getName()) .createTime(System.currentTimeMillis()) .build(); return cn.novelweb.tool.http.Result.ok(HttpStatus.CREATED.value(), "文件上传完成", fileInfo); } catch (IOException e) { e.printStackTrace(); return cn.novelweb.tool.http.Result.error("文件上传失败"); } }
package com.vip.file.mmp; import lombok.extern.slf4j.Slf4j; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import java.io.RandomAccessFile; import java.lang.reflect.Method; import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.security.AccessController; import java.security.PrivilegedAction; /** * @title: MappedFile * @Description: mmp * @Author Lmm * @Date: 2024/4/22 14:48 * @Version 1.0 */ @Slf4j public class MappedFile { private MappedByteBuffer mappedByteBuffer; private File file; private String fileName; // 这个FileChannel很熟悉了吧,可以理解为 mappedByteBuffer获取的入口 protected FileChannel fileChannel; protected long fileSize; public MappedFile(final String fileName, final long fileSize) throws IOException { init(fileName, fileSize); } /** \* 初始化 */ private void init(final String fileName, final long fileSize) throws IOException { // 文件名(全路径) MessageStoreConfig类的storePathCommitLog属性,可以通过setStorePathCommitLog 去修改文件存储的路径 this.fileName = fileName; // 文件大小 MessageStoreConfig类的mappedFileSizeCommitLog属性(默认1G) 所以mmap申请的1g的内存映射 // fileSize 可以看成1g this.fileSize = fileSize; this.file = new File(fileName); // fileFromOffset文件起始地址 大家都知道RockeMQ在设计的时候文件名都是偏移地址 boolean ok = false; // 检查文件存储路径的父目录是否创建 ,没创建会创建 ensureDirOK(this.file.getParent()); try { // 熟悉的代码,熟悉的配方 this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel(); // 可以看到这里 直接申请了 1个g fileSize的大小已经在前面说明过了,如果你要改,可以自己改 this.mappedByteBuffer = this.fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, fileSize); ok = true; } catch (FileNotFoundException e) { log.error("Failed to create file " + this.fileName, e); throw e; } catch (IOException e) { log.error("Failed to map file " + this.fileName, e); throw e; } finally { // 如果这个方法出异常了,那么就会把fileChannel关掉 // 不知道大家有不有跟我一样的疑问:这个mappedByteBuffer不用管么? if (!ok && this.fileChannel != null) { this.fileChannel.close(); } } // 看到这里,我们大概知道RocketMQ是怎么去初始化MappedByteBuffer以及怎么去结合自己的业务场景设计 // 所以如果你没用过MappedByteBuffer,那么看到这里应该也会照猫画虎了,比如这个init()方法代码就可以抄哇(我真是人如其名!) } /** \* @param data 待写入的字节数组 */ public synchronized void put(byte[] data,long position){ mappedByteBuffer.position((int) position); mappedByteBuffer.put(data, 0, data.length); } public static void ensureDirOK(final String dirName) { if (dirName != null) { File f = new File(dirName); if (!f.exists()) { boolean result = f.mkdirs(); log.info(dirName + " mkdir " + (result ? "OK" : "Failed")); } } } /** \* 清除byte buffer */ // jvm钩子函数可以搞一个回调 public synchronized void shutdown() { // unmap mappedByteBuffer 清除mappedByteBuffer 内存映射 MappedFile.clean(this.mappedByteBuffer); try { this.fileChannel.close(); } catch (IOException e) { log.error("Failed to properly close the channel", e); } } // 以下的代码来自RocketMQ 4.6 public static void clean(final ByteBuffer buffer) { if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0) return; // 不满足以上条件 invoke(invoke(viewed(buffer), "cleaner"), "clean"); } private static Object invoke(final Object target, final String methodName, final Class<?>... args) { return AccessController.doPrivileged(new PrivilegedAction<Object>() { public Object run() { try { Method method = method(target, methodName, args); method.setAccessible(true); return method.invoke(target); } catch (Exception e) { throw new IllegalStateException(e); } } }); } private static Method method(Object target, String methodName, Class<?>[] args) throws NoSuchMethodException { try { return target.getClass().getMethod(methodName, args); } catch (NoSuchMethodException e) { return target.getClass().getDeclaredMethod(methodName, args); } } private static ByteBuffer viewed(ByteBuffer buffer) { String methodName = "viewedBuffer"; Method[] methods = buffer.getClass().getMethods(); for (int i = 0; i < methods.length; i++) { if (methods[i].getName().equals("attachment")) { methodName = "attachment"; break; } } ByteBuffer viewedBuffer = (ByteBuffer) invoke(buffer, methodName); if (viewedBuffer == null) return buffer; else return viewed(viewedBuffer); } /** * 用于上传成功后重命名文件 * * @param toFileNewName 文件新的名字 * @return 返回重命名是否成功 */ private boolean renameFile(String toFileNewName) { File toBeRenamed = this.file; // 检查要重命名的文件是否存在,是否是文件 if (!toBeRenamed.exists() || toBeRenamed.isDirectory()) { return false; } File newFile = new File(toBeRenamed.getParent() \+ File.separatorChar + toFileNewName); // 修改文件名 return toBeRenamed.renameTo(newFile); } }
通过上面,我们可以看到,手动gc之后,堆内存是明显的下降了
通过以上简单2组数据单次请求测试发现,在较大文件持续写入的场景中,mmap文件写入效率相比传统文件写入方式快3.514倍,但是系统负载情况相比传统文件写入方式内存会有%5%10左右的上浮。
当然以上测试并非压力测试,所以实际应用时,应当以压测数据为准!
通过上面代码验证、RocketMQ的源码以及官方MappedByteBuffer的建议,我们可以初步得出以下结论关于mmap的使用场景以及其优缺点:
适用场景:
大文件多次读写场景(可参考场景比如RocketMQ的CommitLog、IndexLog文件读写,而且CommitLog大小固定1G)
优点:
缺点:
JDK8源码
RocketMQ4.6 MappedFIle、CommitLog类源码
mmap核心原理推荐文章如下
文章1:mmap
文章2:mmap
关于文章2“MappedByteBuffer VS FileChannel:从内核层面对比两者的性能差异”文章,笔者在断点续传场景使用mmap进行文件持续写的测试中得的结论这篇作者的结论很明显不一致,而且笔者得出的结论跟官方JDK1.8的注释说明是一致的,基于mmap实现的MappedByteBuffer 更适合大文件的持续写入(根据RocketMQ的源码以及笔者自己测试的结果也可以得出以下结论:基于mmap实现的MappedByteBuffer 更适合大文件的持续写入)
如果对于MappedByteBuffer ,如果你有不同的见解,也非常欢迎你在评论区留言、讨论;同时,如果你有不同的应用场景,也非常欢迎讨论
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。