赞
踩
作为服务来说,最好能够以规范的方式向外提供数据查询。这里探讨如何利用flask_sqlalchemy定义、组织数据表,并提供查询的方式。
目的:要在flask服务之下方便的调取表格型数据
【内容稍多,剩下一些以后再修补吧】
flask_sqlalchemy是从sqlalchemy发展而来,主要是对几种结构化数据库进行ORM处理。简单来说就是用对象的方式来操作数据库。
一些操作内容可以参考这个教程
以下按创建数据库对象、初始化数据库、数据库基本操作(增删改查)三部分进行操作介绍。
这里还是先做一个假设:这里的数据对象是为flask web服务设计的。因此在操作之前,其文档结构如下:
├── app
│ ├── auth
│ ├── datamodel.py
│ ├── static
│ └── templates
├── config.py
├── init_user.py
├── manager_debug.py
其中,app是整个的项目文件夹,其中与本词内容相关的有datamodel.py和manager_debug.py两个文件。
_ init _.py : 在app的初始化中实例了SQLAlchemy对象(db)
# >>>>>>>>>>>>>>>>>>> 1 数据对象
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
db = SQLAlchemy()
datamodel.py: 导入了db, 剩下的许多包是为了定义用户的类导入的。主要是关于登录状态、密码等的。
from . import db , ValidationError, login_manager from flask import current_app from datetime import datetime from flask_login import UserMixin, login_required, current_user from flask_login import login_user, logout_user, AnonymousUserMixin # 密码的哈希创建及校验 from werkzeug.security import generate_password_hash, check_password_hash # 定义确认方法 from itsdangerous import TimedJSONWebSignatureSerializer as Serializer class Users(UserMixin, db.Model): __tablename__ = 'users' # 1 ID类 id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(64), unique=True, index=True) # 唯一约束,加索引 email = db.Column(db.String(64), unique=True, index=True) # 唯一约束,加索引 mobile = db.Column(db.String(11), unique=True, index=True) # 唯一约束,加索引 password_hash = db.Column(db.String(128)) confirmed = db.Column(db.Boolean, default=False) register_time = db.Column(db.DateTime(), default=datetime.utcnow) ...
manager_debug.py:这里导入了各种对应的数据模型(Users等),并且使用Manager为该app做了一个shell(开启并保持上下文)。我们之后就要通过shell进行数据操作。
from app import create_app from flask_script import Manager, Shell from flask import current_app # 数据对象 from app import db from app.datamodel import Users, Roles, Caps, Trans, Servs app = create_app('test_msg') # 增加markdown for jinja from markdown import markdown # markdown 测试:Markdown:使用python实现服务器端的Markdown到HTML的转化。 @app.template_filter('md') def markdown_to_html(txt): return markdown(txt) manager = Manager(app) # 制作Shell字典 shell_dict = {} shell_dict['app'] = app shell_dict['db'] = db shell_dict['Users'] = Users shell_dict['Roles'] = Roles shell_dict['Caps'] = Caps shell_dict['Trans'] = Trans shell_dict['Servs'] = Servs def make_shell_context(shell_dict=shell_dict): return shell_dict manager.add_command('shell', Shell(make_context=make_shell_context))
如何确定是设立外键还是反向引用?
class Users(UserMixin, db.Model):class Users(UserMixin, db.Model): ... # users 和 roles是 「多对一」的关系,产生一个新列,作为外键 role_id = db.Column(db.Integer, db.ForeignKey('roles.id')) ... class Roles(db.Model): ... users = db.relationship('Users', backref='role', lazy='dynamic') ... # 某个用户 some_user = Users() # 引用role some_user.role # 某个角色 some_role = Roles() # 引用user some_role --- In [4]: some_user = Users().query.filter_by(username='admin').first() In [5]: some_user Out[5]: <User 'admin'> In [6]: some_user.role Out[6]: <Role 'Super_Manager'> --- In [14]: some_role = Roles.query.filter_by(name='Super_Manager').first() In [15]: some_role Out[15]: <Role 'Super_Manager'> In [16]: some_role.users.all() Out[16]: [<User 'admin'>]
切换到项目目录下,使用命令进入shell,可以看到创建了app并打开了一个ipython交互器。
└─ $ python3 manager_debug.py shell
*** creating app by Config class
db.session.add
db.session.commit
# 方法一:使用db
db.session.query
# 方法二:使用对象
Users.query.filter_by().all()
可以用,但是感觉还是有点麻烦,而且有些bug比较龟毛。个人觉得还是直接写一个数据库搬迁的脚本方便。(所以核心的数据和运行数据分开,搬起来不累)
±---------------------+
| Tables_in YOUR DB |
±---------------------+
| caps |
| roles |
| servs |
| trans |
| users
在开发程序的过程中,你会发现有时需要修改数据库模型,而且修改之后还需要更新数据库。
仅当数据库表不存在时,Flask-SQLAlchemy 才会根据模型进行创建。因此,更新表的唯一
方式就是先删除旧表,不过这样做会丢失数据库中的所有数据。
更新表的更好方法是使用数据库迁移框架。源码版本控制工具可以跟踪源码文件的变化,
类似地,数据库迁移框架能跟踪数据库模式的变化,然后增量式的把变化应用到数据库中。— 狗书(77 of 229)
用migrate的方法,目的是既改变数据结构,又保存了数据。再赞一下狗书,大概看十分钟,操作十分钟就验证完毕了,的确很清晰易懂。总共就五步(第五步取消迁移可以不算)
第一步:修改manager_debug.py,增加数据库迁移功能。增加了之后才能执行后面的几条命令。
# --- 增加迁移
from flask_migrate import Migrate,MigrateCommand #flask 迁移数据
migrate = Migrate(app, db) # flask 迁移数据. 传入2个对象一个是flask的app对象,一个是SQLAlchemy
manager = Manager(app)
manager.add_command('db',MigrateCommand)#flask 迁移数据 ,给manager添加一个db命令并且传入一个MigrateCommand的类
第二步:初始化,生成migrations文件夹
python3 manager_debug.py db init
第三步:自动创建迁移脚本,这个在我看来有点像是一个链条的起点。
python3 manager_debug.py db migrate -m "Initial migration"
第四步:应用更改,增加本次新增的列,可以看到,在保留的数据情况下增加了一个列。
python3 manager_debug.py db upgrade
第五步:使用downgrade, 删除本次修改,把test_col删除
python3 manager_debug.py db downgrade
不过正如狗书提到的,数据库的设计本身更加重要。未来我会在图算法的内容里探讨使用图的方法(结合neo4j)建立一个既灵活又稳定的数据库。
数据库的设计和使用是很重要的话题,甚至有整本的书对其进行介绍。你应该把本章视做一个概览,更高级的话题会在后续各章中讨论。
一个用户可能有n个上传文件。这个表(upfiles)的字段如下:
id | 原始文件名 | 英文+日期文件名 | 文件名MD5 | 日期(字符) | 创建日期(Datetime) | 更新日期(datetime) | 计算状态 |
---|---|---|---|---|---|---|---|
主键 | 可以是任何合法字符(头尾无空格) | 英文+日期 | MD5 | 日期字符作为子文件夹分类目录 | datetime格式用于排序和筛选 | 反馈的时间 | 根据响应结果修改 |
文件名MD5 | 文件序号 |
---|---|
主键 | 外键 |
文件序号 | 年份 | 行业 | 序号 | 指标号 | 标签组 | 当前值 | 参考值 | 创建日期(Datetime) |
---|---|---|---|---|---|---|---|---|
主键 | 按年划分的话 | 如果有行业的话 | 对指标指定的序 | 指标名称 | 标签组 | 当前计算的值 | 参考的值 | 计算完成的时间 |
注:如果对指标有解释可以另外挂一张表
标签组 | 标签号 | 创建日期 | 是否有效 |
---|
指标号 | 指标名称 | 指标解释 |
---|
标签号 | 标签名称 | 标签解释 |
---|
整个逻辑应该是,给用户表反向挂文件表,给文件表反向挂指标表和标签表。从冗余占用空间和存储的角度看,通过某个id(xxx_id)将某个字段替代,然后再建一张新表连接。这样大量重复的存储都是存数字(id), 而不是长文本。举个栗子,如果要直接存用户角色字段,那么可能是administrator之类的,但如果用role_id表示,可能就是1。反向连接的意义则在于我们可以通过另一端的入口往回找,例如我们看有多少的角色为administrator的用户。
如果增加了新表直接db.create_all(), 创建新表但不会处理老表(类似 create table if not exists)
class Users(UserMixin, db.Model): __tablename__ = 'users' ... upfiles = db.relationship('Upfiles', backref='users', lazy='dynamic') ... ''' upfiles: 上传文件表 1 id 1 2 original_filename abc 3 en_filename abc_20200101_123456.xlsx 4 en_filename_md5 xxxxxxxx 5 date_str 2020-01-01 6 create_datetime 2020-01-01:00:00:00 7 update_datetime 2020-01-01:00:00:00 8 status xxxx ''' class Upfiles(db.Model): __tablename__ = 'upfiles' id = db.Column(db.Integer, primary_key=True) original_filename = db.Column(db.String(64)) en_filename = db.Column(db.String(64)) en_filename_md5 = db.Column(db.String(32)) date_str = db.Column(db.String(10)) create_time = db.Column(db.DateTime(), default=datetime.utcnow) update_datetime = db.Column(db.DateTime()) status = db.Column(db.String(10)) # 反向链接 users = db.Column(db.Integer, db.ForeignKey('users.id')) # 展示名称 def __repr__(self): return '<Upfiles %r>' % self.name
from app.datamodel import Users, Roles, Caps, Trans, Servs, Upfiles
shell_dict['Upfiles'] = Upfiles
python3 manager_debug.py db upgrade
python3 manager_debug.py shell
# 新建一个文件 some_file_dict = {} some_file_dict['original_filename'] = 'abc' some_file_dict['en_filename'] = 'abc_20200101_123456.xlsx' some_file_dict['en_filename_md5'] = 'xxxxxxxx' some_file_dict['date_str'] = '2020-01-01' some_file = Upfiles(**some_file_dict) some_file.user_id = 4 db.session.add(some_file) db.session.commit() some_file = Upfiles.query.filter_by(id=1).first() # --- 结果增加了文件,并且两端都挂上了,实验成功 In [2]: some_file = Upfiles.query.filter_by(id=1).first() ...: In [3]: some_file.users Out[3]: <User 'andy'> In [4]: some_user = Users.query.filter_by(username='andy').first() ...: In [5]: some_user.upfiles Out[5]: <sqlalchemy.orm.dynamic.AppenderBaseQuery at 0x11ddfec50>
同步的,数据库操作也成功了。
看看某个版本的迁移文件,可以参考 , 狗书里没有介绍太多,所以使用的时候有点小问题。我不知道怎么切换分支到原始状态,所以就直接只在一个分支上修改。似乎如果没有改变字段,upgrade只会这里也不怎么展示。算了,这里放弃思考了,反正能实现改动,小问题我就直接改版本里的文件了。
"""Initial migration Revision ID: 8cd4a6b16d2e Revises: Create Date: 2020-08-30 23:36:58.010444 """ from alembic import op import sqlalchemy as sa # revision identifiers, used by Alembic. revision = '8cd4a6b16d2e' down_revision = None branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### op.add_column('users', sa.Column('test_col', sa.Integer(), nullable=True)) # ### end Alembic commands ### def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.drop_column('users', 'test_col') # ### end Alembic commands ###
常用功能吧,flask的pagination挺简洁的。切到shell里面,进行数据库操作。下面按照每页三条的限制查询并返回第一页。当然,配合Jinja才最好,参考
page_idx = 1
per_page = 3
# paginate 测试
users_pagi = Users.query.order_by('username').paginate(page_idx, per_page, error_out = False)
users_pagi_res_list = users_pagi.items
users_pagi_res_list1 = users_pagi.next().items
print(users_pagi_res_list)
print(users_pagi_res_list1)
---
[<User 'a'>, <User 'b'>, <User 'c'>]
[<User 'd'>, <User 'e'>, <User 'f'>]
该有的几个属性都有,而且很容易用(假设查询实例是pagi):
1.pagi.total: 总记录数
2.pagi.pages:总页数
3.pagi.has_next: 是否有下页
4.pagi.has_prev: 是否有前一页
5.pagi.items: 当前分页实例的内容(对象列表)
6.pagi.iter_pages() : 页码的迭代对象
7.pagi.next(): 下一个迭代分页对象
8.pagi.prev(): 上一个迭代分页对象
9.pagi.next_num: 下一页页码
10.pagi.prev_num: 上一页页码
11.pagi.page: 当前页码
12.page.per_page: 每页的条数
13.page.query: 查询对象
一个查询数据并转为数据框的例子。因为本身是通过数据对象操作和连接的,因此反而怎么好写函数。先这样吧。
user_uploads = Upfiles.query.filter_by(user_id= 4).all() # 以行为单位进行连接 res_list = [] for res in user_uploads: tem_dict = OrderedDict() # 本表字段 tem_dict['filename'] = res.original_filename # 外键 tem_dict['user_id'] = res.user_id # 外键表字段 tem_dict['user_name'] = res.users.username res_list.append(tem_dict) res_df = pd.DataFrame(res_list)
假设要完成一次线上的修改,需要迁移/修改的部分:
1 数据对象 datamodels.py(修改服务端文件)。 将更新好的文件一次性写入覆盖。
2 数据表初始化 db_modified_v1.py(增加服务端文件)。 准备在shell里create all。
3 修改manager_dubug/prod.py (修改服务端文件)。增加新的数据类。
4 新的视图函数
db_modified_v1.py
from app import db from app.datamodel import Users, Roles, Caps, Trans, Servs, generate_password_hash, Upfiles, Kpi, Reports, ReportType import dateutil.parser import pytz import pandas as pd import DataManipulation as dm from collections import OrderedDict from datetime import datetime ''' 如果做好了迁移可以db.drop_all()再db.create_all()。否则只能create_all() ''' # 本机上drop all ,服务器上不要 # db.drop_all() # 新建表 db.create_all() print('* db create all tables define in app.datamodel')
manager_dubug/prod.py 本次增加4个类:
# v1新增
from app.datamodel import Upfiles, ReportType, Reports, Kpi
shell_dict['Upfiles'] = Upfiles
shell_dict['ReportType'] = ReportType
shell_dict['Reports'] = Reports
shell_dict['Kpi'] = Kpi
本地测试操作:
python3 manager_debug.py shell
In [1]: run db_modified_v1.py
* db create all tables define in app.datamodel
import pandas as pd import pymysql as pyl import numpy as np import DataManipulation as dm from datetime import datetime # 如果是本地的话,写入对应的数据表哦 if env.lower() == 'local': user_df = pd.read_excel('server_user.xlsx', index=False) # user_df = user_df.dropna() user_df['mobile'] = user_df['mobile'].fillna(0).apply(int).apply(str) # 制作val_list field_list = list(user_df.columns) field_list = ['id', 'username', 'email', 'mobile','password_hash','confirmed','register_time','ttl'] val_list = [] for i in range(len(user_df)): tem_dict = dict(user_df.iloc[i]) if tem_dict['id'] > 2: tem_tuple = tuple([tem_dict[x] for x in field_list]) val_list.append(tem_tuple) dm.mysql_insert_rows('users', field_list, val_list, cfg_mysql) ... 其他静态表写入
服务端操作:
1 更新datamodels.py,新建db_modified_v1.py ,更新manager_dubug/prod.py 推送至服务端
2 静态表的写入
3 新数据表的静态数据。数据表.py(本地连接服务端数据库修改)。使用pymysql将一些静态数据写入。
1 静态表(静态资源):一般不依赖其他表(或资源),使用时一般在服务器启动时一次性载入
2 动态表(动态资源):在使用时进行即时的查询/调用,有可能会有外键字段(即依赖于其他表或资源)。
静态表的意义。
动态表:
从维护方式上,数据表(资源)可以分为主表和日志表。
1 主表(Master Table)。一个ID对应唯一的一行,或者是一个对象。
2 日志表(Log Table)。一个ID对应着一次操作。
主表通常是模型所需的数据形态。从张量的角度来看,每个ID相当于每条记录的标志,对应多维矩阵第一层的序号。每条记录则可视为一维向量,所以传统结构化数据的模型处理的都是二维矩阵(sklearn)。如果使用pytorch处理图片,那么就是三维或者四维矩阵,对应的数据是灰度图(只有一个矩阵)或者彩色图(RGB矩阵)。主表通常有create_time和update_time。
日志表通常是时间序列模型需要的数据形态。通常来说只有create_time。有时会通过提取日志表展示的简单时序特征送给主表,例如从交易日志表中提取特征给到主表,用于建模。
日志表的量是非常大的,其中也会有许多错误 ,因此有必要做一些tag。例如设置一个source字段,代表了来自不同source的数据。一个想象中的日志表:
id | log_id | val | val_id | source | source_id | create_time | is_enable | opr_time |
---|---|---|---|---|---|---|---|---|
自增ID | 时间戳微秒级整型数 * 1e6 + random(1, 1e6) | 文本/数值 | abc123 | 更新源 | 更新算法id | 创建时间 | 是否有效 | 操作时间 |
从图的角度上,数据表(资源)可以分为属性和关系。
1 属性表(Attr Table)
2 关系表(Rel Table)
为什么要从图的角度来看?
图里自然只有节点和边,节点和边都可以有n个属性。现在假设节点和边大致是对的,但是可能有错,也可能需要进行修改。创建节点和边时,我们可以有一个大致准确的ID(例如使用身份证作)。但有一些数据我们只有大致准确的ID,例如电话号码,可能A号码和B号码是同一个人的。
如果我们想象一个完美的情况:在一个全部由实体构成的世界里,每个实体的每个变化都被忠实的记录,任何时刻都可以进行诸如A近期向外拨了多少电话的查询(假设A有固化、手机、微信语音…)。
当然,完美的世界不存在。但是通过一些技术和机制,我们可以构建一个大致的世界,然后通过有组织的推断以及有限的标记来达到应用的目的。例如,在有限的信息之下,我们建立了A和B两个节点(但其实他们可能是同一节点)。通过某个source的算法(source_id)我们做出了合并节点的推断,然后建立一个新节点C,C拥有A和B的合并属性。除了普通的属性之外,C还拥有其他的操作属性/标签。例如C.is_infer 表示C节点是否是推断生成的,C.is_manual表示是否是手工修改的, C.is_correct是由人做出的对错标签…
通过诸多的推断完成大部分数据的融合以及缺失数据修补,通过算法来管理海量的推断,通过人工的干预/标记来加强算法的效果,在应用时可以选择一个合适的Schema来完成任务。这样就完成了不完美前提下数据-> 信息 -> 决策的整个过程。
方法:
从ID的角度,可以分为自动/手动,有序/无序,数值/字符这几种形式。
1 自动:数据库自增ID。
2 手动:用户定义的ID包括有序和无序。
是否需要ID?是否可以预计/安排ID?
什么数据用什么数据库存?
维度的切割。例如不同公司的客户放不同表。
mysql 分库。例如按月份分库分表。
note:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。