当前位置:   article > 正文

Go语言实现多协程文件下载器

Go语言实现多协程文件下载器

前言

你好,我是醉墨居士,最近在开发文件传输相关的项目,然后顺手写了一个多协程文件下载器,代码非常精简,核心代码只有100行左右,适合分享给大家学习使用

流程图

在这里插入图片描述

主函数

func main() {
	fileURL := flag.String("u", "", "downloade url of the file")
	flag.Parse()

	if *fileURL == "" {
		log.Println("Please input a download url")
		flag.Usage()
		return
	}

	fileDir, err := os.Getwd()
	if err != nil {
		log.Println(err)
		return
	}

	// 下载文件保存路径
	filePath := filepath.Join(fileDir, filepath.Base(*fileURL))

	err = downloadFile(*fileURL, filePath)
	if err != nil {
		log.Println(err)
		return
	}

	log.Println("download file success:", filePath)
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

下载文件

// 下载文件
func downloadFile(fileURL string, filePath string) error {
	log.Println("downloading file:", fileURL, "to", filePath)

	taskCh := make(chan [2]int64, runtime.NumCPU())
	wg := new(sync.WaitGroup)

	// 创建执行下载任务的 worker
	err := initWorker(fileURL, filePath, taskCh, wg)
	if err != nil {
		return fmt.Errorf("init worker failed: %v", err)
	}

	// 分发下载任务
	err = dispatchTask(fileURL, taskCh)
	if err != nil {
		return fmt.Errorf("dispacth task failed: %v", err)
	}

	// 等待所有下载任务完成
	wg.Wait()

	return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24

初始化分片下载worker

// 初始化 下载 worker
func initWorker(url string, filePath string, taskCh chan [2]int64, wg *sync.WaitGroup) error {
	for i := 0; i < runtime.NumCPU(); i++ {
		// 打开文件句柄
		file, err := os.OpenFile(filePath, os.O_CREATE|os.O_RDWR, 0644)
		if err != nil {
			return err
		}

		wg.Add(1)
		go func(file *os.File, taskCh chan [2]int64) {
			defer wg.Done()
			defer file.Close()

			// 循环从 taskCh 中获取下载任务并下载
			for part := range taskCh {
				log.Printf("downloading part, start offset: %d, end offset: %d", part[0], part[1])

				// 重试下载,最大重试次数为 10 次,每次下载失败后等待 1 秒
				err := retryWithWaitTime(10, func() error {
					return downloadPart(url, file, part[0], part[1])
				}, time.Second)
				if err != nil {
					log.Printf("download part %d failed: %v", part, err)
				}
			}
		}(file, taskCh)
	}

	return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31

分发下载任务

// 分发下载任务
func dispatchTask(url string, taskCh chan [2]int64) error {
	defer close(taskCh)

	fileSize, err := getFileSize(url)
	if err != nil {
		return err
	}
	

	// 分片大小 1MB
	const chunkSize = 1024 * 1024

	parts := fileSize / chunkSize

	log.Println("file size:", fileSize, "parts:", parts, "chunk size:", chunkSize)

	for i := int64(0); i < parts; i++ {
		// 计算分片的起始和结束位置
		startOffset := i * chunkSize
		endOffset := startOffset + chunkSize - 1

		// 发送下载任务
		taskCh <- [2]int64{startOffset, endOffset}
	}

	// 发送最后一个分片的下载任务
	if fileSize % chunkSize != 0 {
		taskCh <- [2]int64{parts * chunkSize, fileSize - 1}
	}

	return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

获取下载文件的大小

// 获取文件大小
func getFileSize(url string) (int64, error) {
	resp, err := http.Head(url)
	if err != nil {
		return 0, err
	}
	defer resp.Body.Close()

	return resp.ContentLength, nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

下载文件分片

// 下载文件分片
func downloadPart(url string, file *os.File, startPos, endPos int64) error {
	req, err := http.NewRequest("GET", url, nil)
	if err != nil {
		return err
	}

	// 设置文件分片区间的请求头
	req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", startPos, endPos))
	resp, err := http.DefaultTransport.RoundTrip(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	// 如果服务器返回的状态码不是 206 Partial Content,则说明下载失败
	if resp.StatusCode != http.StatusPartialContent {
		data, err := io.ReadAll(resp.Body)
		if err != nil {
			return err
		}
		log.Println("unexpected data:", string(data))
		return fmt.Errorf("unexpected status code: %d", resp.StatusCode)
	}

	// 文件指针移动到分片的起始位置
	_, err = file.Seek(startPos, 0)
	if err != nil {
		return err
	}

	// 写入分片数据到文件
	_, err = io.Copy(file, resp.Body)
	if err != nil {
		return err
	}

	return nil
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39

错误重试

// 重试函数
func retryWithWaitTime(retryCount int, fn func() error, waitTime time.Duration) error {
	var err error
	for i := 0; i < retryCount; i++ {
		e := fn()
		if e != nil {
			errors.Join(err, e)
			time.Sleep(waitTime)
			continue
		}

		return nil
	}

	return err
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

项目演示

在这里插入图片描述
在这里插入图片描述

最后

我是醉墨居士,如果这个项目对你有所帮助,希望你能多多支持,我们下期再见

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/知新_RL/article/detail/931588
推荐阅读
相关标签
  

闽ICP备14008679号