当前位置:   article > 正文

用JAVA实现一个多线程HTTP下载服务器,支持暂停、断点续传、重新下载、限速下载、删除、多任务并行下载等功能

用JAVA实现一个多线程HTTP下载服务器,支持暂停、断点续传、重新下载、限速下载、删除、多任务并行下载等功能

项目简介:

项目功能说明:

  1. 该网站包括file页面、transfer页面和settings页面。其中,file页面用于显示指定目录下的所有文件,可根据文件类型、名称或者大小进行排序筛选;transfer页面主要的使用场景是实现提交HTTP地址后下载到服务器上,这个服务器可以作为NAS或者云盘使用;settings页面用于设置下载路径、限速大小、最大并行任务数。
  2. transfer页面能够查看每个任务的下载进度、下载速度和下载剩余时间,用户可自行控制并发线程数。

项目特点:本项目需要定时获取每个下载任务的实时状态,当系统需要大量并发请求时,如果直接在数据库上执行轮询可能会导致数据库连接池耗尽或数据库性能下降。因此我们利用redis消息队列来缓存任务状态,避免了轮询数据库,可以显著提高响应速度。

技术栈:SpringBoot + Spring + Mybatis + Mysql + Redis + SSE

完整项目及代码见gitee:https://gitee.com/Liguangyu1017/HTTP-download-server.git (项目已上线)

WebUI:

项目架构:



如何实现一个分片算法:

HTTP下载本质是发起一个GET的HTTP请求,在发起的GET请求中设置Range字段,比如:Range:bytes = 0 - 499表示请求文件的前 500 个字节。

  1. 确定分片策略:
    本项目采用固定分片大小,即根据文件大小来决定分片数量。
    根据文件大小,文件大小小于等于10MB的按照32KB,10MB~100MB按照1MB,超过100MB按照10MB进行分片。
     
    1. // The shard size is determined by the file size
    2. private int determineChunkSize(long fileSize) {
    3. if (fileSize <= 10 * 1024 * 1024) {
    4. return urlConfig.getMinChunkSize();
    5. } else if (fileSize <= 100 * 1024 * 1024) {
    6. return urlConfig.getMidChunkSize();
    7. } else {
    8. return urlConfig.getMaxChunkSize();
    9. }
    10. }
  2. 计算分片大小和数量,确定分片范围:
     获取分片大小后,计算出分片数量 = 文件大小 / 分片大小 (向上取整)
     确定分片范围: 每个分片的起始位置为片索引乘以分片大小,结束位置为当前分片的起始     位置+分片大小。但在最后一个分片时,结束位置是分片大小减去1
     
    1. // Calculate the size of each shard
    2. long chunkSize = determineChunkSize(fileSize);
    3. // Number of shards = File size/shard size (rounded up)
    4. long chunkNum = (long)Math.ceil((double) fileSize / (double) chunkSize);
  3. 提交分片给下载线程池执行下载:
    1. // Submit the download task to the thread pool
    2. for (int i = 0; i < chunkNum; i++) {
    3. // Gets the number of bytes downloaded by the current shard
    4. long downloadedBytesOneChunk = chunkDownloadBytes[i];
    5. // sp indicates the start location of fragment download
    6. sp[i] = chunkSize * i;
    7. // ep indicates the end location of fragment download
    8. ep[i] = (i < chunkNum - 1) ? (sp[i] + chunkSize) - 1 : fileSize - 1;
    9. chunkDownloadBytesShould[i] = ep[i] - sp[i] + 1;
    10. // LOG.info("正在下载:" + sp[i] + " -> " + ep[i]);
    11. // Start the thread pool to perform the fragment download
    12. executor.submit(new Download(urlString, outputFile.getPath(), sp[i], ep[i],totalDownloadedBytesMap.get(taskDO.getId()),
    13. downloadedBytesOneChunk,i,rateLimiter,taskDO.getId(),redisService));
    14. }

补充:

一个下载任务对应一个redis记分牌(用于标记每个分片是否下载完成,true/false,初始化每个分片全为false),下载过程中会不断更新记分牌,每个分片下载完成后,这个分片就被标记为true,这样暂停下载任务后再次开启下载时就可以跳过已经下载好的分片,从而实现断点续传。

