赞
踩
# -*- encoding: utf-8 -*- # @Author: SWHL # @Contact: liekkaskono@163.com from pathlib import Path from typing import Dict, List, Union import filetype from ..utils import logger from .image_loader import ImageLoader from .office_loader import OfficeLoader from .pdf_loader import PDFLoader from .txt_loader import TXTLoader INPUT_TYPE = Union[str, Path] class FileLoader: def __init__(self) -> None: self.file_map = { "office": ["docx", "doc", "ppt", "pptx", "xlsx", "xlx"], "image": ["jpg", "png", "bmp", "tif", "jpeg"], "txt": ["txt", "md"], "pdf": ["pdf"], } self.img_loader = ImageLoader() self.office_loader = OfficeLoader() self.pdf_loader = PDFLoader() self.txt_loader = TXTLoader() def __call__(self, file_path: INPUT_TYPE) -> Dict[str, List[str]]: all_content = {} file_list = self.get_file_list(file_path) for file_path in file_list: file_name = file_path.name if file_path.suffix[1:] in self.file_map["txt"]: content = self.txt_loader(file_path) all_content[file_name] = content continue file_type = self.which_type(file_path) if file_type in self.file_map["office"]: content = self.office_loader(file_path) elif file_type in self.file_map["pdf"]: content = self.pdf_loader(file_path) elif file_type in self.file_map["image"]: content = self.img_loader(file_path) else: logger.warning("%s does not support.", file_path) continue all_content[file_name] = content return all_content def get_file_list(self, file_path: INPUT_TYPE): if not isinstance(file_path, Path): file_path = Path(file_path) if file_path.is_dir(): return file_path.rglob("*.*") return [file_path] @staticmethod def which_type(content: Union[bytes, str, Path]) -> str: kind = filetype.guess(content) if kind is None: raise TypeError(f"The type of {content} does not support.") return kind.extension def sorted_by_suffix(self, file_list: List[str]) -> Dict[str, str]: sorted_res = {k: [] for k in self.file_map} for file_path in file_list: if file_path.suffix[1:] in self.file_map["txt"]: sorted_res["txt"].append(file_path) continue file_type = self.which_type(file_path) if file_type in self.file_map["office"]: sorted_res["office"].append(file_path) continue if file_type in self.file_map["pdf"]: sorted_res["pdf"].append(file_path) continue if file_type in self.file_map["image"]: sorted_res["image"].append(file_path) continue return sorted_res
# -*- encoding: utf-8 -*- # @Author: SWHL # @Contact: liekkaskono@163.com from pathlib import Path from typing import List, Union from rapidocr_pdf import PDFExtracter from ..text_splitter.chinese_text_splitter import ChineseTextSplitter class PDFLoader: def __init__( self, ): self.extracter = PDFExtracter() self.splitter = ChineseTextSplitter(pdf=True) def __call__(self, pdf_path: Union[str, Path]) -> List[str]: contents = self.extracter(pdf_path) split_contents = [self.splitter.split_text(v[1]) for v in contents] return sum(split_contents, [])
# -*- encoding: utf-8 -*- # @Author: SWHL # @Contact: liekkaskono@163.com from pathlib import Path from typing import List, Union from ..text_splitter.chinese_text_splitter import ChineseTextSplitter from ..utils.utils import read_txt class TXTLoader: def __init__(self) -> None: self.splitter = ChineseTextSplitter() def __call__(self, txt_path: Union[str, Path]) -> List[str]: contents = read_txt(txt_path) split_contents = [self.splitter.split_text(v) for v in contents] return sum(split_contents, [])
# -*- encoding: utf-8 -*- # @Author: SWHL # @Contact: liekkaskono@163.com from pathlib import Path from typing import Union from extract_office_content import ExtractOfficeContent from ..text_splitter.chinese_text_splitter import ChineseTextSplitter class OfficeLoader: def __init__(self) -> None: self.extracter = ExtractOfficeContent() self.splitter = ChineseTextSplitter() def __call__(self, office_path: Union[str, Path]) -> str: contents = self.extracter(office_path) split_contents = [self.splitter.split_text(v) for v in contents] return sum(split_contents, [])
split_text1 方法:这个方法用于分句操作,尤其针对 PDF 格式的文本。它使用正则表达式和字符串操作来将输入的文本划分成句子。具体步骤包括:
def split_text1(self, text: str) -> List[str]: if self.pdf: text = re.sub(r"\n{3,}", "\n", text) text = re.sub("\s", " ", text) text = text.replace("\n\n", "") sent_sep_pattern = re.compile( '([﹒﹔﹖﹗.。!?]["’”」』]{0,2}|(?=["‘“「『]{1,2}|$))' ) # del :; # 正则表达式用于识别句子中的断句标点和引号,并在这些位置进行句子的分割 sent_list = [] for ele in sent_sep_pattern.split(text): ele = ele.strip() if sent_sep_pattern.match(ele) and sent_list: sent_list[-1] += ele elif ele: sent_list.append(ele) return sent_list
split_text 方法:这个方法用于分句操作,包括针对各种标点符号和断句规则的处理。具体步骤包括:
根据 pdf 标志对输入文本进行预处理,去除多余的换行和空白。
使用正则表达式将文本按照断句标点分割为句子,并进行适当的处理。
针对单字符断句符、英文省略号、中文省略号、双引号等情况进行断句处理。
去除文本末尾多余的空白字符。
将文本按行划分,并去除空行。
对每行文本进行进一步处理,如果句子长度超过指定的 sentence_size,则进行递归式的断句操作。
def split_text(self, text: str) -> List[str]: ##此处需要进一步优化逻辑 if self.pdf: text = re.sub(r"\n{3,}", r"\n", text) text = re.sub("\s", " ", text) text = re.sub("\n\n", "", text) text = re.sub(r"([;;.!?。!?\?])([^”’])", r"\1\n\2", text) # 单字符断句符 text = re.sub(r'(\.{6})([^"’”」』])', r"\1\n\2", text) # 英文省略号 text = re.sub(r'(\…{2})([^"’”」』])', r"\1\n\2", text) # 中文省略号 text = re.sub(r'([;;!?。!?\?]["’”」』]{0,2})([^;;!?,。!?\?])', r"\1\n\2", text) # 如果双引号前有终止符,那么双引号才是句子的终点,把分句符\n放到双引号后,注意前面的几句都小心保留了双引号 text = text.rstrip() # 段尾如果有多余的\n就去掉它 # 很多规则中会考虑分号;,但是这里我把它忽略不计,破折号、英文双引号等同样忽略,需要的再做些简单调整即可。 ls = [i for i in text.split("\n") if i] for ele in ls: if len(ele) > self.sentence_size: ele1 = re.sub(r'([,,.]["’”」』]{0,2})([^,,.])', r"\1\n\2", ele) ele1_ls = ele1.split("\n") for ele_ele1 in ele1_ls: if len(ele_ele1) > self.sentence_size: ele_ele2 = re.sub( r'([\n]{1,}| {2,}["’”」』]{0,2})([^\s])', r"\1\n\2", ele_ele1 ) ele2_ls = ele_ele2.split("\n") for ele_ele2 in ele2_ls: if len(ele_ele2) > self.sentence_size: ele_ele3 = re.sub( '( ["’”」』]{0,2})([^ ])', r"\1\n\2", ele_ele2 ) ele2_id = ele2_ls.index(ele_ele2) ele2_ls = ( ele2_ls[:ele2_id] + [i for i in ele_ele3.split("\n") if i] + ele2_ls[ele2_id + 1 :] ) ele_id = ele1_ls.index(ele_ele1) ele1_ls = ( ele1_ls[:ele_id] + [i for i in ele2_ls if i] + ele1_ls[ele_id + 1 :] ) id = ls.index(ele) ls = ls[:id] + [i.strip() for i in ele1_ls if i] + ls[id + 1 :] return ls
adapt_array 和 convert_array 函数:这两个函数分别用于将 NumPy 数组转换为二进制数据,以便存储到 SQLite 数据库中,以及将存储在数据库中的二进制数据转换回 NumPy 数组。
DBUtils 类:这个类用于与 SQLite 数据库进行交互,主要用于存储和检索向量化的文本数据。以下是类的主要部分:
# -*- encoding: utf-8 -*- # @Author: SWHL # @Contact: liekkaskono@163.com import io import sqlite3 import time from typing import Dict, List, Optional import faiss import numpy as np from ..utils.logger import logger # adapt_array 和 convert_array 函数:这两个函数分别用于将 NumPy 数组转换为二进制数据,以便存储到 SQLite 数据库中,以及将存储在数据库中的二进制数据转换回 NumPy 数组。 def adapt_array(arr): out = io.BytesIO() np.save(out, arr) out.seek(0) return sqlite3.Binary(out.read()) def convert_array(text): out = io.BytesIO(text) out.seek(0) return np.load(out, allow_pickle=True) sqlite3.register_adapter(np.ndarray, adapt_array) sqlite3.register_converter("array", convert_array) class DBUtils: def __init__( self, db_path: str, ) -> None: self.db_path = db_path self.table_name = "embedding_texts" self.vector_nums = 0 self.max_prompt_length = 4096 self.connect_db() def connect_db( self, ): con = sqlite3.connect(self.db_path, detect_types=sqlite3.PARSE_DECLTYPES) cur = con.cursor() cur.execute( f"create table if not exists {self.table_name} (id integer primary key autoincrement, file_name TEXT, embeddings array UNIQUE, texts TEXT)" ) return cur, con def load_vectors( self, ): cur, _ = self.connect_db() cur.execute(f"select file_name, embeddings, texts from {self.table_name}") all_vectors = cur.fetchall() self.file_names = np.vstack([v[0] for v in all_vectors]).squeeze() all_embeddings = np.vstack([v[1] for v in all_vectors]) self.all_texts = np.vstack([v[2] for v in all_vectors]).squeeze() self.search_index = faiss.IndexFlatL2(all_embeddings.shape[1]) self.search_index.add(all_embeddings) self.vector_nums = len(all_vectors) def count_vectors( self, ): cur, _ = self.connect_db() cur.execute(f"select file_name from {self.table_name}") all_vectors = cur.fetchall() return len(all_vectors) def search_local( self, embedding_query: np.ndarray, top_k: int = 5, ) -> Optional[Dict[str, List[str]]]: s = time.perf_counter() cur_vector_nums = self.count_vectors() if cur_vector_nums <= 1: return None, 0 if cur_vector_nums != self.vector_nums: self.load_vectors() _, I = self.search_index.search(embedding_query, top_k) top_index = I.squeeze().tolist() search_contents = self.all_texts[top_index] file_names = [self.file_names[idx] for idx in top_index] dup_file_names = list(set(file_names)) dup_file_names.sort(key=file_names.index) search_res = {v: [] for v in dup_file_names} for file_name, content in zip(file_names, search_contents): search_res[file_name].append(content) elapse = time.perf_counter() - s return search_res, elapse def insert(self, file_name: str, embeddings: np.ndarray, texts: List): cur, con = self.connect_db() file_names = [file_name] * len(embeddings) t1 = time.perf_counter() insert_sql = f"insert or ignore into {self.table_name} (file_name, embeddings, texts) values (?, ?, ?)" cur.executemany(insert_sql, list(zip(file_names, embeddings, texts))) elapse = time.perf_counter() - t1 logger.info( f"Insert {len(embeddings)} data, total is {len(embeddings)}, cost: {elapse:4f}s" ) con.commit() def get_files(self): cur, _ = self.connect_db() search_sql = f"select distinct file_name from {self.table_name}" cur.execute(search_sql) search_res = cur.fetchall() search_res = [v[0] for v in search_res] return search_res def __enter__(self): return self def __exit__(self, *a): self.cur.close() self.con.close()
Faiss(Facebook AI Similarity Search)是一个由 Facebook AI Research 开发的用于高效相似性搜索的库。它主要用于处理大规模高维向量数据,例如文本嵌入、图像特征等。Faiss 提供了一系列优化的算法和数据结构,能够在大型数据集中快速进行相似性搜索,找到与给定查询向量最相似的数据点。
在文本嵌入或图像特征检索等应用中,常常需要找到与给定查询向量最接近的嵌入向量。这是一项计算密集型任务,特别是在高维空间中。Faiss 通过使用各种索引结构(如平面索引、多索引等)以及高度优化的搜索算法(如倒排索引、乘积量化、哈希等)来加速这些相似性搜索操作。
具体而言,Faiss 为用户提供了一些核心组件:
在上述代码示例中,self.search_index 表示 Faiss 的搜索索引,self.search_index.add(all_embeddings) 将嵌入向量添加到索引中,从而建立索引以加速相似性搜索。
总之,Faiss 提供了强大的工具和算法,用于处理大规模高维向量数据的相似性搜索,这在文本检索、图像搜索等应用中非常有用。
在 Faiss 中,一般的工作流程是首先进行聚类,然后构建索引,最后执行查询。具体来说,顺序是这样的:
聚类(Clustering):首先,你需要将你的数据集进行聚类,通常使用 k-means 或其他聚类算法。这一步的目的是将数据划分成若干个聚类中心,每个聚类中心代表一个虚拟的向量,而不是实际的数据向量。
构建索引(Indexing):一旦聚类完成,对于每个聚类中心,Faiss 会构建一个索引结构,如 k-d 树或多叉树。这些索引结构将帮助加速后续的近似搜索操作。这个阶段实际上是在为每个聚类中心创建一个能够快速定位附近向量的数据结构。
查询(Querying):一旦索引构建完成,你可以使用查询向量来寻找最接近的近邻。Faiss 会使用之前构建的索引结构来定位可能包含相似向量的聚类中心,然后在这些聚类中心的附近搜索实际的近邻向量,最终返回近似的最近邻结果。
因此,聚类是在数据预处理阶段,构建索引是在为每个聚类中心创建索引结构,而查询是在实际搜索阶段。这样的顺序可以显著提高搜索的效率,特别是在处理大规模数据集时。
类似于词嵌入,句子嵌入将整个句子映射到向量空间中。这可以通过将句子中的词嵌入进行合并或组合,或者通过使用预训练的模型(如BERT、GPT等)来获得句子的嵌入表示。
这个过程的核心思想是捕捉文本的语义和语境信息,以便计算机能够更好地理解和处理文本数据。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。