赞
踩
用lama-cleaner进行批量去水印,去除视频水印。
lama-cleaner移除图片水印大概流程:先在本地创建一个服务,然后通过浏览器标注需要处理的水印,然后发送请求,本地进行处理,并返回。因此,视频去水印的原理就是先提取视频的每一帧,然后依次发送给lama-cleaner进行处理,最终再合成一个视频。(个人理解)
创作灵感来自:
https://github.com/david419kr/video-watermark-removal-script-for-lama-cleaner
pip install lama-cleaner
lama-cleaner --model=lama --device=cpu --port=8080
file_type = is_video_or_image(self.file_path)
- def get_mask_img(self):
- cap = cv2.VideoCapture(self.file_path)
- # 设置视频帧指针到第100帧
- cap.set(cv2.CAP_PROP_POS_FRAMES, 99) # 注意:帧计数从0开始,所以第100帧是第99索引
- success, frame = cap.read() # 读取当前帧(第100帧)
- if success: # 确保成功读取帧
- cv2.imwrite('mask_frame.png', frame)
- else:
- print("Failed to retrieve frame.")
- cap.release()
- input('mask修改好了,请回车')
- self.mask = open('mask_frame.png', "rb").read()
原图
处理后的
- def video_to_frames(self):
- # 确保存放帧的目录存在
- if not os.path.exists(self.frames_path):
- os.makedirs(self.frames_path)
- else:
- recreate_folder(self.frames_path)
- if not os.path.exists(self.frames_out_folder):
- os.makedirs(self.frames_out_folder)
- # 使用OpenCV读取视频
- vidcap = cv2.VideoCapture(self.file_path)
- # 获取视频的总帧数
- self.fps = int(vidcap.get(cv2.CAP_PROP_FPS))
- success, image = vidcap.read()
- count = 0
- while success:
- # 保存当前帧图片
- frame_path = os.path.join(self.frames_path, f"{str(count).zfill(8)}.jpg")
- #cv2.imwrite(frame_path, image)不能保存中文路径
- cv2.imencode('.jpg', image)[1].tofile(frame_path) # 存储成功
- success, image = vidcap.read() # 读取下一帧
- count += 1
- print(f"Finished! Extracted {count} frames.")
- def extract_audio_from_video(self):
- """
- 从视频文件中提取音频并保存。
- 参数:
- file_path (str): 视频文件的路径。
- audio_path (str): 要保存的音频文件的路径。
- """
- if not os.path.exists(self.audio_path):
- os.makedirs(self.audio_path)
- video_clip = VideoFileClip(self.file_path)
- audio_clip = video_clip.audio
-
- audio_clip.write_audiofile(os.path.join(self.audio_path, 'out.mp3'))
- audio_clip.close()
- video_clip.close()
- for i in tqdm(os.listdir(self.frames_path), desc="Processing frames"):
- self.removeWatermark(img_path=os.path.join(self.frames_path, i), mask=self.mask, image_name=i)
- def removeWatermark(self, img_path, mask, image_name):
- url = "http://127.0.0.1:8080/inpaint"
- # print(img_path)
- image = open(img_path, "rb").read()
- response = requests.post(
- url,
- files={"image": image, "mask": mask},
- data={
- "ldmSteps": 25,
- "ldmSampler": "plms",
- "hdStrategy": "Crop",
- "zitsWireframe": True,
- "hdStrategyCropMargin": 196,
- "hdStrategyCropTrigerSize": 800,
- "hdStrategyResizeLimit": 2048,
- "prompt": "",
- "negativePrompt": "",
- "useCroper": False,
- "croperX": 64,
- "croperY": -16,
- "croperHeight": 512,
- "croperWidth": 512,
- "sdScale": 1.0,
- "sdMaskBlur": 5,
- "sdStrength": 0.75,
- "sdSteps": 50,
- "sdGuidanceScale": 7.5,
- "sdSampler": "uni_pc",
- "sdSeed": -1,
- "sdMatchHistograms": False,
- "cv2Flag": "INPAINT_NS",
- "cv2Radius": 5,
- "paintByExampleSteps": 50,
- "paintByExampleGuidanceScale": 7.5,
- "paintByExampleMaskBlur": 5,
- "paintByExampleSeed": -1,
- "paintByExampleMatchHistograms": False,
- "paintByExampleExampleImage": None,
- "p2pSteps": 50,
- "p2pImageGuidanceScale": 1.5,
- "p2pGuidanceScale": 7.5,
- "controlnet_conditioning_scale": 0.4,
- "controlnet_method": "control_v11p_sd15_canny",
- "paint_by_example_example_image": None,
- },
- )
-
- with open(f'{self.frames_out_folder}/{image_name}', "wb") as f:
- f.write(response.content)
- # print(image_name)
- def frames_to_video(self):
- """
- 将指定文件夹内的图像帧合并成视频文件。
- 参数:
- - input_folder: 包含图像帧的文件夹路径。
- - output_video_file: 输出视频文件的路径。
- - fps: 视频的帧率(每秒帧数)。
- """
-
- output_video_file = os.path.join(self.video_folder, 'out.mp4')
- if not os.path.exists(self.video_folder):
- os.makedirs(self.video_folder)
-
- # 获取文件夹内所有文件的列表,并排序(确保顺序正确)
- frame_files = sorted([os.path.join(self.frames_out_folder, f) for f in os.listdir(self.frames_out_folder) if
- os.path.isfile(os.path.join(self.frames_out_folder, f))])
-
- # 读取第一帧以获取视频尺寸
- frame = cv2.imread(frame_files[0])
- height, width, layers = frame.shape
-
- # 定义视频编码器和创建VideoWriter对象
- fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 或者使用 'XVID',根据输出格式而定
- video = cv2.VideoWriter(output_video_file, fourcc, self.fps, (width, height))
- print(output_video_file)
- # 遍历所有帧,添加到视频中
- for file in frame_files:
- frame = cv2.imread(file)
- video.write(frame)
-
- # 释放VideoWriter对象
- video.release()
- def merge_audio_video(self):
- """
- 合并音频和视频文件。
- 参数:
- video_file_path (str): 视频文件的路径。
- audio_file_path (str): 音频文件的路径。
- output_file_path (str): 输出视频文件的路径。
- """
- video_file_path = os.path.join(self.video_folder, 'out.mp4')
- audio_file_path = os.path.join(self.audio_path, 'out.mp3')
- name = os.path.basename(self.file_path)
- output_file_path = os.path.join(self.result_folder, name)
-
- if not os.path.exists(self.result_folder):
- os.makedirs(self.result_folder)
- # 加载视频文件
- video_clip = VideoFileClip(video_file_path)
- # 加载音频文件
- audio_clip = AudioFileClip(audio_file_path)
- # 设置视频的音频为加载的音频文件
- final_clip = video_clip.set_audio(audio_clip)
- # 导出合并后的视频文件
- print(output_file_path)
- final_clip.write_videofile(output_file_path, codec='libx264', audio_codec='aac')
- # lama-cleaner --model=lama --device=cpu --port=8080
- import cv2
- import requests
- import shutil
- from moviepy.editor import VideoFileClip, AudioFileClip
- import os
- from tqdm import tqdm
-
- def recreate_folder(folder_path):
- try:
- shutil.rmtree(folder_path) # 删除文件夹及其所有内容
- os.makedirs(folder_path) # 创建新的同名文件夹
- except Exception as e:
- print('Failed to delete and recreate folder. Reason: %s' % e)
-
-
- def is_video_or_image(file_path):
- video_extensions = ['.mp4', '.mov', '.avi', '.mkv']
- image_extensions = ['.jpg', '.jpeg', '.png', '.gif']
- _, file_extension = os.path.splitext(file_path)
-
- if file_extension.lower() in video_extensions:
- return 'video'
- elif file_extension.lower() in image_extensions:
- return 'image'
- else:
- return 'unknown'
-
-
- class removeVideoWatermark():
- def __init__(self, file_path):
- self.mask = None
- self.file_path = file_path
- self.fps = 30
- folder_name = os.path.basename(self.file_path).split('.')[0]
- self.tmp_folder = os.path.join('C:\\Users\\35785\\Desktop\\tmp', folder_name)
- self.frames_path = os.path.join('C:\\Users\\35785\\Desktop\\tmp', folder_name, 'frames')
- self.audio_path = os.path.join('C:\\Users\\35785\\Desktop\\tmp', folder_name, 'audio')
- self.frames_out_folder = os.path.join('C:\\Users\\35785\\Desktop\\tmp', folder_name, 'frames_out')
- self.video_folder = os.path.join('C:\\Users\\35785\\Desktop\\tmp', folder_name, 'video')
- self.result_folder = os.path.join('C:\\Users\\35785\\Desktop\\tmp', folder_name, 'result')
-
- def start(self):
- file_type = is_video_or_image(self.file_path)
- if file_type == 'image':
- # 待完善
- pass
- elif file_type == 'video':
- self.get_mask_img()
- self.video_to_frames()
- self.extract_audio_from_video()
- # 不使用多线程
- for i in tqdm(os.listdir(self.frames_path), desc="Processing frames"):
- self.removeWatermark(img_path=os.path.join(self.frames_path, i), mask=self.mask, image_name=i)
- self.frames_to_video()
- self.merge_audio_video()
- else:
- pass
-
- def get_mask_img(self):
- cap = cv2.VideoCapture(self.file_path)
- # 设置视频帧指针到第100帧
- cap.set(cv2.CAP_PROP_POS_FRAMES, 99) # 注意:帧计数从0开始,所以第100帧是第99索引
- success, frame = cap.read() # 读取当前帧(第100帧)
- if success: # 确保成功读取帧
- cv2.imwrite('mask_frame.png', frame)
- else:
- print("Failed to retrieve frame.")
- cap.release()
- input('mask修改好了,请回车')
- self.mask = open('mask_frame.png', "rb").read()
-
- def merge_audio_video(self):
- """
- 合并音频和视频文件。
- 参数:
- video_file_path (str): 视频文件的路径。
- audio_file_path (str): 音频文件的路径。
- output_file_path (str): 输出视频文件的路径。
- """
- video_file_path = os.path.join(self.video_folder, 'out.mp4')
- audio_file_path = os.path.join(self.audio_path, 'out.mp3')
- name = os.path.basename(self.file_path)
- output_file_path = os.path.join(self.result_folder, name)
-
- if not os.path.exists(self.result_folder):
- os.makedirs(self.result_folder)
- # 加载视频文件
- video_clip = VideoFileClip(video_file_path)
- # 加载音频文件
- audio_clip = AudioFileClip(audio_file_path)
- # 设置视频的音频为加载的音频文件
- final_clip = video_clip.set_audio(audio_clip)
- # 导出合并后的视频文件
- print(output_file_path)
- final_clip.write_videofile(output_file_path, codec='libx264', audio_codec='aac')
-
- def extract_audio_from_video(self):
- """
- 从视频文件中提取音频并保存。
- 参数:
- file_path (str): 视频文件的路径。
- audio_path (str): 要保存的音频文件的路径。
- """
- if not os.path.exists(self.audio_path):
- os.makedirs(self.audio_path)
- video_clip = VideoFileClip(self.file_path)
- audio_clip = video_clip.audio
-
- audio_clip.write_audiofile(os.path.join(self.audio_path, 'out.mp3'))
- audio_clip.close()
- video_clip.close()
-
- def video_to_frames(self):
- # 确保存放帧的目录存在
- if not os.path.exists(self.frames_path):
- os.makedirs(self.frames_path)
- else:
- recreate_folder(self.frames_path)
- if not os.path.exists(self.frames_out_folder):
- os.makedirs(self.frames_out_folder)
- # 使用OpenCV读取视频
- vidcap = cv2.VideoCapture(self.file_path)
- # 获取视频的总帧数
- self.fps = int(vidcap.get(cv2.CAP_PROP_FPS))
- success, image = vidcap.read()
- count = 0
- while success:
- # 保存当前帧图片
- frame_path = os.path.join(self.frames_path, f"{str(count).zfill(8)}.jpg")
- # cv2.imwrite(frame_path, image)中文路径会保存不了
- cv2.imencode('.jpg', image)[1].tofile(frame_path) # 存储成功
- success, image = vidcap.read() # 读取下一帧
- count += 1
- print(f"Finished! Extracted {count} frames.")
-
- def removeWatermark(self, img_path, mask, image_name):
- url = "http://127.0.0.1:8080/inpaint"
- # print(img_path)
- image = open(img_path, "rb").read()
- response = requests.post(
- url,
- files={"image": image, "mask": mask},
- data={
- "ldmSteps": 25,
- "ldmSampler": "plms",
- "hdStrategy": "Crop",
- "zitsWireframe": True,
- "hdStrategyCropMargin": 196,
- "hdStrategyCropTrigerSize": 800,
- "hdStrategyResizeLimit": 2048,
- "prompt": "",
- "negativePrompt": "",
- "useCroper": False,
- "croperX": 64,
- "croperY": -16,
- "croperHeight": 512,
- "croperWidth": 512,
- "sdScale": 1.0,
- "sdMaskBlur": 5,
- "sdStrength": 0.75,
- "sdSteps": 50,
- "sdGuidanceScale": 7.5,
- "sdSampler": "uni_pc",
- "sdSeed": -1,
- "sdMatchHistograms": False,
- "cv2Flag": "INPAINT_NS",
- "cv2Radius": 5,
- "paintByExampleSteps": 50,
- "paintByExampleGuidanceScale": 7.5,
- "paintByExampleMaskBlur": 5,
- "paintByExampleSeed": -1,
- "paintByExampleMatchHistograms": False,
- "paintByExampleExampleImage": None,
- "p2pSteps": 50,
- "p2pImageGuidanceScale": 1.5,
- "p2pGuidanceScale": 7.5,
- "controlnet_conditioning_scale": 0.4,
- "controlnet_method": "control_v11p_sd15_canny",
- "paint_by_example_example_image": None,
- },
- )
-
- with open(f'{self.frames_out_folder}/{image_name}', "wb") as f:
- f.write(response.content)
- # print(image_name)
-
- def frames_to_video(self):
- """
- 将指定文件夹内的图像帧合并成视频文件。
- 参数:
- - input_folder: 包含图像帧的文件夹路径。
- - output_video_file: 输出视频文件的路径。
- - fps: 视频的帧率(每秒帧数)。
- """
-
- output_video_file = os.path.join(self.video_folder, 'out.mp4')
- if not os.path.exists(self.video_folder):
- os.makedirs(self.video_folder)
-
- # 获取文件夹内所有文件的列表,并排序(确保顺序正确)
- frame_files = sorted([os.path.join(self.frames_out_folder, f) for f in os.listdir(self.frames_out_folder) if
- os.path.isfile(os.path.join(self.frames_out_folder, f))])
-
- # 读取第一帧以获取视频尺寸
- frame = cv2.imread(frame_files[0])
- height, width, layers = frame.shape
-
- # 定义视频编码器和创建VideoWriter对象
- fourcc = cv2.VideoWriter_fourcc(*'mp4v') # 或者使用 'XVID',根据输出格式而定
- video = cv2.VideoWriter(output_video_file, fourcc, self.fps, (width, height))
- print(output_video_file)
- # 遍历所有帧,添加到视频中
- for file in frame_files:
- frame = cv2.imread(file)
- video.write(frame)
-
- # 释放VideoWriter对象
- video.release()
-
-
- a = removeVideoWatermark(file_path=r'C:\Users\35785\Downloads\Video\tmp.mp4')
- a.start()
-
总结
提示:没有用多线程,处理起来可能慢。可以修改请求参数,选用不同的处理模型。
lama-cleaner运行时自动会创建一个网页操作界面的,我是通过网页调试看每次处理图像,他发送的请求得知的参数。
先启动lama-cleaner,在终端运行
lama-cleaner --model=lama --device=cpu --port=8080
访问 http://127.0.0.1:8080,在设置里选择好自己想要的模型,然后随便选择一张图片,打开网页调试中的网络,然后随便在图片上涂抹一下,进行处理,在调试窗口就可以看到相应的请求参数。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。