根据任务id初始化redis记分牌:

  1. @Override
  2. public void initializeScoreboard(String taskId,long chunkNum){
  3. Map<String,Boolean> scoreboard=new HashMap<>();
  4. for (long i = 0;i < chunkNum; i++){
  5. scoreboard.put(String.valueOf(i),false);
  6. }
  7. redisTemplate.opsForHash().put(KEY_CHUNK_HASHMAP,taskId,scoreboard);
  8. }

根据任务id更新redis记分牌:

  1. @Override
  2. public void updateScoreboard(String taskId, long chunkId) {
  3. Map<String,Boolean> scoreboard=(Map) redisTemplate.opsForHash().get(KEY_CHUNK_HASHMAP,taskId);
  4. scoreboard.put(String.valueOf(chunkId),true);
  5. redisTemplate.opsForHash().put(KEY_CHUNK_HASHMAP,taskId,scoreboard);
  6. }

注意事项:

每个分片下载完成之后,存储到分片的起始位置中,这个过程由于文件句柄只有一个,需要加锁(synchronized)后进行seek,保证存储位置的准确性。

  1. HttpURLConnection connection;
  2. // Establish a connection to the download address and set the download range
  3. connection = (HttpURLConnection) new URL(fileURL).openConnection();
  4. // Configure the range
  5. String byteRange = sp + "-" + ep;
  6. connection.setRequestProperty("Range", "bytes=" + byteRange);
  7. // Create an input stream
  8. BufferedInputStream in = new BufferedInputStream(connection.getInputStream());
  9. RandomAccessFile raf = new RandomAccessFile(new File(outputFile), "rw");
  10. synchronized (lock) {
  11. try {
  12. raf.seek(sp);
  13. }catch (IOException e) {
  14. LOG.error("outputFile定位开始下载位置时出现错误");
  15. e.printStackTrace();
  16. }
  17. }

