赞
踩
项目功能说明:
项目特点:本项目需要定时获取每个下载任务的实时状态,当系统需要大量并发请求时,如果直接在数据库上执行轮询可能会导致数据库连接池耗尽或数据库性能下降。因此我们利用redis消息队列来缓存任务状态,避免了轮询数据库,可以显著提高响应速度。
技术栈:SpringBoot + Spring + Mybatis + Mysql + Redis + SSE
完整项目及代码见gitee:https://gitee.com/Liguangyu1017/HTTP-download-server.git (项目已上线)
HTTP下载本质是发起一个GET的HTTP请求,在发起的GET请求中设置Range字段,比如:Range:bytes = 0 - 499表示请求文件的前 500 个字节。
- // The shard size is determined by the file size
- private int determineChunkSize(long fileSize) {
- if (fileSize <= 10 * 1024 * 1024) {
- return urlConfig.getMinChunkSize();
- } else if (fileSize <= 100 * 1024 * 1024) {
- return urlConfig.getMidChunkSize();
- } else {
- return urlConfig.getMaxChunkSize();
- }
- }
- // Calculate the size of each shard
- long chunkSize = determineChunkSize(fileSize);
- // Number of shards = File size/shard size (rounded up)
- long chunkNum = (long)Math.ceil((double) fileSize / (double) chunkSize);
- // Submit the download task to the thread pool
- for (int i = 0; i < chunkNum; i++) {
- // Gets the number of bytes downloaded by the current shard
- long downloadedBytesOneChunk = chunkDownloadBytes[i];
- // sp indicates the start location of fragment download
- sp[i] = chunkSize * i;
- // ep indicates the end location of fragment download
- ep[i] = (i < chunkNum - 1) ? (sp[i] + chunkSize) - 1 : fileSize - 1;
- chunkDownloadBytesShould[i] = ep[i] - sp[i] + 1;
- // LOG.info("正在下载:" + sp[i] + " -> " + ep[i]);
- // Start the thread pool to perform the fragment download
- executor.submit(new Download(urlString, outputFile.getPath(), sp[i], ep[i],totalDownloadedBytesMap.get(taskDO.getId()),
- downloadedBytesOneChunk,i,rateLimiter,taskDO.getId(),redisService));
- }
一个下载任务对应一个redis记分牌(用于标记每个分片是否下载完成,true/false,初始化每个分片全为false),下载过程中会不断更新记分牌,每个分片下载完成后,这个分片就被标记为true,这样暂停下载任务后再次开启下载时就可以跳过已经下载好的分片,从而实现断点续传。
根据任务id初始化redis记分牌:
- @Override
- public void initializeScoreboard(String taskId,long chunkNum){
- Map<String,Boolean> scoreboard=new HashMap<>();
- for (long i = 0;i < chunkNum; i++){
- scoreboard.put(String.valueOf(i),false);
- }
- redisTemplate.opsForHash().put(KEY_CHUNK_HASHMAP,taskId,scoreboard);
- }
根据任务id更新redis记分牌:
- @Override
- public void updateScoreboard(String taskId, long chunkId) {
- Map<String,Boolean> scoreboard=(Map) redisTemplate.opsForHash().get(KEY_CHUNK_HASHMAP,taskId);
- scoreboard.put(String.valueOf(chunkId),true);
- redisTemplate.opsForHash().put(KEY_CHUNK_HASHMAP,taskId,scoreboard);
- }
每个分片下载完成之后,存储到分片的起始位置中,这个过程由于文件句柄只有一个,需要加锁(synchronized)后进行seek,保证存储位置的准确性。
- HttpURLConnection connection;
- // Establish a connection to the download address and set the download range
- connection = (HttpURLConnection) new URL(fileURL).openConnection();
- // Configure the range
- String byteRange = sp + "-" + ep;
- connection.setRequestProperty("Range", "bytes=" + byteRange);
- // Create an input stream
- BufferedInputStream in = new BufferedInputStream(connection.getInputStream());
- RandomAccessFile raf = new RandomAccessFile(new File(outputFile), "rw");
- synchronized (lock) {
- try {
- raf.seek(sp);
- }catch (IOException e) {
- LOG.error("outputFile定位开始下载位置时出现错误");
- e.printStackTrace();
- }
- }
- package cn.ykd.actualproject.service.impl;
-
- import cn.ykd.actualproject.config.UrlConfig;
- import cn.ykd.actualproject.dao.SettingsDAO;
- import cn.ykd.actualproject.dao.TaskDAO;
- import cn.ykd.actualproject.dataobject.TaskDO;
- import cn.ykd.actualproject.model.Settings;
- import cn.ykd.actualproject.service.RedisService;
- import cn.ykd.actualproject.service.TaskManagerService;
- import cn.ykd.actualproject.service.DownloadService;
-
- import cn.ykd.actualproject.utils.SharedVariables;
- import com.google.common.util.concurrent.RateLimiter;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.stereotype.Service;
-
- import java.io.*;
- import java.net.HttpURLConnection;
- import java.net.URL;
- import java.util.concurrent.ConcurrentHashMap;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.atomic.AtomicLong;
- import java.util.regex.Matcher;
- import java.util.regex.Pattern;
-
- @Service
- public class DownloadServiceImpl implements DownloadService {
- @Autowired
- private TaskManagerService taskManagerService;
- @Autowired
- private RedisService redisService;
- @Autowired
- private SettingsDAO settingsDAO;
- @Autowired
- private UrlConfig urlConfig;
- @Autowired
- private TaskDAO taskDAO;
- private static final Logger LOG= LoggerFactory.getLogger(DownloadServiceImpl.class);
- // Secure parallel downloads
- private static final ConcurrentHashMap<String,AtomicLong> totalDownloadedBytesMap = new ConcurrentHashMap<>();
- private static final Object lock = new Object(); // Synchronous lock
- private static long[] chunkDownloadBytes; // Used to record the number of bytes downloaded per shard
- private static long[] chunkDownloadBytesShould; // Used to record the number of bytes that each shard should download
-
- @Override
- public boolean download(TaskDO taskDO) {
- // Obtain and process raw data
- int threadNum = taskDO.getThreads();
- String urlString = taskDO.getUrl(); // Get the download address
- Settings settings = settingsDAO.get().convertToModel(); // Obtain the setting information
- double maxDownloadSpeedMBps = settings.getMaxDownloadSpeed(); // Get the maximum download speed
- if (maxDownloadSpeedMBps<=0){
- maxDownloadSpeedMBps=100;
- }
- String savePath = settings.getDownloadPath(); // Obtain the download save path
- String fileName = taskDO.getName(); // Extract the file name from the download address
- RateLimiter rateLimiter = RateLimiter.create(maxDownloadSpeedMBps*1024*1000); // One token corresponds to one byte of data
- AtomicLong totalDownloadedBytes = new AtomicLong(0); // Used to record the number of bytes downloaded by all threads
- totalDownloadedBytesMap.put(taskDO.getId(),totalDownloadedBytes);
- try {
- // Establish a connection to the download address and get the file size
- URL url = new URL(urlString);
- HttpURLConnection connection = (HttpURLConnection) url.openConnection();
- // Get the binary data length of the file
- long fileSize = connection.getContentLength();
- // Calculate file size (Mb)
- double size = (double)fileSize/1024/1024;
- taskDO.setPath(settings.getDownloadPath());
- taskDO.setSize(String.format("%.2f",size) + " MB");
- taskDO.setStatus("Downloading");
- taskDAO.update(taskDO);
- // Create an output stream of the downloaded file
- File outputFile = new File(savePath, fileName);
- // Create a thread pool for multithreaded sharding downloads
- ExecutorService executor = Executors.newFixedThreadPool(threadNum);
- // Calculate the size of each shard
- long chunkSize = determineChunkSize(fileSize);
- // Number of shards = File size/shard size (rounded up)
- long chunkNum = (long)Math.ceil((double) fileSize / (double) chunkSize);
- String msg = "fileSize = " + fileSize + ",chunkSize = " + chunkSize + ",chunkNum = " + chunkNum;
- LOG.info(msg);
- // Record the download start time
- long startTime = System.currentTimeMillis();
- // Record the downloads of each shard
- chunkDownloadBytes = new long[(int) chunkNum];
- chunkDownloadBytesShould = new long[(int) chunkNum];
- // Initialize the state of each shard (True: shards are downloaded False: shards are not downloaded)
- redisService.initializeScoreboard(taskDO.getId(),chunkNum);
- // Initializes the sp and ep
- long[] sp = new long[(int) chunkNum];
- long[] ep = new long[(int) chunkNum];
- // Submit the download task to the thread pool
- for (int i = 0; i < chunkNum; i++) {
- // Gets the number of bytes downloaded by the current shard
- long downloadedBytesOneChunk = chunkDownloadBytes[i];
- // sp indicates the start location of fragment download
- sp[i] = chunkSize * i;
- // ep indicates the end location of fragment download
- ep[i] = (i < chunkNum - 1) ? (sp[i] + chunkSize) - 1 : fileSize - 1;
- chunkDownloadBytesShould[i] = ep[i] - sp[i] + 1;
- // LOG.info("正在下载:" + sp[i] + " -> " + ep[i]);
- // Start the thread pool to perform the fragment download
- executor.submit(new Download(urlString, outputFile.getPath(), sp[i], ep[i],totalDownloadedBytesMap.get(taskDO.getId()),
- downloadedBytesOneChunk,i,rateLimiter,taskDO.getId(),redisService));
- }
- // Call task management module to synchronize database and sse in real time
- taskManagerService.updateDownloadStatus(executor,totalDownloadedBytesMap.get(taskDO.getId()),fileSize,
- taskDO,startTime,outputFile,
- chunkNum,chunkSize,chunkDownloadBytes,chunkDownloadBytesShould,rateLimiter,sp,ep);
- return true;
- }catch (Exception e){
- e.printStackTrace();
- return false;
- }
-
-
- }
-
- // Inner classes are used to implement the download function
- static class Download implements Runnable {
- private String fileURL; // File url
- private String outputFile; // Output stream file
- private long sp; // Download start location
- private long ep; // Download end location
- private AtomicLong totalDownloadedBytes; // Records the number of bytes downloaded
- private long downloadedBytes; // Records the number of bytes actually downloaded by the current fragment
- private int chunkId; // Fragment id
- private RateLimiter rateLimiter; // Governor
- private String taskId; // Task Id
- private RedisService redisService; // redis service
-
- public Download(String fileURL, String outputFile, long sp,long ep, AtomicLong totalDownloadedBytes,
- long downloadedBytes,int chunkId,RateLimiter rateLimiter,String taskId,RedisService redisService) {
- this.fileURL = fileURL;
- this.outputFile = outputFile;
- this.sp = sp;
- this.ep = ep;
- this.totalDownloadedBytes = totalDownloadedBytes;
- this.downloadedBytes = downloadedBytes;
- this.chunkId = chunkId;
- this.rateLimiter = rateLimiter;
- this.taskId= taskId;
- this.redisService = redisService;
- }
-
- @Override
- public void run() {
- try {
- HttpURLConnection connection;
- // Establish a connection to the download address and set the download range
- connection = (HttpURLConnection) new URL(fileURL).openConnection();
- // Configure the range
- String byteRange = sp + "-" + ep;
- connection.setRequestProperty("Range", "bytes=" + byteRange);
- // Create an input stream
- BufferedInputStream in = new BufferedInputStream(connection.getInputStream());
- // Create an output stream
- RandomAccessFile raf = new RandomAccessFile(new File(outputFile), "rw");
- synchronized (lock) {
- try {
- raf.seek(sp);
- }catch (IOException e) {
- LOG.error("outputFile定位开始下载位置时出现错误");
- e.printStackTrace();
- }
- }
- // Number of bytes actually read
- int bytesRead;
- // Read 1024 bytes of cache
- byte[] buffer = new byte[1024];
- // Start writing to the file
- while ((bytesRead = in.read(buffer)) != -1) {
- synchronized (lock) {
- // Check whether the task is suspended
- if (SharedVariables.getIsPaused()) {
- raf.close();
- in.close();
- break;
- }
- rateLimiter.acquire(bytesRead);
- raf.write(buffer, 0, bytesRead);
- downloadedBytes += bytesRead;
- chunkDownloadBytes[chunkId] = downloadedBytes;
- // Update the scoreboard if the current shard is downloaded
- if (chunkDownloadBytes[chunkId] == chunkDownloadBytesShould[chunkId]) {
- redisService.updateScoreboard(taskId, chunkId);
- }
- totalDownloadedBytes.addAndGet(bytesRead);// Updates the number of downloaded bytes stored in the atomic class
- }
- }
-
- raf.close();
- in.close();
- connection.disconnect();
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
-
-
- // The shard size is determined by the file size
- private int determineChunkSize(long fileSize) {
- if (fileSize <= 10 * 1024 * 1024) {
- return urlConfig.getMinChunkSize();
- } else if (fileSize <= 100 * 1024 * 1024) {
- return urlConfig.getMidChunkSize();
- } else {
- return urlConfig.getMaxChunkSize();
- }
- }
- }
断点续传是一种网络传输技术,允许在文件下载或上传过程中,当传输中断或失败时,能够从断点处恢复传输,而不是重新开始传输整个文件。
上文中已经提到了redis记分牌,每个任务所对应的每个分片都存储在redis中,在下载过程中不断更新记分牌,当一个分片下载完成时,将其标记为true。
当一个下载任务被终止,此时redis已经记录了该任务每个分片的下载情况,如果要恢复下载,只需读取未下载完的分片(false),确定范围(start_position和end_position),重新提交给线程池执行下载即可。
根据任务id获取记分牌:
- @Override
- public List<Long> getScoreboard(String taskId){
- List<Long> chunkIds=new ArrayList<>();
- Map<String,Boolean> scoreboard=(Map) redisTemplate.opsForHash().get(KEY_CHUNK_HASHMAP,taskId);
- scoreboard.forEach((key,value)->{
- if (!value){
- chunkIds.add(Long.valueOf(key));
- }
- });
- return chunkIds;
- }
确定范围,然后提交重新提交给线程池下载:
- LOG.info("正在执行恢复下载操作");
- // Obtain the id of the unfinished fragment
- List<Long> unDownloadedChunkId = redisService.getScoreboard(taskDO.getId());
- // Submit the unfinished shard to the thread pool
- for (int i = 0;i < unDownloadedChunkId.size(); i++) {
- // Get the index and ensure a one-to-one correspondence
- int index = Math.toIntExact(unDownloadedChunkId.get(i));
- long downloadedBytesOneChunk = chunkDownloadBytes[index];
- // Obtain the sp and ep of the remaining fragments
- long resumeSp = sp[index] + chunkDownloadBytes[index];
- long resumeEp = ep[index];
- // LOG.info("剩余的第" + index + "个分片正在下载" + resumeSp + "->" + resumeEp);
- newExecutor.submit(new DownloadServiceImpl.Download(urlString,outputFile.getPath(),resumeSp,
- resumeEp,totalDownloadedBytes,
- downloadedBytesOneChunk,index,rateLimiter,taskDO.getId(),redisService));
本项目通过使用RateLimiter工具实现限速下载功能。
RateLimiter基于令牌桶算法(Token Bucket)实现。它维护了一个稳定的速率(rate),以及一个令牌桶,令牌桶中的令牌数量代表了当前可用的请求次数。当有请求到来时,RateLimiter会根据当前的令牌桶状态来决定是否立即执行请求,或者需要等待一段时间后再执行。
<!-- Guava限流 --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency>
RateLimiter rateLimiter = RateLimiter.create(maxDownloadSpeedMBps*1024*1000); // One token corresponds to one byte of data
- // Number of bytes actually read
- int bytesRead;
- // Read 1024 bytes of cache
- byte[] buffer = new byte[1024];
- // Start writing to the file
- while ((bytesRead = in.read(buffer)) != -1) {
- synchronized (lock) {
- // Check whether the task is suspended
- if (SharedVariables.getIsPaused()) {
- raf.close();
- in.close();
- break;
- }
- rateLimiter.acquire(bytesRead);
- raf.write(buffer, 0, bytesRead);
- downloadedBytes += bytesRead;
- chunkDownloadBytes[chunkId] = downloadedBytes;
- // Update the scoreboard if the current shard is downloaded
- if (chunkDownloadBytes[chunkId] == chunkDownloadBytesShould[chunkId]) {
- redisService.updateScoreboard(taskId, chunkId);
- }
- totalDownloadedBytes.addAndGet(bytesRead);// Updates the number of downloaded bytes stored in the atomic class
- }
- }
可以创建一个单线程用于定时(500ms)获取任务的信息,并同时将数据传给数据库和前端。
这里特别讲一下暂停操作:
当后端接受到pause参数时,首先会关闭文件的输入流和输出流,即:
if (SharedVariables.getIsPaused()) { raf.close(); in.close(); break; }然后会关闭下载线程池:
// Pause download private void pauseDownload(ExecutorService executor) { SharedVariables.setIsPaused(true); // If downloading for the first time executor.shutdownNow(); // Until the download thread closes while (!executor.isTerminated()) { try { Thread.sleep(200); } catch (InterruptedException e) { throw new RuntimeException(e); } } LOG.info("任务已暂停"); // Reset is Pause SharedVariables.setIsPaused(false); }接下来会一直阻塞更新任务状态的线程,直到有新的任务状态(删除、重新下载、恢复下载)被获取到:
while (status == null) { try { Thread.sleep(500); status = redisSubscribersTaskOptions.getOptionsForId(taskDO.getId()); } catch (InterruptedException | NullPointerException e) { throw new RuntimeException(e); } LOG.info("暂停下载任务,线程阻塞中"); }
- package cn.ykd.actualproject.service.impl;
-
- import cn.ykd.actualproject.dao.TaskDAO;
- import cn.ykd.actualproject.dataobject.TaskDO;
- import cn.ykd.actualproject.service.DownloadService;
- import cn.ykd.actualproject.service.RedisService;
- import cn.ykd.actualproject.service.TaskManagerService;
- import cn.ykd.actualproject.utils.SharedVariables;
- import com.google.common.util.concurrent.RateLimiter;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.data.redis.core.RedisTemplate;
- import org.springframework.stereotype.Service;
-
- import java.io.File;
- import java.util.List;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.ThreadPoolExecutor;
- import java.util.concurrent.atomic.AtomicLong;
-
- @Service
- public class TaskManagerServiceImpl implements TaskManagerService {
- @Autowired
- private TaskDAO taskDAO;
- @Autowired
- private RedisSubscribersTaskOptions redisSubscribersTaskOptions;
- @Autowired
- private DownloadService downloadService;
- @Autowired
- private RedisService redisService;
- @Autowired
- private RedisTemplate redisTemplate;
- @Autowired
- private RedisSubscribersThreads redisSubscribersThreads;
- private static final String KEY_MESSAGE_PARALLEL_DOWNLOAD = "DOWNLOAD_THREAD_QUEUE";
- private static final String KEY_CHUNK_HASHMAP = "CHUNK_HASHMAP";
- private static final Logger LOG= LoggerFactory.getLogger(TaskManagerServiceImpl.class);
- @Override
- public void updateDownloadStatus(ExecutorService executor, AtomicLong totalDownloadedBytes, long fileSize,
- TaskDO taskDO, long startTime, File outputFile,long chunkNum,long chunkSize,
- long[] chunkDownloadBytes,long[] chunkDownloadBytesShould,RateLimiter rateLimiter,
- long[] sp,long[] ep) {
-
- ExecutorService statusUpdater = Executors.newSingleThreadExecutor();
- ExecutorService newExecutor = Executors.newFixedThreadPool(taskDO.getThreads());
- String urlString = taskDO.getUrl();
- statusUpdater.submit(() -> {
- while (true) {
- long downloadedBytes = totalDownloadedBytes.get();
- long currentTime = System.currentTimeMillis();
- long elapsedTime = currentTime - startTime;
- double downloadSpeedMBps = (downloadedBytes / 1024.0 / 1024.0) / (elapsedTime / 1000.0);
- int remainingTimeSeconds = (int) (((fileSize - downloadedBytes) / 1024.0 / 1024.0) / downloadSpeedMBps);
- int progressPercent = (int) ((downloadedBytes / (fileSize * 1.0)) * 100);
- ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executor;
- threadPoolExecutor.setMaximumPoolSize(10);
- Integer threadNum = redisSubscribersThreads.getThreadsForId(taskDO.getId());
- if (threadNum != null) {
- threadPoolExecutor.setCorePoolSize(threadNum); // Dynamically resize thread pools
- }
-
- String msg = "id = " + taskDO.getId() + ",剩余时间 = " + remainingTimeSeconds + "s," +
- "下载速度 = " + String.format("%.2f",downloadSpeedMBps) + "MB/s,下载进度 = " + progressPercent;
- LOG.info(msg);
- // Download speed is converted to a string
- String downloadSpeedStr = String.format("%.2f MB/s", downloadSpeedMBps);
- // Update database and synchronize sse
- int isUpdate = taskDAO.updateDownloadProgress(taskDO.getId(), downloadSpeedStr, remainingTimeSeconds,
- progressPercent);
- /*
- Delete: The update fails, that is, isUpdate == 0 is triggered
- step1: Close all download threads
- step2: Delete the local file
- */
- if (isUpdate == 0) {
- LOG.info("正在执行删除任务操作");
- pauseDownload(executor);
- deleteFile(outputFile);
- deleteRedisInfo(taskDO);
- statusUpdater.shutdown();
- break;
- }
- // Get task status
- String status = redisSubscribersTaskOptions.getOptionsForId(taskDO.getId());
-
- /**
- * PAUSE download: Triggered when the task obtained from redis is in Pause state
- * step1: Close all download threads
- * step2: Update the scoreboard
- */
- if (status != null && status.equals("Pause")) {
- LOG.info("正在执行暂停下载操作");
- pauseDownload(executor);
- status = null;
- // Do not exit the loop until the task state changes
- while (status == null) {
- try {
- Thread.sleep(500);
- status = redisSubscribersTaskOptions.getOptionsForId(taskDO.getId());
- } catch (InterruptedException | NullPointerException e) {
- throw new RuntimeException(e);
- }
- LOG.info("暂停下载任务,线程阻塞中");
- }
-
- // After a task is suspended, you can delete it, resume downloading it, and download it again
- if (status.equals("Delete")) {
- LOG.info("正在执行删除任务操作");
- deleteFile(outputFile);
- deleteRedisInfo(taskDO);
- statusUpdater.shutdown();
- break;
- }else if (status.equals("Resume")) {
- /*
- RESUME download: triggered when the status of the task obtained from redis is Resume
- Step p1: Obtain the undownloaded fragment Id from redis
- step2: Enumerate all ids in step1, and recalculate sp and ep.(sp = original sp + number of
- bytes downloaded before the current fragment is paused, ep remains unchanged)
- step3: Submit to the download thread pool and recurse the updateDownloadStatus method
- */
- LOG.info("正在执行恢复下载操作");
- // Obtain the id of the unfinished fragment
- List<Long> unDownloadedChunkId = redisService.getScoreboard(taskDO.getId());
- // Submit the unfinished shard to the thread pool
- for (int i = 0;i < unDownloadedChunkId.size(); i++) {
- // Get the index and ensure a one-to-one correspondence
- int index = Math.toIntExact(unDownloadedChunkId.get(i));
- long downloadedBytesOneChunk = chunkDownloadBytes[index];
- // Obtain the sp and ep of the remaining fragments
- long resumeSp = sp[index] + chunkDownloadBytes[index];
- long resumeEp = ep[index];
- // LOG.info("剩余的第" + index + "个分片正在下载" + resumeSp + "->" + resumeEp);
- newExecutor.submit(new DownloadServiceImpl.Download(urlString,outputFile.getPath(),resumeSp,
- resumeEp,totalDownloadedBytes,
- downloadedBytesOneChunk,index,rateLimiter,taskDO.getId(),redisService));
- }
- // Get the real-time status of the task
- updateDownloadStatus(newExecutor,totalDownloadedBytes,fileSize,taskDO,startTime,outputFile,
- chunkNum,chunkSize,chunkDownloadBytes,chunkDownloadBytesShould,rateLimiter,sp,ep);
- statusUpdater.shutdown();
- break;
- }else {
- refreshDownload(taskDO,executor,statusUpdater,outputFile);
- break;
- }
- }
-
-
- /**
- * Re-download: triggered when the task status obtained from redis is REFRESH
- * step1: Close all download threads
- * step2: Delete the local file
- * step3: Recursive download method
- */
- if (status != null && status.equals("Refresh")) {
- // Pause download
- SharedVariables.setIsPaused(true);
- executor.shutdownNow();
- while (!executor.isTerminated()) {
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- // reDownload
- refreshDownload(taskDO,executor,statusUpdater,outputFile);
- break;
- }
-
- // File download complete
- if (downloadedBytes == fileSize) {
- deleteRedisInfo(taskDO);
- LOG.info("文件下载完成. 文件保存为: " + outputFile.getAbsolutePath());
- totalDownloadedBytes.set(0);
- // Close the fragment download thread pool
- executor.shutdown();
- // Close the update status thread pool
- statusUpdater.shutdown();
- LOG.info("所有线程关闭");
- break;
- }
- try {
- Thread.sleep(500); // Update every 1 second
- } catch (InterruptedException e) {
- LOG.info(Thread.currentThread().getName()+"被中断了");
- }
-
- }
- });
-
- }
-
-
-
- // Delete redis information
- private void deleteRedisInfo(TaskDO taskDO) {
- // Delete task status
- if (redisTemplate.opsForHash().delete(RedisProducer.OPTIONS_KEY,taskDO.getId())!=0L){
- LOG.info(taskDO.getId()+"任务状态已删除");
- }
- // Delete thread instance
- if(redisTemplate.opsForHash().delete(KEY_MESSAGE_PARALLEL_DOWNLOAD, taskDO.getId())!=0L){
- LOG.info(taskDO.getId()+"并行线程已删除");
- }
- // Delete the task scoreboard
- if(redisTemplate.opsForHash().delete(KEY_CHUNK_HASHMAP,taskDO.getId())!=0L){
- LOG.info(taskDO.getId()+"任务记分牌已删除");
- }
- }
- // Pause download
- private void pauseDownload(ExecutorService executor) {
- SharedVariables.setIsPaused(true);
- // If downloading for the first time
- executor.shutdownNow();
- // Until the download thread closes
- while (!executor.isTerminated()) {
- try {
- Thread.sleep(200);
- } catch (InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
- LOG.info("任务已暂停");
- // Reset is Pause
- SharedVariables.setIsPaused(false);
- }
- // reDownload
- private void refreshDownload(TaskDO taskDO,ExecutorService executor,ExecutorService statusUpdater,File outputFile) {
- LOG.info("正在执行重新下载操作");
- LOG.info("线程关闭中");
- if (executor.isTerminated()) {
- LOG.info("所有线程已关闭");
- deleteFile(outputFile);
- SharedVariables.setIsPaused(false);
- }
- downloadService.download(taskDO);
- statusUpdater.shutdown();
- }
- // Delete temporary files
- private void deleteFile(File outputFile) {
- if (outputFile.delete()) {
- LOG.info("文件已删除");
- }else {
- LOG.info("文件删除失败,请手动删除");
- }
- }
- }
以上是对java实现分片下载、断点续传、限速等功能具体实现的说明,如果有不足的地方欢迎指正!
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。