赞
踩
Peewee是一个简单小巧的Python ORM,它非常容易学习,并且使用起来很直观。
如果想快速入门,请参考官方的Quckstart。
在官方的Quckstart中,Peewee中 Model
类、fields
和model 实例
与数据库的映射关系如下:
也就是说,一个Model类代表一个数据库的表
,一个Field字段代表数据库中的一个字段
,而一个model类实例化对象则代表数据库中的一行
。至于Peewee的实现原理,我暂时没有看源代码,但觉得和廖雪峰老师的使用元类这个文章的例子实现类似。
在使用的时候,根据需求先定义好Model,然后可以通过 create_tables()
创建表
在peewee模块中,如果已经配置好了mysql数据库的信息,而不想定义Model,可以使用execute_sql()
执行一条sql语句
- from peewee import *
-
- # 连接数据库
- database = MySQLDatabase('test', user='root', host='localhost', port=3306)
- or
- # settings = {'host': 'localhost', 'password': '', 'port': 3306, 'user': 'root'}
- # database = peewee.MySQLDatabase("test",**settings)
- database.execute_sql('')
第一种方式:先定义Model,然后通过db.create_tables()
创建或Model.create_table()
创建表。
例如,我们需要建一个Person表,我们定义的Model如下:
- from peewee import *
-
- # 连接数据库
- database = MySQLDatabase('test', user='root', host='localhost', port=3306)
-
- # 定义Person
- class Person(Model):
- name = CharField(verbose_name='姓名', max_length=10, null=False, index=True)
- passwd = CharField(verbose_name='密码', max_length=20, null=False, default='123456')
- email = CharField(verbose_name='邮件', max_length=50, null=True, unique=True)
- gender = IntegerField(verbose_name='姓别', null=False, default=1)
- birthday = DateField(verbose_name='生日', null=True, default=None)
- is_admin = BooleanField(verbose_name='是否是管理员', default=True)
- class Meta:
- database = db # 这里是数据库链接,为了方便建立多个表,可以把这个部分提炼出来形成一个新的类
- table_name = 'persons' # 这里可以自定义表名
然后,我们就可以创建表了
- # 创建表
- Person.create_table()
-
- # 创建表也可以这样, 可以创建多个
- # database.create_tables([Person])
其中,CharField、DateField、BooleanField等这些类型与数据库中的数据类型一一对应,我们直接使用它就行,至于CharField => varchar(255)
这种转换Peewee已经为我们做好了 。
全部数据类型
Field Type | Sqlite | Postgresql | MySQL |
---|---|---|---|
IntegerField | integer | integer | integer |
BigIntegerField | integer | bigint | bigint |
SmallIntegerField | integer | smallint | smallint |
AutoField | integer | serial | integer |
FloatField | real | real | real |
DoubleField | real | double precision | double precision |
DecimalField | decimal | numeric | numeric |
CharField | varchar | varchar | varchar |
FixedCharField | char | char | char |
TextField | text | text | longtext |
BlobField | blob | bytea | blob |
BitField | integer | bigint | bigint |
BigBitField | blob | bytea | blob |
UUIDField | text | uuid | varchar(40) |
DateTimeField | datetime | timestamp | datetime |
DateField | date | date | date |
TimeField | time | time | time |
TimestampField | integer | integer | integer |
IPField | integer | bigint | bigint |
BooleanField | integer | boolean | bool |
BareField | untyped | not supported | not supported |
ForeignKeyField | integer | integer | integer |
全部属性
- null = False – 可否为空
- index = False – index索引
- unique = False – unique索引
- column_name = None – string representing the underlying column to use if different, useful for legacy databases
- default = None – 默认值,如果callable, 会调用生成!
- primary_key = False – 主键
- constraints = None - a list of one or more constraints, e.g. [Check('price > 0')]
- sequence = None – sequence to populate field (if backend supports it)
- collation = None – collation to use for ordering the field / index
- unindexed = False – indicate field on virtual table should be unindexed (SQLite-only)
- choices = None – an optional iterable containing 2-tuples of value, display
- help_text = None – string representing any helpful text for this field
- verbose_name = None – string representing the “user-friendly” name of this field
第二种方式:
已经存在过数据库,则直接通过python -m pwiz
批量创建Model。
上面我已经创建好了test
库,并且创建了Person
表那么,我可以使用下面命令:
- # 指定mysql,用户为root,host为localhost,数据库为test
- python -m pwiz -e mysql -u root -H localhost --password test > testModel.py
然后,输入密码,pwiz
脚本会自动创建Model,内容如下:
- from peewee import *
-
- database = MySQLDatabase('test', **{'charset': 'utf8', 'use_unicode': True, 'host': 'localhost', 'user': 'root', 'password': ''})
-
- class UnknownField(object):
- def __init__(self, *_, **__): pass
-
- class BaseModel(Model):
- class Meta:
- database = database
-
- class Person(BaseModel):
- birthday = DateField()
- is_relative = IntegerField()
- name = CharField()
-
- class Meta:
- table_name = 'person'
链接数据库
- db.is_closed() # 判断数据库是不是链接
- db.connect() # 数据库链接
一、增
直接创建示例,然后使用save()就添加了一条新数据
- # 添加一条数据
- p = Person(name='liuchungui', birthday=date(1990, 12, 20), is_relative=True)
- p.save()
- data = [
- {'facid': 9, 'name': 'Spa', 'membercost': 20, 'guestcost': 30,
- 'initialoutlay': 100000, 'monthlymaintenance': 800},
- {'facid': 10, 'name': 'Squash Court 2', 'membercost': 3.5,
- 'guestcost': 17.5, 'initialoutlay': 5000, 'monthlymaintenance': 80}]
- query = Facility.insert_many(data) # 插入了多个
-
- with db.atomic(): # 一次链接
- for data_dict in data_source:
- MyModel.create(**data_dict)
-
- User.insert(username='Mickey').execute() # >>> 返回主键
- # insert_from 是指从一个表查数据快速差到另一个表
- query = (TweetArchive
- .insert_from(
- Tweet.select(Tweet.user, Tweet.message),
- fields=[Tweet.user, Tweet.message])
- .execute())
二、删
使用delete().where().execute()进行删除,where()是条件,execute()负责执行语句。若是已经查询出来的实例,则直接使用delete_instance()删除。
- # 删除姓名为perter的数据
- Person.delete().where(Person.name == 'perter').execute()
-
- # 已经实例化的数据, 使用delete_instance
- p = Person(name='liuchungui', birthday=date(1990, 12, 20), is_relative=False)
- p.id = 1
- p.save()
- p.delete_instance()
三、改
若是,已经添加过数据的的实例或查询到的数据实例,且表拥有primary key
时,此时使用save()就是修改数据;若是未拥有实例,则使用update().where()进行更新数据。
- # 已经实例化的数据,指定了id这个primary key,则此时保存就是更新数据
- p = Person(name='liuchungui', birthday=date(1990, 12, 20), is_relative=False)
- p.id = 1
- p.save()
-
- # 更新birthday数据
- q = Person.update({Person.birthday: date(1983, 12, 21)}).where(Person.name == 'liuchungui')
- q.execute()
四、查
单条数据使用Person.get()就行了,也可以使用Person.select().where().get()。若是查询多条数据,则使用Person.select().where(),去掉get()就行了。语法很直观,select()就是查询,where是条件,get是获取第一条数据。
- # 查询单条数据
- p = Person.get(Person.name == 'liuchungui')
- print(p.name, p.birthday, p.is_relative)
-
- # 使用where().get()查询
- p = Person.select().where(Person.name == 'liuchungui').get()
- print(p.name, p.birthday, p.is_relative)
-
- #字典展示
- query = User.select().dicts()
- for row in query:
- print(row)
-
- # 查询多条数据
- persons = Person.select().where(Person.is_relative == True)
- for p in persons:
- print(p.name, p.birthday, p.is_relative)
-
- #复合条件
- query1 = Person.select().where((Person.name == "fff0") | (Person.name == "sss1"))
-
- query2 = Person.select().where((Person.name == "fff") & (Person.is_relative == True))
-
- #去重
- Person.select(Person.name).order_by(Person.name).limit(10).distinct()
- #聚合函数
- Person.select(fn.MAX(Person.birthday))
- # 添加一条数据
- Person(name='liuchungui', birthday=date(1990, 12, 20), is_relative=True).save()
- 或者
- data={name:'liuchungui',birthday:date(1990, 12, 20), is_relative:True}
- Person.create(**data)
-
- # 删除姓名为perter的数据
- Person.delete().where(Person.name == 'perter').execute()
-
- # 更新数据
- Person.update(birthday=date(1983, 12, 21)).where(Person.name == 'liuchungui').execute()
- 或者
- data={name:'liuchungui',birthday:date(1990, 12, 20), is_relative:True}
- Person.update(data)
-
- # 查询单条数据
- person =Person.get_or_none(Person.name == 'liuchungui')
-
- # 查询多条数据
- persons =persons = Person.select().where(Person.is_relative == True)
-
- # 查询name为liuchungui的Person数量, 返回数量为1
- num = Person.select().where(Person.name == 'liuchungui').count()
-
- # 按照创建时间降序排序
- persons = Person.select().order_by(Person.create_time.desc())
-
- # 按照创建时间升序排序
- persons = Person.select().order_by(Person.create_time.asc())
-
- # 按照创建时间升序前5个
- persons = Person.select().order_by(Person.create_time.asc()).limit(5)
-
- # 查询湖南和湖北的 (| & 用法), 注意需要用()将Person.province == '湖南'包一层
- persons = Person.select().where((Person.province == '湖南') | (Person.province == '湖北'))
- #In 查询
- query = Facility.select().where(Facility.facid.in_([1, 5]))
-
- # 联表查询
- query = (Tweet.select(Tweet.name, Person.username).join(Person, on=(Tweet.id == Person.tweet_id)).order_by(Person.timestamp.desc()))
-
- # %使用,查询省份中含有 湖 字,sql语句:select * from person where province like '%湖%'
- persons = Person.select().where(Person.province % '%湖%')
-
- #模糊查询
- #SELECT * FROM person WHERE name ILIKE '%tennis%';
- Person.select().where(Person.name ** "%fff%")
- Person.select().where(Facility.name.contains('tennis'))
-
- # <<使用,查询省份属于湖北和湖南的,对应sql语句:select * from person where province in ('湖南', '湖北')
- persons = Person.select().where(Person.province << ['湖南', '湖北'])
-
- # >>使用,查询省份为空的,sql语句: select * from person where province is Null
- persons = Person.select().where(Person.province >> None)
-
- # paginate方法使取得某页数据容易,两个参数:page_number, items_per_page desc()倒叙 asc()正序
- Person.select().order_by(Person.id.dese()).paginate(2, 10)
-
- # group_by 用法 统计相同名字人的数量 alias起别名 查看数量用obj.num_tweets
- Model.select(Model,fn.count(Model.name).alias('num_tweets')).group_by(Model.name)
-
- #执行SQL
- database = MySQLDatabase('test', **{'charset': 'utf8', 'use_unicode': True, 'host': 'localhost', 'user': 'root', 'password': ''})
- database.execute_sql('')
- query = MyModel.raw('SELECT * FROM my_table WHERE data = %s', user_data)
- query = MyModel.select().where(SQL('Some SQL expression %s' % user_data))
连接数据库时,推荐使用 playhouse 中的 db_url 模块。db_url 的 connect
方法可以通过传入的 URL 字符串,生成数据库连接。
- from playhouse.db_url import connect
-
- mysql_config_url='mysql://root:root@localhost:3306/network'
- db = connect(mysql_config_url)
表关系ForeignKey
- class Pet(peewee.Model):
- name = peewee.CharField()
- owner = peewee.ForeignKeyField(Person,related_name="pets",backref="petties")
- # backref是反查的字段,如果有related_name用related_name反查,如果没有直接用petties反查 e.g. [i.name for i in Person.get(name="aaa").petties]
-
- class Meta:
- database = db
-
- #自关联
- class Category(Model):
- name = CharField()
- parent = ForeignKeyField('self', null=True, backref='children')
- # 注意自关联永远是null = True
-
- #正查
- dog1 = Pet.get(name="dog1")
- dog1.owner.name
-
- # 反查
- aaa = Person.get(name="aaa").pets # pets为related_name字段,如果没写用backref字段
- for a in aaa:
- print(i.name)
peewee 的连接池,使用时需要显式的关闭连接。下面先说下为什么,最后会给出推荐的使用方法,避免进坑。
Connections will not be closed exactly when they exceed their stale_timeout. Instead, stale connections are only closed when a new connection is requested.
这里引用官方文档的提示。大致说:“超时连接不会自动关闭,只会在有新的请求时是才会关闭
所以,每次操作完数据库就关闭连接实例。
用法1:使用with
- def send_rule():
- with db.execution_context():
- # A new connection will be opened or, if using a connection pool,
- # pulled from the pool of available connections. Additionally, a
- # transaction will be started.
- for user in get_all_user():
- user_id = user['id']
- rule = Rule(user_id)
- rule_dict = rule.slack_rule(index)
- .....do something.....
用法2:使用Flask hook
- @app.before_request
- def _db_connect():
- database.connect()
- #
- # This hook ensures that the connection is closed when we've finished
- # processing the request.
- @app.teardown_request
- def _db_close(exc):
- if not database.is_closed():
- database.close()
- #
- #
- # 更优雅的用法:
- from playhouse.flask_utils import FlaskDB
- from dock_fastgear.model.base import db
- #
- app = Flask(__name__)
- FlaskDB(app, db) # 这样就自动做了上面的事情(具体实现可查看http://docs.peewee-orm.com/en/latest/peewee/playhouse.html?highlight=Flask%20DB#flask-utils)
这里没有什么大坑,就是有两点需要注意:
首先,查询的结果都是该 Model 的 object,注意不是 dict。如果想让结果为 dict,需要 playhouse
模块的工具方法进行转化:from playhouse.shortcuts import model_to_dict
其次,get
方法只会返回一条记录
- from playhouse.shortcuts import model_to_dict
-
- projects = Model.select().where(*cond).paginate(page_index, page_size)
- result = [model_to_dict(project) for project in projects]
peewee事务
atomic和rollback
- with db.atomic() as transaction: # Opens new transaction.
- try:
- save_some_objects()
- except ErrorSavingData:
- # Because this block of code is wrapped with "atomic", a
- # new transaction will begin automatically after the call
- # to rollback().
- transaction.rollback()
- error_saving = True
-
- #装饰器模式
- @db.atomic()
- def create_user(username):
- # This statement will run in a transaction. If the caller is already
- # running in an `atomic` block, then a savepoint will be used instead.
- return User.create(username=username)
-
- create_user('charlie')
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。