分片下载的完整代码如下:

  1. package cn.ykd.actualproject.service.impl;
  2. import cn.ykd.actualproject.config.UrlConfig;
  3. import cn.ykd.actualproject.dao.SettingsDAO;
  4. import cn.ykd.actualproject.dao.TaskDAO;
  5. import cn.ykd.actualproject.dataobject.TaskDO;
  6. import cn.ykd.actualproject.model.Settings;
  7. import cn.ykd.actualproject.service.RedisService;
  8. import cn.ykd.actualproject.service.TaskManagerService;
  9. import cn.ykd.actualproject.service.DownloadService;
  10. import cn.ykd.actualproject.utils.SharedVariables;
  11. import com.google.common.util.concurrent.RateLimiter;
  12. import org.slf4j.Logger;
  13. import org.slf4j.LoggerFactory;
  14. import org.springframework.beans.factory.annotation.Autowired;
  15. import org.springframework.stereotype.Service;
  16. import java.io.*;
  17. import java.net.HttpURLConnection;
  18. import java.net.URL;
  19. import java.util.concurrent.ConcurrentHashMap;
  20. import java.util.concurrent.ExecutorService;
  21. import java.util.concurrent.Executors;
  22. import java.util.concurrent.ThreadPoolExecutor;
  23. import java.util.concurrent.atomic.AtomicLong;
  24. import java.util.regex.Matcher;
  25. import java.util.regex.Pattern;
  26. @Service
  27. public class DownloadServiceImpl implements DownloadService {
  28. @Autowired
  29. private TaskManagerService taskManagerService;
  30. @Autowired
  31. private RedisService redisService;
  32. @Autowired
  33. private SettingsDAO settingsDAO;
  34. @Autowired
  35. private UrlConfig urlConfig;
  36. @Autowired
  37. private TaskDAO taskDAO;
  38. private static final Logger LOG= LoggerFactory.getLogger(DownloadServiceImpl.class);
  39. // Secure parallel downloads
  40. private static final ConcurrentHashMap<String,AtomicLong> totalDownloadedBytesMap = new ConcurrentHashMap<>();
  41. private static final Object lock = new Object(); // Synchronous lock
  42. private static long[] chunkDownloadBytes; // Used to record the number of bytes downloaded per shard
  43. private static long[] chunkDownloadBytesShould; // Used to record the number of bytes that each shard should download
  44. @Override
  45. public boolean download(TaskDO taskDO) {
  46. // Obtain and process raw data
  47. int threadNum = taskDO.getThreads();
  48. String urlString = taskDO.getUrl(); // Get the download address
  49. Settings settings = settingsDAO.get().convertToModel(); // Obtain the setting information
  50. double maxDownloadSpeedMBps = settings.getMaxDownloadSpeed(); // Get the maximum download speed
  51. if (maxDownloadSpeedMBps<=0){
  52. maxDownloadSpeedMBps=100;
  53. }
  54. String savePath = settings.getDownloadPath(); // Obtain the download save path
  55. String fileName = taskDO.getName(); // Extract the file name from the download address
  56. RateLimiter rateLimiter = RateLimiter.create(maxDownloadSpeedMBps*1024*1000); // One token corresponds to one byte of data
  57. AtomicLong totalDownloadedBytes = new AtomicLong(0); // Used to record the number of bytes downloaded by all threads
  58. totalDownloadedBytesMap.put(taskDO.getId(),totalDownloadedBytes);
  59. try {
  60. // Establish a connection to the download address and get the file size
  61. URL url = new URL(urlString);
  62. HttpURLConnection connection = (HttpURLConnection) url.openConnection();
  63. // Get the binary data length of the file
  64. long fileSize = connection.getContentLength();
  65. // Calculate file size (Mb)
  66. double size = (double)fileSize/1024/1024;
  67. taskDO.setPath(settings.getDownloadPath());
  68. taskDO.setSize(String.format("%.2f",size) + " MB");
  69. taskDO.setStatus("Downloading");
  70. taskDAO.update(taskDO);
  71. // Create an output stream of the downloaded file
  72. File outputFile = new File(savePath, fileName);
  73. // Create a thread pool for multithreaded sharding downloads
  74. ExecutorService executor = Executors.newFixedThreadPool(threadNum);
  75. // Calculate the size of each shard
  76. long chunkSize = determineChunkSize(fileSize);
  77. // Number of shards = File size/shard size (rounded up)
  78. long chunkNum = (long)Math.ceil((double) fileSize / (double) chunkSize);
  79. String msg = "fileSize = " + fileSize + ",chunkSize = " + chunkSize + ",chunkNum = " + chunkNum;
  80. LOG.info(msg);
  81. // Record the download start time
  82. long startTime = System.currentTimeMillis();
  83. // Record the downloads of each shard
  84. chunkDownloadBytes = new long[(int) chunkNum];
  85. chunkDownloadBytesShould = new long[(int) chunkNum];
  86. // Initialize the state of each shard (True: shards are downloaded False: shards are not downloaded)
  87. redisService.initializeScoreboard(taskDO.getId(),chunkNum);
  88. // Initializes the sp and ep
  89. long[] sp = new long[(int) chunkNum];
  90. long[] ep = new long[(int) chunkNum];
  91. // Submit the download task to the thread pool
  92. for (int i = 0; i < chunkNum; i++) {
  93. // Gets the number of bytes downloaded by the current shard
  94. long downloadedBytesOneChunk = chunkDownloadBytes[i];
  95. // sp indicates the start location of fragment download
  96. sp[i] = chunkSize * i;
  97. // ep indicates the end location of fragment download
  98. ep[i] = (i < chunkNum - 1) ? (sp[i] + chunkSize) - 1 : fileSize - 1;
  99. chunkDownloadBytesShould[i] = ep[i] - sp[i] + 1;
  100. // LOG.info("正在下载:" + sp[i] + " -> " + ep[i]);
  101. // Start the thread pool to perform the fragment download
  102. executor.submit(new Download(urlString, outputFile.getPath(), sp[i], ep[i],totalDownloadedBytesMap.get(taskDO.getId()),
  103. downloadedBytesOneChunk,i,rateLimiter,taskDO.getId(),redisService));
  104. }
  105. // Call task management module to synchronize database and sse in real time
  106. taskManagerService.updateDownloadStatus(executor,totalDownloadedBytesMap.get(taskDO.getId()),fileSize,
  107. taskDO,startTime,outputFile,
  108. chunkNum,chunkSize,chunkDownloadBytes,chunkDownloadBytesShould,rateLimiter,sp,ep);
  109. return true;
  110. }catch (Exception e){
  111. e.printStackTrace();
  112. return false;
  113. }
  114. }
  115. // Inner classes are used to implement the download function
  116. static class Download implements Runnable {
  117. private String fileURL; // File url
  118. private String outputFile; // Output stream file
  119. private long sp; // Download start location
  120. private long ep; // Download end location
  121. private AtomicLong totalDownloadedBytes; // Records the number of bytes downloaded
  122. private long downloadedBytes; // Records the number of bytes actually downloaded by the current fragment
  123. private int chunkId; // Fragment id
  124. private RateLimiter rateLimiter; // Governor
  125. private String taskId; // Task Id
  126. private RedisService redisService; // redis service
  127. public Download(String fileURL, String outputFile, long sp,long ep, AtomicLong totalDownloadedBytes,
  128. long downloadedBytes,int chunkId,RateLimiter rateLimiter,String taskId,RedisService redisService) {
  129. this.fileURL = fileURL;
  130. this.outputFile = outputFile;
  131. this.sp = sp;
  132. this.ep = ep;
  133. this.totalDownloadedBytes = totalDownloadedBytes;
  134. this.downloadedBytes = downloadedBytes;
  135. this.chunkId = chunkId;
  136. this.rateLimiter = rateLimiter;
  137. this.taskId= taskId;
  138. this.redisService = redisService;
  139. }
  140. @Override
  141. public void run() {
  142. try {
  143. HttpURLConnection connection;
  144. // Establish a connection to the download address and set the download range
  145. connection = (HttpURLConnection) new URL(fileURL).openConnection();
  146. // Configure the range
  147. String byteRange = sp + "-" + ep;
  148. connection.setRequestProperty("Range", "bytes=" + byteRange);
  149. // Create an input stream
  150. BufferedInputStream in = new BufferedInputStream(connection.getInputStream());
  151. // Create an output stream
  152. RandomAccessFile raf = new RandomAccessFile(new File(outputFile), "rw");
  153. synchronized (lock) {
  154. try {
  155. raf.seek(sp);
  156. }catch (IOException e) {
  157. LOG.error("outputFile定位开始下载位置时出现错误");
  158. e.printStackTrace();
  159. }
  160. }
  161. // Number of bytes actually read
  162. int bytesRead;
  163. // Read 1024 bytes of cache
  164. byte[] buffer = new byte[1024];
  165. // Start writing to the file
  166. while ((bytesRead = in.read(buffer)) != -1) {
  167. synchronized (lock) {
  168. // Check whether the task is suspended
  169. if (SharedVariables.getIsPaused()) {
  170. raf.close();
  171. in.close();
  172. break;
  173. }
  174. rateLimiter.acquire(bytesRead);
  175. raf.write(buffer, 0, bytesRead);
  176. downloadedBytes += bytesRead;
  177. chunkDownloadBytes[chunkId] = downloadedBytes;
  178. // Update the scoreboard if the current shard is downloaded
  179. if (chunkDownloadBytes[chunkId] == chunkDownloadBytesShould[chunkId]) {
  180. redisService.updateScoreboard(taskId, chunkId);
  181. }
  182. totalDownloadedBytes.addAndGet(bytesRead);// Updates the number of downloaded bytes stored in the atomic class
  183. }
  184. }
  185. raf.close();
  186. in.close();
  187. connection.disconnect();
  188. } catch (IOException e) {
  189. e.printStackTrace();
  190. }
  191. }
  192. }
  193. // The shard size is determined by the file size
  194. private int determineChunkSize(long fileSize) {
  195. if (fileSize <= 10 * 1024 * 1024) {
  196. return urlConfig.getMinChunkSize();
  197. } else if (fileSize <= 100 * 1024 * 1024) {
  198. return urlConfig.getMidChunkSize();
  199. } else {
  200. return urlConfig.getMaxChunkSize();
  201. }
  202. }
  203. }



