赞
踩
import sqlalchemy print(sqlalchemy.__version__) print(dir(sqlalchemy)) # BINARY | LargeBinary | VARBINARY | SMALLINT | SamllInteger | INT | INTEGER | Integer | BIGINT | BigInteger # DECIMAL | FLOAT | Float | NUMERIC | Numeric | REAL # CHAR | NCHAR | VARCHAR | NVARCHAR | CLOB | BLOB | TEXT | Text | String # BOOLEAN | Boolean # DATE | TIME | DATETIME | DDL | TIMESTAMP | Date | Time | DateTime # Enum | Sequence | ARRAY | JSON # Constraint | PrimaryKeyConstraint | ForeignKey | ForeignKeyConstraint | Index | CheckConstraint | UniqueConstraint # BLANK_SCHEMA | Column | ColumnDefault | DefaultClause | FetchedValue | Interval | MetaData | ThreadLocalMetaData | Table | TypeDecorator | PassiveDefault | PickleType | Unicode | UnicodeText # 建表相关: column | table | tablesample(相当于view把) | collate # 语句类型: delete | update | select | subquery | insert # distinct # 多表连接: join | outerjoin | alias(相当于as) # where后的: all_ | any_ | exists | and_ | or_ | not_ | between | true | false | null | nullsfirst(排序时为null的记录放顶部,mysql不支持null first|last语法) | nullslast | bindparam | case | within_group # 多条查询: (并集) union | (有all的包含重复列) union_all | (差集,即第一表有第二表无的,mysql不支持) except_ | except_all | (交集) intersect | intersect_all # order by后的: asc | desc
# 异常模块: exc # 引擎模块: engine | engine_from_config | create_engine # 工具模块: util | cutils # 连接池模块: pool # schema模块: schema # 语句模块: sql # func功能: func | funcfilter # processors: processors | cprocessors # event: event | events # inspection: inspection | inspect # cresultproxy: cresultproxy # dialects: dialects # log: log # types: types # interfaces: interfaces # ['cast', 'extract', 'lateral', 'literal', 'literal_column', 'modifier', 'outparam', 'over', 'text', 'tuple_', 'type_coerce']
sqlite3_engine = sqlalchemy.create_engine('sqlite:///:memory', echo=True, encoding='UTF-8') # mysql-python(python2中的mysqldb): mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> # pymysql: mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] # mysql-connector: mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> # cx_oracle: oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] # http://docs.sqlalchemy.org/en/latest/dialects/index.html print(dir(sqlite3_engine)) # sqlite3_engine.url: 'sqlite:///:memory' # sqlite3_engine.driver: 'pysqlite' # sqlite3_engine.echo: True # sqlite3_engine.name: 'sqlite' # transaction = sqlite3_engine.begin() # connection = sqlite3_engine.connect(**args) # sqlite3_engine.dialect # <sqlalchemy.dialects.sqlite.pysqlite.SQLiteDialect_pysqlite at 0x217dc71a080> # sqlite3_engine.dispose() # 关闭了engine,还在执行的connection也会失去关联,但那些connection执行完后也会自动被回收 # sqlite3_engine.execute(stat, *multiparams, **params) # shard1 = sqlite3_engine.execution_options(shard_id='shard1') # 生成具有 shard_id='shard1' 执行选项的 engine # from sqlalchemy import event # from sqlalchemy.engine import Engine # shards = {"default": "base", shard_1: "db1", "shard_2": "db2"} # @event.listens_for(Engine, "before_cursor_execute") # def _switch_shard(conn, cursor, stmt, params, context, executemany): # shard_id = conn._execution_options.get('shard_id', "default") # current_shard = conn.info.get("current_shard", None) # if current_shard != shard_id: # cursor.execute("use %s" % shards[shard_id]) # conn.info["current_shard"] = shard_id # shard1.get_execution_options() # shard1.update_execution_options(**opt) # sqlite3_engine.has_table(table_name, schema=None) # sqlite3_engine.table_names(schema=None, connection=None) # sqlite3_engine.logging_name # 空的 # sqlite3_engine.logger # <sqlalchemy.log.InstanceLogger at 0x217dd8c5048> # ['dispatch', 'engine', 'pool', 'raw_connection', 'run_callable', 'scalar', 'schema_for_object', 'transaction']
# default
engine = create_engine('postgresql://scott:tiger@localhost/mydatabase')
# psycopg2
engine = create_engine('postgresql+psycopg2://scott:tiger@localhost/mydatabase')
# pg8000
engine = create_engine('postgresql+pg8000://scott:tiger@localhost/mydatabase')
# default
engine = create_engine('mysql://scott:tiger@localhost/foo')
# mysqlclient (a maintained fork of MySQL-Python)
engine = create_engine('mysql+mysqldb://scott:tiger@localhost/foo')
# PyMySQL
engine = create_engine('mysql+pymysql://scott:tiger@localhost/foo')
engine = create_engine('oracle://scott:tiger@127.0.0.1:1521/sidname')
engine = create_engine('oracle+cx_oracle://scott:tiger@tnsname')
# pyodbc
engine = create_engine('mssql+pyodbc://scott:tiger@mydsn')
# pymssql
engine = create_engine('mssql+pymssql://scott:tiger@hostname:port/dbname')
# sqlite://<nohostname>/<path>
# where <path> is relative:
engine = create_engine('sqlite:///foo.db')
# Unix/Mac - 4 initial slashes in total
engine = create_engine('sqlite:absolute/path/to/foo.db')
# Windows
engine = create_engine('sqlite:///C:\\path\\to\\foo.db')
# Windows alternative using raw string
engine = create_engine(r'sqlite:///C:\path\to\foo.db')
engine = create_engine('sqlite://')
from sqlalchemy import Table, Column, Integer, String, MetaData, ForeignKey
metadata = MetaData()
users = Table('users', metadata, Column('id', Integer, primary_key=True), Column('name', String), Column('fullname', String))
addresses = Table('addresses', metadata, Column('id', Integer, primary_key=True), Column('user_id', None, ForeignKey('users.id')), Column('email_address', String, nullable=True))
metadata.create_all(sqlite3_engine)
print(dir(users))
# ['__and__', '__bool__', '__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__invert__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__nonzero__', '__or__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__visit_name__', '__weakref__', '_annotate', '_annotations', '_autoincrement_column', '_autoload', '_clone', '_cloned_set', '_cols_populated', '_columns', '_compiler', '_compiler_dispatch', '_constructor', '_copy_internals', '_deannotate', '_execute_on_connection', '_extra_dependencies', '_extra_kwargs', '_from_objects', '_hide_froms', '_init', '_init_collections', '_init_existing', '_init_items', '_is_clone_of', '_is_from_container', '_is_join', '_is_lateral', '_is_lexical_equivalent', '_is_select', '_kw_reg_for_dialect', '_kw_reg_for_dialect_cls', '_kw_registry', '_memoized_property', '_negate', '_order_by_label_element', '_params', '_populate_column_collection', '_prefixes', '_refresh_for_new_column', '_reset_exported', '_schema_item_copy', '_select_iterable', '_set_parent', '_set_parent_with_dispatch', '_sorted_constraints', '_textual', '_translate_schema', '_validate_dialect_kwargs', '_with_annotations', 'add_is_dependent_on', 'alias', 'append_column', 'append_constraint', 'append_ddl_listener', 'argument_for', 'bind', 'c', 'columns', 'comment', 'compare', 'compile', 'constraints', 'correspond_on_equivalents', 'corresponding_column', 'count', 'create', 'delete', 'description', 'dialect_kwargs', 'dialect_options', 'dispatch', 'drop', 'exists', 'foreign_key_constraints', 'foreign_keys', 'fullname', 'get_children', 'implicit_returning', 'indexes', 'info', 'insert', 'is_clause_element', 'is_derived_from', 'is_selectable', 'join', 'key', 'kwargs', 'lateral', 'metadata', 'name', 'named_with_column', 'outerjoin', 'params', 'primary_key', 'quote', 'quote_schema', 'replace_selectable', 'schema', 'select', 'selectable', 'self_group', 'supports_execution', 'tablesample', 'tometadata', 'unique_params', 'update']
# 主要的有 insert/select/update/delete
ins = users.insert()
print(ins) # 'INSERT INTO users (id, name, fullname) VALUES (:id, :name, :fullname)'
ins = users.insert().values(name='jack', fullname='Jack Jones')
print(ins) # 'INSERT INTO users (name, fullname) VALUES (:name, :fullname)'
print(ins.compile().params) # {'fullname': 'Jack Jones', 'name': 'jack'}
# insert | update | delete
conn = sqlite3_engine.connect()
result = conn.execute(ins)
ins.bind = engine
print(ins) # 'INSERT INTO users (name, fullname) VALUES (?, ?)'
print(result.inserted_primary_key) # 1
ins = users.insert()
result = conn.execute(ins, id=2, name='wendy', fullname='Wendy Williams')
results = conn.execute(addresses.insert(), [
{'user_id': 1, 'email_address': 'jack@yahoo.com'}
{'user_id': 1, 'email_address': 'jack@msn.com'}
{'user_id': 2, 'email_address': 'www@www.org'}
{'user_id': 2, 'email_address': 'wendy@aol.com'}
])
# executemany这样的形式可以用于 select | update | delete
from sqlalchemy.sql import select sel = select([users]) result = conn.execute(sel) for row in result: print(row) result.close() result = conn.execute(sel) row = result.fetchone() # rows = result.fetchall() print("name:", row['name'], "; fullname:", row['fullname']) print("name:", row[1], "; fullname:", row[2]) print("name:", row[users.c.name], "; fullname:", row[users.c.fullname]) result.close() for row in conn.execute(select([users, addresses])): print(row) s = select([users, addresses]).where(users.c.id == addresses.c.user_id) for row in conn.execute(s): print(row)
print(users.c.id == addresses.c.user_id) # users.id = addresses.user_id
print(users.c.id == 7) # users.id = :id_1
print((users.c.id == 7).compile().params) # {u'id_1': 7}
print(users.c.id != 7) # users.id != :id_1
print(users.c.name == None) # users.name IS NULL
print('fred' > users.c.name) # users.name < :name_1
print(users.c.id + addresses.c.id) # users.id + addresses.id
print(users.c.name + users.c.fullname) # users.name || users.fullname
print((users.c.name + users.c.fullname).compile(bind=create_engine('mysql://'))) # concat(users.name, users.fullname)
print(users.c.name.op('tiddlywinks')('foo')) # users.name tiddlywinks :name_1
print(somecolumn.op('&')(0xff)) #
from sqlalchemy import type_coerce
expr = type_coerce(somecolumn.op('-%>')('foo'), MySpecialType())
stmt = select([expr])
somecolumn.bool_op('-->')('some value')
from sqlalchemy.sql import and_, or_, not_ print(and_( users.c.name.like('j%'), users.c.id == addresses.c.user_id, or_( addresses.c.email_address == 'wendy@aol.com', addresses.c.email_address == 'jack@yahoo.com' ), not_(users.c.id > 5) )) # users.name LIKE :name_1 AND users.id = addresses.user_id AND (addresses.email_address = :email_address_1 OR addresses.email_address = :email_address_2) AND users.id <= :id_1 print(users.c.name.like('j%') & (users.c.id == addresses.c.user_id) & ((addresses.c.email_address == 'wendy@aol.com') | (addresses.c.email_address == 'jack@yahoo.com')) & ~(users.c.id>5)) # 与上面等价,但格式太混乱 s = select( [(users.c.fullname + ', ' + addresses.c.email_address).label('title')]. \ where(and_( users.c.id == addresses.c.user_id, users.c.name.between('m', 'z'), or_( addresses.c.email_address.like('%@aol.com'), addresses.c.email_address.like('%@msn.com') )))) rows = conn.execute(s).fetchall() s = select( [(users.c.fullname + ', ' + addresses.c.email_address).label('title')]. \ where(users.c.id == addresses.c.user_id). \ where(users.c.name.between('m', 'z')). \ where(or_( addresses.c.email_address.like('%@aol.com'), addresses.c.email_address.like('%@msn.com') ))) # 与上面等价
from sqlalchemy.sql import text
s = text('select concat(users.fullname, ", ", addresses.email_address) as title from users, addresses where users.id=addresses.user_id and users.name between :x and :y and (addresses.email_address like :e1 or addresses.email_address like :e2)')
conn.execute(s, x='m', y='z', e1='%@aol.com', e2='%@msn.com').fetchall()
# -------- 定制参数
s = text("select * from users where users.name between :x and :y")
s = s.bindparmas(x='m', y='z'); conn.execute(s) # or
s = s..bindparams(bindparam("x", type_=String), bindparam("y", type_=String)); conn.execute(s, {'x': 'm', 'y': 'z'}) # 定制获取的参数的类型
# ---------- 定制返回类型
s = s.columns(id=Integer, name=String) # or
s = text("select name, id from users"); s = s.columns(users.c.id, users.c.name) # or
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, func, ForeignKey, exists from sqlalchemy.orm import sessionmaker, aliased, relationship Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String) fullname = Column(String) nickname = Column(String) print(User.__table__) Base.metadata.create_all(engine) Session = sessionmaker(bind=engine) # Session.config(bind=engie) session = Session() session.add(User(name='ed', fullname='Ed Jones', nickname='edsnickname')) session.add_all([ User(name='wendy', fullname='Wendy Williams', nickname='windy'), User(name='mary', fullname='Mary Contrary', nickname='mary'), User(name='fred', fullname='Fred Flintstone', nickname='freddy') ]) our_user = session.query(User).filter_by(name='ed').first() query = session.query(User).filter(User.name.in_(['Edwardo', 'fakeuser'])) for row in session.query(User).order_by(User.id): print(row.name, row.fullname) for name, fullname in session.query(User.name, User.fullname): print(name, fullname) for row in session.query(User, User.name, User.name.label('name_label')).all(): print(row.User, row.name, row.name_label) user_alias = aliased(User, name='user_alias') for row in session.query(user_alias, user_alias.name).all(): print(row.user_alias) query.all() query.first() query.one() query.one_or_none() query.scalar() query.count() session.query(func.count(User.name), User.name).group_by(User.name).all() # [(1, u'ed'), (1, u'fred'), (1, u'mary'), (1, u'wendy')] session.query(func.count('*')).select_from(User).scalar() # 4 session.query(func.count(User.id)).scalar() # 4 our_user.nickname = 'eddie' print(session.dirty) print(session.new) session.commit() session.rollback() fake_user in session # False class Address(Base): __tablename__ = 'addresses' id = Column(Integer, primary_key=True) email_address = Column(String(50), nullable=False) user_id = Column(Integer, ForeignKey('users.id')) user = relationship('User', back_populates='addresses') # 相当于一个关联的User实例 def __repr__(self): return '<Address(email_address="%s")>' % self.email_address User.addresses = relationship('Address', order_by=Address.id, back_populates='users') # 相当于一个关联的Address实例 Base.metadata.create_all(engine) query.join(Address).filter(...) query.outerjoin(User.addresses).filter(...) stmt = session.query(Address.user_id, func.count('*').label('address_count')).group_by(Address.user_id).subquery() result = session.query(User, stmt.c.address_count).outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id)
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, CHAR, VARCHAR, Enum, TIMESTAMP Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String) fullname = Column(String) nickname = Column(String) test1 = Column(CHAR(20)) test2 = Column(VARCHAR(20)) test3 = Column('test3', Enum('item1', 'item2')) test4 = Column('test4', TIMESTAMP) def __repr__(self): # 此方法 可选,不一定需要实现 return "<User(name='%s', fullname='%s', nickname='%s')>" % (self.name, self.fullname, self.nickname) print(User.__table__) # Table('users', MetaData(bind=None), Column('id', Integer(), table=<users>, primary_key=True, nullable=False), Column('name', String(), table=<users>), Column('fullname', String(), table=<users>), Column('nickname', String(), table=<users>), schema=None) Base.metadata.create_all(engine)
ed_user = User(name='ed', fullname='Ed Jones', nickname='edsnickname')
print(ed_user.name, ed_user.fullname, ed_user.nickname, ed_user.id) # 'ed' 'Ed Jones' 'edsnickname' None
from sqlalchemy.orm import sessionmaker
Session = sessionmaker(bind=engine)
# Session = sessionmaker() # 如果还没有定义engine,后面可以用 Session.config(bind=engine)
session = Session()
session.add(ed_user) # 现在还不会提交到数据库中,先缓存起来,但如果有query操作,会立刻提交到数据库
our_user = session.query(User).filter_by(name='ed').first()
print(ed_user is our_user) # True
session.add_all([
User(name='wendy', fullname='Wendy Williams', nickname='windy'),
User(name='mary', fullname='Mary Contrary', nickname='mary'),
User(name='fred', fullname='Fred Flintstone', nickname='freddy')
])
ed_user.nickname = 'eddie'
print(session.dirty) # IdentitySet([<User(name='ed', fullname='Ed Jones', nickname='eddie')>])
print(session.new) # IdentitySet([<User(name='wendy', fullname='Wendy Williams', nickname='windy')>, <User(name='mary', fullname='Mary Contrary', nickname='mary')>, <User(name='fred', fullname='Fred Flintstone', nickname='freddy')>])
session.commit()
ed_user.name = 'Edwardo'
fake_user = User(name='fakeuser', fullname='Invalid', nickname='12345')
session.add(fake_user)
session.query(User).filter(User.name.in_(['Edwardo', 'fakeuser'])).all() # [<User(name='Edwardo', fullname='Ed Jones', nickname='eddie')>, <User(name='fakeuser', fullname='Invalid', nickname='12345')>]
session.rollback()
ed_user.name # u'ed'
fake_user in session # False
session.query(User).filter(User.name.in_(['ed', 'fakeuser'])).all() # [<User(name='ed', fullname='Ed Jones', nickname='eddie')>]
for instance in session.query(User).order_by(User.id): print(instance.name, instance.fullname) # ed Ed Jones # wendy Wendy Williams # mary Mary Contrary # fred Fred Flintstone for name, fullname in session.query(User.name, User.fullname): print(name, fullname) for row in session.query(User, User.name, User.name.label('name_label')).all(): print(row.User, row.name, row.name_label) from sqlalchemy.orm import aliased user_alias = aliased(User, name='user_alias') for row in session.query(user_alias, user_alias.name).all(): print(row.user_alias) # <User(name='ed', fullname='Ed Jones', nickname='eddie')> # <User(name='wendy', fullname='Wendy Williams', nickname='windy')> # <User(name='mary', fullname='Mary Contrary', nickname='mary')> # <User(name='fred', fullname='Fred Flintstone', nickname='freddy')> # filter可以接受比filter_by更灵活的SQL表达式语言结构 for name, in session.query(User.name).filter_by(fullname='Ed Jones'): pass for name, in session.query(User.name).filter(User.fillname=='Ed Jones'): pass # limit与offset可以使用python的切分或者 limit与offset 方法
query.filter(User.name == 'ed') # equals
query.filter(User.name != 'ed') # not equals
query.filter(User.name.like('%ed%')) # like
query.filter(User.name.ilike('%ed%')) # like(不区分大小写)
query.filter(User.name.in_(['ed', 'wendy', 'jack'])) # in
query.filter(User.name.in_(session.query(User.name).filter(User.name.like('%ed%')))) # in subquery
query.filter(~User.name.in_(['ed', 'wendy', 'jack'])) # not in
query.filter(User.name == None) # is NULL 或者 query.filter(User.name.is_(None))
query.filter(User.name != None) # is not NULL 或者 query.filter(User.name.isnot(None))
from sqlalchemy import and_, or_, not_
query.filter(and_(User.name == 'ed', User.fullname == 'Ed Jones')) # and 或者 query.filter(User.name == 'ed', User.fullname == 'Ed Jones') 或者 query.filter(User.name == 'ed').filter(User.fullname == 'Ed Jones')
query.filter(or_(User.name == 'ed', User.fullname == 'Ed Jones')) # or
query.filter(User.name.match('wendy')) # match
query = session.query(User).filter(User.name.like('%ed')).order_by(User.id)
query.all() # 返回列表
query.first()
query.one() # 如果有多行或者找不到则报错
query.one_or_none() # 多结果时报错,否则返回但结果或者None
query.scalar() # 调用one方法,并在成功时返回该行的第一列
from sqlalchemy import text
for user in session.query(User).filter(text('id < 224')).order_by(text('id')).all():
print(user)
session.query(User).filter(text('id < :value and name = :name')).params(value=224, name='fred').order_by(User.id).one()
session.query(User).from_statement(text('select * from users where name = :name')).parmas(name='ed').all() # 要使用sql语句,需要使用from_statement
stmt = text('select name, id, fullname, nickname from users where name = :name')
stmt = stmt.columns(User.name, User.id, User.fullname, User.nickname) # 绑定返回类型,结果返回User
session.query(User).from_statement(stmt).params(name='ed').all() # [<User(name='ed', fullname='Ed Jones', nickname='eddie')>]
stmt = text('select name, id from users where name = :name')
stmt = stmt.columns(User.name, User.id) # 绑定返回类型
session.query(User.id, User.name).from_statement(stmt).params(name='ed').all() # [(1, u'ed')]
query.count()
from sqlalchemy import func
session.query(func.count(User.name), User.name).group_by(User.name).all() # [(1, u'ed'), (1, u'fred'), (1, u'mary'), (1, u'wendy')]
session.query(func.count('*')).select_from(User).scalar() # 4
session.query(func.count(User.id)).scalar() # 4
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
class Address(Base):
__tablename__ = 'addresses'
id = Column(Integer, primary_key=True)
email_address = Column(String(50), nullable=False)
user_id = Column(Integer, ForeignKey('users.id'))
user = relationship('User', back_populates='addresses') # 相当于一个关联的User实例
def __repr__(self):
return '<Address(email_address="%s")>' % self.email_address
User.addresses = relationship('Address', order_by=Address.id, back_populates='users') # 相当于一系列关联的Address实例
Base.metadata.create_all(engine)
jack = User(name='jack', fullname='Jack Bean', nickname='gjffdd')
jack.addresses # 默认是[]
jack.addresses = [Address(email_address='jack@google.com'), Address(email_address='j25@yahoo.com')]
jack.addresses[1] # <Address(email_address='j25@yahoo.com')>
jack.addresses[1].user # <User(name='jack', fullname='Jack Bean', nickname='gjffdd')>
session.add(jack)
session.commit()
for u, a in session.query(User, Address).filter(User.id==Address.user_id).filter(Address.email_address=='jack@google.com').all():
print(u, a, sep='\n')
# <User(name='jack', fullname='Jack Bean', nickname='gjffdd')>
# <Address(email_address='jack@google.com')>
for u in session.query(User).join(Address).filter(Address.email_address=='jack@google.com').all(): # 因为有foreign key,所以join知道如何关联
print(u)
# 在没有foreign key时,可以如下:
query.join(Address, User.id==Address.user_id)
query.join(User.addresses)
query.join(Address, User.addresses)
query.join('addresses')
# 另外,还有 left outer join:
query.outerjoin(User.addresses)
# query中如果需要有多个实体
query = session.query(User, Address).select_from(Address).join(User) # 先选择Address,然后根据key选择User
adalias1 = aliased(Address)
adalias2 = aliased(Address)
for username, email1, email2 in session.query(User.name, adalias1.email_address, adalias2.email_address). \
join(adalias1, User.addresses).join(adalias2, User.addresses). \
filter(adalias1.email_address=='jack@google.com').filter(adalias2.email_address=='j25@yahoo.com'):
print(username, email1, email2)
select users.*, adr_count.address_count from users left outer join
(select user_id, count(*) as address_count from addresses group by user_id) as adr_count
on users.id = adr_count.user_id
stmt = session.query(Address.user_id, func.count('*').label('address_count')).group_by(Address.user_id).subquery()
result = session.query(User, stmt.c.address_count).outerjoin(stmt, User.id==stmt.c.user_id).order_by(User.id)
stmt = session.query(Address.user_id, func.count('*').label('address_count')).group_by(Address.user_id).subquery()
adalias1 = aliased(Address, stmt)
for user, address in session.query(User, adalias1).join(adalias1, User.addresses):
print(user, address, sep='\t')
from sqlalchemy.sql import exists
stmt = exists().where(Address.user_id==User.id)
for name, in session.query(User.name).filter(stmt):
print(name)
# 等价于
for name, in session.query(User.name).filter(User.addresses.any()):
print(name)
# has
for name, in session.query(User.name).filter(User.addresses.any(Address.email_address.like('%google%'))):
print(name)
# 其他
session.query(Address).filter(~Address.user.has(User.name=='jack')).all()
query.filter(Address.user == someuser) # many-to-one “equals” comparison
query.filter(Address.user != someuser) # many-to-one “not equals” comparison
query.filter(Address.user == None) # many-to-one comparison
query.filter(User.addresses.contains(someaddress)) # one-to-many
query.filter(User.addresses.any(Address.email_address == 'bar')) # one-to-many
query.filter(User.addresses.any(email_address == 'bar')) # one-to-many
query.filter(User.addresses.has(name == 'ed')) # used for scalar references
session.query(Address).with_parent(someuser, 'addresses') # used for any relationship
from sqlalchmey.orm import selectinload
jack = session.query(User).options(selectinload(User.addresses)).filter_by(name='jack').one()
print(jack, jack.addresses, sep='\n')
from sqlalchemy.orm import joinedload
jack = session.query(User).options(joinedload(User.addresses)).filter_by(name='jack').one()
print(jack, jack.addresses, sep='\n')
from sqlalchemy.orm import contains_eager
jacks_addresses = session.query(Address).join(Address.user).filter(name=='jack').options(contains_eager(Address.user)).all()
print(jacks_address[0].user, jacks_addresses, sep='\n')
session.delete(jack) session.query(User).filter_by(name='jack').count() # 0 session.query(Address).filter(Address.email_address.in_(['jack@google.com', 'j25@yahoo.com'])).count() # 2 # sqlalchemy不会自己删除有外键依赖这些的对象的,需要自己来做 session.close() Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String) fullname = Column(String) nickname = Column(String) address = relationship('Address', back_populates='users', cascade='all, delete, delete-orphan') # TODO,重点 def __repr__(self): return "<User(name='%s', fullname='%s', nickname='%s')>" % (self.name, self.fullname, self.nickname) class Address(Base): __tablename__ = 'addresses' id = Column(Integer, primary_key=True) email_address = Column(String, nullable=False) user_id = Column(Integer, Foreignkey('users.id')) user = relationship('User', back_populates='addresses') def __repr__(self): return "<Address(email_address='%s')>" % self.email_address Base.metadata.create_all(engine) # 可以正常删除address,不影响user jack = session.query(User).get(5) # get根据primary_key del jack.addresses[1] session.query(Address).filter(Address.email_address.in_(['jack@google.com', 'j25@yahoo.com'])).count() # 1 # 删除user可以删除相关的addresses session.delete(jack) session.query(User).filter_by(name='jack').count() # 0 session.query(Address).filter(Address.email_address.in_(['jack@google.com', 'j25@yahoo.com'])).count() # 0
from sqlalchemy import Table, Text post_keywords = Table("post_keywords", Base.metadata, Column('post_id', ForeignKey('posts.id'), primary_key=True), Column('keyword_id', ForeignKey('keywords.id'), primary_key=True) ) class BlogPost(Base): __tablename__ = 'posts' id = Column(Integer, primary_key=True) user_id = Column(Integer, ForeignKey('users.id')) headline = Column(String(255), nullable=False) body = Column(Text) # many to many BlogPost<->Keyword keywords = relationship('Keyword', secondary=post_keywords, back_populates='posts') def __init__(self, headline, body, author): self.author = author self.headline = headline self.body = body def __repr__(self): return "BlogPost(%r, %r, %r)" % (self.headline, self.body, self.author) class Keyword(Base): __tablename__ = 'keywords' id = Column(Integer, primary_key=True) keyword = Column(String(50), nullable=False, unique=True) posts = relationship('BlogPost', secondary=post_keywords, back_populates='keywords') def __init__(self, keyword): self.keyword = keyword BlogPost.author = relationship(User, back_populates="posts") User.posts = relationship(BlogPost, back_populates="author", lazy="dynamic") Base.metadata.create_all(engine) wendy = session.query(User).filter_by(name='wendy').one_or_none() # not None post = BlogPost("Wendy's Blog Post", "This is a test", wendy) session.add(post) post.keywords.append(Keyword('wendy')) post.keywords.append(Keyword('firstpost')) session.query(BlogPost).filter(BlogPost.keywords.any(keyword='firstpost')).all() # [BlogPost("Wendy's Blog Post", 'This is a test', <User(name='wendy', fullname='Wendy Williams', nickname='windy')>)] session.query(BlogPost).filter(BlogPost.author==wendy).filter(BlogPost.keywords.any(keyword='firstpost')).all() # [BlogPost("Wendy's Blog Post", 'This is a test', <User(name='wendy', fullname='Wendy Williams', nickname='windy')>)] wendy.posts.filter(BlogPost.keywords.any(keyword='firstpost')).all()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。