当前位置:   article > 正文

python小技巧大应用--实现自己的SQLite3 ORM(全)_python自定义orm操作sqllite3数据库

python自定义orm操作sqllite3数据库

上篇文章初步实现了SQLite3 ORM查询功能,经过一周多的努力,终于将SQLite3的CRUD全部功能实现了ORM.这回只上代码,实现示例及实现结果,其他相关内容请参见我的相关文章

1.建立uitl_SQLiteORM.py工具模块

  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. __author__ = 'TianJiang Gui'
  4. import asyncio, logging
  5. import sqlite3
  6. #---gtj 设置log输出格式
  7. logging.basicConfig(format='%(asctime)s - %(pathname)s[line:%(lineno)d] - %(levelname)s: %(message)s',
  8. level=logging.INFO)
  9. # ---gtj sqlite数据库文件名
  10. db = 'sqlite-data.db'
  11. def log(sql, args=()):
  12. logging.info('SQL: %s' % sql)
  13. # logging.info('SQL: %s' % sql.replace('?', '%s'), args or ())
  14. # 返回字典方法,传入一个执行过查询sql语句的cursor对象
  15. def dict_factory(cursor, row):
  16. d = {}
  17. for idx, col in enumerate(cursor.description):
  18. d[col[0]] = row[idx]
  19. return d
  20. def db_Select(sql, args, size=None):
  21. log(sql, args)
  22. connect = sqlite3.connect(db)
  23. connect.row_factory = dict_factory
  24. cursor = connect.cursor()
  25. try:
  26. cursor.execute(sql,args)
  27. rs = cursor.fetchall() # 获取查询结果 sqlite_dict(cursor)#
  28. rows = len(rs)
  29. logging.info('rows returned: %s' % rows)
  30. except Exception as e:
  31. print(u'查询错误...', e)
  32. connect.rollback()
  33. finally:
  34. cursor.close()
  35. connect.close()
  36. return rs
  37. def db_Instert(sql, args, autocommit=True):
  38. log(sql,args)
  39. connect = sqlite3.connect(db)
  40. cursor = connect.cursor()
  41. try:
  42. cursor.execute(sql, args)
  43. connect.commit()
  44. print(u'插入成功影响行数1')
  45. except Exception as e:
  46. print(u'插入错误...', e)
  47. connect.rollback()
  48. finally:
  49. cursor.close()
  50. connect.close()
  51. return 1
  52. def db_Update(sql, args, autocommit=True):
  53. log(sql,args)
  54. connect = sqlite3.connect(db)
  55. cursor = connect.cursor()
  56. try:
  57. cursor.execute(sql, args)
  58. rows = cursor.rowcount
  59. print("待更新行数:" + str(rows))
  60. connect.commit()
  61. print(u'修改成功...')
  62. except Exception as e:
  63. print(u'插入错误...', e)
  64. connect.rollback()
  65. finally:
  66. cursor.close()
  67. connect.close()
  68. return 1
  69. # ---gtj 删除
  70. # delete_sql = "DELETE from userinfo WHERE `name` = 'gtj2'"
  71. def db_Delete(sql, args, autocommit=True):
  72. log(sql,args)
  73. connect = sqlite3.connect(db)
  74. cursor = connect.cursor()
  75. try:
  76. cursor.execute(sql,args)
  77. rows = cursor.rowcount
  78. logging.info('rows returned: %s' % rows)
  79. print("待删除行数:" + str(rows))
  80. connect.commit()
  81. print(u'删除成功...')
  82. except Exception as e:
  83. print(u'删除错误...', e)
  84. connect.rollback()
  85. finally:
  86. cursor.close()
  87. connect.close()
  88. return 1
  89. def create_args_string(num):
  90. L = []
  91. for n in range(num):
  92. L.append('?')
  93. return ', '.join(L)
  94. class Field(object):
  95. def __init__(self, name, column_type, primary_key, default):
  96. self.name = name
  97. self.column_type = column_type
  98. self.primary_key = primary_key
  99. self.default = default
  100. def __str__(self):
  101. return '<%s, %s:%s>' % (self.__class__.__name__, self.column_type, self.name)
  102. # return '<%s:%s>' % (self.__class__.__name__, self.name)
  103. class StringField(Field):
  104. def __init__(self, name=None, primary_key=False, default=None, ddl='varchar(100)'):
  105. super().__init__(name, ddl, primary_key, default)
  106. class BooleanField(Field):
  107. def __init__(self, name=None, default=False):
  108. super().__init__(name, 'boolean', False, default)
  109. class IntegerField(Field):
  110. def __init__(self, name=None, primary_key=False, default=0):
  111. super().__init__(name, 'bigint', primary_key, default)
  112. class FloatField(Field):
  113. def __init__(self, name=None, primary_key=False, default=0.0):
  114. super().__init__(name, 'float', primary_key, default)
  115. class TextField(Field):
  116. def __init__(self, name=None, default=None):
  117. super().__init__(name, 'text', False, default)
  118. class ModelMetaclass(type):
  119. def __new__(cls, name, bases, attrs):
  120. if name=='Model':
  121. return type.__new__(cls, name, bases, attrs)
  122. tableName = attrs.get('__table__', None) or name
  123. logging.info('found model: %s (table: %s)' % (name, tableName))
  124. mappings = dict()
  125. fields = []
  126. primaryKey = None
  127. for k, v in attrs.items():
  128. if isinstance(v, Field):
  129. logging.info(' found mapping: %s ==> %s' % (k, v))
  130. mappings[k] = v
  131. if v.primary_key:
  132. # 找到主键:
  133. if primaryKey:
  134. raise Exception('Duplicate primary key for field: %s' % k)
  135. primaryKey = k
  136. else:
  137. fields.append(k)
  138. if not primaryKey:
  139. raise Exception('Primary key not found.')
  140. for k in mappings.keys():
  141. attrs.pop(k)
  142. escaped_fields = list(map(lambda f: '`%s`' % f, fields))
  143. attrs['__mappings__'] = mappings # 保存属性和列的映射关系
  144. attrs['__table__'] = tableName
  145. attrs['__primary_key__'] = primaryKey # 主键属性名
  146. attrs['__fields__'] = fields # 除主键外的属性名
  147. attrs['__select__'] = 'select `%s`, %s from `%s`' % (primaryKey, ', '.join(escaped_fields), tableName)
  148. attrs['__insert__'] = 'insert into `%s` (%s, `%s`) values (%s)' % (tableName, ', '.join(escaped_fields), primaryKey, create_args_string(len(escaped_fields) + 1))
  149. attrs['__update__'] = 'update `%s` set %s where `%s`=?' % (tableName, ', '.join(map(lambda f: '`%s`=?' % (mappings.get(f).name or f), fields)), primaryKey)
  150. attrs['__delete__'] = 'delete from `%s` where `%s`=?' % (tableName, primaryKey)
  151. return type.__new__(cls, name, bases, attrs)
  152. class Model(dict, metaclass=ModelMetaclass):
  153. def __init__(self, **kw):
  154. super(Model, self).__init__(**kw)
  155. def __getattr__(self, key):
  156. try:
  157. print('gtj key %s==>%s' % (key, self[key]))
  158. return self[key]
  159. except KeyError:
  160. raise AttributeError(r"'Model' object has no attribute '%s'" % key)
  161. def __setattr__(self, key, value):
  162. self[key] = value
  163. def getValue(self, key):
  164. return getattr(self, key, None)
  165. def getValueOrDefault(self, key):
  166. value = getattr(self, key, None)
  167. if value is None:
  168. field = self.__mappings__[key]
  169. if field.default is not None:
  170. value = field.default() if callable(field.default) else field.default
  171. logging.debug('using default value for %s: %s' % (key, str(value)))
  172. setattr(self, key, value)
  173. return value
  174. @classmethod
  175. def findAll(cls, where=None, args=None, **kw):
  176. ' find objects by where clause. '
  177. sql = [cls.__select__]
  178. if where:
  179. sql.append('where')
  180. sql.append(where)
  181. if args is None:
  182. args = []
  183. orderBy = kw.get('orderBy', None)
  184. if orderBy:
  185. sql.append('order by')
  186. sql.append(orderBy)
  187. limit = kw.get('limit', None)
  188. if limit is not None:
  189. sql.append('limit')
  190. if isinstance(limit, int):
  191. sql.append('?')
  192. args.append(limit)
  193. elif isinstance(limit, tuple) and len(limit) == 2:
  194. sql.append('?, ?')
  195. args.extend(limit)
  196. else:
  197. raise ValueError('Invalid limit value: %s' % str(limit))
  198. rs = db_Select(' '.join(sql), args)
  199. return [cls(**r) for r in rs]
  200. @classmethod
  201. def find(cls, pk):
  202. ' find object by primary key. '
  203. rs = db_Select('%s where `%s`=?' % (cls.__select__, cls.__primary_key__), [pk], 1)
  204. if len(rs) == 0:
  205. return None
  206. return cls(**rs[0])
  207. def save(self):
  208. args = list(map(self.getValueOrDefault, self.__fields__))
  209. pk=self.getValueOrDefault(self.__primary_key__)
  210. args.append(pk)
  211. print('args:',type(args),str(args))
  212. rows = db_Instert(self.__insert__, args)
  213. if rows != 1:
  214. logging.info('failed to insert record: affected rows: %s' % rows)
  215. return 0
  216. return pk
  217. def update(self):
  218. args = list(map(self.getValue, self.__fields__))
  219. args.append(self.getValue(self.__primary_key__))
  220. rows = db_Update(self.__update__, args)
  221. if rows != 1:
  222. logging.warning('failed to update by primary key: affected rows: %s' % rows)
  223. def remove(self,pk):
  224. #args = [self.getValue(self.__primary_key__)]
  225. rows = db_Delete(self.__delete__, [pk])
  226. if rows != 1:
  227. logging.warning('failed to remove by primary key: affected rows: %s' % rows)
  228. return 0
  229. return pk

