赞
踩
最近业务上有个需求,需要采集某些公司披露的年度报告中的信息,因为 pdf 解析工具的效果不太理想,因此需要人工查找录入到oracle数据库。为了提高效率,我借助chatgpt搭建了一个小型的录入工具以提高录入速度。
我描述了需求后它给出了模板代码,我一步步测试,它一步步给出优化方案,中间他出现了比较多的语法错误和”伪造参数“,需要人工矫正。
总体感受是,它的知识面很广,利用的好的话可以大大提升造轮子的效率,但是对需求描述比较严格,需要清晰的描述出需求后(包括前因后果细节等等),才能给出符合预期的代码
最终代码如下:
import tkinter as tk from tkinter import ttk import cx_Oracle import re class TableEditor: def __init__(self, db_config): self.db_config = db_config self.root = tk.Tk() self.root.title("Table Editor") self.root.state('zoomed') self.root.configure(bg="#f0f0f0") # Create a Canvas widget canvas = tk.Canvas(self.root, bg="#f0f0f0") canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True, padx=10, pady=10) # Create a Scrollbar widget scrollbar = ttk.Scrollbar(self.root, orient=tk.VERTICAL, command=canvas.yview) scrollbar.pack(side=tk.RIGHT, fill=tk.Y, padx=10, pady=10) # Configure the Canvas widget to use the Scrollbar widget canvas.configure(yscrollcommand=scrollbar.set) # Create a Frame widget inside the Canvas widget self.table_frame = tk.Frame(canvas) self.table_frame.bind('<Configure>', lambda e: canvas.configure(scrollregion=canvas.bbox('all'))) canvas.create_window((0, 0), window=self.table_frame, anchor='nw', tags=('self.table_frame',)) # bind mousewheel to scrollbar self.root.bind('<MouseWheel>', lambda e: canvas.yview_scroll(int(-1 * (e.delta / 120)), 'units')) self.create_table() self.create_submit_button() self.create_synchronize_button() self.create_transform_button() self.root.mainloop() def create_table(self): # Get column names and comments from the database column_names, comments, usage_find, usage_find_hk = self.get_column_names_from_db() # Create table header tk.Label(self.table_frame, text="字段来源_港股", bg="#f0f0f0", fg="#333333", font=("黑体", 12)).grid(row=0, column=0) tk.Label(self.table_frame, text="字段来源", bg="#f0f0f0", fg="#333333", font=("黑体", 12)).grid(row=0, column=1) tk.Label(self.table_frame, text="字段注释", bg="#f0f0f0", fg="#333333", font=("黑体", 12)).grid(row=0, column=2) tk.Label(self.table_frame, text="字段", bg="#f0f0f0", fg="#333333", font=("黑体", 12)).grid(row=0, column=3) tk.Label(self.table_frame, text="值(元)", bg="#f0f0f0", fg="#333333", font=("黑体", 12)).grid(row=0, column=4) # Create table data self.column0_data = usage_find_hk self.column1_data = usage_find self.column2_data = comments self.column3_data = column_names self.column4_data = [''] * len(column_names) for i in range(len(self.column3_data)): # 第一列使用 Entry 控件 entry_var_1 = tk.StringVar(value=self.column0_data[i]) entry = tk.Entry(self.table_frame, textvariable=entry_var_1, state=tk.NORMAL, width=50) entry.grid(row=i + 1, column=0) # 第二列使用 Entry 控件 entry_var_2 = tk.StringVar(value=self.column1_data[i]) entry = tk.Entry(self.table_frame, textvariable=entry_var_2, state=tk.NORMAL, width=50) entry.grid(row=i + 1, column=1) # 第三列使用 Entry 控件 entry_var_3 = tk.StringVar(value=self.column2_data[i]) entry = tk.Entry(self.table_frame, textvariable=entry_var_3, state=tk.NORMAL) entry.grid(row=i + 1, column=2) tk.Label(self.table_frame, text=self.column3_data[i]).grid(row=i + 1, column=3) # column4_var = tk.StringVar(value=self.column4_data[i]) column4_var = tk.StringVar(value=self.column4_data[i] or "") column4_entry = tk.Entry(self.table_frame, textvariable=column4_var, state=tk.NORMAL, name=f"col4_{i}") column4_entry.grid(row=i + 1, column=4) # 保存输入框的变量,以便在 update_db 中使用 column4_var.trace_add('write', lambda name, index, mode, var=column4_var, i=i: self.column4_data.__setitem__(i, var.get())) self.table_frame.grid_rowconfigure(i + 1, minsize=25) def create_submit_button(self): submit_button = tk.Button(self.root, text="Submit", command=self.update_db) submit_button.pack(side=tk.BOTTOM, padx=10, pady=10) def create_synchronize_button(self): submit_button = tk.Button(self.root, text="Synchronize", command=self.synchronize_db) submit_button.pack(side=tk.BOTTOM, padx=10, pady=20) def create_transform_button(self): submit_button = tk.Button(self.root, text="Transform", command=self.transform_data) submit_button.pack(side=tk.BOTTOM, padx=10, pady=100) def get_column_names_from_db(self): # Connect to Oracle database with cx_Oracle.connect(**self.db_config) as conn: with conn.cursor() as cursor: # Replace 'table_name' with the actual name of your table cursor.execute("select A.column_name ,B.comments,C.FIND,C.FIND_HK \ from user_tab_columns A,user_col_comments B ,LOOKUP C \ where A.Table_Name = B.Table_Name and A.Column_Name = B.Column_Name \ and A.Table_Name = 'TPROP' and C.TPROP_COL=A.column_name order by A.COLUMN_ID") sql_data = cursor.fetchall() column_names = [row[0] for row in sql_data] comments = [row[1] for row in sql_data] usage_find = [row[2] for row in sql_data] usage_find_hk = [row[3] for row in sql_data] return column_names, comments, usage_find, usage_find_hk def update_db(self): # Connect to Oracle database with cx_Oracle.connect(**self.db_config) as conn: with conn.cursor() as cursor: # Replace 'table_name' and 'column_name' with the actual names of your table and column for i, value in enumerate(self.column3_data): if not self.column4_data[i]: continue comp_index = self.column3_data.index('COMPNAME') enddate_index = self.column3_data.index('ENDDATE') if i == comp_index or i == enddate_index: continue sql = f"UPDATE TPROP SET {self.column3_data[i]}='{str(self.column4_data[i])}' \ WHERE COMPNAME='{str(self.column4_data[comp_index])}' \ AND ENDDATE='{str(self.column4_data[enddate_index])}'" try: cursor.execute(sql) except Exception as e: print(self.column3_data[i], '------>', e) raise e conn.commit() # 关闭窗口 self.root.destroy() def get_column4_data_from_db(self): # Connect to Oracle database with cx_Oracle.connect(**self.db_config) as conn: with conn.cursor() as cursor: # Replace 'table_name' and 'column_name' with the actual names of your table and column comp_index = self.column3_data.index('COMPNAME') enddate_index = self.column3_data.index('ENDDATE') sql = f"SELECT * FROM TPROP \ WHERE COMPNAME='{str(self.column4_data[comp_index])}' \ AND ENDDATE='{str(self.column4_data[enddate_index])}'" cursor.execute(sql) sql_data = list(cursor.fetchall()[0]) return sql_data def synchronize_db(self): cur_data = self.get_column4_data_from_db() for i, value in enumerate(cur_data): if not value: continue var = tk.StringVar(value=value) var.set(self.column4_data[i]) # 更新表格中的数据 column4_entry = self.table_frame.grid_slaves(row=i + 1, column=4)[0] column4_entry.delete(0, tk.END) column4_entry.insert(0, value) self.column4_data[i] = value def transform_data(self): cur_data = self.column4_data for i, value in enumerate(cur_data): if not value: continue if type(value) == str: if self.column3_data[i][0] == 'T': try: new_value = re.sub(r'[^\d\.-]', '', value) except Exception as e: print(self.column3_data[i],i,value) raise e var = tk.StringVar(value=new_value) var.set(self.column4_data[i]) # 更新表格中的数据 column4_entry = self.table_frame.grid_slaves(row=i + 1, column=4)[0] column4_entry.delete(0, tk.END) column4_entry.insert(0, new_value) self.column4_data[i] = new_value if __name__ == '__main__': # Replace 'username', 'password', and 'host:port/service_name' with the actual values for your Oracle database db_config = {'user': 'xbookadmin', 'password': 'mypwd', 'dsn': 'host:port/ORCL'} table_editor = TableEditor(db_config)
代码的运行结果如图:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。