赞
踩
cd <where_you_want>
git clone https://github.com/THUDM/CogVLM2.git
cd CogVLM2
mkdir -p models #存放模型
从modelscope下载int4模型
#模型下载,随便建一个py文件,
from modelscope import snapshot_download
model_dir = snapshot_download('ZhipuAI/cogvlm2-llama3-chinese-chat-19B-int4')
下载后正常应该存放在~/.cache/modelscope/hub/
下,把它mv到 <where_you_want>/CogVLM2/models
下。
推理的demo在CogVLM2/basic_demo
下。
cli_demo.py
ROOT_PATH = "<where_you_want>/CogVLM2/"
MODEL_NAME = "cogvlm2-llama3-chinese-chat-19B-int4"
MODEL_PATH = os.path.join(ROOT_PATH, "models", MODEL_NAME) # 假设已经按照1.1放置模型文件
python cli_demo.py
时不需要--quant 4
参数。chainlit run web_demo.py
同理,需要修改模型地址,参考 >1.2.1
因工作中有OCR的需求,所以特意试了一下:
import torch import argparse from PIL import Image from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig import os import glob import re import time from datetime import datetime import zipfile # 设置调用参数 # Argument parser parser = argparse.ArgumentParser(description="CogVLM2 Batch OCR Demo") parser.add_argument('--quant', type=int, choices=[4, 8], help='Enable 4-bit or 8-bit precision loading', default=0) # 新增root_path参数,表示项目的路径,默认值为basic_demo的上级目录,即CogVLM2 parser.add_argument('--root_path', type=str, help='Path to the Project root directory', default=os.path.join(os.getcwd(), os.pardir)) # 新增--imgpath参数,表示待OCR的原图存放目录,默认值为当前目录下的images子目录 default_imgpath = os.path.join(os.getcwd(), 'images') parser.add_argument('--imgpath', type=str, help='Path to the image directory', default=default_imgpath) args = parser.parse_args() # 定义项目路径、模型路径、设备类型和精度设置等 ROOT_PATH = args.root_path MODEL_NAME = "cogvlm2-llama3-chinese-chat-19B-int4" MODEL_PATH = os.path.join(ROOT_PATH, "models", MODEL_NAME) # MODEL_PATH = "THUDM/cogvlm2-llama3-chat-19B" DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu' TORCH_TYPE = torch.bfloat16 if torch.cuda.is_available() and torch.cuda.get_device_capability()[0] >= 8 else torch.float16 # Dynamically adjust precision based on model path if 'int4' in MODEL_PATH: args.quant = 4 def check_cuda(): # Check GPU memory and adjust accordingly # Check GPU memory if torch.cuda.is_available() and torch.cuda.get_device_properties(0).total_memory < 48 * 1024 ** 3 and not args.quant: print("GPU memory is less than 48GB. Please use cli_demo_multi_gpus.py or pass `--quant 4` or `--quant 8`.") exit() return True def get_imgfile_list(path: str, extensions: list[str] = ['.jpg', '.jpeg', '.png', '.bmp', '.gif']) -> list[str]: """ 获取指定路径下的所有图片文件列表 :param path: 文件夹路径 :param extensions: 图片文件后缀列表 :return: 文件列表 """ # 验证路径有效性并防止路径遍历攻击 clean_path = os.path.normpath(path) if not os.path.exists(clean_path) or not clean_path.startswith(os.path.abspath(path)): raise ValueError(f"Invalid path: {path}") # 使用os.walk遍历文件夹,但直接通过文件后缀名进行匹配,而不是正则表达式 matching_files = [] for dirpath, dirnames, filenames in os.walk(clean_path): for filename in filenames: base_name, file_extension = os.path.splitext(filename) if file_extension.lower() in [ext.lower() for ext in extensions]: matching_files.append(os.path.join(dirpath, filename)) return matching_files def get_imgdata(img_file: str): #检查img_file的有效性 image_path = img_file try: image_data = Image.open(image_path).convert('RGB') except IOError: print(f"Error: Invalid image file: {image_path}") exit(IOError) return image_data def gen_summary(summary: dict, LogPath: str): # summary是一个字典变量,key为文件名,value为推理时长 # 统计模型处理时间, 计算模型推理的平均时间 total_time = 0 file_count = len(summary) for k, v in summary.items(): total_time += v avg_time = total_time / file_count print(f"Total {file_count} files OCRed, model inference avarage time: {avg_time:.4f} seconds") # 将summary写入日志文件 logfilefullpath = os.path.join(LogPath, f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}.log') with open(LogPath, 'a') as f: f.write(f"Total {file_count} files OCRed, model inference avarage time: {avg_time:.4f} seconds\n") for k, v in summary.items(): f.write(f"{k}: {v:.4f} seconds\n") def main(): if not check_cuda(): exit(1) # Load the tokenizer tokenizer = AutoTokenizer.from_pretrained( MODEL_PATH, trust_remote_code=True ) print(f"Step1, tokenizer initiated.args.imgpath={args.imgpath}") # Load the model with appropriate precision and quantization settings # Load the model if args.quant == 4: model = AutoModelForCausalLM.from_pretrained( MODEL_PATH, torch_dtype=TORCH_TYPE, trust_remote_code=True, quantization_config=BitsAndBytesConfig(load_in_4bit=True), low_cpu_mem_usage=True ).eval() elif args.quant == 8: model = AutoModelForCausalLM.from_pretrained( MODEL_PATH, torch_dtype=TORCH_TYPE, trust_remote_code=True, quantization_config=BitsAndBytesConfig(load_in_8bit=True), low_cpu_mem_usage=True ).eval() else: model = AutoModelForCausalLM.from_pretrained( MODEL_PATH, torch_dtype=TORCH_TYPE, trust_remote_code=True ).eval().to(DEVICE) print(f'Step2: model loaded with {args.quant}') # 创建一个OCR_OUTPUT_PATH变量, 使它等于ROOT_PATH下的ocr_output文件夹 # 判断OCR_OUTPUT_PATH文件夹是否存在 # 如果不存在则创建它 # 如果存在则将这个目录内的全部.md文件压缩成一个zip,并使用当前时间作为文件名,然后清空所有.md文件 OCR_OUTPUT_PATH = os.path.join(ROOT_PATH, "ocr_output") if not os.path.exists(OCR_OUTPUT_PATH): os.mkdir(OCR_OUTPUT_PATH) else: # 压缩OCR_OUTPUT_PATH文件夹内的所有.md文件到一个zip文件中 # 使用当前时间作为文件名 zip_file_name = os.path.join(OCR_OUTPUT_PATH, f"{datetime.now().strftime('%Y-%M-%d-%H:%M:%S')}.zip") # 使用zipfile模块压缩文件夹内所有md文件 zip_file = zipfile.ZipFile(zip_file_name, 'w', zipfile.ZIP_DEFLATED) for file in glob.glob(os.path.join(OCR_OUTPUT_PATH, "*.md")): zip_file.write(file, os.path.basename(file)) #清空这些旧的md文件 for file in glob.glob(os.path.join(OCR_OUTPUT_PATH, "*.md")): os.remove(file) zip_file.close() print(f"Step3: {OCR_OUTPUT_PATH} is cleaned and {zip_file_name} is created") prompts = '请OCR图片全文,水印文字请单独列出,返回结果请使用Markdown格式。' gen_kwargs = { "max_new_tokens": 2048, "pad_token_id": 128002, "top_k": 1, } summary = {} # 统计模型处理时间 img_filelist = get_imgfile_list(args.imgpath) # print(f'prompt={prompts}\n{img_filelist}') # 遍历处理img_filelist中的文件。每个文件用get_imagedata打开并转换后,输入大模型,得到结果后记录为输入文件同名的word文档。 for img_file in img_filelist: mark_time = time.time() image_data = get_imgdata(img_file) convert_img_time = time.time() - mark_time filename = img_file.split('.')[0] input_by_model = model.build_conversation_input_ids( tokenizer, query=prompts, images=[image_data], template_version='chat' ) # Structure input for model generation and specify generation parameters inputs = { 'input_ids': input_by_model['input_ids'].unsqueeze(0).to(DEVICE), 'token_type_ids': input_by_model['token_type_ids'].unsqueeze(0).to(DEVICE), 'attention_mask': input_by_model['attention_mask'].unsqueeze(0).to(DEVICE), 'images': [[input_by_model['images'][0].to(DEVICE).to(TORCH_TYPE)]] if image_data is not None else None, } # 生成大模型结果,并将结果存为与img_file同名的md文档。 # print(f"\nimg:{img_file} OCRing...") print(f"\nImage:{os.path.basename(img_file)} OCRing", end='...') filename += '.md' filename = os.path.join(OCR_OUTPUT_PATH, os.path.basename(filename)) with torch.no_grad(): outputs = model.generate(**inputs, **gen_kwargs) response = tokenizer.decode(outputs[0], skip_special_tokens=True) with open(filename, 'w', encoding='utf-8') as f: f.write(response) generate_time = time.time() - mark_time #在summary中追加一条记录,key为os.path.basename(filename),value为generate_time summary[os.path.basename(filename)] = generate_time print(f"Done! Infer time: {generate_time:.4f} seconds") # print(f"Summary: \n \t\t convert_img_time = {convert_img_time} 秒 \n \t\t generate_time = {generate_time} 秒") gen_summary(summary, OCR_OUTPUT_PATH) if __name__=="__main__": main()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。