如何实现断点续传:

什么是断点续传:

断点续传是一种网络传输技术,允许在文件下载或上传过程中,当传输中断或失败时,能够从断点处恢复传输,而不是重新开始传输整个文件。

优点:

  • 节省带宽和时间:断点续传允许在传输中断或失败时恢复传输,避免重新下载整个文件,节省了带宽和时间。
  • 提高用户体验:用户无需重新开始下载,可以在中断的地方恢复下载,提高了用户体验和满意度。

适用场景:

  • 大文件下载:对于大文件下载,断点续传能够有效减少下载时间和带宽消耗。
  • 不稳定的网络环境:在网络连接不稳定的情况下,比如下载过程中网络中断,当恢复网络时不需要从头开始下载,直接从上次下载中断处开始,断点续传能够保证下载的可靠性和稳定性。

具体实现: 

上文中已经提到了redis记分牌,每个任务所对应的每个分片都存储在redis中,在下载过程中不断更新记分牌,当一个分片下载完成时,将其标记为true。

当一个下载任务被终止,此时redis已经记录了该任务每个分片的下载情况,如果要恢复下载,只需读取未下载完的分片(false),确定范围(start_position和end_position),重新提交给线程池执行下载即可。

根据任务id获取记分牌:

  1. @Override
  2. public List<Long> getScoreboard(String taskId){
  3. List<Long> chunkIds=new ArrayList<>();
  4. Map<String,Boolean> scoreboard=(Map) redisTemplate.opsForHash().get(KEY_CHUNK_HASHMAP,taskId);
  5. scoreboard.forEach((key,value)->{
  6. if (!value){
  7. chunkIds.add(Long.valueOf(key));
  8. }
  9. });
  10. return chunkIds;
  11. }

