赞
踩
前两篇介绍了全栈系统里面移动端和前端:
移动端篇:H5+搭建移动端应用
前端篇:Vue2.0搭建PC前端
项目线上地址:项目访问链接,账号:admin 密码:admin
本文讲述用flask创建一个简单的后台系统,系统包含以下内容:
登录模块:利用flask中session实现
用户部门模型:定义部门信息数据模型
用户职位模型:定义职位信息数据模型
用户分组模型:定义分组信息数据模型
用户角色模型:定义角色信息数据模型
用户信息模型:定义用户信息数据模型,演示用户增、删、改、查业务实现
开发语言:python3.7
数据库:MySQL5.7
开发框架:Flask0.10.1
开发工具:PyCharm2008
其他工具1:XAPP(集成apach和mysql)
其他工具2:Navicat 11.0.10
项目创建完成后运行如下图:
为了项目工程化,我们需要创建一个文件包FlaskDemo,创建好后,把资源文件static和模板文件templates移到包里面,如图
项目里面需要用到session、数据库访问、邮件、WebSocket,需要提前安装,命令如下:
venv\Scripts\python.exe -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple/ -r requirements.txt
requirements.txt内容如下:
blinker==1.4 cachelib==0.1 certifi==2020.4.5.1 cffi==1.14.0 chardet==3.0.4 click==7.1.2 Flask==1.1.2 Flask-Cors==3.0.8 Flask-Mail==0.9.1 Flask-Session==0.3.2 Flask-Sockets==0.2.1 Flask-SQLAlchemy==2.4.3 gevent==20.5.2 gevent-websocket==0.10.1 greenlet==0.4.15 idna==2.9 itsdangerous==1.1.0 Jinja2==2.11.2 MarkupSafe==1.1.1 pycparser==2.20 PyMySQL==0.9.3 PySnooper==0.4.1 redis==3.5.2 requests==2.23.0 six==1.15.0 SQLAlchemy==1.3.17 urllib3==1.25.9 Werkzeug==1.0.1 zope.event==4.4 zope.interface==5.1.0
项目目录结构如上图,文件和目录的说明如下:
controls:数据模型定义和业务操作
logs:日志文件目录
sources:用户上传资源文件目录
static:前端页面打包发布后存放的目录
templates:flask模板文件目录,这个目录下面的html是浏览器访问的入口
viewsclient:定义路由,并把请求分发到相应的数据模型
init.py:创建Flask App,初始化数据模型等操作
common.py:定义公共方法
config.py:flask配置文件
manager.py:flask启动管理
models.py:数据模型初始化
status_code.py:定义API请求返回码
tests:单元测试代码目录
这个文件里面配置flask的基本配置:如数据库引擎、登录验证、邮件等
class Config: DEBUG = False # 默认数据库引擎,数据库根目录 SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://{0}:{1}@{2}:3306'.format(DB_USER, DB_PWD, DB_ADDR) # 配置多个数据库连接 SQLALCHEMY_BINDS = { 'client': 'mysql+pymysql://{0}:{1}@{2}:3306/{3}?charset=utf8'.format(DB_USER, DB_PWD, DB_ADDR, DB_NAME) } # 在app设置里开启自动提交会出现 sqlalchemy.exc.InvalidRequestError: Can't reconnect until invalid transaction # is rolled back SQLALCHEMY_COMMIT_ON_TEARDOWN = False SQLALCHEMY_TRACK_MODIFICATIONS = False # 关闭数据追踪,避免内存资源浪费 # session配置,用于登录验证 # openssl rand -hex 32 SECRET_KEY = "dc96245b9fa5070a4994f761a772592b0147b32ccd9c0e3150bb30371e727f70" # 将session存储到redis中 SESSION_TYPE = "redis" SESSION_USE_SIGNER = True PERMANENT_SESSION_LIFETIME = 30*60 # 秒 # 文件上传配置 UPLOAD_FOLDER = 'static/uploads/' # 上传目录 MAX_CONTENT_LENGTH = 100 * 1024 * 1024 # 上传大小限制 # 邮件发送设置 MAIL_SERVER = 'smtp.ym.163.com' # 邮箱服务器 MAIL_USERNAME = 'xxxxxxxQS@163.com' # 邮箱用户 MAIL_PASSWORD = 'xxxxxxx' # 用户密码
这个文件里面配置flask启动管理
def create_app(config): app = Flask(__name__) app.config.from_object(config) app.url_map.converters['html'] = HTMLConverter # session初始化,将session存储到redis中 Session(app) # 系统需要的资源初始化 system_init() # 日志处理 import logging from logging.handlers import RotatingFileHandler logging.basicConfig(level=logging.DEBUG) file_log_handler = RotatingFileHandler(SYSTEM_LOGS_FILE, maxBytes=1024 * 1024 * 10, backupCount=10) # 日志文件最大10M,最多备份10个 formatter = logging.Formatter('%(asctime)s %(levelname)s %(filename)s:%(lineno)d %(message)s') file_log_handler.setFormatter(formatter) logging.getLogger().addHandler(file_log_handler) logging.getLogger().setLevel(logging.DEBUG) # 处理日志不能显示到终端的问题 return app
这个文件定义接口返回码
class RET: OK = "0" DBERR = "4001" NODATA = "4002" DATAEXIST = "4003" DATAERR = "4004" SESSIONERR = "4101" LOGINERR = "4102" PARAMERR = "4103" USERERR = "4104" ROLEERR = "4105" PWDERR = "4106" CODEERR = "4107" AUTHEERR = "4108" KEYEERR = "4109" LOGINEERR = "4110" REQERR = "4201" IPERR = "4202" THIRDERR = "4301" IOERR = "4302" SERVERERR = "4500" UNKOWNERR = "4501" DATATYPE = '4006' DATAMISS = '4005' ret_map = { RET.OK: u"成功", RET.DBERR: u"数据库查询错误", RET.NODATA: u"无数据", RET.DATAEXIST: u"数据已存在", RET.DATAERR: u"数据错误", RET.SESSIONERR: u"用户未登录", RET.LOGINERR: u"用户登录失败", RET.PARAMERR: u"参数错误", RET.USERERR: u"用户不存在或未激活", RET.ROLEERR: u"用户身份错误", RET.PWDERR: u"密码错误", RET.CODEERR: u"验证码错误", RET.AUTHEERR: u"权限错误", RET.KEYEERR: u"由于您长时间未进行任何操作,出于数据安全性要求,请重新登录", RET.LOGINEERR: u"用户已经登录", RET.REQERR: u"非法请求或请求次数受限", RET.IPERR: u"IP受限", RET.THIRDERR: u"第三方系统错误", RET.IOERR: u"文件读写错误", RET.SERVERERR: u"内部错误", RET.UNKOWNERR: u"未知错误", RET.DATATYPE: u"数据类型错误", RET.DATAMISS: u"参数缺失", }
这个文件定义公共方法
def system_init(): # 系统需要的资源初始化 my_mkdir(SYSTEM_LOGS_DIR) # 创建系统日志目录 my_mkdir(CLIENT_SOURCES_DIR) # 创建客户资源目录 def my_mkdir(path: str): # 当面目录下创建文件夹 if not os.path.exists(path): os.mkdir(path) return path def get_md5(data: str, salt: bool=True): if salt: return hashlib.md5('1qa{0}{1}[;.]'.format(data, time.time()).encode(encoding='UTF-8')).hexdigest() else: return hashlib.md5(data.encode(encoding='UTF-8')).hexdigest() def get_curr_format_time(): return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
这个文件创建Flask App
# 创建Flask App app = create_app(DevelopConfig) # 跨域请求 CORS(app, supports_credentials=True) # 创建WebSocket,用于服务器主动向web端推送消息,避免web端轮询查询 sockets = Sockets(app) # 用于发送邮件 mail = Mail(app) # 2.初始化数据库 db.init_app(app) # 3.注册蓝图 app.register_blueprint(client_page, url_prefix='/') def init_data(): with app.app_context(): # 初始化数据库配置 init_database() @sockets.route('/echo') def echo_socket(ws): key = str(ws).split(' ')[-1].rstrip('>') # 用作服务器给web端推送的标识 while not ws.closed: ws.send(json.dumps({'wskey': key}, ensure_ascii=False)) # 回传给clicent """ 服务端必须接收到客户端发的消息才能保持该服务运行,如果ws.receive()没有接收到客户端发送的 消息,那么它会关闭与客户端建立的链接 底层解释:Read and return a message from the stream. If `None` is returned, then the socket is considered closed/errored. 所以客户端只建立连接,不与服务端交互通信,则无法实现自由通信状态,之后在客户端代码处会有详细内容。 """ message = ws.receive() # 接收到消息 if message is not None: ws.send(json.dumps({'wskey': key}, ensure_ascii=False)) # 回传给clicent init_data()
这个文件初始化数据模型
from FlaskDemo.common import DB_NAME from FlaskDemo.controls.Base import db, create_db_client from FlaskDemo.controls.AccountDpart import AccountDpart from FlaskDemo.controls.AccountGroup import AccountGroup from FlaskDemo.controls.AccountPost import AccountPost from FlaskDemo.controls.AccountRole import AccountRole from FlaskDemo.controls.AccountUsers import AccountUsers def create_client_db(): # 创建所以客户数据表 create_db_client(DB_NAME) # 初始化表数据 AccountDpart().init_data() # 初始化部门数据 AccountRole.init_data() # 初始化角色数据 AccountPost.init_data() # 初始化职位数据 AccountUsers.init_data() # 初始化用户数据 AccountGroup.init_data() # 初始化用户组数据 def init_database(): # 创建客户数据库和数据表 create_client_db()
这个文件夹定义接口路由,采用的思路:定义增、删、改、查基本路由,再分发到具体的数据模型处理,这样避免定义很多个接口
定义蓝图,关联url接口地址
client_page = Blueprint('client_page', __name__) # 定义蓝图
定义跟路由关联的接口函数
@client_page.route('/')
def index():
return render_template('tempclient/login.html', error='')
浏览器输入:http://127.0.0.1:5000/就会跳转到跟路由关联的函数,即会跳转到tempclient目录下面的login.html文件
定义登录接口
# 登录入口
@client_page.route('/login', methods=['POST'])
def login():
# 接收参数
params = request.form
logging.debug(json.dumps(params, indent=4, ensure_ascii=False))
username = params.get('username') # 用户名
password = params.get('password') # 密码
code = params.get('code', '1234') # 验证码
logging.debug('username={0}, pwd={1}, code={2}'.format(username, password, code))
定义登出接口
# 用户退出
@client_page.route('/logout', methods=['POST'])
@client_is_login
def logout():
# 删除当前session中的用户名,就退出登录
if 'username' in session:
session.pop('username', None)
return jsonify(code=RET.OK, msg=ret_map[RET.OK])
定义基本路由:get、add、update、delete
# 查询数据接口 @client_page.route('/get/<qrytype>/<qryfunc>', methods=['GET']) @client_is_login def get(qrytype: str, qryfunc: str): args = request.args.to_dict() logging.debug('[get]qrytype = {0}, qryfunc = {1}'.format(qrytype, qryfunc)) logging.debug(json.dumps(args, indent=4, ensure_ascii=False)) return handle_requests(qrytype, qryfunc, args) # 新增数据接口 @client_page.route('/add/<qrytype>/<qryfunc>', methods=['POST']) @client_is_login def add(qrytype: str, qryfunc: str): args = request.form.to_dict() logging.debug('[add]qrytype = {0}, qryfunc = {1}'.format(qrytype, qryfunc)) logging.debug(json.dumps(args, indent=4, ensure_ascii=False)) return handle_requests(qrytype, qryfunc, args) # 编辑数据接口 @client_page.route('/update/<qrytype>/<qryfunc>/<Id>', methods=['PUT']) @client_is_login def update(qrytype: str, qryfunc: str, Id: int): logging.debug('[update]qrytype = {0}, qryfunc = {1}, Id = {2}'.format(qrytype, qryfunc, Id)) for form in request.form: args = form break logging.debug(json.dumps(args, indent=4, ensure_ascii=False)) return handle_requests(qrytype, qryfunc, args, Id) # 删除数据接口 @client_page.route('/delete/<qrytype>/<qryfunc>/<Id>', methods=['DELETE']) @client_is_login def delete(qrytype: str, qryfunc: str, Id: int): args = request.form.to_dict() logging.debug('[delete]qrytype = {0}, qryfunc = {1}, Id = {2}'.format(qrytype, qryfunc, Id)) logging.debug(json.dumps(args, indent=4, ensure_ascii=False)) return handle_requests(qrytype, qryfunc, args, Id)
路由分发函数
后台定义函数get_type_query,获取字典:{‘数据模型名称’: 数据模型对象}
def get_type_query():
typedict = {}
for clazz in db.Model._decl_class_registry.values():
try:
typedict[clazz.__name__] = clazz
except Exception as e:
print(e)
# print(typedict)
return typedict
函数运行结果:
{'AccountDpart': <class 'FlaskDemo.controls.AccountDpart.AccountDpart'>,
'AccountGroup': <class 'FlaskDemo.controls.AccountGroup.AccountGroup'>,
'AccountPost': <class 'FlaskDemo.controls.AccountPost.AccountPost'>,
'AccountRole': <class 'FlaskDemo.controls.AccountRole.AccountRole'>,
'AccountUsers': <class 'FlaskDemo.controls.AccountUsers.AccountUsers'>}
web前端调用url传入格式:http://127.0.0.1:5000/get/[qrytype]/[qryfunc]
qrytype:查询类型,即对应的数据模型
qryfunc:数据模型中定义的具体业务函数
如查询用户表中所有用户数据,可以这样写:http://127.0.0.1:5000/get/AccountUsers/get_value_list
@pysnooper.snoop('pysnooper.log') # 输出到文件 def handle_requests(qrytype: str, qryfunc: str, argc, Id: int = None): dbClass = get_type_query().get(qrytype) if not dbClass: return jsonify(code=RET.PARAMERR, msg=u'查询类型错误') dbcls = dbClass() if isinstance(argc, str): try: argcjson = json.loads(argc) except Exception as e: logging.error('[handle_requests]Failed to json.load, {0}'.format(e)) logging.error(traceback.format_exc()) return jsonify(code=RET.PARAMERR, msg='Failed to json.load, {0}'.format(e)) elif isinstance(argc, dict): argcjson = argc else: return jsonify(code=RET.PARAMERR, msg=u'argc查询类型错误') # print(dir(dbcls)) if qryfunc in dir(dbcls): if Id: value = dbcls.__getattribute__(qryfunc)(argcjson, Id) else: value = dbcls.__getattribute__(qryfunc)(argcjson) return value else: return jsonify(code=RET.PARAMERR, msg=qryfunc + '方法不存在,请联系开发人员')
这数据模型和业务逻辑目录
这个文件定义数据模型基类BaseModel,定义全局数据库对象db,所有数据模型继承BaseModel和db,并在BaseModel类
中定义基本的数据操作函数,执行出错会事务回滚:
update:更新记录
add_update:增加一条记录
add_all_update:增加多条记录
delete:删除一条记录
to_json:SQLAlchemy转json对象
# 客户数据库连接,用作数据模型 db = SQLAlchemy() class BaseModel(object): def update(self): # 更新记录 try: db.session.commit() # 事务提交 except Exception as e: db.session.rollback() # 事务回滚 logging.error('[BaseModel->update]Failed to run update, {0}'.format(e)) logging.error(traceback.format_exc()) return False return True def add_update(self): # 增加一条记录 try: db.session.add(self) # 添加数据对象 db.session.commit() # 事务提交 except Exception as e: db.session.rollback() # 事务回滚 logging.error('[BaseModel->add_update]Failed to run add_update, {0}'.format(e)) logging.error(traceback.format_exc()) return False return True def add_all_update(self, datas: list): # 增加多条记录 try: db.session.add_all(datas) # 添加数据对象 db.session.commit() # 事务提交 except Exception as e: db.session.rollback() # 事务回滚 logging.error('[BaseModel->add_all_update]Failed to run add_all_update, {0}'.format(e)) logging.error(traceback.format_exc()) return False return True def delete(self): # 删除一条记录 try: db.session.delete(self) # 删除数据对象 db.session.commit() # 事务提交 except Exception as e: db.session.rollback() # 事务回滚 logging.error('[BaseModel->delete]Failed to run delete, {0}'.format(e)) logging.error(traceback.format_exc()) return False return True def to_json(self): # SQLAlchemy转json对象 fields = {} for field in [x for x in self.__dict__ if not x.startswith('_') and x != 'metadata']: data = self.__getattribute__(field) try: if isinstance(data, datetime): data = data.strftime('%Y-%m-%d %H:%M:%S') json.dumps(data) # this will fail on non-encodable values, like other classes fields[field] = data except TypeError: fields[field] = None logging.error(traceback.format_exc()) return fields
这个文件是部门数据模型AccountDpart,定义用户部门信息,定义了3个业务函数:
init_data:初始化部门数据,数据模型里面根据需要定义该函数
get_count:查询数据表中数据数量,一般用于前端页面分页,所有数据模型中都要定义
get_value_list:查询和该模型相关的所有数据,所有数据模型中都要定义
# 客户部门信息数据表模型 class AccountDpart(BaseModel, db.Model): __bind_key__ = 'client' # 数据库引擎,这就表示数据库名称,在config.py中定义 __tablename__ = 'account_depart' # 设置信息->部门信息表 Id = db.Column(db.INTEGER, primary_key=True) # 部门ID, Name = db.Column(db.String(255), unique=True) # 部门名称 Remarks = db.Column(db.String(255)) # 部门描述 RegTime = db.Column(TIMESTAMP, server_default=func.current_timestamp(), index=True) # 检验限制创建时间 @staticmethod def init_data(): datadict = [{'Name': '系统部', 'Remarks': '系统超级管理员'}, {'Name': '总经办', 'Remarks': '总经理办公室'}] for data in datadict: result = AccountDpart.query.filter(AccountDpart.Name == data.get('Name', '')).first() if not result: AccountDpart(Name=data.get('Name', ''), Remarks=data.get('Remarks', '')).add_update() @staticmethod def get_count(argc: dict): value = {'count': db.session.query(func.count(AccountDpart.Id)).scalar()} return {'code': RET.OK, 'data': value} @staticmethod def get_value_list(argc: dict): results = db.session.query(AccountDpart).all() values = [] for res in results: values.append([res.to_json()]) return {'code': RET.OK, 'data': values}
这个文件是用户分组数据模型,定义用户分组信息
init_data:初始化分组数据,数据模型里面根据需要定义该函数
get_count:查询数据表中数据数量,一般用于前端页面分页,所有数据模型中都要定义
get_value_list:查询和该模型相关的所有数据,所有数据模型中都要定义
# 客户用户组信息数据表模型 class AccountGroup(BaseModel, db.Model): __bind_key__ = 'client' # 数据库引擎,这就表示数据库名称,在config.py中定义 __tablename__ = 'account_group' # 账户信息->用户组表 Id = db.Column(db.INTEGER, primary_key=True) GroupName = db.Column(db.String(255), unique=True) # 用户组名称 Permiss = db.Column(db.Text) # 用户组权限,json数组,对应访问页面的信息 Remarks = db.Column(db.String(255)) # 备注 RegTime = db.Column(TIMESTAMP, server_default=func.current_timestamp(), index=True) # 检验限制创建时间 @staticmethod def init_data(): datadict = [{'Id': 1, 'GroupName': '系统管理组', 'Permiss': '{"首页":[], "设置":[]}', 'Remarks': '超级管理组'}] for data in datadict: result = AccountGroup.query.filter(AccountGroup.Id == data.get('Id')).first() if not result: AccountGroup(Id=data.get('Id', 1), GroupName=data.get('GroupName', ''), Permiss=data.get('Permiss', ''), Remarks=data.get('Remarks', '')).add_update() @staticmethod def get_count(argc: dict): value = {'count': db.session.query(func.count(AccountGroup.Id)).scalar()} return {'code': RET.OK, 'data': value} @staticmethod def get_value_list(argc: dict): results = db.session.query(AccountGroup).all() values = [] for res in results: values.append([res.to_json()]) return {'code': RET.OK, 'data': values}
这个文件是用户职位数据模型,定义用户职位信息
init_data:初始化职位数据,数据模型里面根据需要定义该函数
get_count:查询数据表中数据数量,一般用于前端页面分页,所有数据模型中都要定义
get_value_list:查询和该模型相关的所有数据,所有数据模型中都要定义
# 客户职位信息数据表模型 class AccountPost(BaseModel, db.Model): __bind_key__ = 'client' # 数据库引擎,这就表示数据库名称,在config.py中定义 __tablename__ = 'account_post' # 设置信息->职位信息表 Id = db.Column(db.INTEGER, primary_key=True) # 职位ID, DepartId = db.Column(db.INTEGER, nullable=False) # 部门ID,对应数据表cli_acc_depart的ID Name = db.Column(db.String(255)) # 职位名称 Remarks = db.Column(db.String(255)) # 职位描述 Authview = db.Column(db.Text, default='{}') # 用户查看权限 Authoper = db.Column(db.Text, default='{}') # 用户操作权限 RegTime = db.Column(TIMESTAMP, server_default=func.current_timestamp(), index=True) # 检验限制创建时间 @staticmethod def init_data(): datadict = [{'DepartId': 1, 'Name': '系统管理员', 'Remarks': '系统管理员'}, {'DepartId': 2, 'Name': '普通用户', 'Remarks': '普通用户'}] for data in datadict: result = AccountPost.query.filter(AccountPost.DepartId == data.get('DepartId')).first() if not result: AccountPost(DepartId=data.get('DepartId', 2), Name=data.get('Name', ''), Remarks=data.get('Remarks', '')).add_update() @staticmethod def get_count(argc: dict): value = {'count': db.session.query(func.count(AccountPost.Id)).scalar()} return {'code': RET.OK, 'data': value} @staticmethod def get_value_list(argc: dict): results = db.session.query(AccountPost, AccountDpart).filter(AccountPost.DepartId == AccountDpart.Id).all() values = [] for res in results: values.append([res[0].to_json(), res[1].to_json()]) return {'code': RET.OK, 'data': values}
这个文件是用户角色数据模型,定义用户角色信息
init_data:初始化分组数据,数据模型里面根据需要定义该函数
get_count:查询数据表中数据数量,一般用于前端页面分页,所有数据模型中都要定义
get_value_list:查询和该模型相关的所有数据,所有数据模型中都要定义
# 客户角色信息数据表模型 class AccountRole(BaseModel, db.Model): __bind_key__ = 'client' # 数据库引擎,这就表示数据库名称,在config.py中定义 __tablename__ = 'account_role' # 账户信息->用户角色表 Id = db.Column(db.INTEGER, primary_key=True) # 角色ID,1超级管理员,>1表示自定义权限 RoleName = db.Column(db.String(255), unique=True) # 角色名称 Permiss = db.Column(db.Text) # 角色权限,json数组,对应访问页面的信息 Remarks = db.Column(db.String(255)) # 备注 RegTime = db.Column(TIMESTAMP, server_default=func.current_timestamp(), index=True) # 检验限制创建时间 @staticmethod def init_data(): datadict = [{'Id': 1, 'RoleName': '系统管理员', 'Permiss': '["增加","删除","修改","查询","发送","下载","安装","上传"]', 'Remarks': '系统管理员'}, {'Id': 2, 'RoleName': '普通用户', 'Permiss': '["查询","发送","下载","安装","上传"]', 'Remarks': '普通用户'} ] for data in datadict: result = AccountRole.query.filter(AccountRole.Id == data.get('Id')).first() if not result: AccountRole(Id=data.get('Id', 1), RoleName=data.get('RoleName', ''), Permiss=data.get('Permiss', ''), Remarks=data.get('Remarks', '')).add_update() @staticmethod def get_count(argc: dict): value = {'count': db.session.query(func.count(AccountRole.Id)).scalar()} return {'code': RET.OK, 'data': value} @staticmethod def get_value_list(argc: dict): results = db.session.query(AccountRole).all() values = [] for res in results: values.append([res.to_json()]) return {'code': RET.OK, 'data': values}
这个文件是分组数据模型,定义用户分组信息
init_data:初始化分组数据,数据模型里面根据需要定义该函数
get_count:查询数据表中数据数量,一般用于前端页面分页,所有数据模型中都要定义
get_value_list:查询和该模型相关的所有数据,所有数据模型中都要定义
add_value:增加用户数据,所有数据模型中都要定义
update_value:更新用户数据,根据用户ID来更新,所有数据模型中都要定义
delete_value:删除用户数据,根据用户ID来更新,所有数据模型中都要定义
# 客户用户信息数据表模型 class AccountUsers(BaseModel, db.Model): __bind_key__ = 'client' # 数据库引擎,这就表示数据库名称,在config.py中定义 __tablename__ = 'account_users' # 账户信息->用户表 Id = db.Column(db.INTEGER, primary_key=True) Name = db.Column(db.String(50), unique=True) # 用户名称 Pwd = db.Column(db.String(300)) # 用户密码 Nick = db.Column(db.String(150)) # 用户姓名 PostId = db.Column(db.INTEGER, nullable=False) # 职位ID Mobile = db.Column(db.String(150)) # 用户手机号码 Email = db.Column(db.String(150)) # 用户邮箱地址 RoleIds = db.Column(db.String(255)) # 用户角色ID,json数组 GroupIds = db.Column(db.String(255)) # 用户组ID,json数组 Authview = db.Column(db.Text, default='{}') # 用户查看权限 Authoper = db.Column(db.Text, default='{}') # 用户操作权限 Valid = db.Column(db.INTEGER, default=1) # 账号是否有效:0注销,1生效 RegTime = db.Column(TIMESTAMP, server_default=func.current_timestamp(), index=True) # 检验限制创建时间 @staticmethod def init_data(): datadict = [ {'Id': 1, 'Name': 'superadmin', 'Pwd': generate_password_hash('superadmin'), 'Nick': '超级管理员', 'PostId': 1}, {'Id': 2, 'Name': 'admin', 'Pwd': generate_password_hash('admin'), 'Nick': '超级管理员', 'PostId': 1}, {'Id': 3, 'Name': 'general', 'Pwd': generate_password_hash('general'), 'Nick': '超级管理员', 'PostId': 2}] for data in datadict: result = AccountUsers.query.filter(AccountUsers.Id == data.get('Id')).first() if not result: AccountUsers(Id=data.get('Id'), Name=data.get('Name'), Pwd=data.get('Pwd'), Nick=data.get('Nick'), PostId=data.get('PostId')).add_update() @staticmethod def get_count(argc: dict): value = {'count': db.session.query(func.count(AccountUsers.Id)).scalar()} return {'code': RET.OK, 'data': value} @staticmethod def get_value_list(argc: dict): skip, limit = argc.get('skip', 0), argc.get('limit', 100) results = db.session.query(AccountUsers, AccountPost, AccountDpart). \ filter(AccountUsers.PostId == AccountPost.Id).filter(AccountPost.DepartId == AccountDpart.Id).\ offset(skip).limit(limit).all() values = [] for res in results: values.append([res[0].to_json(), res[1].to_json(), res[2].to_json()]) return {'code': RET.OK, 'data': values} @staticmethod def add_value(argc: dict): Name = argc.get('Name') # 用户名 Nick = argc.get('Nick') # 姓名 PostId = argc.get('PostId') # 用户职位ID # 验证参数是否存在 if not all([Name, Nick, PostId]): return {'code': RET.PARAMERR, 'msg': ret_map[RET.PARAMERR]} db_user = AccountUsers.query.filter(AccountUsers.Name == Name).first() if db_user: return {'code': RET.PARAMERR, 'msg': '用户名已经存在'} db_post = AccountPost.query.filter(AccountPost.Id == PostId).first() if not db_post: return {'code': RET.PARAMERR, 'msg': '部门ID不存在'} if AccountUsers(Name=Name, Pwd=generate_password_hash('123456'), Nick=Nick, PostId=PostId).add_update(): return {'code': RET.OK, 'msg': ret_map[RET.OK]} else: logging.error(u'增加用户更新数据库失败,用户名:{0}'.format(Name)) return {'code': RET.DBERR, 'msg': ret_map[RET.DBERR]} @staticmethod def update_value(argc: dict, Id: int): db_user = AccountUsers.query.filter(AccountUsers.Id == Id).first() if not db_user: return {'code': RET.PARAMERR, 'msg': '用户ID不存在'} for k, v in argc.items(): if hasattr(db_user, k): db_user.__setattr__(k, v) if db_user.add_update(): return {'code': RET.OK, 'msg': ret_map[RET.OK]} else: logging.error(u'编辑用户更新数据库失败') return {'code': RET.DBERR, 'msg': ret_map[RET.DBERR]} @staticmethod def delete_value(argc: dict, Id: int): db_user = AccountUsers.query.filter(AccountUsers.Id == Id).first() if not db_user: return {'code': RET.PARAMERR, 'msg': '用户ID不存在'} if db_user.delete(): return {'code': RET.OK, 'msg': ret_map[RET.OK]} else: return {'code': RET.DBERR, 'msg': ret_map[RET.DBERR]}
后台源码:https://download.csdn.net/download/yyt593891927/12522457
访问地址:http://127.0.0.1:5000
默认用户名:admin
默认密码:admin
本文完整讲述了利用flask创建后台系统,下一章介绍用Vue2.0创建系统对应的前端页面。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。