赞
踩
本文的前两个部分最早是属于此旧文的《学术论文GPT的源码解读与微调:从ChatPaper到七月论文审稿GPT第1版》,但为了每一篇文章各自的内容更好的呈现,于是我今天做了以下三个改动
如此,mamba那篇解读可以专注mamba的解读,不把过多篇幅放在mamba之外的RWKV上,且原来论文审稿第一版本身微调的RWKV,故刚好需要介绍下RWKV
且对于学术论文GPT的源码解读与微调本来就还得解读下gpt_academic,故把ChatPaper和gpt_academic这两个开源系统独立成本文,也更好
本文
ChatPaper的自身定位是全流程加速科研:论文总结+专业级翻译+润色+审稿+审稿回复,因为论文更多是PDF的格式,故针对PDF的对话、总结、翻译,便不可避免的涉及到PDF的解析
// 待更
使用OpenAI的GPT模型进行论文审查的脚本。它首先定义了一个Reviewer类来处理审查工作,然后在if __name__ == '__main__':语句下使用argparse处理命令行参数,并调用chat_reviewer_main函数来开始审查过程
- ReviewerParams = namedtuple(
- "ReviewerParams",
- [
- "paper_path",
- "file_format",
- "research_fields",
- "language"
- ],
- )
- def contains_chinese(text):
- for ch in text:
- if u'\u4e00' <= ch <= u'\u9fff':
- return True
- return False
- def insert_sentence(text, sentence, interval):
- # 将输入文本按换行符分割成行
- lines = text.split('\n')
- # 初始化一个新的行列表
- new_lines = []
-
- # 遍历每一行
- for line in lines:
- # 检查行中是否包含中文字符
- if contains_chinese(line):
- # 如果是中文,使用jieba分词工具进行分词
- words = list(jieba.cut(line))
- # 定义分隔符为空字符(对于中文分词)
- separator = ''
- else:
- # 如果不包含中文,按空格分割行
- words = line.split()
- # 定义分隔符为空格(对于英文或其他非中文语言)
- separator = ' '
-
- # 初始化一个新的单词列表
- new_words = []
- # 初始化一个计数器
- count = 0
-
- # 遍历当前行的每一个单词
- for word in words:
- # 将当前单词添加到新的单词列表
- new_words.append(word)
- # 计数器增加
- count += 1
-
- # 检查是否达到了插入句子的间隔
- if count % interval == 0:
- # 在达到指定间隔时,将要插入的句子添加到新的单词列表
- new_words.append(sentence)
-
- # 将新的单词列表连接起来,并添加到新的行列表
- new_lines.append(separator.join(new_words))
-
- # 将新的行列表连接起来,返回结果
- return '\n'.join(new_lines)
- # 定义Reviewer类
- class Reviewer:
- # 初始化方法,设置属性
- def __init__(self, args=None):
- if args.language == 'en':
- self.language = 'English'
- elif args.language == 'zh':
- self.language = 'Chinese'
- else:
- self.language = 'Chinese'
- # 创建一个ConfigParser对象
- self.config = configparser.ConfigParser()
- # 读取配置文件
- self.config.read('apikey.ini')
- # 获取某个键对应的值
- self.chat_api_list = self.config.get('OpenAI', 'OPENAI_API_KEYS')[1:-1].replace('\'', '').split(',')
- self.chat_api_list = [api.strip() for api in self.chat_api_list if len(api) > 5]
- self.cur_api = 0
- self.file_format = args.file_format
- self.max_token_num = 4096
- self.encoding = tiktoken.get_encoding("gpt2")
-
- def validateTitle(self, title):
- # 修正论文的路径格式
- rstr = r"[\/\\\:\*\?\"\<\>\|]" # '/ \ : * ? " < > |'
- new_title = re.sub(rstr, "_", title) # 替换为下划线
- return new_title
然后分别实现两个函数- def stage_1(self, paper):
- # 初始化一个空列表,用于存储生成的HTML内容
- htmls = []
-
- # 初始化一个空字符串,用于存储文章的标题和摘要
- text = ''
- # 添加文章的标题
- text += 'Title: ' + paper.title + '. '
- # 添加文章的摘要
- text += 'Abstract: ' + paper.section_texts['Abstract']
-
- # 计算文本的token数量
- text_token = len(self.encoding.encode(text))
- # 判断token数量是否超过最大token限制的一半减去800
- if text_token > self.max_token_num/2 - 800:
- input_text_index = int(len(text)*((self.max_token_num/2)-800)/text_token)
- # 如果超出,则截取文本以满足长度要求
- text = text[:input_text_index]
-
- # 设置OpenAI API的密钥
- openai.api_key = self.chat_api_list[self.cur_api]
- # 更新当前使用的API索引
- self.cur_api += 1
- # 如果当前API索引超过API列表的长度,则重置为0
- self.cur_api = 0 if self.cur_api >= len(self.chat_api_list)-1 else self.cur_api
-
- # 创建与GPT-3的对话消息
- messages = [
- {"role": "system",
- "content": f"You are a professional reviewer in the field of {args.research_fields}. "
- f"I will give you a paper. You need to review this paper and discuss the novelty and originality of ideas, correctness, clarity, the significance of results, potential impact and quality of the presentation. "
- f"Due to the length limitations, I am only allowed to provide you the abstract, introduction, conclusion and at most two sections of this paper."
- f"Now I will give you the title and abstract and the headings of potential sections. "
- f"You need to reply at most two headings. Then I will further provide you the full information, includes aforementioned sections and at most two sections you called for.\n\n"
- f"Title: {paper.title}\n\n"
- f"Abstract: {paper.section_texts['Abstract']}\n\n"
- f"Potential Sections: {paper.section_names[2:-1]}\n\n"
- f"Follow the following format to output your choice of sections:"
- f"{{chosen section 1}}, {{chosen section 2}}\n\n"},
- {"role": "user", "content": text},
- ]
-
- # 调用OpenAI API与GPT-3进行对话
- response = openai.ChatCompletion.create(
- model="gpt-3.5-turbo",
- messages=messages,
- )
-
- # 初始化一个空字符串,用于存储模型的回复
- result = ''
- # 遍历模型的回复,将其添加到结果字符串中
- for choice in response.choices:
- result += choice.message.content
- # 打印模型的回复
- print(result)
-
- # 返回模型的回复,将其分割为多个部分
- return result.split(',')
一个chat_review,主要功能是调用GPT-3模型进行论文审稿,对输入的文章文本进行审查,并按照预定格式生成审稿意见 - def chat_review(self, text):
- # 设置OpenAI API的密钥
- openai.api_key = self.chat_api_list[self.cur_api]
-
- # 更新当前使用的API密钥索引
- self.cur_api += 1
- # 如果当前API密钥索引超过API密钥列表的长度,则将其重置为0
- self.cur_api = 0 if self.cur_api >= len(self.chat_api_list)-1 else self.cur_api
-
- # 定义用于审稿提示的token数量
- review_prompt_token = 1000
-
- # 计算输入文本的token数量
- text_token = len(self.encoding.encode(text))
- # 计算输入文本的截取位置
- input_text_index = int(len(text)*(self.max_token_num-review_prompt_token)/text_token)
- # 截取文本并添加前缀
- input_text = "This is the paper for your review:" + text[:input_text_index]
-
- # 从'ReviewFormat.txt'文件中读取审稿格式
- with open('ReviewFormat.txt', 'r') as file:
- review_format = file.read()
-
- # 创建与GPT-3的对话消息
- messages=[
- {"role": "system",
- "content": "You are a professional reviewer in the field of "+args.research_fields+". Now I will give you a paper. You need to give a complete review opinion according to the following requirements and format:"+ review_format +" Please answer in {}.".format(self.language)},
- {"role": "user", "content": input_text},
- ]
-
- # 调用OpenAI API与GPT-3进行对话
- response = openai.ChatCompletion.create(
- model="gpt-3.5-turbo",
- messages=messages,
- )
-
- # 初始化一个空字符串,用于存储模型的回复
- result = ''
- # 遍历模型的回复,将其添加到结果字符串中
- for choice in response.choices:
- result += choice.message.content
-
- # 在结果中插入特定的句子,警告不允许复制
- result = insert_sentence(result, '**Generated by ChatGPT, no copying allowed!**', 15)
- # 追加伦理声明
- result += "\n\n⚠伦理声明/Ethics statement:\n--禁止直接复制生成的评论用于任何论文审稿工作!\n--Direct copying of generated comments for any paper review work is prohibited!"
-
- # 打印分隔符和结果
- print("********"*10)
- print(result)
- print("********"*10)
- # 打印相关的token使用信息和响应时间
- print("prompt_token_used:", response.usage.prompt_tokens)
- print("completion_token_used:", response.usage.completion_tokens)
- print("total_token_used:", response.usage.total_tokens)
- print("response_time:", response.response_ms/1000.0, 's')
-
- # 返回模型生成的审稿意见
- return result
使用ChatGPT进行审稿,且有tenacity重试机制和更多的功能,其中review_by_chatgpt 调用了上面所示的两个函数,一个stage_1,一个chat_review - def review_by_chatgpt(self, paper_list):
- # 创建一个空列表用于存储每篇文章审稿后的HTML格式内容
- htmls = []
-
- # 遍历paper_list中的每一篇文章
- for paper_index, paper in enumerate(paper_list):
- # 使用第一阶段审稿方法选择文章的关键部分
- sections_of_interest = self.stage_1(paper)
-
- # 初始化一个空字符串用于提取文章的主要部分
- text = ''
- # 添加文章的标题
- text += 'Title:' + paper.title + '. '
- # 添加文章的摘要
- text += 'Abstract: ' + paper.section_texts['Abstract']
-
- # 查找并添加“Introduction”部分
- intro_title = next((item for item in paper.section_names if 'ntroduction' in item.lower()), None)
- if intro_title is not None:
- text += 'Introduction: ' + paper.section_texts[intro_title]
-
- # 同样地,查找并添加“Conclusion”部分
- conclusion_title = next((item for item in paper.section_names if 'onclusion' in item), None)
- if conclusion_title is not None:
- text += 'Conclusion: ' + paper.section_texts[conclusion_title]
-
- # 遍历sections_of_interest,添加其他感兴趣的部分
- for heading in sections_of_interest:
- if heading in paper.section_names:
- text += heading + ': ' + paper.section_texts[heading]
-
- # 使用ChatGPT进行审稿,并得到审稿内容
- chat_review_text = self.chat_review(text=text)
-
- # 将审稿的文章编号和内容添加到htmls列表中
- htmls.append('## Paper:' + str(paper_index+1))
- htmls.append('\n\n\n')
- htmls.append(chat_review_text)
-
- # 获取当前日期和时间,并转换为字符串格式
- date_str = str(datetime.datetime.now())[:13].replace(' ', '-')
- try:
- # 创建输出文件夹
- export_path = os.path.join('./', 'output_file')
- os.makedirs(export_path)
- except:
- # 如果文件夹已存在,则不执行任何操作
- pass
-
- # 如果是第一篇文章,则写模式为'w',否则为'a'
- mode = 'w' if paper_index == 0 else 'a'
-
- # 根据文章标题和日期生成文件名
- file_name = os.path.join(export_path, date_str+'-'+self.validateTitle(paper.title)+"."+self.file_format)
-
- # 将审稿内容导出为Markdown格式并保存
- self.export_to_markdown("\n".join(htmls), file_name=file_name, mode=mode)
-
- # 清空htmls列表,为下一篇文章做准备
- htmls = []
- def chat_reviewer_main(args):
-
- reviewer1 = Reviewer(args=args)
- # 开始判断是路径还是文件:
- paper_list = []
- if args.paper_path.endswith(".pdf"):
- paper_list.append(Paper(path=args.paper_path))
- else:
- for root, dirs, files in os.walk(args.paper_path):
- print("root:", root, "dirs:", dirs, 'files:', files) #当前目录路径
- for filename in files:
- # 如果找到PDF文件,则将其复制到目标文件夹中
- if filename.endswith(".pdf"):
- paper_list.append(Paper(path=os.path.join(root, filename)))
- print("------------------paper_num: {}------------------".format(len(paper_list)))
- [print(paper_index, paper_name.path.split('\\')[-1]) for paper_index, paper_name in enumerate(paper_list)]
- reviewer1.review_by_chatgpt(paper_list=paper_list)
主程序中定义了命令行参数解析,并调用了chat_reviewer_main 函数- if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument("--paper_path", type=str, default='', help="path of papers")
- parser.add_argument("--file_format", type=str, default='txt', help="output file format")
- parser.add_argument("--research_fields", type=str, default='computer science, artificial intelligence and reinforcement learning', help="the research fields of paper")
- parser.add_argument("--language", type=str, default='en', help="output lauguage, en or zh")
-
- reviewer_args = ReviewerParams(**vars(parser.parse_args()))
- start_time = time.time()
- chat_reviewer_main(args=reviewer_args)
- print("review time:", time.time() - start_time)
当然,这个项目的论文审稿部分更多是用的ChatGPT的API审稿,我司在API的基础上进一步做了微调的工作,比如如何通过论文审阅语料微调出一个论文审稿GPT(甚至通过10万量级的paper+review语料微调/训练),详见本文的第三部分或我司的「大模型项目开发线下营」
通过这个项目文件:ChatPaper/scipdf_parser-master/scipdf/pdf/parse_pdf.py可以看到以下内容
- def validate_url(path: str):
- """
- 验证给定的``path``是否为URL
- """
- # 定义正则表达式以匹配URL
- # 下面的正则表达式主要匹配了以下几部分:
- # 1. http:// 或 https:// 开头
- # 2. 域名 (例如:example.com)
- # 3. localhost (本地主机)
- # 4. IP地址 (例如:192.168.1.1)
- # 5. 可选的端口号 (例如::80)
- # 6. 路径或者查询字符串
- regex = re.compile(
- r"^(?:http|ftp)s?://" # http:// or https:// 开头
- # 域名部分
- r"(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|"
- r"localhost|" # localhost 部分
- r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" # IP地址部分
- r"(?::\d+)?" # 可选的端口号部分
- r"(?:/?|[/?]\S+)$", # 路径或查询字符串部分
- re.IGNORECASE, # 忽略大小写
- )
- # 使用上述正则表达式匹配给定的path,如果匹配成功则返回True,否则返回False
- return re.match(regex, path) is not None
这是代码中的核心功能,用 GROBID 服务从 PDF 文档中解析 XML 或 BeautifulSoup 格式的信息
如果 fulltext 参数为 True,则解析整篇文章;否则,只解析标题
可以从本地或云端的 GROBID 服务中获取数据
- def parse_pdf(
- pdf_path: str,
- fulltext: bool = True,
- soup: bool = False,
- return_coordinates: bool = True,
- grobid_url: str = GROBID_URL,
- ):
- """
- 使用GROBID工具将PDF解析为XML或BeautifulSoup
- 可以查看http://grobid.readthedocs.io/en/latest/Install-Grobid/了解如何本地运行GROBID
- 加载GROBID zip文件后,可以使用以下方法运行GROBID
- >> ./gradlew run
- 参数
- ==========
- pdf_path: str 或 bytes,出版物、文章的路径、URL或PDF的字节字符串
- fulltext: bool, 解析选项,如果为True,解析文章的全部文本
- 如果为False,只解析头部
- grobid_url: str, GROBID解析器的url,默认为'http://localhost:8070'
- 可以更改为"https://cloud.science-miner.com/grobid/"使用云服务
- soup: bool, 如果为True,返回文章的BeautifulSoup
- 输出
- ======
- parsed_article: 如果soup为False,则返回文本格式的解析后的XML,
- 否则返回XML的BeautifulSoup
- 示例
- =======
- >> parsed_article = parse_pdf(pdf_path, fulltext=True, soup=True)
- """
- # GROBID的URL
- if fulltext:
- url = "%s/api/processFulltextDocument" % grobid_url # 完整文本处理URL
- else:
- url = "%s/api/processHeaderDocument" % grobid_url # 仅处理头部的URL
-
- files = []
- if return_coordinates: # 如果需要返回坐标
- files += [
- ("teiCoordinates", (None, "persName")),
- ("teiCoordinates", (None, "figure")),
- ("teiCoordinates", (None, "ref")),
- ("teiCoordinates", (None, "formula")),
- ("teiCoordinates", (None, "biblStruct")),
- ]
-
- if isinstance(pdf_path, str): # 如果pdf_path是字符串
- if validate_url(pdf_path) and op.splitext(pdf_path)[-1].lower() != ".pdf":
- print("输入的URL必须以``.pdf``结尾")
- parsed_article = None
- elif validate_url(pdf_path) and op.splitext(pdf_path)[-1] == ".pdf":
- page = urllib.request.urlopen(pdf_path).read() # 从URL下载PDF
- parsed_article = requests.post(url, files={"input": page}).text # 通过GROBID处理下载的PDF
- elif op.exists(pdf_path): # 如果pdf_path是文件路径
- parsed_article = requests.post(
- url, files={"input": open(pdf_path, "rb")}
- ).text # 通过GROBID处理文件
- else:
- parsed_article = None
- elif isinstance(pdf_path, bytes): # 如果pdf_path是字节
- # 假设传入的是字节字符串
- parsed_article = requests.post(url, files={"input": pdf_path}).text # 通过GROBID处理字节
- else:
- parsed_article = None
-
- if soup and parsed_article is not None: # 如果需要返回BeautifulSoup对象
- parsed_article = BeautifulSoup(parsed_article, "lxml")
- return parsed_article
- def parse_authors(article):
- """
- Parse authors from a given BeautifulSoup of an article
- """
- # 从文章的 BeautifulSoup 对象中查找包含作者信息的 "sourcedesc" 标签,然后找到其中所有的 "persname" 标签
- author_names = article.find("sourcedesc").findAll("persname")
- # 创建一个空列表,用于保存解析的作者名字
- authors = []
- # 遍历每个作者标签
- for author in author_names:
- # 查找作者的名字,并进行处理,如果不存在则返回空字符串
- firstname = author.find("forename", {"type": "first"})
- firstname = firstname.text.strip() if firstname is not None else ""
-
- # 查找作者的中间名,并进行处理,如果不存在则返回空字符串
- middlename = author.find("forename", {"type": "middle"})
- middlename = middlename.text.strip() if middlename is not None else ""
-
- # 查找作者的姓氏,并进行处理,如果不存在则返回空字符串
- lastname = author.find("surname")
- lastname = lastname.text.strip() if lastname is not None else ""
-
- # 判断中间名是否存在,然后将名、中间名和姓组合在一起
- if middlename is not "":
- authors.append(firstname + " " + middlename + " " + lastname)
- else:
- authors.append(firstname + " " + lastname)
- # 使用"; "连接所有的作者名,生成一个字符串
- authors = "; ".join(authors)
- # 返回最终的作者名字符串
- return authors
- def parse_date(article):
- """
- Parse date from a given BeautifulSoup of an article
- """
- # 从文章的 BeautifulSoup 对象中查找包含出版日期信息的 "publicationstmt" 标签
- pub_date = article.find("publicationstmt")
-
- # 在 "publicationstmt" 标签下查找 "date" 标签
- year = pub_date.find("date")
-
- # 尝试获取 "date" 标签的 "when" 属性,如果标签不存在则返回空字符串
- year = year.attrs.get("when") if year is not None else ""
-
- # 返回解析出的年份
- return year
- def parse_abstract(article):
- """
- Parse abstract from a given BeautifulSoup of an article
- """
- # 从文章的 BeautifulSoup 对象中查找 "abstract" 标签
- div = article.find("abstract")
- # 初始化摘要字符串为空
- abstract = ""
-
- # 遍历 "abstract" 标签下的所有直接子节点
- for p in list(div.children):
- # 如果子节点不是纯文本(NavigableString)且子节点的子元素数量大于0
- if not isinstance(p, NavigableString) and len(list(p)) > 0:
- # 将子节点下的所有非纯文本子元素的文本内容加入摘要字符串
- abstract += " ".join(
- [elem.text for elem in p if not isinstance(elem, NavigableString)]
- )
- # 返回解析出的摘要
- return abstract
- def parse_sections(article, as_list: bool = False):
- """
- 从给定的BeautifulSoup文章中解析章节列表
- 参数
- ==========
- as_list: bool, 如果为True,则将输出文本作为段落列表,
- 而不是将其连接成一个单一的文本
- """
- # 找到文章中的"text"部分
- article_text = article.find("text")
- # 获取所有带有特定属性的"div"标签
- divs = article_text.find_all("div", attrs={"xmlns": "http://www.tei-c.org/ns/1.0"})
- sections = [] # 初始化章节列表
- for div in divs:
- div_list = list(div.children)
- if len(div_list) == 0:
- heading = ""
- text = ""
- elif len(div_list) == 1:
- # 如果只有一个子元素
- if isinstance(div_list[0], NavigableString):
- heading = str(div_list[0])
- text = ""
- else:
- heading = ""
- text = div_list[0].text
- else:
- text = []
- heading = div_list[0]
- if isinstance(heading, NavigableString):
- heading = str(heading)
- p_all = list(div.children)[1:]
- else:
- heading = ""
- p_all = list(div.children)
- for p in p_all:
- if p is not None:
- try:
- text.append(p.text) # 尝试添加文本
- except:
- pass
- if not as_list:
- text = "\n".join(text)
-
- # 如果标题或文本不为空
- if heading is not "" or text is not "":
- # 计算参考文献数量
- ref_dict = calculate_number_of_references(div)
- sections.append(
- {
- "heading": heading,
- "text": text,
- "n_publication_ref": ref_dict["n_publication_ref"],
- "n_figure_ref": ref_dict["n_figure_ref"],
- }
- )
- return sections
- def calculate_number_of_references(div):
- """
- 对于给定的章节,计算章节中的参考文献数量
- """
- # 计算给定章节中的文献引用数量
- n_publication_ref = len(
- # 列表推导式查找所有type属性为"bibr"的"ref"标签
- [ref for ref in div.find_all("ref") if ref.attrs.get("type") == "bibr"]
- )
- # 计算给定章节中的图形引用数量
- n_figure_ref = len(
- # 列表推导式查找所有type属性为"figure"的"ref"标签
- [ref for ref in div.find_all("ref") if ref.attrs.get("type") == "figure"]
- )
- # 返回一个字典,包含文献引用数量和图形引用数量
- return {"n_publication_ref": n_publication_ref, "n_figure_ref": n_figure_ref}
- def parse_references(article):
- """
- 从给定的BeautifulSoup文章中解析引用列表
- """
- reference_list = [] # 初始化引用列表
- # 在文章中查找文本部分中的引用部分
- references = article.find("text").find("div", attrs={"type": "references"})
- # 如果存在引用,则查找所有的"biblstruct"标签,否则返回空列表
- references = references.find_all("biblstruct") if references is not None else []
- reference_list = [] # 再次初始化引用列表
- for reference in references:
-
- # 尝试查找引用的文章标题
- title = reference.find("title", attrs={"level": "a"})
- if title is None:
- title = reference.find("title", attrs={"level": "m"})
- title = title.text if title is not None else ""
-
- # 尝试查找引用的期刊名
- journal = reference.find("title", attrs={"level": "j"})
- journal = journal.text if journal is not None else ""
- if journal is "":
- journal = reference.find("publisher")
- journal = journal.text if journal is not None else ""
-
- # 查找引用的出版年份
- year = reference.find("date")
- year = year.attrs.get("when") if year is not None else ""
-
- authors = [] # 初始化作者列表
- # 遍历引用中的所有作者
- for author in reference.find_all("author"):
- firstname = author.find("forename", {"type": "first"})
- firstname = firstname.text.strip() if firstname is not None else ""
- middlename = author.find("forename", {"type": "middle"})
- middlename = middlename.text.strip() if middlename is not None else ""
- lastname = author.find("surname")
- lastname = lastname.text.strip() if lastname is not None else ""
-
- # 根据是否有中间名来组合作者的全名
- if middlename is not "":
- authors.append(firstname + " " + middlename + " " + lastname)
- else:
- authors.append(firstname + " " + lastname)
- authors = "; ".join(authors) # 将所有作者连接为一个字符串
-
- # 将标题、期刊、年份和作者添加到引用列表中
- reference_list.append(
- {"title": title, "journal": journal, "year": year, "authors": authors}
- )
- return reference_list # 返回引用列表
- def parse_figure_caption(article):
- """
- 从给定的BeautifulSoup文章中解析图表列表
- """
- figures_list = [] # 初始化图表列表
- # 在文章中查找所有的"figure"标签
- figures = article.find_all("figure")
- for figure in figures:
- # 获取图标的类型(可能是图或表)和ID
- figure_type = figure.attrs.get("type") or ""
- figure_id = figure.attrs.get("xml:id") or ""
-
- # 获取图标的标签(如"图1")
- label = figure.find("label").text
- if figure_type == "table":
- # 如果图形类型为表,则获取表的标题和数据
- caption = figure.find("figdesc").text
- data = figure.table.text
- else:
- # 否则,只获取图形的标题,并将数据设置为空字符串
- caption = figure.text
- data = ""
-
- # 将标签、类型、ID、标题和数据添加到图形列表中
- figures_list.append(
- {
- "figure_label": label,
- "figure_type": figure_type,
- "figure_id": figure_id,
- "figure_caption": caption,
- "figure_data": data,
- }
- )
- return figures_list # 返回图表列表
- def parse_figures(
- pdf_folder: str,
- jar_path: str = PDF_FIGURES_JAR_PATH,
- resolution: int = 300,
- output_folder: str = "figures",
- ):
- """
- 使用pdffigures2从给定的科学PDF中提取图形。
- 参数
- ==========
- pdf_folder: str, 包含PDF文件的文件夹的路径。一个文件夹必须只包含PDF文件。
- jar_path: str, pdffigures2-assembly-0.0.12-SNAPSHOT.jar文件的默认路径。
- resolution: int, 输出图形的分辨率。
- output_folder: str, 我们希望保存解析数据(与图形相关)和图形的文件夹的路径。
- 输出
- ======
- folder: 在output_folder/data和output_folder/figures中创建文件夹,分别包含解析数据和图形。
- """
- # 检查output_folder是否存在,如果不存在,则创建它。
- if not op.isdir(output_folder):
- os.makedirs(output_folder)
-
- # 在output_folder内创建“data”和“figures”子文件夹。
- data_path = op.join(output_folder, "data")
- figure_path = op.join(output_folder, "figures")
- if not op.exists(data_path):
- os.makedirs(data_path)
- if not op.exists(figure_path):
- os.makedirs(figure_path)
-
- # 如果data和figures文件夹存在,则执行pdffigures2命令。
- if op.isdir(data_path) and op.isdir(figure_path):
- args = [
- "java",
- "-jar",
- jar_path,
- pdf_folder,
- "-i",
- str(resolution),
- "-d",
- op.join(op.abspath(data_path), ""),
- "-m",
- op.join(op.abspath(figure_path), ""), # end path with "/"
- ]
- _ = subprocess.run(
- args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=20
- )
- print("完成从PDFs中提取图形!")
- else:
- print(
- "您可能需要检查output文件夹路径中的``data``和``figures``。"
- )
- def parse_formulas(article):
- """
- 从给定的BeautifulSoup文章中解析公式列表
- """
- formulas_list = [] # 初始化公式列表
- # 在文章中查找所有的"formula"标签
- formulas = article.find_all("formula")
- for formula in formulas:
- # 获取公式的ID
- formula_id = formula.attrs["xml:id"] or ""
- # 获取公式的文本内容
- formula_text = formula.text
- # 尝试获取公式的坐标
- formula_coordinates = formula.attrs.get("coords") or ""
- if formula_coordinates is not "":
- # 如果有坐标,将它们转换为浮点数列表
- formula_coordinates = [float(x) for x in formula_coordinates.split(",")]
- # 将ID、文本和坐标添加到公式列表中
- formulas_list.append(
- {
- "formula_id": formula_id,
- "formula_text": formula_text,
- "formula_coordinates": formula_coordinates,
- }
- )
- return formulas_list # 返回公式列表
- def convert_article_soup_to_dict(article, as_list: bool = False):
- """
- 将BeautifulSoup对象转换为JSON格式的函数
- 与https://github.com/allenai/science-parse/ 的输出类似
- 参数
- ==========
- article: BeautifulSoup
- 输出
- ======
- article_json: dict, 给定文章的解析字典,格式如下:
- {
- 'title': ...,
- 'abstract': ...,
- 'sections': [
- {'heading': ..., 'text': ...},
- {'heading': ..., 'text': ...},
- ...
- ],
- 'references': [
- {'title': ..., 'journal': ..., 'year': ..., 'authors': ...},
- {'title': ..., 'journal': ..., 'year': ..., 'authors': ...},
- ...
- ],
- 'figures': [
- {'figure_label': ..., 'figure_type': ..., 'figure_id': ..., 'figure_caption': ..., 'figure_data': ...},
- ...
- ]
- }
- """
- article_dict = {} # 初始化文章字典
-
- if article is not None:
- # 从文章中获取主标题
- title = article.find("title", attrs={"type": "main"})
- title = title.text.strip() if title is not None else ""
-
- article_dict["title"] = title
- # 解析文章的作者
- article_dict["authors"] = parse_authors(article)
- # 解析文章的发布日期
- article_dict["pub_date"] = parse_date(article)
- # 解析文章的摘要
- article_dict["abstract"] = parse_abstract(article)
- # 解析文章的各个部分
- article_dict["sections"] = parse_sections(article, as_list=as_list)
- # 解析文章的参考文献
- article_dict["references"] = parse_references(article)
- # 解析文章的图表
- article_dict["figures"] = parse_figure_caption(article)
- # 解析文章的公式
- article_dict["formulas"] = parse_formulas(article)
-
- # 从文章中获取DOI
- doi = article.find("idno", attrs={"type": "DOI"})
- doi = doi.text if doi is not None else ""
- article_dict["doi"] = doi
-
- return article_dict
- else:
- return None # 如果文章不存在,返回None
- def parse_pdf_to_dict(
- pdf_path: str,
- fulltext: bool = True,
- soup: bool = True,
- as_list: bool = False,
- return_coordinates: bool = True,
- grobid_url: str = GROBID_URL,
- ):
- """
- 解析给定的PDF并返回解析后的文章字典
- 参数
- ==========
- pdf_path: str, 出版物或文章的路径
- fulltext: bool, 是否提取完整文本
- soup: bool, 是否返回BeautifulSoup
- as_list: bool, 是否返回部分列表
- return_coordinates: bool, 是否返回坐标
- grobid_url: str, grobid服务器的url,默认为`GROBID_URL`
- 可更改为 "https://cloud.science-miner.com/grobid/" 使用云服务
- 输出
- =====
- article_dict: dict, 文章的字典
- """
- # 使用parse_pdf函数解析PDF
- parsed_article = parse_pdf(
- pdf_path,
- fulltext=fulltext,
- soup=soup,
- return_coordinates=return_coordinates,
- grobid_url=grobid_url,
- )
- # 将BeautifulSoup对象转换为字典
- article_dict = convert_article_soup_to_dict(parsed_article, as_list=as_list)
- return article_dict # 返回解析后的文章字典
这个函数的目的是解析给定的PDF文件,并将其转换为一个结构化的字典。首先,它使用parse_pdf函数来解析PDF,然后使用convert_article_soup_to_dict函数将解析后的BeautifulSoup对象转换为字典具体包含如下功能(这个基于GPT4的文献总结工具的项目auto-draft也提供类似的功能)
// 待更
- # 导入HuggingFace的文本嵌入功能
- from langchain.embeddings import HuggingFaceEmbeddings
- # 导入操作系统相关的模块,用于获取环境变量等操作
- import os
-
- # 从环境变量中获取OpenAI的API密钥
- openai_api_key = os.getenv("OPENAI_API_KEY")
- # 如果获取到了OpenAI的API密钥
- if openai_api_key is not None:
- # 导入OpenAI的文本嵌入功能
- from langchain.embeddings.openai import OpenAIEmbeddings
- # 使用获取到的API密钥初始化OpenAI的文本嵌入
- openai_embedding = OpenAIEmbeddings(model="text-embedding-ada-002", openai_api_key=openai_api_key)
- else:
- # 如果没有获取到API密钥,则将OpenAI的文本嵌入设为None
- openai_embedding = None
-
- # 定义HuggingFace的模型名称
- model_name = 'sentence-transformers/all-MiniLM-L6-v2'
- # 设置模型的参数,这里是将模型放在CPU上运行
- model_kwargs = {'device': 'cpu'}
- # 设置文本嵌入的参数,这里是不对嵌入进行归一化
- encode_kwargs = {'normalize_embeddings': False}
-
- # 使用上述参数初始化HuggingFace的文本嵌入
- all_minilm_l6_v2 = HuggingFaceEmbeddings(
- model_name=model_name,
- model_kwargs=model_kwargs,
- encode_kwargs=encode_kwargs)
-
- # 创建一个字典来存储上述两种文本嵌入,方便后续调用
- EMBEDDINGS = {"text-embedding-ada-002": openai_embedding, "all-MiniLM-L6-v2": all_minilm_l6_v2}
// 待更
定义了一个Knowledge类,该类使用关键词字典从数据库中搜索相关内容,并可以将这些内容转化为提示文本或JSON格式
- import tiktoken # 导入tiktoken模块,用于计算tokens数量
- from random import shuffle # 从random模块导入shuffle函数,用于随机打乱列表
-
- # 使用`tiktoken`来计算文本中的tokens数量
- tokenizer_name = tiktoken.encoding_for_model('gpt-4') # 为"gpt-4"模型获取相应的编码器名称
- tokenizer = tiktoken.get_encoding(tokenizer_name.name) # 获取编码器实例
-
- def tiktoken_len(text):
- # 计算给定文本中的tokens数量
- tokens = tokenizer.encode(text, disallowed_special=()) # 对文本进行编码并返回tokens
- return len(tokens) # 返回tokens的数量
-
- class Knowledge:
- # 定义一个Knowledge类来处理知识数据库相关操作
- def __init__(self, db):
- self.db = db # 数据库实例
- self.contents = [] # 用于存放内容的列表
-
- def collect_knowledge(self, keywords_dict, max_query):
- """
- 根据给定的关键词字典,从数据库中搜索并收集相关的知识。
- keywords_dict:
- 示例: {"machine learning": 5, "language model": 2};
- """
- db = self.db
- if max_query > 0:
- for kw in keywords_dict:
- docs = db.similarity_search_with_score(kw, k=max_query) # 使用关键词在数据库中进行相似度搜索
- for i in range(max_query):
- content = {"content": docs[i][0].page_content.replace('\n', ' '), # 移除换行符
- "score": docs[i][1]} # 为每个文档添加评分
- self.contents.append(content) # 将内容添加到contents列表中
- shuffle(self.contents) # 随机打乱contents列表
-
- def to_prompts(self, max_tokens=2048):
- # 将收集到的知识内容转化为提示文本,且tokens总数不超过max_tokens
- if len(self.contents) == 0:
- return ""
- prompts = []
- tokens = 0
- for idx, content in enumerate(self.contents):
- prompt = "Reference {}: {}\n".format(idx, content["content"])
- tokens += tiktoken_len(prompt)
- if tokens >= max_tokens:
- break
- else:
- prompts.append(prompt) # 将提示文本添加到prompts列表中
- return "".join(prompts) # 返回连接后的提示文本
-
- def to_json(self):
- # 将收集到的知识内容转化为JSON格式
- if len(self.contents) == 0:
- return {}
- output = {}
- for idx, content in enumerate(self.contents):
- output[str(idx)] = {
- "content": content["content"],
- "score": str(content["score"])
- }
- print(output)
- return output
这个代码文件主要注意实现了以下功能
References
类之外Reference类的说明
.bib
文件中读取论文,并用search_paper_abstract
方法填充缺失的摘要{"paper_id": "paper summary"}
。待完成的任务(todo)
bib_papers
;bib_papers
;一些基本的工具
- def evaluate_cosine_similarity(v1, v2):
- try:
- return np.dot(v1, v2)/(norm(v1)*norm(v2))
- except ValueError:
- return 0.0
- def chunks(lst, chunk_size=MAX_BATCH_SIZE):
- """Splits a longer list to respect batch size"""
- for i in range(0, len(lst), chunk_size):
- yield lst[i : i + chunk_size]
- def embed(papers):
- embeddings_by_paper_id: Dict[str, List[float]] = {}
- for chunk in chunks(papers):
- # Allow Python requests to convert the data above to JSON
- response = requests.post(URL, json=chunk)
-
- if response.status_code != 200:
- raise RuntimeError("Sorry, something went wrong, please try later!")
-
- for paper in response.json()["preds"]:
- embeddings_by_paper_id[paper["paper_id"]] = paper["embedding"]
-
- return embeddings_by_paper_id
- def get_embeddings(paper_title, paper_description):
- output = [{"title": paper_title, "abstract": paper_description, "paper_id": "target_paper"}]
- emb_vector = embed(output)["target_paper"]
- target_paper = output[0]
- target_paper["embeddings"] = emb_vector
- return target_paper
k
篇论文- def get_top_k(papers_dict, paper_title, paper_description, k=None):
- # 获取目标论文的嵌入向量
- target_paper = get_embeddings(paper_title, paper_description)
- # 存放所有的论文信息,其中应包含嵌入向量
- papers = papers_dict
-
- # 如果k小于papers的数量,返回k篇最相关的论文
- # 如果k大于等于papers的数量或k为None,返回所有论文
- max_num_papers = len(papers) # 获取论文总数
- if k is None: # 如果k为None,设置k为论文总数
- k = max_num_papers
- num_papers = min(k, max_num_papers) # 确定需要返回的论文数量
-
- # 获取目标论文的嵌入向量
- target_embedding_vector = target_paper["embeddings"]
-
- # 计算每篇论文与目标论文的余弦相似度
- for k in papers:
- v = papers[k]
- embedding_vector = v["embeddings"] # 获取当前论文的嵌入向量
- cos_sim = evaluate_cosine_similarity(embedding_vector, target_embedding_vector) # 计算余弦相似度
- papers[k]["cos_sim"] = cos_sim # 存储余弦相似度到papers中
-
- # 返回相似度最高的前k篇论文
- sorted_papers = {k: v for k, v in sorted(papers.items(), key=lambda x: x[1]["cos_sim"], reverse=True)[:num_papers]}
-
- # 从返回的论文中移除嵌入向量信息
- for key in sorted_papers:
- sorted_papers[key].pop("embeddings", None)
-
- return sorted_papers
- def remove_newlines(serie):
- # This function is applied to the abstract of each paper to reduce the length of prompts.
- serie = serie.replace('\n', ' ')
- serie = serie.replace('\\n', ' ')
- serie = serie.replace(' ', ' ')
- serie = serie.replace(' ', ' ')
- return serie
从.bib文件加载论文信息
- def load_papers_from_bibtex(bib_file_path):
- with open(bib_file_path) as bibtex_file:
- bib_database = bibtexparser.load(bibtex_file)
- if len(bib_database.entries) == 0:
- return []
- else:
- bib_papers = []
- for bibitem in bib_database.entries:
- # Add each paper to `bib_papers`
- paper_id = bibitem.get("ID")
- title = bibitem.get("title")
- if title is None:
- continue
- journal = bibitem.get("journal")
- year = bibitem.get("year")
- author = bibitem.get("author")
- abstract = bibitem.get("abstract")
- if abstract is None:
- abstract = search_paper_abstract(title)
- result = {
- "paper_id": paper_id,
- "title": title,
- "link": "",
- "abstract": abstract,
- "authors": author,
- "year": year,
- "journal": journal
- }
- bib_papers.append(result)
- return bib_papers
search_paper_abstract
函数查询摘要 - def search_paper_abstract(title):
- pg = ProxyGenerator()
- success = pg.FreeProxies() # pg.ScraperAPI("921b16f94d701308b9d9b4456ddde155")
- if success:
- try:
- scholarly.use_proxy(pg)
- # input the title of a paper, return its abstract
- search_query = scholarly.search_pubs(title)
- found_paper = next(search_query)
- except:
- return ""
- else:
- return ""
- # raise RuntimeError("ScraperAPI fails.")
- return remove_newlines(found_paper['bib']['abstract'])
计算文本的tokens数量
tokenizer
对象来计算给定文本的tokens的数量 - # `tokenizer`: used to count how many tokens
- tokenizer_name = tiktoken.encoding_for_model('gpt-4')
- tokenizer = tiktoken.get_encoding(tokenizer_name.name)
-
-
- def tiktoken_len(text):
- # evaluate how many tokens for the given text
- tokens = tokenizer.encode(text, disallowed_special=())
- return len(tokens)
使用Semantic Scholar (SS) API搜索论文
parse_search_results
函数
这部分主要关于从搜索结果中提取学术论文的相关信息:
该函数的目的是对传入的搜索结果进行解析,并将其转换为一个论文信息列表。
&
替换为\&
。ss_search
方法,然后使用上述函数处理这些搜索结果 - def parse_search_results(search_results_ss):
- # 判断搜索结果是否为空
- if len(search_results_ss) == 0:
- return []
-
- # 将搜索结果转换为论文字典的列表
- papers_ss = []
- for raw_paper in search_results_ss:
- # 如果论文没有摘要,跳过此论文
- if raw_paper["abstract"] is None:
- continue
-
- # 提取作者信息
- authors_str, last_name = extract_author_info(raw_paper['authors'])
- # 获取论文的发表年份
- year_str = str(raw_paper['year'])
- # 获取论文标题
- title = raw_paper['title']
-
- # 有些期刊的名字可能包含"&"字符;将其替换掉
- journal = raw_paper['venue'].replace("&", "\\&")
- # 如果没有提供期刊名,就默认为“arXiv preprint”
- if not journal:
- journal = "arXiv preprint"
-
- # 根据作者姓、发表年份和标题提取论文ID
- paper_id = extract_paper_id(last_name, year_str, title).lower()
- # 转换外部ID为链接
- link = externalIds2link(raw_paper['externalIds'])
-
- # 如果存在tldr摘要,使用tldr摘要;否则,使用原始摘要并移除其中的换行符
- if tldr and raw_paper['tldr'] is not None:
- abstract = raw_paper['tldr']['text']
- else:
- abstract = remove_newlines(raw_paper['abstract'])
-
- # 有些论文可能没有嵌入;处理这种情况
- embeddings_dict = raw_paper.get('embedding')
- if embeddings_dict is None:
- continue
- else:
- embeddings = raw_paper['embedding']['vector']
- # 组合结果
- result = {
- "paper_id": paper_id,
- "title": title,
- "abstract": abstract,
- "link": link,
- "authors": authors_str,
- "year": year_str,
- "journal": journal,
- "embeddings": embeddings
- }
- # 将结果添加到论文列表中
- papers_ss.append(result)
- # 返回论文列表
- return papers_ss
-
- # 使用关键字进行搜索
- raw_results = ss_search(keyword, limit=counts)
- # 如果获取到了原始搜索结果
- if raw_results is not None:
- # 提取搜索结果数据
- search_results = raw_results.get("data")
- # 如果搜索结果是空的,设置为空列表
- if search_results is None:
- search_results = []
- # 如果没有获取到原始搜索结果,设置为空列表
- else:
- search_results = []
- # 解析搜索结果并返回
- results = parse_search_results(search_results)
- return results
References
类该类用于管理论文引用:
//待更
chat_paper.py,包含一个Paper类、Reader类和chat_paper_mian函数。该程序功能为根据读者输入的搜索查询和感兴趣的关键词,从Arxiv数据库中获取文章,并对文章进行摘要和总结。程序使用了OpenAI的GPT-3模型生成文本摘要,使用了arxiv包获取Arxiv数据库中的文章。程序会将摘要和总结以markdown文件的形式保存下来。
- def parse_pdf(self): # 定义一个方法来解析PDF文件
- self.pdf = fitz.open(self.path) # 使用fitz库打开指定路径的pdf文件
- self.text_list = [page.get_text() for page in self.pdf] # 从每一页中提取文本并存放到列表中
- self.all_text = ' '.join(self.text_list) # 将每一页的文本连接成一个完整的字符串
- self.section_page_dict = self._get_all_page_index() # 获取段落与其对应的页码字典
- print("section_page_dict", self.section_page_dict) # 打印该段落与页码的对应字典
- self.section_text_dict = self._get_all_page() # 获取段落与其对应的内容字典
- self.section_text_dict.update({"title": self.title}) # 将标题添加到段落内容字典中
- self.section_text_dict.update({"paper_info": self.get_paper_info()}) # 获取论文的信息并添加到字典中
- self.pdf.close() # 关闭pdf文件
- def _get_all_page_index(self):
- # 定义需要寻找的章节名称列表
- section_list = ["Abstract",
- 'Introduction', 'Related Work', 'Background',
- "Preliminary", "Problem Formulation",
- 'Methods', 'Methodology', "Method", 'Approach', 'Approaches',
- # exp
- "Materials and Methods", "Experiment Settings",
- 'Experiment', "Experimental Results", "Evaluation", "Experiments",
- "Results", 'Findings', 'Data Analysis',
- "Discussion", "Results and Discussion", "Conclusion",
- 'References']
- # 初始化一个字典来存储找到的章节和它们在文档中出现的页码
- section_page_dict = {}
- # 遍历每一页文档
- for page_index, page in enumerate(self.pdf):
- # 获取当前页面的文本内容
- cur_text = page.get_text()
- # 遍历需要寻找的章节名称列表
- for section_name in section_list:
- # 将章节名称转换成大写形式
- section_name_upper = section_name.upper()
- # 如果当前页面包含"Abstract"这个关键词
- if "Abstract" == section_name and section_name in cur_text:
- # 将"Abstract"和它所在的页码加入字典中
- section_page_dict[section_name] = page_index
- # 如果当前页面包含章节名称,则将章节名称和它所在的页码加入字典中
- else:
- if section_name + '\n' in cur_text:
- section_page_dict[section_name] = page_index
- elif section_name_upper + '\n' in cur_text:
- section_page_dict[section_name] = page_index
- # 返回所有找到的章节名称及它们在文档中出现的页码
- return section_page_dict
- def _get_all_page(self):
- """
- 获取PDF文件中每个页面的文本信息,并将文本信息按照章节组织成字典返回。
- """
- text = '' # 初始化空字符串用于临时储存文本
- text_list = [] # 初始化列表用于储存每一页的文本
- section_dict = {} # 初始化章节字典
-
- text_list = [page.get_text() for page in self.pdf] # 从每一页获取文本
- for sec_index, sec_name in enumerate(self.section_page_dict): # 遍历章节页码字典
- print(sec_index, sec_name, self.section_page_dict[sec_name]) # 打印章节索引、章节名和章节起始页码
- if sec_index <= 0 and self.abs: # 如果是第一个章节并且存在摘要,则跳过
- continue
- else:
- start_page = self.section_page_dict[sec_name] # 获取章节的起始页码
- # 如果当前章节不是最后一个,则获取下一个章节的起始页码作为当前章节的结束页码
- if sec_index < len(list(self.section_page_dict.keys()))-1:
- end_page = self.section_page_dict[list(self.section_page_dict.keys())[sec_index+1]]
- else: # 否则当前章节的结束页码为PDF的最后一页
- end_page = len(text_list)
- print("start_page, end_page:", start_page, end_page) # 打印起始和结束页码
-
- cur_sec_text = '' # 初始化当前章节的文本
- # 如果起始页码和结束页码相同,说明章节在同一页内
- if end_page - start_page == 0:
- next_sec = list(self.section_page_dict.keys())[sec_index+1]
- # 下面的代码是为了确定当前章节的文本的起始和结束位置
- # 这部分代码处理可能存在的大小写不一致的问题
- start_i = text_list[start_page].find(sec_name) if text_list[start_page].find(sec_name) != -1 else text_list[start_page].find(sec_name.upper())
- end_i = text_list[start_page].find(next_sec) if text_list[start_page].find(next_sec) != -1 else text_list[start_page].find(next_sec.upper())
- cur_sec_text += text_list[start_page][start_i:end_i]
- else: # 否则,章节可能跨越多页
- for page_i in range(start_page, end_page):
- # 下面的代码是为了确定在每一页中章节文本的起始和结束位置
- if page_i == start_page:
- start_i = text_list[start_page].find(sec_name) if text_list[start_page].find(sec_name) != -1 else text_list[start_page].find(sec_name.upper())
- cur_sec_text += text_list[page_i][start_i:]
- elif page_i < end_page:
- cur_sec_text += text_list[page_i]
- elif page_i == end_page:
- next_sec = list(self.section_page_dict.keys())[sec_index+1]
- end_i = text_list[start_page].find(next_sec) if text_list[start_page].find(next_sec) != -1 else text_list[start_page].find(next_sec.upper())
- cur_sec_text += text_list[page_i][:end_i]
- # 在当前章节的文本中去除多余的换行符
- section_dict[sec_name] = cur_sec_text.replace('-\n', '').replace('\n', ' ')
- return section_dict # 返回章节字典
- def get_paper_info(self): # 定义一个方法获取论文的信息
- first_page_text = self.pdf[self.title_page].get_text() # 从PDF的标题页中提取文本
- if "Abstract" in self.section_text_dict.keys(): # 如果"Abstract"(摘要)在字典的关键字中
- abstract_text = self.section_text_dict['Abstract'] # 从字典中获取摘要的文本
- else: # 否则
- abstract_text = self.abs # 使用self.abs作为摘要的文本
- first_page_text = first_page_text.replace(abstract_text, "") # 从首页面文本中移除摘要内容
- return first_page_text # 返回处理后的首页面文本
- def get_title(self):
- doc = self.pdf # 打开pdf文件
- max_font_size = 0 # 初始化最大字体大小为0
- max_string = "" # 初始化最大字体大小对应的字符串为空
- max_font_sizes = [0]
- for page_index, page in enumerate(doc): # 遍历每一页
- text = page.get_text("dict") # 获取页面上的文本信息
- blocks = text["blocks"] # 获取文本块列表
- for block in blocks: # 遍历每个文本块
- if block["type"] == 0 and len(block['lines']): # 如果是文字类型
- if len(block["lines"][0]["spans"]):
- font_size = block["lines"][0]["spans"][0]["size"] # 获取第一行第一段文字的字体大小
- max_font_sizes.append(font_size)
- if font_size > max_font_size: # 如果字体大小大于当前最大值
- max_font_size = font_size # 更新最大值
- max_string = block["lines"][0]["spans"][0]["text"] # 更新最大值对应的字符串
- max_font_sizes.sort()
- print("max_font_sizes", max_font_sizes[-10:])
- cur_title = ''
-
- for page_index, page in enumerate(doc): # 遍历每一页
- text = page.get_text("dict") # 获取页面上的文本信息
- blocks = text["blocks"] # 获取文本块列表
- for block in blocks: # 遍历每个文本块
- if block["type"] == 0 and len(block['lines']): # 如果是文字类型
- if len(block["lines"][0]["spans"]):
- cur_string = block["lines"][0]["spans"][0]["text"] # 更新最大值对应的字符串
- font_flags = block["lines"][0]["spans"][0]["flags"] # 获取第一行第一段文字的字体特征
- font_size = block["lines"][0]["spans"][0]["size"] # 获取第一行第一段文字的字体大小
- # print(font_size)
- if abs(font_size - max_font_sizes[-1]) < 0.3 or abs(font_size - max_font_sizes[-2]) < 0.3:
- # print("The string is bold.", max_string, "font_size:", font_size, "font_flags:", font_flags)
- if len(cur_string) > 4 and "arXiv" not in cur_string:
- # print("The string is bold.", max_string, "font_size:", font_size, "font_flags:", font_flags)
- if cur_title == '' :
- cur_title += cur_string
- else:
- cur_title += ' ' + cur_string
- self.title_page = page_index
- # break
- title = cur_title.replace('\n', ' ')
- return title
Reader类包含了下载文章、筛选文章以及使用OpenAI的GPT-3模型生成文本摘要和总结的方法。主要方法有:
- # 遍历论文列表
- for paper_index, paper in enumerate(paper_list):
- # 第一步:用title,abs和introduction进行总结
- text = ''
- text += 'Title:' + paper.title
- text += 'Url:' + paper.url
- text += 'Abstract:' + paper.abs
- text += 'Paper_info:' + paper.section_text_dict['paper_info']
- # 添加introduction
- text += list(paper.section_text_dict.values())[0]
- chat_summary_text = ""
-
- # 尝试与聊天机器人对话以获取摘要
- try:
- chat_summary_text = self.chat_summary(text=text)
- except Exception as e: # 捕获所有异常
- print("summary_error:", e)
- import sys
- exc_type, exc_obj, exc_tb = sys.exc_info() # 获取异常信息
- fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
- print(exc_type, fname, exc_tb.tb_lineno)
- if "maximum context" in str(e): # 如果错误信息中包含特定字符串
- current_tokens_index = str(e).find("your messages resulted in") + len(
- "your messages resulted in") + 1
- offset = int(str(e)[current_tokens_index:current_tokens_index + 4])
- summary_prompt_token = offset + 1000 + 150
- chat_summary_text = self.chat_summary(text=text, summary_prompt_token=summary_prompt_token)
-
- # 添加到html列表中
- htmls.append('## Paper:' + str(paper_index + 1))
- htmls.append('\n\n\n')
- htmls.append(chat_summary_text)
其次,第二步:总结方法 - # 第二步:总结方法。
- # 由于有些文章的方法章节名是算法名,所以简单的通过关键词来筛选很难获取
- method_key = ''
- for parse_key in paper.section_text_dict.keys():
- if 'method' in parse_key.lower() or 'approach' in parse_key.lower():
- method_key = parse_key
- break
-
- # 如果找到方法关键词
- if method_key != '':
- text = ''
- method_text = ''
- summary_text = ''
- summary_text += "<summary>" + chat_summary_text
- method_text += paper.section_text_dict[method_key]
- text = summary_text + "\n\n<Methods>:\n\n" + method_text
- chat_method_text = ""
- try:
- chat_method_text = self.chat_method(text=text)
- except Exception as e:
- print("method_error:", e)
- import sys
- exc_type, exc_obj, exc_tb = sys.exc_info()
- fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
- print(exc_type, fname, exc_tb.tb_lineno)
- if "maximum context" in str(e):
- current_tokens_index = str(e).find("your messages resulted in") + len(
- "your messages resulted in") + 1
- offset = int(str(e)[current_tokens_index:current_tokens_index + 4])
- method_prompt_token = offset + 800 + 150
- chat_method_text = self.chat_method(text=text, method_prompt_token=method_prompt_token)
- htmls.append(chat_method_text)
- else:
- chat_method_text = ''
- htmls.append("\n" * 4)
最后,第三步:总结全文并打分 - # 第三步:总结全文并打分。
- conclusion_key = ''
- for parse_key in paper.section_text_dict.keys():
- if 'conclu' in parse_key.lower():
- conclusion_key = parse_key
- break
-
- text = ''
- conclusion_text = ''
- summary_text = ''
- summary_text += "<summary>" + chat_summary_text + "\n <Method summary>:\n" + chat_method_text
- if conclusion_key != '':
- conclusion_text += paper.section_text_dict[conclusion_key]
- text = summary_text + "\n\n<Conclusion>:\n\n" + conclusion_text
- else:
- text = summary_text
- chat_conclusion_text = ""
- try:
- chat_conclusion_text = self.chat_conclusion(text=text)
- except Exception as e:
- print("conclusion_error:", e)
- import sys
- exc_type, exc_obj, exc_tb = sys.exc_info()
- fname = os.path.split(exc_tb.tb_frame.f_code.co_filename)[1]
- print(exc_type, fname, exc_tb.tb_lineno)
- if "maximum context" in str(e):
- current_tokens_index = str(e).find("your messages resulted in") + len(
- "your messages resulted in") + 1
- offset = int(str(e)[current_tokens_index:current_tokens_index + 4])
- conclusion_prompt_token = offset + 800 + 150
- chat_conclusion_text = self.chat_conclusion(text=text, conclusion_prompt_token=conclusion_prompt_token)
- htmls.append(chat_conclusion_text)
- htmls.append("\n" * 4)
-
- # 整合成一个文件并保存
- date_str = str(datetime.datetime.now())[:13].replace(' ', '-')
- export_path = os.path.join(self.root_path, 'export')
- if not os.path.exists(export_path):
- os.makedirs(export_path)
- mode = 'w' if paper_index == 0 else 'a'
- file_name = os.path.join(export_path,
- date_str + '-' + self.validateTitle(paper.title[:80]) + "." + self.file_format)
- self.export_to_markdown("\n".join(htmls), file_name=file_name, mode=mode)
- htmls = []
- def chat_summary(self, text, summary_prompt_token=1100):
- # 设置OpenAI API密钥
- openai.api_key = self.chat_api_list[self.cur_api]
- # 更新API密钥索引,用于循环使用多个API密钥(如果有)
- self.cur_api += 1
- self.cur_api = 0 if self.cur_api >= len(self.chat_api_list) - 1 else self.cur_api
-
- # 计算输入文本的token数量
- text_token = len(self.encoding.encode(text))
- # 计算截断文本的索引,确保总的token数量不超过限制
- clip_text_index = int(len(text) * (self.max_token_num - summary_prompt_token) / text_token)
- # 获取截断后的文本
- clip_text = text[:clip_text_index]
-
- # 定义聊天机器人的交互消息
- messages = [
- {"role": "system",
- "content": "You are a researcher in the field of [" + self.key_word + "] who is good at summarizing papers using concise statements"},
- {"role": "assistant",
- "content": "This is the title, author, link, abstract and introduction of an English document. I need your help to read and summarize the following questions: " + clip_text},
- {"role": "user", "content": """
- ...(这部分是详细的指示内容,为了简洁我略过了)...
- """.format(self.language, self.language, self.language)},
- ]
-
- # 根据API类型调用相应的方法
- if openai.api_type == 'azure':
- response = openai.ChatCompletion.create(
- engine=self.chatgpt_model,
- messages=messages,
- )
- else:
- response = openai.ChatCompletion.create(
- model=self.chatgpt_model,
- messages=messages,
- )
-
- # 从响应中提取机器人的回复
- result = ''
- for choice in response.choices:
- result += choice.message.content
-
- # 打印结果和使用的token数量以及响应时间
- print("summary_result:\n", result)
- print("prompt_token_used:", response.usage.prompt_tokens,
- "completion_token_used:", response.usage.completion_tokens,
- "total_token_used:", response.usage.total_tokens)
- print("response_time:", response.response_ms / 1000.0, 's')
-
- # 返回结果
- return result
- def chat_method(self, text, method_prompt_token=800):
- # 设置OpenAI的API key
- openai.api_key = self.chat_api_list[self.cur_api]
-
- # 将当前API索引递增,以便下次使用不同的API key
- self.cur_api += 1
-
- # 如果当前API索引超出API key列表的长度,则将其重置为0(实现循环使用API key列表)
- self.cur_api = 0 if self.cur_api >= len(self.chat_api_list) - 1 else self.cur_api
-
- # 使用encoding方法计算输入文本的token数量
- text_token = len(self.encoding.encode(text))
-
- # 根据最大token数量和方法提示token计算需要裁剪的文本长度
- clip_text_index = int(len(text) * (self.max_token_num - method_prompt_token) / text_token)
-
- # 根据上面计算的索引裁剪文本
- clip_text = text[:clip_text_index]
-
- # 定义要发送到ChatGPT的消息列表
- messages = [
- # 定义系统角色的消息,描述用户的专业背景和能力
- {"role": "system", "content": "You are a researcher in the field of [" + self.key_word + "] who is good at summarizing papers using concise statements"},
-
- # 定义助手角色的消息,描述要助手完成的任务
- {"role": "assistant", "content": "This is the <summary> and <Method> part of an English document, where <summary> you have summarized, but the <Methods> part, I need your help to read and summarize the following questions." + clip_text},
-
- # 定义用户角色的消息,给出具体的问题和期望格式
- {"role": "user", "content": """
- 7. Describe in detail the methodological idea of this article. Be sure to use {} answers (proper nouns need to be marked in English). For example, its steps are.
- - (1):...
- - (2):...
- - (3):...
- - .......
- Follow the format of the output that follows:
- 7. Methods: \n\n
- - (1):xxx;\n
- - (2):xxx;\n
- - (3):xxx;\n
- ....... \n\n
- Be sure to use {} answers (proper nouns need to be marked in English), statements as concise and academic as possible, do not repeat the content of the previous <summary>, the value of the use of the original numbers, be sure to strictly follow the format, the corresponding content output to xxx, in accordance with \n line feed, ....... means fill in according to the actual requirements, if not, you can not write.
- """.format(self.language, self.language)},
- ]
-
- # 根据API类型选择适当的调用方法
- if openai.api_type == 'azure':
- response = openai.ChatCompletion.create(
- engine=self.chatgpt_model,
- messages=messages,
- )
- else:
- response = openai.ChatCompletion.create(
- model=self.chatgpt_model,
- messages=messages,
- )
-
- # 从返回的答案中初始化一个空字符串用于保存结果
- result = ''
-
- # 遍历返回的选择,将内容添加到结果字符串中
- for choice in response.choices:
- result += choice.message.content
-
- # 打印方法的结果和相关的token使用情况
- print("method_result:\n", result)
- print("prompt_token_used:", response.usage.prompt_tokens,
- "completion_token_used:", response.usage.completion_tokens,
- "total_token_used:", response.usage.total_tokens)
-
- # 打印响应时间
- print("response_time:", response.response_ms / 1000.0, 's')
-
- # 返回结果字符串
- return result
- def chat_conclusion(self, text, conclusion_prompt_token=800):
- # 设置OpenAI的API密钥
- openai.api_key = self.chat_api_list[self.cur_api]
-
- # 使当前API索引递增,以便下次使用不同的API密钥
- self.cur_api += 1
-
- # 如果当前API索引超过API密钥列表的长度,将其重置为0
- self.cur_api = 0 if self.cur_api >= len(self.chat_api_list) - 1 else self.cur_api
-
- # 使用encoding方法计算输入文本的token数量
- text_token = len(self.encoding.encode(text))
-
- # 计算需要裁剪的文本长度,以适应模型的最大token限制
- clip_text_index = int(len(text) * (self.max_token_num - conclusion_prompt_token) / text_token)
-
- # 裁剪文本
- clip_text = text[:clip_text_index]
-
- # 定义要发送给ChatGPT的消息列表
- messages = [
- # 系统角色的消息,描述用户作为一个审稿人的背景
- {"role": "system", "content": "You are a reviewer in the field of [" + self.key_word + "] and you need to critically review this article"},
-
- # 助手角色的消息,描述要助手完成的任务
- {"role": "assistant", "content": "This is the <summary> and <conclusion> part of an English literature, where <summary> you have already summarized, but <conclusion> part, I need your help to summarize the following questions:" + clip_text},
-
- # 用户角色的消息,提供具体问题和预期的答案格式
- {"role": "user", "content": """
- 8. Make the following summary.Be sure to use {} answers (proper nouns need to be marked in English).
- - (1):What is the significance of this piece of work?
- - (2):Summarize the strengths and weaknesses of this article in three dimensions: innovation point, performance, and workload.
- .......
- Follow the format of the output later:
- 8. Conclusion: \n\n
- - (1):xxx;\n
- - (2):Innovation point: xxx; Performance: xxx; Workload: xxx;\n
-
- Be sure to use {} answers (proper nouns need to be marked in English), statements as concise and academic as possible, do not repeat the content of the previous <summary>, the value of the use of the original numbers, be sure to strictly follow the format, the corresponding content output to xxx, in accordance with \n line feed, ....... means fill in according to the actual requirements, if not, you can not write.
- """.format(self.language, self.language)},
- ]
-
- # 根据API类型选择适当的方法来获取模型的答案
- if openai.api_type == 'azure':
- response = openai.ChatCompletion.create(
- engine=self.chatgpt_model,
- messages=messages,
- )
- else:
- response = openai.ChatCompletion.create(
- model=self.chatgpt_model,
- messages=messages,
- )
-
- # 初始化结果字符串
- result = ''
-
- # 遍历模型返回的答案,将其添加到结果字符串中
- for choice in response.choices:
- result += choice.message.content
-
- # 打印结论部分的结果和token使用情况
- print("conclusion_result:\n", result)
- print("prompt_token_used:", response.usage.prompt_tokens,
- "completion_token_used:", response.usage.completion_tokens,
- "total_token_used:", response.usage.total_tokens)
-
- # 打印响应时间
- print("response_time:", response.response_ms / 1000.0, 's')
-
- # 返回结果字符串
- return result
// 待更
chatpaper代码运行后得到的部分结果 输出:标题、作者、单位、 关键词、相关链接及 Summary。其中
//待更
// 待更
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。