确定范围,然后提交重新提交给线程池下载:

  1. LOG.info("正在执行恢复下载操作");
  2. // Obtain the id of the unfinished fragment
  3. List<Long> unDownloadedChunkId = redisService.getScoreboard(taskDO.getId());
  4. // Submit the unfinished shard to the thread pool
  5. for (int i = 0;i < unDownloadedChunkId.size(); i++) {
  6. // Get the index and ensure a one-to-one correspondence
  7. int index = Math.toIntExact(unDownloadedChunkId.get(i));
  8. long downloadedBytesOneChunk = chunkDownloadBytes[index];
  9. // Obtain the sp and ep of the remaining fragments
  10. long resumeSp = sp[index] + chunkDownloadBytes[index];
  11. long resumeEp = ep[index];
  12. // LOG.info("剩余的第" + index + "个分片正在下载" + resumeSp + "->" + resumeEp);
  13. newExecutor.submit(new DownloadServiceImpl.Download(urlString,outputFile.getPath(),resumeSp,
  14. resumeEp,totalDownloadedBytes,
  15. downloadedBytesOneChunk,index,rateLimiter,taskDO.getId(),redisService));



如何实现限速:

本项目通过使用RateLimiter工具实现限速下载功能。

工作原理:

RateLimiter基于令牌桶算法(Token Bucket)实现。它维护了一个稳定的速率(rate),以及一个令牌桶,令牌桶中的令牌数量代表了当前可用的请求次数。当有请求到来时,RateLimiter会根据当前的令牌桶状态来决定是否立即执行请求,或者需要等待一段时间后再执行。

<!-- Guava限流 -->
<dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>18.0</version>
</dependency>

具体实现:

  • 创建RateLimiter对象:通过RateLimiter类的静态工厂方法来创建RateLimiter对象,传入一个速率参数(这里限速的单位是MB/s)。
RateLimiter rateLimiter = RateLimiter.create(maxDownloadSpeedMBps*1024*1000); // One token corresponds to one byte of data
  • 请求令牌:调用RateLimiter的acquire(bytesRead)方法来请求bytesRead个令牌,该方法会阻塞当前线程直到获取到令牌为止。
  1. // Number of bytes actually read
  2. int bytesRead;
  3. // Read 1024 bytes of cache
  4. byte[] buffer = new byte[1024];
  5. // Start writing to the file
  6. while ((bytesRead = in.read(buffer)) != -1) {
  7. synchronized (lock) {
  8. // Check whether the task is suspended
  9. if (SharedVariables.getIsPaused()) {
  10. raf.close();
  11. in.close();
  12. break;
  13. }
  14. rateLimiter.acquire(bytesRead);
  15. raf.write(buffer, 0, bytesRead);
  16. downloadedBytes += bytesRead;
  17. chunkDownloadBytes[chunkId] = downloadedBytes;
  18. // Update the scoreboard if the current shard is downloaded
  19. if (chunkDownloadBytes[chunkId] == chunkDownloadBytesShould[chunkId]) {
  20. redisService.updateScoreboard(taskId, chunkId);
  21. }
  22. totalDownloadedBytes.addAndGet(bytesRead);// Updates the number of downloaded bytes stored in the atomic class
  23. }
  24. }



如何定时获取任务的状态、下载速度、剩余时间、下载进程以及对用户操作(暂停、删除、重新下载)进行处理:

可以创建一个单线程用于定时(500ms)获取任务的信息,并同时将数据传给数据库和前端。

这里特别讲一下暂停操作:

当后端接受到pause参数时,首先会关闭文件的输入流和输出流,即:

  1. if (SharedVariables.getIsPaused()) {
  2. raf.close();
  3. in.close();
  4. break;
  5. }

然后会关闭下载线程池:

  1. // Pause download
  2. private void pauseDownload(ExecutorService executor) {
  3. SharedVariables.setIsPaused(true);
  4. // If downloading for the first time
  5. executor.shutdownNow();
  6. // Until the download thread closes
  7. while (!executor.isTerminated()) {
  8. try {
  9. Thread.sleep(200);
  10. } catch (InterruptedException e) {
  11. throw new RuntimeException(e);
  12. }
  13. }
  14. LOG.info("任务已暂停");
  15. // Reset is Pause
  16. SharedVariables.setIsPaused(false);
  17. }

接下来会一直阻塞更新任务状态的线程,直到有新的任务状态(删除、重新下载、恢复下载)被获取到:

  1. while (status == null) {
  2. try {
  3. Thread.sleep(500);
  4. status = redisSubscribersTaskOptions.getOptionsForId(taskDO.getId());
  5. } catch (InterruptedException | NullPointerException e) {
  6. throw new RuntimeException(e);
  7. }
  8. LOG.info("暂停下载任务,线程阻塞中");
  9. }

完整代码如下:

  1. package cn.ykd.actualproject.service.impl;
  2. import cn.ykd.actualproject.dao.TaskDAO;
  3. import cn.ykd.actualproject.dataobject.TaskDO;
  4. import cn.ykd.actualproject.service.DownloadService;
  5. import cn.ykd.actualproject.service.RedisService;
  6. import cn.ykd.actualproject.service.TaskManagerService;
  7. import cn.ykd.actualproject.utils.SharedVariables;
  8. import com.google.common.util.concurrent.RateLimiter;
  9. import org.slf4j.Logger;
  10. import org.slf4j.LoggerFactory;
  11. import org.springframework.beans.factory.annotation.Autowired;
  12. import org.springframework.data.redis.core.RedisTemplate;
  13. import org.springframework.stereotype.Service;
  14. import java.io.File;
  15. import java.util.List;
  16. import java.util.concurrent.ExecutorService;
  17. import java.util.concurrent.Executors;
  18. import java.util.concurrent.ThreadPoolExecutor;
  19. import java.util.concurrent.atomic.AtomicLong;
  20. @Service
  21. public class TaskManagerServiceImpl implements TaskManagerService {
  22. @Autowired
  23. private TaskDAO taskDAO;
  24. @Autowired
  25. private RedisSubscribersTaskOptions redisSubscribersTaskOptions;
  26. @Autowired
  27. private DownloadService downloadService;
  28. @Autowired
  29. private RedisService redisService;
  30. @Autowired
  31. private RedisTemplate redisTemplate;
  32. @Autowired
  33. private RedisSubscribersThreads redisSubscribersThreads;
  34. private static final String KEY_MESSAGE_PARALLEL_DOWNLOAD = "DOWNLOAD_THREAD_QUEUE";
  35. private static final String KEY_CHUNK_HASHMAP = "CHUNK_HASHMAP";
  36. private static final Logger LOG= LoggerFactory.getLogger(TaskManagerServiceImpl.class);
  37. @Override
  38. public void updateDownloadStatus(ExecutorService executor, AtomicLong totalDownloadedBytes, long fileSize,
  39. TaskDO taskDO, long startTime, File outputFile,long chunkNum,long chunkSize,
  40. long[] chunkDownloadBytes,long[] chunkDownloadBytesShould,RateLimiter rateLimiter,
  41. long[] sp,long[] ep) {
  42. ExecutorService statusUpdater = Executors.newSingleThreadExecutor();
  43. ExecutorService newExecutor = Executors.newFixedThreadPool(taskDO.getThreads());
  44. String urlString = taskDO.getUrl();
  45. statusUpdater.submit(() -> {
  46. while (true) {
  47. long downloadedBytes = totalDownloadedBytes.get();
  48. long currentTime = System.currentTimeMillis();
  49. long elapsedTime = currentTime - startTime;
  50. double downloadSpeedMBps = (downloadedBytes / 1024.0 / 1024.0) / (elapsedTime / 1000.0);
  51. int remainingTimeSeconds = (int) (((fileSize - downloadedBytes) / 1024.0 / 1024.0) / downloadSpeedMBps);
  52. int progressPercent = (int) ((downloadedBytes / (fileSize * 1.0)) * 100);
  53. ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
  54. threadPoolExecutor.setMaximumPoolSize(10);
  55. Integer threadNum = redisSubscribersThreads.getThreadsForId(taskDO.getId());
  56. if (threadNum != null) {
  57. threadPoolExecutor.setCorePoolSize(threadNum); // Dynamically resize thread pools
  58. }
  59. String msg = "id = " + taskDO.getId() + ",剩余时间 = " + remainingTimeSeconds + "s," +
  60. "下载速度 = " + String.format("%.2f",downloadSpeedMBps) + "MB/s,下载进度 = " + progressPercent;
  61. LOG.info(msg);
  62. // Download speed is converted to a string
  63. String downloadSpeedStr = String.format("%.2f MB/s", downloadSpeedMBps);
  64. // Update database and synchronize sse
  65. int isUpdate = taskDAO.updateDownloadProgress(taskDO.getId(), downloadSpeedStr, remainingTimeSeconds,
  66. progressPercent);
  67. /*
  68. Delete: The update fails, that is, isUpdate == 0 is triggered
  69. step1: Close all download threads
  70. step2: Delete the local file
  71. */
  72. if (isUpdate == 0) {
  73. LOG.info("正在执行删除任务操作");
  74. pauseDownload(executor);
  75. deleteFile(outputFile);
  76. deleteRedisInfo(taskDO);
  77. statusUpdater.shutdown();
  78. break;
  79. }
  80. // Get task status
  81. String status = redisSubscribersTaskOptions.getOptionsForId(taskDO.getId());
  82. /**
  83. * PAUSE download: Triggered when the task obtained from redis is in Pause state
  84. * step1: Close all download threads
  85. * step2: Update the scoreboard
  86. */
  87. if (status != null && status.equals("Pause")) {
  88. LOG.info("正在执行暂停下载操作");
  89. pauseDownload(executor);
  90. status = null;
  91. // Do not exit the loop until the task state changes
  92. while (status == null) {
  93. try {
  94. Thread.sleep(500);
  95. status = redisSubscribersTaskOptions.getOptionsForId(taskDO.getId());
  96. } catch (InterruptedException | NullPointerException e) {
  97. throw new RuntimeException(e);
  98. }
  99. LOG.info("暂停下载任务,线程阻塞中");
  100. }
  101. // After a task is suspended, you can delete it, resume downloading it, and download it again
  102. if (status.equals("Delete")) {
  103. LOG.info("正在执行删除任务操作");
  104. deleteFile(outputFile);
  105. deleteRedisInfo(taskDO);
  106. statusUpdater.shutdown();
  107. break;
  108. }else if (status.equals("Resume")) {
  109. /*
  110. RESUME download: triggered when the status of the task obtained from redis is Resume
  111. Step p1: Obtain the undownloaded fragment Id from redis
  112. step2: Enumerate all ids in step1, and recalculate sp and ep.(sp = original sp + number of
  113. bytes downloaded before the current fragment is paused, ep remains unchanged)
  114. step3: Submit to the download thread pool and recurse the updateDownloadStatus method
  115. */
  116. LOG.info("正在执行恢复下载操作");
  117. // Obtain the id of the unfinished fragment
  118. List<Long> unDownloadedChunkId = redisService.getScoreboard(taskDO.getId());
  119. // Submit the unfinished shard to the thread pool
  120. for (int i = 0;i < unDownloadedChunkId.size(); i++) {
  121. // Get the index and ensure a one-to-one correspondence
  122. int index = Math.toIntExact(unDownloadedChunkId.get(i));
  123. long downloadedBytesOneChunk = chunkDownloadBytes[index];
  124. // Obtain the sp and ep of the remaining fragments
  125. long resumeSp = sp[index] + chunkDownloadBytes[index];
  126. long resumeEp = ep[index];
  127. // LOG.info("剩余的第" + index + "个分片正在下载" + resumeSp + "->" + resumeEp);
  128. newExecutor.submit(new DownloadServiceImpl.Download(urlString,outputFile.getPath(),resumeSp,
  129. resumeEp,totalDownloadedBytes,
  130. downloadedBytesOneChunk,index,rateLimiter,taskDO.getId(),redisService));
  131. }
  132. // Get the real-time status of the task
  133. updateDownloadStatus(newExecutor,totalDownloadedBytes,fileSize,taskDO,startTime,outputFile,
  134. chunkNum,chunkSize,chunkDownloadBytes,chunkDownloadBytesShould,rateLimiter,sp,ep);
  135. statusUpdater.shutdown();
  136. break;
  137. }else {
  138. refreshDownload(taskDO,executor,statusUpdater,outputFile);
  139. break;
  140. }
  141. }
  142. /**
  143. * Re-download: triggered when the task status obtained from redis is REFRESH
  144. * step1: Close all download threads
  145. * step2: Delete the local file
  146. * step3: Recursive download method
  147. */
  148. if (status != null && status.equals("Refresh")) {
  149. // Pause download
  150. SharedVariables.setIsPaused(true);
  151. executor.shutdownNow();
  152. while (!executor.isTerminated()) {
  153. try {
  154. Thread.sleep(200);
  155. } catch (InterruptedException e) {
  156. throw new RuntimeException(e);
  157. }
  158. }
  159. // reDownload
  160. refreshDownload(taskDO,executor,statusUpdater,outputFile);
  161. break;
  162. }
  163. // File download complete
  164. if (downloadedBytes == fileSize) {
  165. deleteRedisInfo(taskDO);
  166. LOG.info("文件下载完成. 文件保存为: " + outputFile.getAbsolutePath());
  167. totalDownloadedBytes.set(0);
  168. // Close the fragment download thread pool
  169. executor.shutdown();
  170. // Close the update status thread pool
  171. statusUpdater.shutdown();
  172. LOG.info("所有线程关闭");
  173. break;
  174. }
  175. try {
  176. Thread.sleep(500); // Update every 1 second
  177. } catch (InterruptedException e) {
  178. LOG.info(Thread.currentThread().getName()+"被中断了");
  179. }
  180. }
  181. });
  182. }
  183. // Delete redis information
  184. private void deleteRedisInfo(TaskDO taskDO) {
  185. // Delete task status
  186. if (redisTemplate.opsForHash().delete(RedisProducer.OPTIONS_KEY,taskDO.getId())!=0L){
  187. LOG.info(taskDO.getId()+"任务状态已删除");
  188. }
  189. // Delete thread instance
  190. if(redisTemplate.opsForHash().delete(KEY_MESSAGE_PARALLEL_DOWNLOAD, taskDO.getId())!=0L){
  191. LOG.info(taskDO.getId()+"并行线程已删除");
  192. }
  193. // Delete the task scoreboard
  194. if(redisTemplate.opsForHash().delete(KEY_CHUNK_HASHMAP,taskDO.getId())!=0L){
  195. LOG.info(taskDO.getId()+"任务记分牌已删除");
  196. }
  197. }
  198. // Pause download
  199. private void pauseDownload(ExecutorService executor) {
  200. SharedVariables.setIsPaused(true);
  201. // If downloading for the first time
  202. executor.shutdownNow();
  203. // Until the download thread closes
  204. while (!executor.isTerminated()) {
  205. try {
  206. Thread.sleep(200);
  207. } catch (InterruptedException e) {
  208. throw new RuntimeException(e);
  209. }
  210. }
  211. LOG.info("任务已暂停");
  212. // Reset is Pause
  213. SharedVariables.setIsPaused(false);
  214. }
  215. // reDownload
  216. private void refreshDownload(TaskDO taskDO,ExecutorService executor,ExecutorService statusUpdater,File outputFile) {
  217. LOG.info("正在执行重新下载操作");
  218. LOG.info("线程关闭中");
  219. if (executor.isTerminated()) {
  220. LOG.info("所有线程已关闭");
  221. deleteFile(outputFile);
  222. SharedVariables.setIsPaused(false);
  223. }
  224. downloadService.download(taskDO);
  225. statusUpdater.shutdown();
  226. }
  227. // Delete temporary files
  228. private void deleteFile(File outputFile) {
  229. if (outputFile.delete()) {
  230. LOG.info("文件已删除");
  231. }else {
  232. LOG.info("文件删除失败,请手动删除");
  233. }
  234. }
  235. }

        以上是对java实现分片下载、断点续传、限速等功能具体实现的说明,如果有不足的地方欢迎指正!

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/羊村懒王/article/detail/542211
推荐阅读
相关标签
  

闽ICP备14008679号