赞
踩
peewee
是一款轻量级 ORM 框架,它少量且达意的概念让人们易于学习和使用。
PS:本教程peewee版本为3.14.0
pip install peewee
不同数据库连接驱动
pip install pymysql
类 | 对应 |
---|---|
Model class | 数据表 |
Field instance | 表中的列 |
Model instance | 表中的行 |
import os
from peewee import *
from datetime import date
if os.path.exists('people.db'):
os.remove('people.db') # 存在则删掉,避免重复添加
db = SqliteDatabase('people.db') # 数据库使用SQLite
class Person(Model):
name = CharField()
birthday = DateField()
class Meta:
database = db # 使用people.db
class Pet(Model):
owner = ForeignKeyField(Person, backref='pets') # 外键
name = CharField()
animal_type = CharField()
class Meta:
database = db # 使用people.db
# 1.初始化
db.connect() # 连接数据库
db.create_tables([Person, Pet]) # 创建表
# 2.增
uncle_bob = Person(name='Bob', birthday=date(1960, 1, 15)) # 实例化
print('增加数据', uncle_bob.save()) # 增
grandma = Person.create(name='Grandma', birthday=date(1935, 3, 1)) # 增并返回实例
herb = Person.create(name='Herb', birthday=date(1950, 5, 5)) # 增并返回实例
bob_kitty = Pet.create(owner=uncle_bob, name='Kitty', animal_type='cat') # 增并返回实例
herb_fido = Pet.create(owner=herb, name='Fido', animal_type='dog') # 增并返回实例
herb_mittens = Pet.create(owner=herb, name='Mittens', animal_type='cat') # 增并返回实例
herb_mittens_jr = Pet.create(owner=herb, name='Mittens Jr', animal_type='cat') # 增并返回实例
# 3.删
print('删除数据', herb_mittens.delete_instance()) # 删除实例
# 4.改
grandma.name = 'Grandma L.' # 修改名字
print('更新数据', grandma.save()) # 更新
herb_fido.owner = uncle_bob # 修改外键
herb_fido.save() # 更新
# 5.查
grandma = Person.select().where(Person.name == 'Grandma L.').get() # 查单条
grandma = Person.get(Person.name == 'Grandma L.') # 简写
for person in Person.select(): # 查多条
print(person.name, end=' ')
print()
query = (Pet
.select(Pet, Person)
.join(Person)
.where(Pet.animal_type == 'cat'))
for pet in query: # 查多条
print('{}: {}. \t'.format(pet.owner.name, pet.name), end=' ')
print()
# 6.排序
for pet in Pet.select().where(Pet.owner == uncle_bob).order_by(Pet.name):
print(pet.name, end=' ')
print()
# 7.筛选
d1940 = date(1940, 1, 1)
d1960 = date(1960, 1, 1)
query = (Person
.select()
.where((Person.birthday < d1940) | (Person.birthday > d1960)))
for person in query:
print('{}: {}. \t'.format(person.name, person.birthday), end=' ')
print()
query = (Person
.select()
.where((Person.birthday.between(d1940, d1960))))
for person in query:
print('{}: {}. \t'.format(person.name, person.birthday), end=' ')
print()
# 8.聚合和预取
for person in Person.select():
print('{}: {} pets. \t'.format(person.name, person.pets.count()), end=' ') # 查两次,效率低
print()
query = (Person
.select(Person, fn.COUNT(Pet.id).alias('pet_count'))
.join(Pet, JOIN.LEFT_OUTER)
.group_by(Person)
.order_by(Person.name)) # 换成JOIN聚合效率高
for person in query:
print('{}: {} pets. \t'.format(person.name, person.pet_count), end=' ') # 查两次,效率低
print()
query = Person.select().order_by(Person.name).prefetch(Pet)
for person in query:
print('{}: '.format(person.name), end=' ')
for pet in person.pets:
print(pet.name, end=' ')
print()
db.close()
CREATE DATABASE
CREATE TABLE
INSERT INTO xxx VALUES ()
数据库 Shop
商品表 Product
SQL语句
-- 创建表
CREATE TABLE Product
(
product_id CHAR(4) NOT NULL,
product_name VARCHAR(100) NOT NULL,
product_type VARCHAR(32) NOT NULL,
sale_price INTEGER,
purchase_price INTEGER,
regist_date DATE,
PRIMARY KEY (product_id)
);
-- 插入数据
BEGIN TRANSACTION;
INSERT INTO Product VALUES ('0001', 'T恤衫', '衣服', 1000, 500, '2009-09-20');
INSERT INTO Product VALUES ('0002', '打孔器', '办公用品', 500, 320, '2009-9-11');
INSERT INTO Product VALUES ('0003', '运动T恤', '衣服', 4000, 2800, NULL);
INSERT INTO Product VALUES ('0004', '菜刀', '厨房用具', 3000, 2800, '2009-9-20');
INSERT INTO Product VALUES ('0005', '高压锅', '厨房用具', 6800, 5000, '2009-1-15');
INSERT INTO Product VALUES ('0006', '叉子', '厨房用具', 500, NULL, '2009-9-20');
INSERT INTO Product VALUES ('0007', '擦菜板', '厨房用具', 880, 790, '2008-4-28');
INSERT INTO Product VALUES ('0008', '圆珠笔', '办公用品', 100, NULL, '2009-11-11');
COMMIT;
代码
import os
from peewee import *
from datetime import date
DB_NAME = 'shop.db' # 数据库名
if os.path.exists(DB_NAME):
os.remove(DB_NAME) # 清空数据
db = SqliteDatabase(DB_NAME) # 数据库使用SQLite
class Product(Model):
product_id = CharField() # 商品编号
product_name = CharField() # 商品名称
product_type = CharField() # 商品种类
sale_price = IntegerField(null=True) # 销售单价
purchase_price = IntegerField(null=True) # 进货单价
regist_date = DateField(null=True) # 登记日期
class Meta:
database = db
db.connect() # 连接数据库
db.create_tables([Product]) # 创建表
# 插入数据
Product(product_id='0001', product_name='T恤衫', product_type='衣服', sale_price=1000, purchase_price=500, regist_date=date(2009, 9, 20)).save()
Product(product_id='0002', product_name='打孔器', product_type='办公用品', sale_price=500, purchase_price=320, regist_date=date(2009, 9, 11)).save()
Product(product_id='0003', product_name='运动T恤', product_type='衣服', sale_price=4000, purchase_price=2800, regist_date=None).save()
Product(product_id='0004', product_name='菜刀', product_type='厨房用具', sale_price=3000, purchase_price=2800, regist_date=date(2009, 9, 20)).save()
Product(product_id='0005', product_name='高压锅', product_type='厨房用具', sale_price=6800, purchase_price=5000, regist_date=date(2009, 1, 15)).save()
Product(product_id='0006', product_name='叉子', product_type='厨房用具', sale_price=500, purchase_price=None, regist_date=date(2009, 9, 20)).save()
Product(product_id='0007', product_name='擦菜板', product_type='厨房用具', sale_price=880, purchase_price=790, regist_date=date(2008, 4, 28)).save()
Product(product_id='0008', product_name='圆珠笔', product_type='办公用品', sale_price=100, purchase_price=None, regist_date=date(2009, 11, 11)).save()
db.close() # 关闭数据库
字段类型 | Sqlite | Postgresql | MySQL | 含义 |
---|---|---|---|---|
AutoField | integer | serial | integer | 自增主键 |
IntegerField | integer | integer | integer | 整数 |
FloatField | real | real | real | |
DoubleField | real | double precision | double precision | |
CharField | varchar | varchar | varchar | 变长字符串 |
FixedCharField | char | char | char | 定长字符串 |
TextField | text | text | text | |
DateTimeField | datetime | timestamp | datetime | |
DateField | date | date | date | 日期(年月日) |
TimeField | time | time | time | |
TimestampField | integer | integer | integer | |
BooleanField | integer | boolean | bool | |
ForeignKeyField | integer | integer | integer |
相关文档:
SQL语句
DROP TABLE Product;
代码
from peewee import *
DB_NAME = 'shop.db' # 数据库名
db = SqliteDatabase(DB_NAME) # 数据库使用SQLite
class Product(Model):
product_id = CharField() # 商品编号
product_name = CharField() # 商品名称
product_type = CharField() # 商品种类
sale_price = IntegerField(null=True) # 销售单价
purchase_price = IntegerField(null=True) # 进货单价
regist_date = DateField(null=True) # 登记日期
class Meta:
database = db
db.connect() # 连接数据库
db.create_tables([Product]) # 创建表
db.drop_tables([Product]) # 删除表
db.close() # 关闭数据库
注意:删除的表难以恢复!
ALTER TABLE xxx ADD COLUMN <列的定义>
(Oracle、SQL Server不用写COLUMN)ALTER TABLE xxx DROP COLUMN <列名>
RENAME TABLE old TO new
ALTER TABLE old RENAME TO new
RENAME TABLE old TO new
sp_rename 'old', 'new'
SQL语句
ALTER TABLE Product ADD COLUMN product_name_pinyin VARCHAR(100); --添加列
ALTER TABLE Product DROP COLUMN product_name_pinyin; --删除列(SQLite不支持)
ALTER TABLE Product RENAME TO Product1; --改表名
代码
from peewee import *
from playhouse.migrate import *
DB_NAME = 'shop.db' # 数据库名
db = SqliteDatabase(DB_NAME) # 数据库使用SQLite
migrator = SqliteMigrator(db) # 迁移工具
class Product(Model):
product_id = CharField() # 商品编号
product_name = CharField() # 商品名称
product_type = CharField() # 商品种类
sale_price = IntegerField(null=True) # 销售单价
purchase_price = IntegerField(null=True) # 进货单价
regist_date = DateField(null=True) # 登记日期
class Meta:
database = db
db.connect() # 连接数据库
db.create_tables([Product]) # 创建表
with db.atomic():
table = 'product' # 表名
migrate(
migrator.add_column(table, 'product_name_pinyin', CharField(null=True)), # 添加列
migrator.drop_column(table, 'product_name_pinyin'), # 删除列
migrator.rename_table(table, 'product1') # 改表名
)
db.close() # 关闭数据库
相关文档:
SELECT <列名>, ... FROM <表名>
SELECT * FROM <表名>
SELECT <列名> AS <列别名>
SELECT <常数> AS <列别名>
SELECT DISTINCT <列名>
SELECT <列名> FROM <表名> WHERE <条件表达式>
SQL语句
-- 查询指定列
SELECT product_id, product_name, purchase_price
FROM Product;
-- 查询所有列
SELECT *
FROM Product;
-- 为列设别名
SELECT product_id AS "商品编号",
product_name AS "商品名称",
purchase_price AS "进货单价"
FROM Product;
-- 设置常数
SELECT '商品' AS string,
38 AS number,
'2009-02-24' AS date,
product_id,
product_name
FROM Product;
-- 删除重复行
SELECT DISTINCT product_type
FROM Product;
-- 筛选
SELECT product_name, product_type
FROM Product
WHERE product_type = '衣服';
代码
import os
from peewee import *
from datetime import date
DB_NAME = 'shop.db' # 数据库名
db = SqliteDatabase(DB_NAME) # 数据库使用SQLite
class Product(Model):
product_id = CharField() # 商品编号
product_name = CharField() # 商品名称
product_type = CharField() # 商品种类
sale_price = IntegerField(null=True) # 销售单价
purchase_price = IntegerField(null=True) # 进货单价
regist_date = DateField(null=True) # 登记日期
class Meta:
database = db
db.connect() # 连接数据库
print(Product.select(Product.product_id, Product.product_name, Product.purchase_price)) # 查询指定列
# SELECT "t1"."product_id", "t1"."product_name", "t1"."purchase_price" FROM "product" AS "t1"
print(Product.select(SQL("*"))) # 查询所有列
# SELECT * FROM "product" AS "t1"
print(Product.select()) # 类似SELECT *,但会一一映射Model的字段定义
# SELECT "t1"."id", "t1"."product_id", "t1"."product_name", "t1"."product_type", "t1"."sale_price", "t1"."purchase_price", "t1"."regist_date" FROM "product" AS "t1"
print(Product.select(
Product.product_id.alias('商品编号'),
Product.product_name.alias('商品名称'),
Product.purchase_price.alias('进货单价'))) # 为列设别名
# SELECT "t1"."product_id" AS "商品编号", "t1"."product_name" AS "商品名称", "t1"."purchase_price" AS "进货单价" FROM "product" AS "t1"
print(Product.select(
Value('商品').alias('string'),
Value(38).alias('number'),
Value('2009-02-24').alias('date'),
Product.product_id,
Product.product_name
)) # 设置常数
# SELECT '商品' AS "string", 38 AS "number", '2009-02-24' AS "date", "t1"."product_id", "t1"."product_name" FROM "product" AS "t1"
print(Product.select(Product.product_type).distinct()) # 删除重复行
# SELECT DISTINCT "t1"."product_type" FROM "product" AS "t1"
print(Product.select(Product.product_name, Product.product_type).where(Product.product_type == '衣服'))
# SELECT "t1"."product_name", "t1"."product_type" FROM "product" AS "t1" WHERE ("t1"."product_type" = '衣服')
db.close() # 关闭数据库
看到57
scalar()
:第一个单元格的值scalar(as_tuple=True)
:第一行的值count()
:结果行数dicts()
:字典迭代器tuples()
:元组迭代器namedtuples()
:命名元组迭代器推荐使用 dicts()
或 命名元组 namedtuples()
,特别是在一些复杂查询下
from peewee import *
from datetime import date
db = SqliteDatabase(':memory:') # 内存
class Product(Model):
product_id = CharField() # 商品编号
product_name = CharField() # 商品名称
product_type = CharField() # 商品种类
sale_price = IntegerField(null=True) # 销售单价
purchase_price = IntegerField(null=True) # 进货单价
regist_date = DateField(null=True) # 登记日期
class Meta:
database = db
db.create_tables([Product]) # 创建表
Product(product_id='0001', product_name='T恤衫', product_type='衣服', sale_price=1000, purchase_price=500, regist_date=date(2009, 9, 20)).save()
Product(product_id='0002', product_name='打孔器', product_type='办公用品', sale_price=500, purchase_price=320, regist_date=date(2009, 9, 11)).save()
query = Product.select()
print(query.scalar()) # 第一个单元格的值
print(query.scalar(as_tuple=True)) # 第一行的值
print(query.count()) # 结果行数
print([i for i in query.dicts()]) # 作为字典返回
print([i for i in query.tuples()]) # 作为元组返回
print([i for i in query.namedtuples()]) # 作为命名元组返回
# 1
# (1, '0001', 'T恤衫', '衣服', 1000, 500, datetime.date(2009, 9, 20))
# 2
# [{'id': 1, 'product_id': '0001', 'product_name': 'T恤衫', 'product_type': '衣服', 'sale_price': 1000, 'purchase_price': 500, 'regist_date': datetime.date(2009, 9, 20)}, {'id': 2, 'product_id': '0002', 'product_name': '打孔器', 'product_type': '办公用品', 'sale_price': 500, 'purchase_price': 320, 'regist_date': datetime.date(2009, 9, 11)}]
# [(1, '0001', 'T恤衫', '衣服', 1000, 500, datetime.date(2009, 9, 20)), (2, '0002', '打孔器', '办公用品', 500, 320, datetime.date(2009, 9, 11))]
# [Row(id=1, product_id='0001', product_name='T恤衫', product_type='衣服', sale_price=1000, purchase_price=500, regist_date=datetime.date(2009, 9, 20)), Row(id=2, product_id='0002', product_name='打孔器', product_type='办公用品', sale_price=500, purchase_price=320, regist_date=datetime.date(2009, 9, 11))]
SQL常用函数
COUNT
:行数SUM
:合计值AVG
:平均值MAX
:最大值MIN
:最小值DISTINCT
:去除重复值
SQL语句
SELECT COUNT(*)
FROM Product;
SELECT SUM(sale_price), SUM(purchase_price)
FROM Product;
SELECT AVG(sale_price)
FROM Product;
SELECT MAX(sale_price)
FROM Product;
SELECT MIN(purchase_price)
FROM Product;
SELECT COUNT(DISTINCT product_type)
FROM Product;
代码
import os
from peewee import *
from datetime import date
DB_NAME = 'test.db' # 数据库名
if os.path.exists(DB_NAME):
os.remove(DB_NAME) # 清空数据
db = SqliteDatabase(DB_NAME) # 数据库使用SQLite
class Product(Model):
product_id = CharField() # 商品编号
product_name = CharField() # 商品名称
product_type = CharField() # 商品种类
sale_price = IntegerField(null=True) # 销售单价
purchase_price = IntegerField(null=True) # 进货单价
regist_date = DateField(null=True) # 登记日期
class Meta:
database = db
# 1.初始化
db.connect() # 连接数据库
db.create_tables([Product]) # 创建表
Product(product_id='0001', product_name='T恤衫', product_type='衣服', sale_price=1000, purchase_price=500,
regist_date=date(2009, 9, 20)).save()
Product(product_id='0002', product_name='打孔器', product_type='办公用品', sale_price=500, purchase_price=320,
regist_date=date(2009, 9, 11)).save()
Product(product_id='0003', product_name='运动T恤', product_type='衣服', sale_price=4000, purchase_price=2800,
regist_date=None).save()
Product(product_id='0004', product_name='菜刀', product_type='厨房用具', sale_price=3000, purchase_price=2800,
regist_date=date(2009, 9, 20)).save()
Product(product_id='0005', product_name='高压锅', product_type='厨房用具', sale_price=6800, purchase_price=5000,
regist_date=date(2009, 1, 15)).save()
Product(product_id='0006', product_name='叉子', product_type='厨房用具', sale_price=500, purchase_price=None,
regist_date=date(2009, 9, 20)).save()
Product(product_id='0007', product_name='擦菜板', product_type='厨房用具', sale_price=880, purchase_price=790,
regist_date=date(2008, 4, 28)).save()
Product(product_id='0008', product_name='圆珠笔', product_type='办公用品', sale_price=100, purchase_price=None,
regist_date=date(2009, 11, 11)).save()
# COUNT:行数
count = Product.select(fn.COUNT(SQL('*')))
print(count.scalar()) # 8
count = Product.select().count()
print(count) # 8
# SUM:合计值
sum = Product.select(fn.SUM(Product.sale_price), fn.SUM(Product.purchase_price)) # 进货单价和销售单价的合计值
print(sum.scalar(as_tuple=True)) # (16780, 12210)
# AVG:平均值
avg = Product.select(fn.AVG(Product.sale_price)) # 销售单价平均值
print(avg.scalar()) # 2097
# MAX:最大值
max = Product.select(fn.MAX(Product.sale_price)) # 销售单价最大值
print(max.scalar()) # 6800
# MIN:最小值
min = Product.select(fn.MIN(Product.purchase_price)) # 进货单价最小值
print(min.scalar()) # 320
# DISTINCT:去除重复值
distinct = Product.select(fn.COUNT(Product.product_type.distinct())) # 商品种类数
print(distinct.scalar()) # 3
distinct = Product.select(Product.product_type).distinct().count() # 商品种类数
print(distinct) # 3
使用聚合函数和GROUP BY子句需注意:
统计不同种类的商品数
SQL语句
SELECT product_type, COUNT(*)
FROM Product
GROUP BY product_type;
代码
group = (Product
.select(Product.product_type, fn.COUNT(SQL('*')))
.group_by(Product.product_type)) # 统计不同种类的商品数
for i in group.tuples():
print(i)
# ('办公用品', 2)
# ('厨房用具', 4)
# ('衣服', 2)
取值
print(getattr(group[0], 'COUNT(*)')) # 2
或用.alias('count')
group = (Product
.select(Product.product_type, fn.COUNT(SQL('*')).alias('count'))
.group_by(Product.product_type)) # 统计不同种类的商品数
print(group[0].product_type, group[0].count) # 办公用品 2
统计不同种类的商品数,取出商品数为2的
SQL
SELECT product_type, COUNT(*)
FROM Product
GROUP BY product_type
HAVING COUNT(*) = 2;
代码
having = (Product
.select(Product.product_type, fn.COUNT(SQL('*')))
.group_by(Product.product_type)
.having(fn.COUNT(SQL('*')) == 2)) # 统计不同种类的商品数,取出商品数为2
for i in having.tuples():
print(i)
# ('办公用品', 2)
# ('衣服', 2)
SQL
SELECT *
FROM Product
ORDER BY sale_price;
代码
order = (Product
.select()
.order_by(Product.sale_price)) # 统计不同种类的商品数,取出商品数为2
for i in order.tuples():
print(i)
# (8, '0008', '圆珠笔', '办公用品', 100, None, datetime.date(2009, 11, 11))
# (2, '0002', '打孔器', '办公用品', 500, 320, datetime.date(2009, 9, 11))
# (6, '0006', '叉子', '厨房用具', 500, None, datetime.date(2009, 9, 20))
# (7, '0007', '擦菜板', '厨房用具', 880, 790, datetime.date(2008, 4, 28))
# (1, '0001', 'T恤衫', '衣服', 1000, 500, datetime.date(2009, 9, 20))
# (4, '0004', '菜刀', '厨房用具', 3000, 2800, datetime.date(2009, 9, 20))
# (3, '0003', '运动T恤', '衣服', 4000, 2800, None)
# (5, '0005', '高压锅', '厨房用具', 6800, 5000, datetime.date(2009, 1, 15))
select()
:多次调用会替换
select_extend())
:多次调用会添加
where()
:多次调用相当于 AND
orwhere()
:多次调用相当于 OR
group_by())
:多次调用会替换
group_by_extend())
:多次调用会添加
函数:
+-*/
:四则运算ABS
:绝对值MOD
:求余ROUND
:四舍五入||
:拼接LENGTH
、LEN
:字符串长度(不同数据库,该函数有所不同)LOWER
:转为小写UPPER
:转为大写REPLACE
:字符串替换SUBSTRING
:字符串截取CURRENT_DATE
:当前日期CURRENT_TIME
:当前时间CURRENT_TIMESTAMP
:当前日期和时间EXTRACT
:截取日期元素CAST
:类型转换COALESCE
:将NULL转换为其他值谓词有:
查询包含字符串"ddd"的数据有以下情况:
SQL
SELECT *
FROM SampleLike
WHERE strcol LIKE 'ddd%';
SELECT *
FROM SampleLike
WHERE strcol LIKE '%ddd%';
SELECT *
FROM SampleLike
WHERE strcol LIKE '%ddd';
代码
import os
from peewee import *
DB_NAME = 'test.db' # 数据库名
if os.path.exists(DB_NAME):
os.remove(DB_NAME) # 清空数据
db = SqliteDatabase(DB_NAME) # 数据库使用SQLite
class SampleLike(Model):
strcol = CharField()
class Meta:
database = db
# 1.初始化
db.connect() # 连接数据库
db.create_tables([SampleLike]) # 创建表
SampleLike(strcol='abcddd').save()
SampleLike(strcol='dddabc').save()
SampleLike(strcol='abdddc').save()
SampleLike(strcol='abcdd').save()
SampleLike(strcol='ddabc').save()
SampleLike(strcol='abddc').save()
print('【前方一致】')
start = SampleLike.select().where(SampleLike.strcol.startswith('ddd'))
for i in start.tuples():
print(i)
print()
print('【中间一致】')
start = SampleLike.select().where(SampleLike.strcol.contains('ddd'))
for i in start.tuples():
print(i)
print()
print('【后方一致】')
start = SampleLike.select().where(SampleLike.strcol.endswith('ddd'))
for i in start.tuples():
print(i)
print()
查询销售单价100-1000的商品
SQL
SELECT *
FROM Product
WHERE sale_price BETWEEN 100 AND 1000;
SELECT *
FROM Product
WHERE sale_price > 100 AND sale_price < 1000;
代码
# BETWEEN:范围查询(包含临界值)
between = Product.select().where(Product.sale_price.between(100, 1000))
for i in between.tuples():
print(i)
print()
# (1, '0001', 'T恤衫', '衣服', 1000, 500, datetime.date(2009, 9, 20))
# (2, '0002', '打孔器', '办公用品', 500, 320, datetime.date(2009, 9, 11))
# (6, '0006', '叉子', '厨房用具', 500, None, datetime.date(2009, 9, 20))
# (7, '0007', '擦菜板', '厨房用具', 880, 790, datetime.date(2008, 4, 28))
# (8, '0008', '圆珠笔', '办公用品', 100, None, datetime.date(2009, 11, 11))
# < > AND:范围查询(不含临界值)
between = Product.select().where(Product.sale_price > 100 and Product.sale_price < 1000)
for i in between.tuples():
print(i)
# (2, '0002', '打孔器', '办公用品', 500, 320, datetime.date(2009, 9, 11))
# (6, '0006', '叉子', '厨房用具', 500, None, datetime.date(2009, 9, 20))
# (7, '0007', '擦菜板', '厨房用具', 880, 790, datetime.date(2008, 4, 28))
# (8, '0008', '圆珠笔', '办公用品', 100, None, datetime.date(2009, 11, 11))
查询进货单价为NULL和不为NULL的商品
SQL
SELECT *
FROM Product
WHERE purchase_price IS NULL;
SELECT *
FROM Product
WHERE purchase_price IS NOT NULL;
代码
# IS NULL:为空
null = (Product
.select()
.where(Product.purchase_price.is_null()))
for i in null.tuples():
print(i)
print()
# (6, '0006', '叉子', '厨房用具', 500, None, datetime.date(2009, 9, 20))
# (8, '0008', '圆珠笔', '办公用品', 100, None, datetime.date(2009, 11, 11))
# IS NOT NULL:不为空
null = (Product
.select()
.where(Product.purchase_price.is_null(False)))
for i in null.tuples():
print(i)
# (1, '0001', 'T恤衫', '衣服', 1000, 500, datetime.date(2009, 9, 20))
# (2, '0002', '打孔器', '办公用品', 500, 320, datetime.date(2009, 9, 11))
# (3, '0003', '运动T恤', '衣服', 4000, 2800, None)
# (4, '0004', '菜刀', '厨房用具', 3000, 2800, datetime.date(2009, 9, 20))
# (5, '0005', '高压锅', '厨房用具', 6800, 5000, datetime.date(2009, 1, 15))
# (7, '0007', '擦菜板', '厨房用具', 880, 790, datetime.date(2008, 4, 28))
查询进货单价为320、500、5000的商品
SQL
SELECT *
FROM Product
WHERE purchase_price = 320
OR purchase_price = 500
OR purchase_price = 5000;
SELECT *
FROM Product
WHERE purchase_price IN (320, 500, 5000);
代码
# IN:OR的简便用法
query = Product.select().where(Product.purchase_price.in_([320, 500, 5000]))
for i in query.tuples():
print(i)
# (1, '0001', 'T恤衫', '衣服', 1000, 500, datetime.date(2009, 9, 20))
# (2, '0002', '打孔器', '办公用品', 500, 320, datetime.date(2009, 9, 11))
# (5, '0005', '高压锅', '厨房用具', 6800, 5000, datetime.date(2009, 1, 15))
SQL
代码
数据库 Shop
商品表 Product
商店商品表 ShopProduct
数据准备
-- 创建表
CREATE TABLE ShopProduct
(
shop_id CHAR(4) NOT NULL,
shop_name VARCHAR(100) NOT NULL,
product_id CHAR(4) NOT NULL,
quantity INTEGER
);
-- 插入数据
BEGIN TRANSACTION;
INSERT INTO ShopProduct VALUES ('000A', '东京', '0001', 30);
INSERT INTO ShopProduct VALUES ('000A', '东京', '0002', 50);
INSERT INTO ShopProduct VALUES ('000A', '东京', '0003', 15);
INSERT INTO ShopProduct VALUES ('000B', '名古屋', '0002', 30);
INSERT INTO ShopProduct VALUES ('000B', '名古屋', '0003', 120);
INSERT INTO ShopProduct VALUES ('000B', '名古屋', '0004', 20);
INSERT INTO ShopProduct VALUES ('000B', '名古屋', '0006', 10);
INSERT INTO ShopProduct VALUES ('000B', '名古屋', '0007', 40);
INSERT INTO ShopProduct VALUES ('000C', '大阪', '0003', 20);
INSERT INTO ShopProduct VALUES ('000C', '大阪', '0004', 50);
INSERT INTO ShopProduct VALUES ('000C', '大阪', '0006', 90);
INSERT INTO ShopProduct VALUES ('000C', '大阪', '0007', 70);
INSERT INTO ShopProduct VALUES ('000D', '福冈', '0001', 100);
COMMIT;
SQL
SELECT SP.shop_id, SP.shop_name, SP.product_id, P.product_name
FROM Shopproduct AS SP
INNER JOIN Product AS P
ON SP.product_id = P.id;
代码
# 内联结:INNER JOIN
SP = ShopProduct.alias()
P = Product.alias()
innerjoin = (SP
.select(SP.shop_id, SP.shop_name, SP.product_id, P.product_name)
.join(P, on=(SP.product_id == P.id)))
for i in innerjoin.tuples():
print(i)
# ('000A', '东京', '0001', 'T恤衫')
# ('000A', '东京', '0002', '打孔器')
# ('000A', '东京', '0003', '运动T恤')
# ('000B', '名古屋', '0002', '打孔器')
# ('000B', '名古屋', '0003', '运动T恤')
# ('000B', '名古屋', '0004', '菜刀')
# ('000B', '名古屋', '0006', '叉子')
# ('000B', '名古屋', '0007', '擦菜板')
# ('000C', '大阪', '0003', '运动T恤')
# ('000C', '大阪', '0004', '菜刀')
# ('000C', '大阪', '0006', '叉子')
# ('000C', '大阪', '0007', '擦菜板')
# ('000D', '福冈', '0001', 'T恤衫')
if ProjectSettingModel.table_exists() is False:
ProjectSettingModel.create_table()
import os
from peewee import *
from datetime import date
if os.path.exists('people.db'):
os.remove('people.db')
db = SqliteDatabase('people.db')
class Person(Model):
name = CharField()
birthday = DateField()
class Meta:
database = db
db.connect()
db.create_tables([Person])
a = Person.create(name='a', birthday=date(1980, 1, 1))
b = Person.create(name='b', birthday=date(1997, 1, 1))
c = Person.create(name='c', birthday=date(2000, 1, 1))
# 筛选时间
date_1990 = date(1990, 1, 1)
date_1999 = date(1999, 12, 31)
query = (Person.select().where((Person.birthday.between(date_1990, date_1999))))
print('90后名单:', [person.name for person in query])
# 90后名单: ['b']
query = Person.select().where((Person.birthday < date_1990) | (Person.birthday > date_1999))
print('非90后名单:', [person.name for person in query])
# 非90后名单: ['a', 'c']
get_or_create()
。参数defaults
传入字典,用于填充值,而不用于查询。replace()
。无法指定部分字段。save()
。from peewee import *
DB_NAME = 'shop.db' # 数据库名
db = SqliteDatabase(DB_NAME) # 数据库使用SQLite
class Product(Model):
product_id = CharField() # 商品编号
product_name = CharField() # 商品名称
class Meta:
database = db
db.connect() # 连接数据库
db.create_tables([Product]) # 创建表
product, created = Product.get_or_create(id=1, defaults={'product_id': '0001', 'product_name': 'T恤衫'}) # 查询不了则新增
print(product, created)
product, created = Product.get_or_create(id=1, defaults={'product_id': '0001', 'product_name': 'T恤衫'}) # 查询不了则新增
print(product, created)
# 1 True
# 1 False
product = (Product.replace(id=1, product_id='0001', product_name='打孔器').execute()) # 修改不了则新增
print(product)
product = (Product.replace(id=2, product_id='0002', product_name='打孔器').execute()) # 修改不了则新增
print(product)
# 1
# 2
from peewee import *
db = SqliteDatabase('test')
class Product(Model):
product_id = CharField()
product_name = CharField()
class Meta:
database = db
db.connect()
db.create_tables([Product])
id1 = Product.create(**{'product_id': '0001', 'product_name': 'apple'})
rows = Product(**{'product_id': '0002', 'product_name': 'banana'}).save()
print(id1)
print(rows)
bulk_create(model_list, batch_size=None)
insert_many(rows, fields=None)
bulk_update(model_list, fields, batch_size=None)
from peewee import *
from datetime import datetime
db = SqliteDatabase('test')
class User(Model):
username = TextField(unique=True)
last_login = DateTimeField(null=True)
class Meta:
database = db
db.connect()
db.create_tables([User])
with db.atomic(): # 事务
data = [
User(username='a', last_login=datetime.now()),
User(username='b', last_login=datetime.now()),
]
print(User.bulk_create(data, batch_size=100)) # 批量创建
# None
data = [
{'username': 'c', 'last_login': datetime.now()},
{'username': 'd', 'last_login': datetime.now()},
]
print(User.insert_many(data).execute()) # 批量插入
# 2
# print(User.insert_many(data, fields=[User.username, User.last_login]).execute()) # 批量插入,可指定字段
users = User.select()
for user in users:
user.username += '1'
print(User.bulk_update(users, fields=[User.username])) # 批量更新
# 4
参数 | 含义 | 默认值 |
---|---|---|
database | 数据库名 | |
user | 用户名 | |
password | 密码 | |
host | 主机 | |
port | 端口号 | MySQL 3306 PostgreSQL 5432 |
charset | 字符集 |
SQLite
from peewee import *
db = SqliteDatabase('test')
class Product(Model):
product_id = CharField()
product_name = CharField()
class Meta:
database = db
db.connect()
db.create_tables([Product])
print(db.execute_sql('SELECT name FROM sqlite_master').fetchall()) # 列出所有数据库
# [('product',)]
MySQL
from peewee import *
db = MySQLDatabase('test', user='root', password='123456')
print([i for i in db.execute_sql('SHOW DATABASES').fetchall()]) # 列出所有数据库
# [('information_schema',), ('mysql',), ('performance_schema',), ('sys',), ('test',)]
PostgreSQL
from peewee import *
db = PostgresqlDatabase('postgres', user='postgres', password='123456')
print(db.execute_sql('SELECT datname FROM pg_database;').fetchall())
# [('postgres',), ('template1',), ('template0',), ('peewee_test',), ('exercises',)] # 列出所有数据库
详细阅读:Database
with XxxModel._meta.database.transaction():
with db.atomic() as txn:
try:
xxx
except:
txn.rollback()
from peewee import *
db = SqliteDatabase('test')
class Product(Model):
id = PrimaryKeyField()
product_id = CharField()
product_name = CharField()
class Meta:
database = db
db.connect()
db.create_tables([Product])
data = [
{'product_id': '0001', 'product_name': 'apple'},
{'product_id': '0002', 'product_name': 'apple1'},
]
for i in data:
Product.create(**i)
new = Product.select().where(Product.id == 1)[0]
new.id = None # 主键要设为None
new.product_id = '0003'
new.save()
参考这篇文章
SELECT * FROM Product AS t1
INNER JOIN (
SELECT id, MAX(create_at) AS create_at FROM Product GROUP BY product_type
) AS t2
ON t1.id=t2.id AND t1.create_at=t2.create_at;
from peewee import *
from datetime import datetime
db = SqliteDatabase('test')
class Product(Model):
id = PrimaryKeyField()
product_id = CharField()
product_name = CharField()
product_type = CharField()
create_at = DateTimeField()
class Meta:
database = db
db.connect()
db.create_tables([Product])
data = [
{'product_id': '0001', 'product_name': 'apple', 'product_type': 'fruit', 'create_at': datetime.now()},
{'product_id': '0002', 'product_name': 'banana', 'product_type': 'fruit', 'create_at': datetime.now()},
{'product_id': '0003', 'product_name': 'cat', 'product_type': 'animal', 'create_at': datetime.now()},
{'product_id': '0004', 'product_name': 'dog', 'product_type': 'animal', 'create_at': datetime.now()},
]
Product.insert_many(data).execute() # 批量插入
lastest = Product.select(Product.id, fn.MAX(Product.create_at).alias('create_at')).group_by(Product.product_type)
lastest = lastest.alias('lastest')
data = Product.select().join(lastest, on=((Product.id == lastest.c.id) & (Product.create_at == lastest.c.create_at)))
print([i for i in data.dicts()])
python -m pwiz -e mysql -H 127.0.0.1 -p 3306 -u root -P -o database_name
B1、B2、B3等不一定存在,且数量不限
import os
import operator
from functools import reduce
from peewee import *
if os.path.exists('test.db'):
os.remove('test.db') # 存在则删掉,避免重复添加
db = SqliteDatabase('test.db')
class Product(Model):
id = PrimaryKeyField()
name = CharField() # 名称
date = DateField(null=True) # 日期
class Meta:
database = db
db.connect()
db.create_tables([Product])
condi = (Product.id == 1)
condi_or = []
condi_or.append(Product.name == '123')
condi_or.append(Product.name == '456')
if condi_or:
condi_or = reduce(operator.or_, condi_or)
condi = condi & condi_or
query = Product.select().where(condi)
print(query.sql())
# ('SELECT "t1"."id", "t1"."name", "t1"."date" FROM "product" AS t1 WHERE (("t1"."id" = ?) AND (("t1"."name" = ?) OR ("t1"."name" = ?)))', [1, '123', '456'])
# 或
condi = (Product.id == 1)
condi_or = True
condi_or = condi_or | (Product.name == '123')
condi_or = condi_or | (Product.name == '456')
if condi_or:
condi = condi & condi_or
query = Product.select().where(condi)
print(query.sql())
# ('SELECT "t1"."id", "t1"."name", "t1"."date" FROM "product" AS t1 WHERE (("t1"."id" = ?) AND ((? OR ("t1"."name" = ?)) OR ("t1"."name" = ?)))', [1, True, '123', '456'])
import os
import operator
from functools import reduce
from peewee import *
if os.path.exists('test.db'):
os.remove('test.db') # 存在则删掉,避免重复添加
db = SqliteDatabase('test.db')
class Product(Model):
id = PrimaryKeyField()
name = CharField() # 名称
date = DateField(null=True) # 日期
class Meta:
database = db
db.connect()
db.create_tables([Product])
for name in range(1, 10):
Product(name=str(name)).save()
condi = (Product.id == 1)
condi_or = []
condi_or.append(Product.name == '1')
condi_or.append(Product.name == '2')
if condi_or:
condi_or = reduce(operator.or_, condi_or)
condi = condi | condi_or
query = Product.select().where(condi)
print(query.sql())
print([i for i in query.dicts()])
# ('SELECT "t1"."id", "t1"."name", "t1"."date" FROM "product" AS t1 WHERE (("t1"."id" = ?) AND (("t1"."name" = ?) OR ("t1"."name" = ?)))', [1, '123', '456'])
# [{'id': 1, 'name': '1', 'date': None}, {'id': 2, 'name': '2', 'date': None}]
旧版本将 x.model
改为 x.model_class
import os
from operator import *
from typing import Union
from collections.abc import Callable
from peewee import *
if os.path.exists('test.db'):
os.remove('test.db') # 存在则删掉,避免重复添加
db = SqliteDatabase('test.db')
class Person(Model):
id = PrimaryKeyField()
name = CharField()
class Meta:
database = db
class Product(Model):
id = PrimaryKeyField()
name = CharField()
class Meta:
database = db
def batch_where(query, operator: Union[Callable, str] = eq, **kwargs):
"""批量添加查询条件,默认为=="""
querys = query if isinstance(query, list) else [query]
for i, x in enumerate(querys):
for column, value in kwargs.items():
if isinstance(operator, str):
querys[i] = x.where(methodcaller(operator, value)(getattr(x.model, column)))
else:
querys[i] = x.where(operator(getattr(x.model, column), value))
return querys if isinstance(query, list) else querys[0]
db.connect()
db.create_tables([Person, Product])
for name in range(10):
Person(name=str(name)).save()
Product(name=str(name)).save()
products = Product.select()
print(len(products)) # 10
products = batch_where(products, id=1, name='1')
print(len(products)) # 1
q1 = Product.select()
q2 = Person.select()
q1, q2 = batch_where([q1, q2], id=1)
print(len(q1), len(q2)) # 1 1
q1 = Product.select()
q2 = Person.select()
q1, q2 = batch_where([q1, q2], operator=gt, id=5)
print(len(q1), len(q2)) # 5 5
products = Product.select()
products = batch_where(products, operator='in_', id=[1, 2, 3])
print(len(products)) # 3
SQL语句及其种类
SQL基本书写规则
数据库、表、列的命名规则
注释的写法
--
/* */
安装psycopg2
pip install psycopg2
移动到clubdata.sql所在目录
cd desktop
导入数据
psql -U postgres -f clubdata.sql -x -q
进入PostgreSQL
psql -U postgres -d peewee_test
查询(一定要加分号!!!)
SELECT * FROM facilities;
cd为PostgreSQL的模式,类似目录,此处为了方便没有定义
model.py
from peewee import *
DB_NAME = 'peewee_test' # 数据库名
db = PostgresqlDatabase(DB_NAME, user='postgres', password='123456') # 数据库使用PostgreSQL
class BaseModel(Model):
class Meta:
database = db
class Member(BaseModel):
"""会员表"""
memid = AutoField() # 自增主键
surname = CharField() # 姓
firstname = CharField() # 名
address = CharField(max_length=300) # 地址
zipcode = IntegerField() # 邮政编码
telephone = CharField() # 电话号
recommendedby = ForeignKeyField('self', backref='recommended', column_name='recommendedby', null=True) # 推荐人
joindate = DateTimeField() # 添加时间
class Meta:
table_name = 'members'
class Facility(BaseModel):
"""场所表"""
facid = AutoField() # 主键
name = CharField() # 设施名
membercost = DecimalField(decimal_places=2) # 会员费用
guestcost = DecimalField(decimal_places=2) # 非会员费用
initialoutlay = DecimalField(decimal_places=2) # 办会员费用
monthlymaintenance = DecimalField(decimal_places=2) # 月维护费
class Meta:
table_name = 'facilities'
class Booking(BaseModel):
"""预订表"""
bookid = AutoField() # 主键
facility = ForeignKeyField(Facility, column_name='facid') # 场地ID
member = ForeignKeyField(Member, column_name='memid') # 会员ID
starttime = DateTimeField() # 开始时间
slots = IntegerField() # 位数
class Meta:
table_name = 'bookings'
SELECT * FROM facilities;
from model import *
query = Facility.select()
for i in query:
print(i.facid, i.name)
# 0 Tennis Court 1
# 1 Tennis Court 2
# 2 Badminton Court
# 3 Table Tennis
SELECT name, membercost FROM facilities;
from model import *
query = Facility.select(Facility.name, Facility.membercost)
for i in query:
print(i.name, i.membercost)
# Tennis Court 1 5
# Tennis Court 2 5
# Badminton Court 0
筛选会员费用大于0的场所
SELECT * FROM facilities WHERE membercost > 0;
from model import *
query = Facility.select().where(Facility.membercost > 0)
for i in query:
print(i.name, i.membercost)
# Tennis Court 1 5
# Tennis Court 2 5
# Massage Room 1 35
# Massage Room 2 35
# Squash Court 3.5
筛选会员费用大于0,且会员费用小于月维护费50分之一的场所
SELECT *
FROM facilities
WHERE membercost > 0 AND membercost < (monthlymaintenance / 50);
from model import *
query = Facility.select().where(
Facility.membercost > 0,
Facility.membercost < (Facility.monthlymaintenance / 50)
)
for i in query:
print(i.name, i.membercost, i.monthlymaintenance)
# Massage Room 1 35 3000
# Massage Room 2 35 3000
包含Tennis的场所
SELECT * FROM facilities WHERE name ILIKE '%tennis%';
from model import *
query = Facility.select().where(Facility.name.contains('tennis'))
query = Facility.select().where(Facility.name ** '%tennis%')
for i in query:
print(i.name)
# Tennis Court 1
# Tennis Court 2
# Table Tennis
匹配facid为1和5的场所
SELECT * FROM facilities WHERE facid IN (1, 5);
from model import *
query = Facility.select().where(Facility.facid.in_([1, 5]))
query = Facility.select().where((Facility.facid == 1) | (Facility.facid == 5))
for i in query:
print(i.name)
# Tennis Court 2
# Massage Room 2
以维护费用为100为界分出便宜和昂贵的场所
SELECT name,
CASE WHEN monthlymaintenance > 100 THEN 'expensive' ELSE 'cheap' END
FROM facilities;
from model import *
cost = Case(None, [(Facility.monthlymaintenance > 100, 'expensive')], 'cheap')
query = Facility.select(Facility.name, cost.alias('cost'))
for i in query:
print(i.name, i.cost)
# Tennis Court 1 expensive
# Tennis Court 2 expensive
# Badminton Court cheap
更多例子查阅文档:Case
列出2012.9之后加入的会员
SELECT memid, surname, firstname, joindate FROM members
WHERE joindate >= '2012-09-01';
import datetime
from model import *
query = (Member
.select(Member.memid, Member.surname, Member.firstname, Member.joindate)
.where(Member.joindate >= datetime.date(2012, 9, 1)))
for i in query:
print(i.memid, i.surname, i.firstname, i.joindate)
# 24 Sarwin Ramnaresh 2012-09-01 08:44:42
# 26 Jones Douglas 2012-09-02 18:43:05
# 27 Rumney Henrietta 2012-09-05 08:42:35
列出十个不同的姓并排序
SELECT DISTINCT surname FROM members ORDER BY surname LIMIT 10;
from model import *
query = (Member
.select(Member.surname)
.order_by(Member.surname)
.limit(10)
.distinct())
for i in query:
print(i.surname)
# Bader
# Baker
# Boothe
# Butters
# Coplin
# Crumpet
# Dare
# Farrell
# Genting
# GUEST
|
+
&
-
SELECT surname FROM members UNION SELECT name FROM facilities;
from model import *
lhs = Member.select(Member.surname)
rhs = Facility.select(Facility.name)
query = lhs | rhs
for i in query:
print(i.surname)
# Hunt
# Farrell
# Tennis Court 2
# Table Tennis
# Dare
查询最后一个会员的注册日期
SELECT MAX(joindate) FROM members;
from model import *
query = Member.select(fn.MAX(Member.joindate))
print(query.scalar())
# 2012-09-26 18:08:45
查询最后一个注册会员的姓名和注册日期
SELECT firstname, surname, joindate FROM members
WHERE joindate = (SELECT MAX(joindate) FROM members);
from model import *
MemberAlias = Member.alias()
subq = MemberAlias.select(fn.MAX(MemberAlias.joindate))
query = (Member
.select(Member.firstname, Member.surname, Member.joindate)
.where(Member.joindate == subq))
print(query.scalar(as_tuple=True))
for i in query:
print(i.firstname, i.surname, i.joindate)
# ('Darren', 'Smith', datetime.datetime(2012, 9, 26, 18, 8, 45))
# Darren Smith 2012-09-26 18:08:45
查询David Farrell的预订开始时间
SELECT starttime FROM bookings
INNER JOIN members ON (bookings.memid = members.memid)
WHERE surname = 'Farrell' AND firstname = 'David';
from model import *
query = (Booking
.select(Booking.starttime)
.join(Member)
.where((Member.surname == 'Farrell') &
(Member.firstname == 'David')))
for i in query:
print(i.starttime)
# 2012-09-18 09:00:00
# 2012-09-18 13:30:00
# 2012-09-18 17:30:00
查询网球场在2012-09-21的开始预订时间
SELECT starttime, name
FROM bookings
INNER JOIN facilities ON (bookings.facid = facilities.facid)
WHERE date_trunc('day', starttime) = '2012-09-21':: date
AND name ILIKE 'tennis%'
ORDER BY starttime, name;
import datetime
from model import *
query = (Booking
.select(Booking.starttime, Facility.name)
.join(Facility)
.where(
(fn.date_trunc('day', Booking.starttime) == datetime.date(2012, 9, 21)) &
Facility.name.startswith('Tennis'))
.order_by(Booking.starttime, Facility.name))
for i in query:
print(i.starttime, i.facility.name)
# 2012-09-21 08:00:00 Tennis Court 1
# 2012-09-21 08:00:00 Tennis Court 2
# 2012-09-21 09:30:00 Tennis Court 1
# 2012-09-21 10:00:00 Tennis Court 2
注意print()里的是
i.facility.name
查询推荐过其他会员的会员
SELECT DISTINCT m.firstname, m.surname
FROM members AS m2
INNER JOIN members AS m ON (m.memid = m2.recommendedby)
ORDER BY m.surname, m.firstname;
from model import *
MA = Member.alias()
query = (Member
.select(Member.firstname, Member.surname)
.join(MA, on=(MA.recommendedby == Member.memid))
.order_by(Member.surname, Member.firstname)
.distinct())
for i in query:
print(i.firstname, i.surname)
# Florence Bader
# Timothy Baker
# Gerald Butters
查询所有会员及其推荐人
SELECT m.firstname, m.surname, r.firstname, r.surname
FROM members AS m
LEFT OUTER JOIN members AS r ON (m.recommendedby = r.memid)
ORDER BY m.surname, m.firstname;
from model import *
MA = Member.alias()
query = (Member
.select(Member.firstname, Member.surname, MA.firstname, MA.surname)
.join(MA, JOIN.LEFT_OUTER, on=(Member.recommendedby == MA.memid))
.order_by(Member.surname, Member.firstname))
for i in query:
print(i.firstname, i.surname)
if i.recommendedby:
print(' ', i.recommendedby.firstname, i.recommendedby.surname)
# Florence Bader
# Ponder Stibbons
# Anne Baker
# Ponder Stibbons
SELECT DISTINCT m.firstname || ' ' || m.surname AS member, f.name AS facility
FROM members AS m
INNER JOIN bookings AS b ON (m.memid = b.memid)
INNER JOIN facilities AS f ON (b.facid = f.facid)
WHERE f.name LIKE 'Tennis%'
ORDER BY member, facility;
from model import *
fullname = Member.firstname + ' ' + Member.surname
query = (Member
.select(fullname.alias('member'), Facility.name.alias('facility'))
.join(Booking)
.join(Facility)
.where(Facility.name.startswith('Tennis'))
.order_by(fullname, Facility.name)
.distinct())
for i in query:
print(i.member, i.booking.facility.facility)
# Anne Baker Tennis Court 1
# Anne Baker Tennis Court 2
# Burton Tracy Tennis Court 1
查询2012-09-14的预订列表,会员或非会员消费超过30,注意非会员的ID总是0,按消费降序,不使用子查询。
SELECT m.firstname || ' ' || m.surname AS member,
f.name AS facility,
(CASE WHEN m.memid = 0 THEN f.guestcost * b.slots
ELSE f.membercost * b.slots END) AS cost
FROM members AS m
INNER JOIN bookings AS b ON (m.memid = b.memid)
INNER JOIN facilities AS f ON (b.facid = f.facid)
WHERE (date_trunc('day', b.starttime) = '2012-09-14') AND
((m.memid = 0 AND b.slots * f.guestcost > 30) OR
(m.memid > 0 AND b.slots * f.membercost > 30))
ORDER BY cost DESC;
import datetime
from model import *
cost = Case(Member.memid, (
(0, Facility.guestcost * Booking.slots),
), (Facility.membercost * Booking.slots)) # 第二个参数是tuple,用于放多个WHEN THEN
fullname = Member.firstname + ' ' + Member.surname
query = (Member.select(fullname.alias('member'), Facility.name.alias('facility'), cost.alias('cost'))
.join(Booking)
.join(Facility)
.where((fn.date_trunc('day', Booking.starttime) == datetime.date(2012, 9, 14)) &
(cost > 30))
.order_by(SQL('cost').desc()))
for i in query.namedtuples():
print(i.member, i.facility, i.cost)
# GUEST GUEST Massage Room 2 320
# GUEST GUEST Massage Room 1 160
# GUEST GUEST Massage Room 1 160
# for i in query:
# print(i.member, i.booking.facility.facility, i.cost)
用namedtuples比直接遍历方便
for i in query:
print(i.member, i.booking.facility.facility, i.cost)
不使用JOIN
SELECT DISTINCT m.firstname || ' ' || m.surname AS member,
(SELECT r.firstname || ' ' || r.surname
FROM members AS r
WHERE m.recommendedby = r.memid) AS recommended
FROM members AS m ORDER BY member;
from model import *
MA = Member.alias()
fullname = Member.firstname + ' ' + Member.surname
subq = MA.select(MA.firstname + ' ' + MA.surname).where(Member.recommendedby == MA.memid)
query = Member.select(fullname.alias('member'), subq.alias('recommended')).order_by(fullname)
for i in query.namedtuples():
print('{} | {}'.format(i.member, i.recommended))
# Anna Mackenzie | Darren Smith
# Anne Baker | Ponder Stibbons
# Burton Tracy | None
SELECT member, facility, cost from (
SELECT
m.firstname || ' ' || m.surname as member,
f.name as facility,
CASE WHEN m.memid = 0 THEN b.slots * f.guestcost
ELSE b.slots * f.membercost END AS cost
FROM members AS m
INNER JOIN bookings AS b ON m.memid = b.memid
INNER JOIN facilities AS f ON b.facid = f.facid
WHERE date_trunc('day', b.starttime) = '2012-09-14'
) as bookings
WHERE cost > 30
ORDER BY cost DESC;
import datetime
from model import *
cost = Case(Member.memid, (
(0, Facility.guestcost * Booking.slots),
), (Facility.membercost * Booking.slots))
fullname = Member.firstname + ' ' + Member.surname
iq = (Member
.select(fullname.alias('member'), Facility.name.alias('facility'), cost.alias('cost'))
.join(Booking)
.join(Facility)
.where(fn.date_trunc('day', Booking.starttime) == datetime.date(2012, 9, 14)))
query = (Member
.select(iq.c.member, iq.c.facility, iq.c.cost)
.from_(iq)
.where(iq.c.cost > 30)
.order_by(SQL('cost').desc()))
for i in query.dicts():
print(i['member'], i['facility'], i['cost'])
# GUEST GUEST Massage Room 2 320
# GUEST GUEST Massage Room 1 160
# GUEST GUEST Massage Room 1 160
INSERT INTO "facilities" ("facid", "name", "membercost", "guestcost",
"initialoutlay", "monthlymaintenance") VALUES (9, 'Spa', 20, 30, 100000, 800);
from model import *
res = Facility.insert({
Facility.facid: 9,
Facility.name: 'Spa',
Facility.membercost: 20,
Facility.guestcost: 30,
Facility.initialoutlay: 100000,
Facility.monthlymaintenance: 800}).execute()
# OR:
res = (Facility
.insert(facid=9, name='Spa', membercost=20, guestcost=30,
initialoutlay=100000, monthlymaintenance=800)
.execute())
INSERT INTO "facilities" ("facid", "name", "membercost", "guestcost", "initialoutlay", "monthlymaintenance") VALUES (9, 'Spa', 20, 30, 100000, 800);
INSERT INTO "facilities" ("facid", "name", "membercost", "guestcost", "initialoutlay", "monthlymaintenance") VALUES (10, 'Squash Court 2', 3.5, 17.5, 5000, 80);
from model import *
data = [
{'facid': 9, 'name': 'Spa', 'membercost': 20, 'guestcost': 30, 'initialoutlay': 100000, 'monthlymaintenance': 800},
{'facid': 10, 'name': 'Squash Court 2', 'membercost': 3.5, 'guestcost': 17.5, 'initialoutlay': 5000, 'monthlymaintenance': 80}
]
res = Facility.insert_many(data).execute()
插入一条相同的数据
INSERT INTO "facilities" ("facid", "name", "membercost", "guestcost", "initialoutlay", "monthlymaintenance")
SELECT (SELECT (MAX("facid") + 1) FROM "facilities") AS _, 'Spa', 20, 30, 100000, 800;
将第二个网球场的创办费用改为10000
UPDATE facilities SET initialoutlay = 10000 WHERE name = 'Tennis Court 2';
from model import *
res = (Facility
.update({Facility.initialoutlay: 10000})
.where(Facility.name == 'Tennis Court 2')
.execute())
print(res)
# OR:
res = (Facility
.update(initialoutlay=10000)
.where(Facility.name == 'Tennis Court 2')
.execute())
print(res)
提高网球场的价格,会员费用为6,非会员费用为30
UPDATE facilities SET membercost=6, guestcost=30 WHERE name ILIKE 'Tennis%';
from model import *
nrows = (Facility
.update(membercost=6, guestcost=30)
.where(Facility.name.startswith('Tennis'))
.execute())
print(nrows)
# 2
让第二个网球场的价格比第一个网球场贵10%
UPDATE facilities SET
membercost = (SELECT membercost * 1.1 FROM facilities WHERE facid = 0),
guestcost = (SELECT guestcost * 1.1 FROM facilities WHERE facid = 0)
WHERE facid = 1;
-- OR --
WITH new_prices (nmc, ngc) AS (
SELECT membercost * 1.1, guestcost * 1.1
FROM facilities WHERE name = 'Tennis Court 1')
UPDATE facilities
SET membercost = new_prices.nmc, guestcost = new_prices.ngc
FROM new_prices
WHERE name = 'Tennis Court 2'
from model import *
sq1 = Facility.select(Facility.membercost * 1.1).where(Facility.facid == 0)
sq2 = Facility.select(Facility.guestcost * 1.1).where(Facility.facid == 0)
res = (Facility
.update(membercost=sq1, guestcost=sq2)
.where(Facility.facid == 1)
.execute())
# OR:
cte = (Facility
.select(Facility.membercost * 1.1, Facility.guestcost * 1.1)
.where(Facility.name == 'Tennis Court 1')
.cte('new_prices', columns=('nmc', 'ngc')))
res = (Facility
.update(membercost=SQL('new_prices.nmc'), guestcost=SQL('new_prices.ngc'))
.with_cte(cte)
.from_(cte)
.where(Facility.name == 'Tennis Court 2')
.execute())
DELETE FROM bookings;
from model import *
nrows = Booking.delete().execute()
print(nrows)
DELETE FROM members WHERE memid = 37;
from model import *
nrows = Member.delete().where(Member.memid == 37).execute()
print(nrows)
# 1
DELETE FROM members WHERE NOT EXISTS (
SELECT * FROM bookings WHERE bookings.memid = members.memid);
from model import *
subq = Booking.select().where(Booking.member == Member.memid)
nrows = Member.delete().where(~fn.EXISTS(subq)).execute()
print(nrows)
SELECT COUNT(facid) FROM facilities;
from model import *
query = Facility.select(fn.COUNT(Facility.facid))
count = query.scalar()
print(count)
# OR:
count = Facility.select().count()
print(count)
# 12
非会员费用大于10
SELECT COUNT(facid) FROM facilities WHERE guestcost >= 10;
from model import *
query = Facility.select(fn.COUNT(Facility.facid)).where(Facility.guestcost >= 10)
count = query.scalar()
print(count)
# OR:
count = Facility.select().where(Facility.guestcost >= 10).count()
print(count)
# 9
SELECT recommendedby, COUNT(memid) FROM members
WHERE recommendedby IS NOT NULL
GROUP BY recommendedby
ORDER BY recommendedby;
from model import *
query = (Member
.select(Member.recommendedby, fn.COUNT(Member.memid))
.where(Member.recommendedby.is_null(False))
.group_by(Member.recommendedby)
.order_by(Member.recommendedby))
for i in query.namedtuples():
print(i.recommendedby, i.count)
# 1 5
# 2 3
# 3 1
SELECT facid, SUM(slots) FROM bookings GROUP BY facid ORDER BY facid;
from model import *
query = (Booking
.select(Booking.facid, fn.SUM(Booking.slots))
.group_by(Booking.facid)
.order_by(Booking.facid))
for i in query.namedtuples():
print(i.facility, i.sum)
# 0 1320
# 1 1278
# 2 1209
SELECT facid, SUM(slots)
FROM bookings
WHERE (date_trunc('month', starttime) = '2012-09-01'::date)
GROUP BY facid
ORDER BY SUM(slots);
import datetime
from model import *
query = (Booking
.select(Booking.facility, fn.SUM(Booking.slots))
.where(fn.date_trunc('month', Booking.starttime) == datetime.date(2012, 9, 1))
.group_by(Booking.facility)
.order_by(fn.SUM(Booking.slots)))
for i in query.namedtuples():
print(i.facility, i.sum)
# 5 122
# 3 422
# 7 426
SELECT facid, date_part('month', starttime), SUM(slots)
FROM bookings
WHERE date_part('year', starttime) = 2012
GROUP BY facid, date_part('month', starttime)
ORDER BY facid, date_part('month', starttime);
from model import *
month = fn.date_part('month', Booking.starttime)
query = (Booking
.select(Booking.facility, month, fn.SUM(Booking.slots))
.where(fn.date_part('year', Booking.starttime) == 2012)
.group_by(Booking.facility, month)
.order_by(Booking.facility, month))
for i in query.namedtuples():
print(i.facility, i.sum)
# 0 270
# 0 459
# 0 591
SELECT COUNT(DISTINCT memid) FROM bookings;
-- OR --
SELECT COUNT(1) FROM (SELECT DISTINCT memid FROM bookings) AS _;
from model import *
query = Booking.select(fn.COUNT(Booking.member.distinct()))
count = query.scalar()
print(count)
# OR:
query = Booking.select(Booking.member).distinct()
count = query.count()
print(count)
# 30
SELECT facid, SUM(slots) FROM bookings
GROUP BY facid
HAVING SUM(slots) > 1000
ORDER BY facid;
from model import *
query = (Booking
.select(Booking.facility, fn.SUM(Booking.slots))
.group_by(Booking.facility)
.having(fn.SUM(Booking.slots) > 1000)
.order_by(Booking.facility))
for i in query.namedtuples():
print(i.facility, i.sum)
# 0 1320
# 1 1278
# 2 1209
# 4 1404
# 6 1104
SELECT f.name, SUM(b.slots * (
CASE WHEN b.memid = 0 THEN f.guestcost ELSE f.membercost END)) AS revenue
FROM bookings AS b
INNER JOIN facilities AS f ON b.facid = f.facid
GROUP BY f.name
ORDER BY revenue;
from model import *
revenue = fn.SUM(Booking.slots * Case(None, (
(Booking.member == 0, Facility.guestcost),
), Facility.membercost))
query = (Facility
.select(Facility.name, revenue.alias('revenue'))
.join(Booking)
.group_by(Facility.name)
.order_by(SQL('revenue')))
for i in query.namedtuples():
print(i.name, i.revenue)
# Table Tennis 180
# Snooker Table 240
# Pool Table 270
SELECT f.name, SUM(b.slots * (
CASE WHEN b.memid = 0 THEN f.guestcost ELSE f.membercost END)) AS revenue
FROM bookings AS b
INNER JOIN facilities AS f ON b.facid = f.facid
GROUP BY f.name
HAVING SUM(b.slots * (CASE WHEN b.memid = 0 THEN f.guestcost ELSE f.membercost END)) < 1000
ORDER BY revenue;
from model import *
revenue = fn.SUM(Booking.slots * Case(None, (
(Booking.member == 0, Facility.guestcost),
), Facility.membercost))
query = (Facility
.select(Facility.name, revenue.alias('revenue'))
.join(Booking)
.group_by(Facility.name)
.having(revenue < 1000)
.order_by(SQL('revenue')))
for i in query.namedtuples():
print(i.name, i.revenue)
# Table Tennis 180
# Snooker Table 240
# Pool Table 270
SELECT facid, SUM(slots) FROM bookings
GROUP BY facid
ORDER BY SUM(slots) DESC
LIMIT 1;
from model import *
query = (Booking
.select(Booking.facility, fn.SUM(Booking.slots))
.group_by(Booking.facility)
.order_by(fn.SUM(Booking.slots).desc())
.limit(1))
facid, nslots = query.scalar(as_tuple=True)
print(facid, nslots)
# 4 1404
只适用于PostgreSQL
SELECT facid, date_part('month', starttime), SUM(slots)
FROM bookings
WHERE date_part('year', starttime) = 2012
GROUP BY ROLLUP(facid, date_part('month', starttime))
ORDER BY facid, date_part('month', starttime);
from model import *
month = fn.date_part('month', Booking.starttime)
query = (Booking
.select(Booking.facility,
month.alias('month'),
fn.SUM(Booking.slots))
.where(fn.date_part('year', Booking.starttime) == 2012)
.group_by(fn.ROLLUP(Booking.facility, month))
.order_by(Booking.facility, month))
for i in query.namedtuples():
print(i)
# Row(facility=0, month=7.0, sum=270)
# Row(facility=0, month=8.0, sum=459)
# Row(facility=0, month=9.0, sum=591)
一个位置持续半个小时
SELECT f.facid, f.name, SUM(b.slots) * .5
FROM facilities AS f
INNER JOIN bookings AS b ON (f.facid = b.facid)
GROUP BY f.facid, f.name
ORDER BY f.facid;
from model import *
query = (Facility
.select(Facility.facid, Facility.name, (fn.SUM(Booking.slots) * .5).alias('hours'))
.join(Booking)
.group_by(Facility.facid, Facility.name)
.order_by(Facility.facid))
for i in query.namedtuples():
print(i.facid, i.name, i.hours)
# 0 Tennis Court 1 660.0
# 1 Tennis Court 2 639.0
# 2 Badminton Court 604.5
SELECT m.surname, m.firstname, m.memid, min(b.starttime) as starttime
FROM members AS m
INNER JOIN bookings AS b ON b.memid = m.memid
WHERE starttime >= '2012-09-01'
GROUP BY m.surname, m.firstname, m.memid
ORDER BY m.memid;
import datetime
from model import *
query = (Member
.select(Member.surname, Member.firstname, Member.memid,
fn.MIN(Booking.starttime).alias('starttime'))
.join(Booking)
.where(Booking.starttime >= datetime.date(2012, 9, 1))
.group_by(Member.surname, Member.firstname, Member.memid)
.order_by(Member.memid))
for i in query.namedtuples():
print(i.surname, i.firstname, i.memid, i.starttime)
# GUEST GUEST 0 2012-09-01 08:00:00
# Smith Darren 1 2012-09-01 09:00:00
# Smith Tracy 2 2012-09-01 11:30:00
只适用于PostgreSQL
SELECT COUNT(*) OVER(), firstname, surname
FROM members ORDER BY joindate;
from model import *
query = (Member
.select(fn.COUNT(Member.memid).over(), Member.firstname,
Member.surname)
.order_by(Member.joindate))
for i in query.namedtuples():
print(i.count, i.firstname, i.surname)
# 30 GUEST GUEST
# 30 Darren Smith
# 30 Tracy Smith
只适用于PostgreSQL
SELECT row_number() OVER (ORDER BY joindate), firstname, surname
FROM members ORDER BY joindate;
from model import *
query = (Member
.select(fn.row_number().over(order_by=[Member.joindate]),
Member.firstname, Member.surname)
.order_by(Member.joindate))
for i in query.namedtuples():
print(i.row_number, i.firstname, i.surname)
# 1 GUEST GUEST
# 2 Darren Smith
# 3 Tracy Smith
只适用于PostgreSQL
SELECT facid, total FROM (
SELECT facid, SUM(slots) AS total,
rank() OVER (order by SUM(slots) DESC) AS rank
FROM bookings
GROUP BY facid
) AS ranked WHERE rank = 1;
from model import *
rank = fn.rank().over(order_by=[fn.SUM(Booking.slots).desc()])
subq = (Booking
.select(Booking.facility, fn.SUM(Booking.slots).alias('total'),
rank.alias('rank'))
.group_by(Booking.facility))
query = (Select(columns=[subq.c.facid, subq.c.total])
.from_(subq)
.where(subq.c.rank == 1)
.bind(db))
for i in query.namedtuples():
print(i.facid, i.total)
# 4 1404
只适用于PostgreSQL
SELECT firstname, surname,
((SUM(bks.slots)+10)/20)*10 as hours,
rank() over (order by ((sum(bks.slots)+10)/20)*10 desc) as rank
FROM members AS mems
INNER JOIN bookings AS bks ON mems.memid = bks.memid
GROUP BY mems.memid
ORDER BY rank, surname, firstname;
from model import *
hours = ((fn.SUM(Booking.slots) + 10) / 20) * 10
query = (Member
.select(Member.firstname, Member.surname, hours.alias('hours'),
fn.rank().over(order_by=[hours.desc()]).alias('rank'))
.join(Booking)
.group_by(Member.memid)
.order_by(SQL('rank'), Member.surname, Member.firstname))
for i in query.namedtuples():
print(i.rank, i.hours, i.firstname, i.surname)
# 1 1200 GUEST GUEST
# 2 340 Darren Smith
# 3 330 Tim Rownam
只适用于PostgreSQL
SELECT name, rank FROM (
SELECT f.name, RANK() OVER (ORDER BY SUM(
CASE WHEN memid = 0 THEN slots * f.guestcost
ELSE slots * f.membercost END) DESC) AS rank
FROM bookings
INNER JOIN facilities AS f ON bookings.facid = f.facid
GROUP BY f.name) AS subq
WHERE rank <= 3
ORDER BY rank;
from model import *
total_cost = fn.SUM(Case(None, (
(Booking.member == 0, Booking.slots * Facility.guestcost),
), (Booking.slots * Facility.membercost)))
subq = (Facility
.select(Facility.name,
fn.RANK().over(order_by=[total_cost.desc()]).alias('rank'))
.join(Booking)
.group_by(Facility.name))
query = (Select(columns=[subq.c.name, subq.c.rank])
.from_(subq)
.where(subq.c.rank <= 3)
.order_by(subq.c.rank)
.bind(db))
for i in query.namedtuples():
print(i.rank, i.name)
# 1 Massage Room 1
# 2 Tennis Court 2
# 3 Tennis Court 1
只适用于PostgreSQL
根据收益将场所分为高、中、低三个组
SELECT name,
CASE class WHEN 1 THEN 'high' WHEN 2 THEN 'average' ELSE 'low' END
FROM (
SELECT f.name, ntile(3) OVER (ORDER BY SUM(
CASE WHEN memid = 0 THEN slots * f.guestcost ELSE slots * f.membercost
END) DESC) AS class
FROM bookings INNER JOIN facilities AS f ON bookings.facid = f.facid
GROUP BY f.name
) AS subq
ORDER BY class, name;
from model import *
cost = fn.SUM(Case(None, (
(Booking.member == 0, Booking.slots * Facility.guestcost),
), (Booking.slots * Facility.membercost)))
subq = (Facility
.select(Facility.name,
fn.NTILE(3).over(order_by=[cost.desc()]).alias('klass'))
.join(Booking)
.group_by(Facility.name))
klass_case = Case(subq.c.klass, [(1, 'high'), (2, 'average')], 'low')
query = (Select(columns=[subq.c.name, klass_case])
.from_(subq)
.order_by(subq.c.klass, subq.c.name)
.bind(db))
for i in query.namedtuples():
print(i.case, i.name)
# high Massage Room 1
# high Tennis Court 1
# high Tennis Court 2
WITH RECURSIVE recommenders(recommender) as (
SELECT recommendedby FROM members WHERE memid = 27
UNION ALL
SELECT mems.recommendedby
FROM recommenders recs
INNER JOIN members AS mems ON mems.memid = recs.recommender
)
SELECT recs.recommender, mems.firstname, mems.surname
FROM recommenders AS recs
INNER JOIN members AS mems ON recs.recommender = mems.memid
ORDER By memid DESC;
from model import *
base = (Member
.select(Member.recommendedby)
.where(Member.memid == 27)
.cte('recommenders', recursive=True, columns=('recommender',)))
MA = Member.alias()
recursive = (MA
.select(MA.recommendedby)
.join(base, on=(MA.memid == base.c.recommender)))
cte = base.union_all(recursive)
query = (cte
.select_from(cte.c.recommender, Member.firstname, Member.surname)
.join(Member, on=(cte.c.recommender == Member.memid))
.order_by(Member.memid.desc()))
for i in query.namedtuples():
print(i.recommender, i.firstname, i.surname)
# 20 Matthew Genting
# 5 Gerald Butters
# 1 Darren Smith
from peewee import *
db = SqliteDatabase('test')
base = Select(columns=(
Value(1).alias('n'),
Value(0).alias('fib_n'),
Value(1).alias('next_fib_n'))).cte('fibonacci', recursive=True)
n = (base.c.n + 1).alias('n')
recursive_term = Select(columns=(
n,
base.c.next_fib_n,
base.c.fib_n + base.c.next_fib_n)).from_(base).where(n < 10)
fibonacci = base.union_all(recursive_term)
query = fibonacci.select_from(fibonacci.c.n, fibonacci.c.fib_n)
results = list(query.execute(db))
print(results)
# [{'n': 1, 'fib_n': 0}, {'n': 2, 'fib_n': 1}, {'n': 3, 'fib_n': 1}, {'n': 4, 'fib_n': 2}, {'n': 5, 'fib_n': 3}, {'n': 6, 'fib_n': 5}, {'n': 7, 'fib_n': 8}, {'n': 8, 'fib_n': 13}, {'n': 9, 'fib_n': 21}, {'n': 10, 'fib_n': 34}]
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。