赞
踩
- import os
- import time
- import hashlib
- import re
- from concurrent.futures import ProcessPoolExecutor
- from functools import partial
-
- def generate_md5_for_file(file_path, block_size=4096):
- # Calculate the MD5 hash for a given file
- md5_hash = hashlib.md5()
- with open(file_path, "rb") as f:
- for byte_block in iter(partial(f.read, block_size), b""):
- md5_hash.update(byte_block)
- return file_path, md5_hash.hexdigest()
-
- def generate_md5_for_files_parallel(folder_path, block_size=4096):
- # Generate MD5 hashes for all files in a folder using parallel processing
- md5_dict = {}
- with ProcessPoolExecutor() as executor:
- # Get all file paths in the specified folder
- file_paths = [os.path.join(root, file) for root, _, files in os.walk(folder_path) for file in files]
- # Use parallel processing to calculate MD5 hashes for each file
- results = executor.map(partial(generate_md5_for_file, block_size=block_size), file_paths)
-
- # Update the dictionary with the calculated MD5 values
- md5_dict.update(results)
- return md5_dict
-
- def write_md5_to_file(md5_dict, output_file):
- # Write MD5 values and file paths to a text file
- with open(output_file, "w") as f:
- for file_path, md5_value in md5_dict.items():
- f.write(f"{md5_value} {file_path}\n")
-
- def check_duplicate_md5(file_path):
- # Check for duplicate MD5 values in a text file
- md5_dict = {}
- with open(file_path, "r") as f:
- for line in f:
- line = line.strip()
- if line:
- md5_value, file_path = line.split(" ", 1)
- if md5_value in md5_dict:
- # Print information about duplicate MD5 values
- print(f"Duplicate MD5 found: {md5_value}")
- print(f"Original file: {md5_dict[md5_value]}")
- print(f"Duplicate file: {file_path}\n")
- else:
- md5_dict[md5_value] = file_path
-
- def split_and_check_duplicate_part(filename, part_index, seen_parts):
- # Split a filename using "_" and check for duplicate parts
- parts = filename.split("_")
- if len(parts) == 4:
- selected_part = parts[part_index]
- if selected_part in seen_parts:
- # Print information about duplicate parts
- print(f'Duplicate part found at index {part_index}: {selected_part}')
- else:
- seen_parts.add(selected_part)
- else:
- # Print information if the filename does not have four parts
- print(f'File "{filename}" does not have four parts.')
-
- def process_folder(folder_path, part_index):
- # Process all filenames in a folder
- files = os.listdir(folder_path)
- seen_parts = set()
- for filename in files:
- # Call the split_and_check_duplicate_part function
- split_and_check_duplicate_part(filename, part_index, seen_parts)
-
- def find_max_execution_time(file_path):
- # Find the maximum execution time from a log file
- try:
- with open(file_path, 'r') as file:
- numbers = []
- pattern = re.compile(r'Program execution time: (\d+) microseconds')
- for line in file:
- match = pattern.search(line)
- if match:
- numbers.append(int(match.group(1)))
- if not numbers:
- raise ValueError("No execution time found in the file.")
- max_number = max(numbers)
- return max_number
- except FileNotFoundError:
- raise FileNotFoundError(f"Error: File '{file_path}' not found.")
- except Exception as e:
- raise Exception(f"An error occurred: {e}")
-
- if __name__ == "__main__":
- # Record the start time of the program
- start_time = time.time()
-
- # Set the folder path and log file path
- folder_path = r"D:/outputFile/bmp"
- file_path = r"D:/log.txt"
-
- try:
- # Try to find and print the maximum execution time
- max_execution_time = find_max_execution_time(file_path)
- print(f"The maximum execution time is: {max_execution_time} microseconds")
- except Exception as e:
- # Print an error message if an exception occurs
- print(e)
-
- # Set the index of the part to be compared
- selected_part_index = 1
-
- # Call the process_folder function to handle filenames
- process_folder(folder_path, selected_part_index)
-
- # Set the MD5 file path and block size
- MD5_file = "D:/md5sums.txt"
- block_size = 8192
-
- # Generate MD5 values for files in parallel and write them to a file
- md5_dict = generate_md5_for_files_parallel(folder_path, block_size=block_size)
- write_md5_to_file(md5_dict, MD5_file)
-
- # Print a message indicating successful MD5 generation
- print(f"MD5 values generated and saved to {MD5_file}")
-
- # Check for duplicate MD5 values in the generated file
- check_duplicate_md5(MD5_file)
-
- # Record the end time of the program
- end_time = time.time()
-
- # Calculate the total execution time in milliseconds
- execution_time = (end_time - start_time) * 1000
- print(f"Function execution time: {execution_time} milliseconds")
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。