赞
踩
mongodb, mysql和elasticsearch 功能较相似,所以打算用一套接口将它们封装起来
如下:
class StorageBase: def __init__(self, host=None, port=None, database=None, table=None, location=None, account=None, password=None, url=None): self.host = host self.port = port self.database = database self.table = table self.location = location self.account = account self.password = password self.url = url def build_connect(self): raise NotImplementedError # 增 def add(self, table_collection_index, data, id=None): raise NotImplementedError # 删 def delete(self, table_collection_index, condition=None, data=None, id=None): raise NotImplementedError # 改 def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None): raise NotImplementedError # 查 def search(self, table_collection_index, condition=None): raise NotImplementedError # 没有就新增,有就更新 def add_or_update(self, table_collection_index, data, condition=None): raise NotImplementedError
class MongodbStore(StorageBase): def __init__(self, host=MONGODB_CONNECTION_HOST, port=MONGODB_CONNECTION_PORT, database=MONGODB_DATABASE, account=MONGODB_DATABASE_USER, password=MONGODB_DATABASE_PASS): self.client = None self.db = None StorageBase.__init__(self, host=host, port=port, database=database, account=account, password=password) def build_connect(self): try: connection = f"mongodb://{self.account}:{self.password}@{self.host}:{self.port}/{self.database}" self.client = pymongo.MongoClient(connection) self.db = self.client[self.database] except Exception as e: print("失败:{0}".format(e)) self.db = None else: print(f"连接成功") return self.db # 增 # data要是列表形式,否则报错 # 添加单个可以是 单个元素的数组 def add(self, table_collection_index, data, id=None): try: result = self.db[table_collection_index].insert_many(data) print(f"新增成功") except Exception as e: print("失败:{0}".format(e)) result = None else: print(f"新增成功") return result def add_one(self, table_collection_index, data): try: result = self.db[table_collection_index].insert_one(data) except Exception as e: print("失败:{0}".format(e)) result = None else: print(f"新增成功") return result # 没有就新增,有就更新 def add_or_update(self, table_collection_index, data, condition=None): existing_doc = self.search_one(table_collection_index, conditions=None) if existing_doc: # 如果文档存在,则更新 self.update_one(table_collection_index, condition, { "key": "set", "value": data }) print('数据已更新。') else: # 如果文档不存在,则插入新文档 self.add_one(table_collection_index, data) print('新数据已插入。') # 删 def delete(self, table_collection_index, condition=None, data=None, id=None): try: result = self.db[table_collection_index].delete_many(parse_mongodb_condition(data)) except Exception as e: print("失败:{0}".format(e)) result = None else: print(f"删除成功") return result def delete_one(self, table_collection_index, data): try: result = self.db[table_collection_index].delete_one(parse_mongodb_condition(data)) except Exception as e: print("失败:{0}".format(e)) result = None else: print(f"删除成功") return result # 改 # search_condition 指定查询条件 # update_condition 指定更新条件 def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None): try: result = self.db[table_collection_index].update_many(parse_mongodb_condition(condition), parse_mongodb_condition(update_condition)) except Exception as e: logging.error(f"失败:{e}") result = None else: print( f"匹配的数据{result.matched_count if result else 0}条,影响的数据{result.modified_count if result else 0}条") return result # 改单个 def update_one(self, table_collection_index=None, search_condition=None, update_condition=None): try: result = self.db[table_collection_index].update_one(parse_mongodb_condition(search_condition), parse_mongodb_condition(update_condition)) except Exception as e: print("失败:{0}".format(e)) result = None else: print( f"匹配的数据{result.matched_count if result else 0}条,影响的数据{result.modified_count if result else 0}条") return result # 计数 def count(self, table_collection_index, conditions): try: result = self.db[table_collection_index].count_documents(parse_mongodb_condition(conditions)) except Exception as e: print("失败:{0}".format(e)) result = None else: print(f"共有{result.matched_count if result else 0}条数据") return result # 查 def search(self, table_collection_index, conditions=None): try: results = self.db[table_collection_index].find(parse_mongodb_condition(conditions) if conditions else {}) except Exception as e: print("失败:{0}".format(e)) results = None return results def search_one(self, table_collection_index, conditions=None): try: result = self.db[table_collection_index].find_one(parse_mongodb_condition(conditions) if conditions else {}) print(f"查询到的内容:{result}") except Exception as e: print("失败:{0}".format(e)) result = None return result # 排序 def sort(self, table_collection_index, sort_key, conditions=None, skip_num=None, limit_num=None, sort_type=pymongo.ASCENDING): try: query = self.db[table_collection_index].find( parse_mongodb_condition(conditions) if conditions else {}).sort(sort_key, sort_type) if skip_num and limit_num: results = query.skip(skip_num).limit(limit_num) elif skip_num: results = query.skip(skip_num) elif limit_num: results = query.limit(limit_num) else: results = query except Exception as e: print("失败:{0}".format(e)) result = None return results # 测试代码 if __name__ == "__main__": # 建立连接 mongodb_connect = MongodbStore() mongodb_connect.build_connect() # student1 = { # "id": 1, # "name": "Jordan3", # "age": 23, # "gender": "male" # } # # student2 = { # "name": "Mike", # "age": 21, # "gender": "male" # } # mongodb_connect.add("students", [student1]) # result = mongodb_connect.delete("students", {"name": "Mike"}) # result = mongodb_connect.update("students", {"age": {"#gt": 23}}, {"$inc": {"age": 1}}) # print(result.matched_count, result.modified_count) # 自定义分隔符号 # condition = parse_mongodb_condition("age*>*20;name*regex*'^J.*'", ";", "*") condition_list = [ { "key": "age", "condition": ">", "value": "20" }, { "key": "name", "condition": "regex", "value": "'^J.*'" }, ] # 不设置condition的话表示等于 # search_list = [ # { # "key": "age", # "value": "20" # }, # ] # 条件表达式解析 mongodb_connect.search("students", condition_list) mongodb_connect.sort("students", 'age', condition_list, None, None, pymongo.DESCENDING)
然后用的字典表如下
MONGODB_CONDITION_DICTIONARY = { '<': '$lt', '>': '$gt', '<=': '$lte', '>=': '$gte', '!=': '$ne', 'in': '$in', 'notin': '$nin', 'regex': '$regex', 'exists': '$exists', 'type': '$type', 'mod': '$mod', 'inc': '$inc', 'search': '$search', 'text': '$text', 'set': '$set', }
这里要注意的是考虑到mongodb搜索条件的语法有些复杂,因此对其进行了一些简化,简化的思路是把条件转化成简单的键值对对象,下面看代码
# 优化该方法的思路 # 1. 使用列表推导式替代了外部循环。 # 2. 使用 str.join() 方法替代了内部循环中的字符串连接操作,这样可以减少不必要的字符串创建和连接操作,提高代码效率。 # 3. 使用 dict.values() 方法直接获取字典中的所有值,避免了在内部循环中通过键获取值的操作。 # 将条件对象数组转化为条件字符串 def turn_list_to_condition_str(list, split_cond=',', split_key=':'): conditions_list = [ split_key.join([str(value) for value in search.values()]) for search in list ] return split_cond.join(conditions_list) # 该方法将条件字符串转换为条件对象 def parse_mongodb_condition(conditions, split_cond=',', split_key=':'): # 传入字符串规则 # 例子:"age:>:20,name:regex:'^J.*'" # ,分隔多个条件 # age:>:20 age是字段 >是条件 20是值 中间用:分隔 # 分隔符默认是,和: 也可以自定义 # 分割字符串 pairs = turn_list_to_condition_str(conditions).split(split_cond) # 构建字典 condition_dict = {} # 避免了在每次循环中都多次调用 pair.split(split_key) for pair in pairs: split_pair = pair.split(split_key) key = replace_chars_by_dit(split_pair[0], MONGODB_CONDITION_DICTIONARY) if len(split_pair) == 3: cond = replace_chars_by_dit(split_pair[1], MONGODB_CONDITION_DICTIONARY) condition_dict[key] = {cond: eval(split_pair[2])} elif len(split_pair) == 2: condition_dict[key] = eval(split_pair[1]) # print(f'解析后的条件表达式是{condition_dict}') return condition_dict # 测试代码 if __name__ == "__main__": # condition_str = "age:20,name:regex:'^J.*'" # print(parse_mongodb_condition(condition_str)) search_list = [ { "key": "age", "condition": ">", "value": "20" }, { "key": "name", "condition": "regex", # 注意字符串的值要帶单引号'' "value": "'^J.*'" }, ] search_list = { "key": "set", "value": "20" }, print(parse_mongodb_condition(search_list))
# 可以和关系数据库的概念对比理解 # Relational DB -> Databases -> Tables -> Rows -> Columns # Elasticsearch -> Indices -> Types -> Documents -> Fields class ElasticsearchStore(StorageBase): def __init__(self, host=ELASTICSEARCH_HOST, port=ELASTICSEARCH_PORT, account=ELASTICSEARCH_USERNAME, password=ELASTICSEARCH_PASSWORD, verify_certs=ELASTICSEARCH_VERIFY_CERTS): self.es = None self.verify_certs = verify_certs StorageBase.__init__(self, host=host, port=port, account=account, password=password) def build_connect(self, default=True): try: if default: self.es = Elasticsearch() else: url = f"https://[{self.account}:{self.password}@]{self.host}:{self.port}" self.es = Elasticsearch(url, verify_certs=self.verify_certs) except Exception as e: print("失败:{0}".format(e)) self.es = None else: print(f"连接成功") return self.es # 增 def add(self, table_collection_index, data, id=None): try: result = self.es.index(index=table_collection_index, body=data, id=id) print(f"新增成功") except Exception as e: print("失败:{0}".format(e)) result = None else: print(f"新增成功") return result # 删 def delete(self, table_collection_index, condition=None, data=None, id=None): try: result = self.es.delete(index=table_collection_index, id=id, ignore=[400, 404]) print(f"删除成功") except Exception as e: print("失败:{0}".format(e)) result = None else: print(f"删除成功") return result # 改 def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None): try: result = self.es.update(index=table_collection_index, body=data, id=id, ignore=[400, 404]) except Exception as e: logging.error(f"失败:{e}") result = None else: print(f"更新成功") return result # 获取 def get(self, table_collection_index, id): try: result = self.es.get(index=table_collection_index, id=id, ignore=[400, 404]) except Exception as e: logging.error(f"失败:{e}") result = None else: print(f"获取成功{result}") return result # 查 # dsl = { # 'query': { # 'match': { # 'title': '高考 圆梦' # } # } # } def search(self, table_collection_index, condition=None): try: results = self.es.search(index=table_collection_index, body=condition) except Exception as e: logging.error(f"失败:{e}") results = None else: print(f"搜索成功{results}") return results # 测试代码 if __name__ == "__main__": # 建立连接 elasticsearch_store = ElasticsearchStore() elasticsearch_store.build_connect() # doc = { # 'author': 'kimchy', # 'text': 'Elasticsearch: cool. bonsai cool.', # 'timestamp': datetime.now(), # } # elasticsearch_connect.add('test-index', doc, 1) # elasticsearch_connect.get('test-index',1) doc1 = { 'author': 'kimchy', 'text': 'test', 'timestamp': datetime.now(), } elasticsearch_store.update('test-index', doc1, 1) # elasticsearch_store.delete('test-index', 1) elasticsearch_store.search('test-index', {"query": {"match_all": {}}})
class MysqlStore(StorageBase): def __init__(self, host=MYSQL_HOST, port=MYSQL_PORT, account=MYSQL_USER, password=MYSQL_PASSWORD, database=MYSQL_DATABASE): self.db = None self.cursor = None StorageBase.__init__(self, host=host, port=port, account=account, password=password, database=database) def build_connect(self) -> pymysql.cursors.Cursor: """ 建立数据库连接。 Returns: pymysql.cursors.Cursor: 数据库游标对象,用于执行 SQL 语句。 """ try: self.db = pymysql.connect(host=self.host, user=self.account, password=self.password, port=self.port, database=self.database) self.cursor = self.db.cursor() except Exception as e: print(f"连接失败:{e}") self.db = None self.cursor = None else: print("连接成功") return self.cursor # 增 # table 表 # data 新增数据 键值对对象 def add(self, table_collection_index, data, id=None): # data.keys()返回的是键的数组 keys = ', '.join(data.keys()) # 下面这段是构造多个%s最为占位符,有几个字段就构造几个 values = ', '.join(['%s'] * len(data)) # 要执行的sql语句 sql = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=table_collection_index, keys=keys, values=values) try: self.cursor.execute(sql, tuple(data.values())) self.db.commit() print(f"新增成功") except Exception as e: print("失败:{0}".format(e)) self.db.rollback() finally: print(f"{sql}") # 没有新增,有就更新 # table 表 # data 新增数据 键值对对象 def add_or_update(self, table_collection_index, data, condition=None): # data.keys()返回的是键的数组 keys = ', '.join(data.keys()) # 下面这段是构造多个%s最为占位符,有几个字段就构造几个 values = ', '.join(['%s'] * len(data)) # 要执行的sql语句 # 如果主键已经存在,就执行更新操作 sql = 'INSERT INTO {table}({keys}) VALUES ({values}) ON DUPLICATE KEY UPDATE '.format(table=table_collection_index, keys=keys, values=values) update = ', '.join(["{key} = %s".format(key=key) for key in data]) sql += update try: self.cursor.execute(sql, tuple(data.values()) * 2) self.db.commit() print(f"新增成功") except Exception as e: print("失败:{0}".format(e)) self.db.rollback() finally: print(f"{sql}") # 删 # table 表名 # condition 删除条件 def delete(self, table_collection_index, condition=None, data=None, id=None): sql = 'DELETE FROM {table} WHERE {condition}'.format(table=table, condition=condition) try: self.cursor.execute(sql) self.db.commit() print(f"删除成功") except Exception as e: print("失败:{0}".format(e)) self.db.rollback() finally: print(f"{sql}") # 改 # table 表名 # data 数据 # condition 条件 def update(self, table_collection_index, data=None, condition=None, id=None, update_condition=None): # data.keys()返回的是键的数组 keys = ', '.join(data.keys()) # 下面这段是构造多个%s最为占位符,有几个字段就构造几个 values = ', '.join(['%s'] * len(data)) # 要跟新的数据 update = ','.join(["{key} = %s".format(key=key) for key in data]) # 要执行的sql语句 sql = 'UPDATE {table} SET {update} WHERE {condition}'.format(table=table_collection_index, update=update, condition=condition) try: self.cursor.execute(sql, tuple(data.values())) self.db.commit() print(f"更新成功") except Exception as e: print("失败:{0}".format(e)) self.db.rollback() finally: print(f"{sql}") # 查 def search(self, table_collection_index, condition=None): # 要执行的sql语句 sql = 'SELECT * FROM {table} WHERE {condition}'.format(table=table_collection_index, condition=condition) try: self.cursor.execute(sql) # fetchall获取结果的所有数据 results = self.cursor.fetchall() # fetchall得到的是二重元祖,其中每个元素都是一条记录 # 要注意的是fetch的内部实现中有一个偏移指针,用来指向查询结果,偏移指针最开始指向第一条数据,取了一次数据后,指针偏移到下一条数据, # fetchone被调用后,结果的偏移指针会指向下一条数据,fetchall方法返回的是从偏移指针指向的数据一直到结束的所有数据,所有fetchall获取的数据会少一条 for row in results: print(row) except Exception as e: print("失败:{0}".format(e)) # 测试代码 if __name__ == "__main__": # 建立连接 mysql_connect = MysqlStore() mysql_connect.build_connect() data = { "name": "Bob", "age": 22 } table = "students" mysql_connect.search(table, 'age>=20')
最后要注意的是
数据库连接的相关信息都放在统一的文件里并且设置为默认的值,也可以初始化的时候传入相应的值
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。