2.测试实例文件test_SQLiteORM.py

  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. ' a test SQLite ORM util module '
  4. __author__ = 'TianJiang Gui'
  5. from util_SQLiteORM import *
  6. import time, uuid
  7. #---gtj 数值型随机数
  8. def next_id():
  9. return int(time.time() * 1000)#'%015d' % (
  10. class Userinfo(Model):
  11. __table__ = 'userinfo'
  12. id = IntegerField(primary_key=True, default=next_id)
  13. name = StringField(ddl='varchar(20)')
  14. age = IntegerField()
  15. def main():
  16. u = Userinfo(name='Test', age=23)
  17. uid=u.save()
  18. if uid>0:
  19. print('插入新纪录uid=%s成功!'%uid)
  20. #---gtj 按id查用户信息
  21. user = Userinfo.find(uid)
  22. print('新插入记录内容为==>id:', user.id, 'name:', user.name, 'age:', user.age)
  23. users = Userinfo.findAll()#'name=?',['gtj12']
  24. for user in users:
  25. print(type(user),user)
  26. print('id:',user.id,'name:',user.name,'age:',user.age)
  27. #---gtj 更新
  28. u = Userinfo(id=uid, name='Test112', age=112)
  29. u.update()
  30. user = Userinfo.find(uid)
  31. print('id:', user.id, 'name:', user.name, 'age:', user.age)
  32. #---gtj 按uid删除操作
  33. did =Userinfo().remove(uid)
  34. if did > 0:
  35. print('主键为uid=%s已成功删除!'%did)
  36. else:
  37. print('主键为uid=%s不存在,删除失败...'%uid)
  38. if __name__ == '__main__':
  39. main()

3.运行结果如下:

        1)插入一条新数据并返回uid

        2)按uid查询获得用户信息

         3)查询表中所有记录

         4)更新uid记录

        5)删除uid记录

 

总结:代码写得比较粗糙,但已基本能实现应用 

相关建立SQLite表及操作的内容请参见我的其他文章:

python小技巧大应用--测试SQLite是否好用

 python小技巧大应用--自己写sqlite3的CRUD供应用调用

最后,请多关注,点赞我:)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/笔触狂放9/article/detail/480961
推荐阅读
相关标签
  

闽ICP备14008679号