赞
踩
AI+逻辑推理比赛是由上海科学智能研究院、复旦大学联合阿里云在上智院·天池平台发布“第二届世界科学智能大赛”的逻辑推理赛道:复杂推理能力评估,该比赛聚焦于通过解决复杂的逻辑推理题,测试大型语言模型的逻辑推理能力。选手需要通过分析推理数据,利用机器学习、深度学习算法或者大语言模型,建立预测模型。比赛的研究成果将有助于评估和改进模型的逻辑推理能力,这也对于开发更智能、更有效的人工智能系统具有重要意义。
Datawhale的官方速通文档:Datawhale
根据此文档可以速通baseline,即使是像我这样的零基础小白也可以获得第一次的分数。对新人十分友好。
本篇文章基于官方文档和day1直播内容进行归纳总结。
赛事链接:http://competition.sais.com.cn/competitionDetail/532231/format
申请账号并完善个人信息,点击报名即可。
为了保证可以顺利体验大模型,同时满足比赛要求,采用阿里开源大模型Qwen系列,通过api的形式调用,使用的模型是目前限时免费的 qwen1.5-1.8b-chat 模型。
链接:https://dashscope.console.aliyun.com/apiKey
进入网页后注册并实名认证,在总览界面点击“去开通”开通灵积模型服务
然后在“API-KEY管理”中点击“创建新的API-KEY”,生成的API-KEY最好保存起来,在代码中需要用到该API-KEY。
ModelScope旨在打造下一代开源的模型即服务共享平台,为泛AI开发者提供灵活、易用、低成本的一站式模型服务产品,让模型应用更简单。
链接:https://www.modelscope.cn/my/mynotebook/preset
点击链接进入官网,登录账号,在“我的Notebook”中点击“启动”,启动完成后,点击“查看Notebook”进入环境。
若是新用户则此界面会要求绑定阿里云账号,根据网页提示完成绑定即可。
在官方手册中下载代码及测试集,然后上传文件
找到dashscope.api_key="sk-"这段代码,在双引号中填写Step2申请到的API-KEY,点击▶▶一键运行代码,大概需要30分钟。
运行完成后,下载结果文件upload.jsonl
链接:http://competition.sais.com.cn/competitionDetail/532231/mySubmissions
点击链接进入比赛界面,点击“提交结果”,上传上一步跑出来的结果文件upload.jsonl即可。等待评分大概需要 2 分钟,评分结束后可以在“我的成绩”看到自己提交的评分结果。
通过代码注释和day1直播内容,粗略了解所提供代码各部分的作用。
!pip install scipy openai tiktoken retry dashscope loguru
下载所需库文件。
- from multiprocessing import Process, Manager
- import json
- import os
- from pprint import pprint
- import re
- from tqdm import tqdm
- import random
-
- import uuid
- import openai
- import tiktoken
- import json
- import numpy as np
- import requests
- from retry import retry
- from scipy import sparse
- #from rank_bm25 import BM25Okapi
- #import jieba
- from http import HTTPStatus
- import dashscope
-
-
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from loguru import logger
- import json
- import time
- from tqdm import tqdm
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
引用大部分的库文件,主要是做多进程处理、异步任务执行、日志记录、http请求、json处理等多项内容。
- logger.remove() # 移除默认的控制台输出
- logger.add("logs/app_{time:YYYY-MM-DD}.log", level="INFO", rotation="00:00", retention="10 days", compression="zip")
-
- MODEL_NAME = 'qwen2-7b-instruct'
主要是做日志记录和模型名称的定义。
logger是一个日志的记录器对象,通常用于记录程序运行过程中的信息、警告错误等情况,通过logger.remove()的方法可以停止这种默认行为。logger.add()主要是作用是添加新的日志文件。
dashscope.api_key="sk-****"
引用API-KEY
- def api_retry(MODEL_NAME, query):
- max_retries = 5
- retry_delay = 60 # in seconds
- attempts = 0
- while attempts < max_retries:
- try:
- return call_qwen_api(MODEL_NAME, query)
- except Exception as e:
- attempts += 1
- if attempts < max_retries:
- logger.warning(f"Attempt {attempts} failed for text: {query}. Retrying in {retry_delay} seconds...")
- time.sleep(retry_delay)
- else:
- logger.error(f"All {max_retries} attempts failed for text: {query}. Error: {e}")
- raise
定义api重试函数,引入重试机制
- def call_qwen_api(MODEL_NAME, query):
- # 这里采用dashscope的api调用模型推理,通过http传输的json封装返回结果
- messages = [
- {'role': 'user', 'content': query}]
- response = dashscope.Generation.call(
- MODEL_NAME,
- messages=messages,
- result_format='message', # set the result is message format.
- )
- if response.status_code == HTTPStatus.OK:
- # print(response)
- return response['output']['choices'][0]['message']['content']
- else:
- print('Request id: %s, Status code: %s, error code: %s, error message: %s' % (
- response.request_id, response.status_code,
- response.code, response.message
- ))
- raise Exception()
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
这段代码定义了一个名为 call_qwen_api 的函数,该函数用于调用一个名为 dashscope.Generation 的 API 来生成文本。以下是该代码的功能、用途和特点的详细介绍:
- # 这里定义了prompt推理模版
-
- def get_prompt(problem, question, options):
-
- options = '\n'.join(f"{'ABCDEFG'[i]}. {o}" for i, o in enumerate(options))
-
- prompt = f"""你是一个逻辑推理专家,擅长解决逻辑推理问题。以下是一个逻辑推理的题目,形式为单项选择题。所有的问题都是(close-world assumption)闭世界假设,即未观测事实都为假。请逐步分析问题并在最后一行输出答案,最后一行的格式为"答案是:A"。题目如下:
- ### 题目:
- {problem}
- ### 问题:
- {question}
- {options}
- """
- # print(prompt)
- return prompt
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
大模型的prompt提示词工程,作用是根据我们给它下的一个定义然后生成一个逻辑推理问题的提示prompt。该函数的三个参数:problem主要用于展示逻辑推理问题的背景或者情境,question主要针对于问题里面的具体问题是什么,option表示问题的多个选项,通常是列表的形式。其中
options = '\n'.join(f"{'ABCDEFG'[i]}. {o}" for i, o in enumerate(options))
的作用是将列表的选项转换为字符串的形式,在每个选项前面加上对应字母,并用换行符进行分割,也就是说将文字形式转换为电脑可读的形式。
- # 这里使用extract抽取模获得抽取的结果
-
- def extract(input_text):
- ans_pattern = re.compile(r"答案是:(.)", re.S)
-
- problems = ans_pattern.findall(input_text)
- # print(problems)
- if(problems == ''):
- return 'A'
- return problems[0]
定义一函数用于多线程处理数据集。具体来说就是通过一些表达方式查找文本中包含的答案,然后返回找到的第一个答案。如果没有找到这个问题的答案,那么返回字符‘A’。
- def process_datas(datas,MODEL_NAME):
- results = []
- with ThreadPoolExecutor(max_workers=16) as executor:
- future_data = {}
- lasttask = ''
- lastmark = 0
- lens = 0
- for data in tqdm(datas, desc="Submitting tasks", total=len(datas)):
- problem = data['problem']
- for id,question in enumerate(data['questions']):
- prompt = get_prompt(problem,
- question['question'],
- question['options'],
- )
-
- future = executor.submit(api_retry, MODEL_NAME, prompt)
-
- future_data[future] = (data,id)
- time.sleep(0.6) # 控制每0.5秒提交一个任务
- lens += 1
- for future in tqdm(as_completed(future_data), total=lens, desc="Processing tasks"):
- # print('data',data)
- data = future_data[future][0]
- problem_id = future_data[future][1]
- try:
- res = future.result()
- extract_response = extract(res)
- # print('res',extract_response)
- data['questions'][problem_id]['answer'] = extract_response
- results.append(data)
- # print('data',data)
-
- except Exception as e:
- logger.error(f"Failed to process text: {data}. Error: {e}")
-
- return results
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
定义了process_datas函数用于处理数据集,使用多线程处理任务。函数接受数据,每一个数据是包含一个question和多个problem,接着通过调用api调用大模型来获取问题的答案,并将最后的结果存储在返回的列表中。
- def main(ifn, ofn):
- if os.path.exists(ofn):
- pass
- data = []
- # 按行读取数据
- with open(ifn) as reader:
- for line in reader:
- sample = json.loads(line)
- data.append(sample)
- datas = data
- # print(data)
- # 均匀地分成多个数据集
- return_list = process_datas(datas,MODEL_NAME)
- print(len(return_list))
- print("All tasks finished!")
- return return_list
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
定义主函数,作用是读取训练数据集,对其进行处理变成程序可操作对象的列表供process_datas函数处理,并返回处理结果。
- def evaluate(ofn):
- data = []
- with open(ofn) as reader:
- for line in reader:
- sample = json.loads(line)
- data.append(sample)
-
- pse = 0
- cnt = 0
- tot = 0
- for task in data:
- for question in task['questions']:
-
- if MODEL_NAME in question:
- tot += 1
- cnt += question[MODEL_NAME] == question['answer']
- else:
- pse += 1
-
- print(cnt, tot, cnt/tot, pse)
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
这段代码用于评估该模型在处理任务中的表现。pse记录缺少的样本数量,cnt记录模型回答问题正确的样本数量,tot记录模型回答的样本数量,因此cnt/tot表示模型回答正确率,打印这些指标。
- if __name__ == '__main__':
-
- a = extract("""根据欧几里得算法,逐步解析计算两个数6和7的最大公约数(gcd)的步骤如下:
- 1. 判断6和7是否相等:不相等。
- 2. 判断6和7大小关系,7 > 6,所以用更大的数7减去较小的数6得到结果1。
- 3. 现在计算6和1的最大公约数。
- 4. 6 > 1,根据算法用更大的数6减去较小的数1得到结果5。
- 5. 再计算5和1的最大公约数。
- 6. 5 > 1,用5减去1得到结果4。
- 7. 再计算4和1的最大公约数。
- 8. 4 > 1,用4减去1得到结果3。
- 9. 再计算3和1的最大公约数。
- 10. 3 > 1,用3减去1得到结果2。
- 11. 再计算2和1的最大公约数。
- 12. 2 > 1,用2减去1得到结果1。
- 13. 最后计算1和1的最大公约数,两数相等,gcd即为这两个数,也就是1。
- 因此,6和7的最大公约数是1。
- 答案是:C.""")
-
- print(a)
- return_list = main('round1_test_data.jsonl', 'upload.jsonl')
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
主程序执行。中间从"a=extract(……"到"print(a)"部分可能是验证extract函数功能。下一句则执行先前定义的主函数,输入测试数据集round1_test_data.jsonl,输出结果返回给return_list。
主程序执行完成之后会进行问题检测,对结果进行一个整合修正,是结果更规整。
- def has_complete_answer(questions):
- # 这里假设完整答案的判断逻辑是:每个question都有一个'answer'键
- for question in questions:
- if 'answer' not in question:
- return False
- return True
判断每个问题集合中的小问题是否都有答案。
- def filter_problems(data):
- result = []
- problem_set = set()
-
- for item in data:
- # print('处理的item' ,item)
- problem = item['problem']
- if problem in problem_set:
- # 找到已存在的字典
- for existing_item in result:
- if existing_item['problem'] == problem:
- # 如果当前字典有完整答案,替换已存在的字典
- if has_complete_answer(item['questions']):
- existing_item['questions'] = item['questions']
- existing_item['id'] = item['id']
- break
- else:
- # 如果当前字典有完整答案,添加到结果列表
- if has_complete_answer(item['questions']):
- result.append(item)
- problem_set.add(problem)
-
- return result
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
定义过滤器,实现过滤问题集合中的重复问题并保留答案。
- return_list
- return_list = filter_problems(return_list)
- sorted_data = sorted(return_list, key=lambda x: int(str(x['id'])[-3:]))
- print(sorted_data)
调用过滤器和排序函数对结果列表进行过滤和排序,并打印结果。
sorted_data
让打印结果更规整。
- def find_missing_ids(dict_list):
- # 提取所有序号
- extracted_ids = {int(d['id'][-3:]) for d in dict_list}
-
- # 创建0-500的序号集合
- all_ids = set(range(500))
-
- # 找出缺失的序号
- missing_ids = all_ids - extracted_ids
-
- return sorted(missing_ids)
-
- # 示例字典列表
- dict_list = sorted_data
-
- # 找出缺失的序号
- missing_ids = find_missing_ids(dict_list)
- print("缺失的序号:", missing_ids)
![](https://csdnimg.cn/release/blogv2/dist/pc/img/newCodeMoreWhite.png)
查找缺少的序号并打印出来。
len(missing_ids)
打印缺少序号的数量。
- data = []
- with open('round1_test_data.jsonl') as reader:
- for id,line in enumerate(reader):
- if(id in missing_ids):
- sample = json.loads(line)
- for question in sample['questions']:
- question['answer'] = 'A'
- sorted_data.append(sample)
- sorted_data = sorted(sorted_data, key=lambda x: int(str(x['id'])[-3:]))
-
- with open('upload.jsonl', 'w') as writer:
- for sample in sorted_data:
- writer.write(json.dumps(sample, ensure_ascii=False))
- writer.write('\n')
补充缺少序号的问题并赋予答案为A,对补充后的结果重新排序并写入文件upload.jsonl中得到输出文件。
经过此次体验我初步熟悉了应用大语言模型进行逻辑推理的流程,对代码内容有了较为粗浅的了解。第一次上分成绩为0.6566,希望在今后的学习中再接再厉,成绩可以得到提高。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。