当前位置:   article > 正文

使用python对文件进行MD5校验, 比对文件重复_python 对byte进行 md5 去重

python 对byte进行 md5 去重
  1. import os
  2. import time
  3. import hashlib
  4. import re
  5. from concurrent.futures import ProcessPoolExecutor
  6. from functools import partial
  7. def generate_md5_for_file(file_path, block_size=4096):
  8. # Calculate the MD5 hash for a given file
  9. md5_hash = hashlib.md5()
  10. with open(file_path, "rb") as f:
  11. for byte_block in iter(partial(f.read, block_size), b""):
  12. md5_hash.update(byte_block)
  13. return file_path, md5_hash.hexdigest()
  14. def generate_md5_for_files_parallel(folder_path, block_size=4096):
  15. # Generate MD5 hashes for all files in a folder using parallel processing
  16. md5_dict = {}
  17. with ProcessPoolExecutor() as executor:
  18. # Get all file paths in the specified folder
  19. file_paths = [os.path.join(root, file) for root, _, files in os.walk(folder_path) for file in files]
  20. # Use parallel processing to calculate MD5 hashes for each file
  21. results = executor.map(partial(generate_md5_for_file, block_size=block_size), file_paths)
  22. # Update the dictionary with the calculated MD5 values
  23. md5_dict.update(results)
  24. return md5_dict
  25. def write_md5_to_file(md5_dict, output_file):
  26. # Write MD5 values and file paths to a text file
  27. with open(output_file, "w") as f:
  28. for file_path, md5_value in md5_dict.items():
  29. f.write(f"{md5_value} {file_path}\n")
  30. def check_duplicate_md5(file_path):
  31. # Check for duplicate MD5 values in a text file
  32. md5_dict = {}
  33. with open(file_path, "r") as f:
  34. for line in f:
  35. line = line.strip()
  36. if line:
  37. md5_value, file_path = line.split(" ", 1)
  38. if md5_value in md5_dict:
  39. # Print information about duplicate MD5 values
  40. print(f"Duplicate MD5 found: {md5_value}")
  41. print(f"Original file: {md5_dict[md5_value]}")
  42. print(f"Duplicate file: {file_path}\n")
  43. else:
  44. md5_dict[md5_value] = file_path
  45. def split_and_check_duplicate_part(filename, part_index, seen_parts):
  46. # Split a filename using "_" and check for duplicate parts
  47. parts = filename.split("_")
  48. if len(parts) == 4:
  49. selected_part = parts[part_index]
  50. if selected_part in seen_parts:
  51. # Print information about duplicate parts
  52. print(f'Duplicate part found at index {part_index}: {selected_part}')
  53. else:
  54. seen_parts.add(selected_part)
  55. else:
  56. # Print information if the filename does not have four parts
  57. print(f'File "{filename}" does not have four parts.')
  58. def process_folder(folder_path, part_index):
  59. # Process all filenames in a folder
  60. files = os.listdir(folder_path)
  61. seen_parts = set()
  62. for filename in files:
  63. # Call the split_and_check_duplicate_part function
  64. split_and_check_duplicate_part(filename, part_index, seen_parts)
  65. def find_max_execution_time(file_path):
  66. # Find the maximum execution time from a log file
  67. try:
  68. with open(file_path, 'r') as file:
  69. numbers = []
  70. pattern = re.compile(r'Program execution time: (\d+) microseconds')
  71. for line in file:
  72. match = pattern.search(line)
  73. if match:
  74. numbers.append(int(match.group(1)))
  75. if not numbers:
  76. raise ValueError("No execution time found in the file.")
  77. max_number = max(numbers)
  78. return max_number
  79. except FileNotFoundError:
  80. raise FileNotFoundError(f"Error: File '{file_path}' not found.")
  81. except Exception as e:
  82. raise Exception(f"An error occurred: {e}")
  83. if __name__ == "__main__":
  84. # Record the start time of the program
  85. start_time = time.time()
  86. # Set the folder path and log file path
  87. folder_path = r"D:/outputFile/bmp"
  88. file_path = r"D:/log.txt"
  89. try:
  90. # Try to find and print the maximum execution time
  91. max_execution_time = find_max_execution_time(file_path)
  92. print(f"The maximum execution time is: {max_execution_time} microseconds")
  93. except Exception as e:
  94. # Print an error message if an exception occurs
  95. print(e)
  96. # Set the index of the part to be compared
  97. selected_part_index = 1
  98. # Call the process_folder function to handle filenames
  99. process_folder(folder_path, selected_part_index)
  100. # Set the MD5 file path and block size
  101. MD5_file = "D:/md5sums.txt"
  102. block_size = 8192
  103. # Generate MD5 values for files in parallel and write them to a file
  104. md5_dict = generate_md5_for_files_parallel(folder_path, block_size=block_size)
  105. write_md5_to_file(md5_dict, MD5_file)
  106. # Print a message indicating successful MD5 generation
  107. print(f"MD5 values generated and saved to {MD5_file}")
  108. # Check for duplicate MD5 values in the generated file
  109. check_duplicate_md5(MD5_file)
  110. # Record the end time of the program
  111. end_time = time.time()
  112. # Calculate the total execution time in milliseconds
  113. execution_time = (end_time - start_time) * 1000
  114. print(f"Function execution time: {execution_time} milliseconds")

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/340172
推荐阅读
相关标签
  

闽ICP备14008679号