赞
踩
图库在处理关系上比传统关系型数据库好很多,因此计划使用neo4j作为flask的数据库后端
flask一般使用sqlalchemy(ORM)连接mysql作为数据库后端,我觉得不是非常灵活。在定义各个数据表的关系时需要分别在对应的数据对象里声明,还要遵守一定的规范,如果说仅仅是比较固定的表格还听合适,问题是变化
。无论是mysql数据库本身,又或者表和表的关系之间,一旦有变化调整起来都是很复杂的。
变化归图,不变归表
我认为可以把经常发生变化的部分抽出来,放到图里处理,例如用户的权限管理;不变的部分例如报表,放到mysql里,然后为他们各自启动一个flask服务,各归各的。
于是一个系统可以分为几个,几十个甚至几百个微服务,通过超链接进行联系,并通过ngingx实现高可用。唯一通用的服务就是授权(鉴权)服务。
要修改并使用图库作为后端,要先修改数据对象:为数据对象添加图库的操作函数。(未来如果并发大了,可以考虑使用多个neo4j,或者使用redis作为缓存库)
使用py2neo作为代理,向数据库发起cypher请求。库里之前存在一个示例节点,我们把这个节点及其关系先删掉。
match (n:Person{name:'Andy'})-[r]-() delete r,n
重新建立一个节点:
create (n:Person{email:'test@test.com',name:'Andy'}) return n
因为查询时主要通过email进行,所以我们为这个属性增加索引。不过这里我还有点点困惑,neo4j好像不能给一个属性增加唯一性索引,而如果想建好索引后再加唯一性约束就会报错。只能理解为索引的优化算法为了速度,不能管唯一性了。
CREATE INDEX ON :Person(email);
如果再执行唯一性约束就会失败
create constraint on (n:Person) assert n.email is unique
要保证不重复创建节点,就只有自己保证使用match , merge方法了
我们通过这种方法给节点增加一个属性来看看节点会不会增加
情况一:匹配并试图创建一个节点,如果节点已经存在,那么这条命令无效
merge (n:Person{email:'test@test.com'})
on create set n.createTime = timestamp()
return n
节点没有增加,也没有改变属性。
情况二:匹配并试图更改一个节点的状态,匹配到了就设置状态
merge (n:Person{email:'test@test.com'})
on match set n.createTime = timestamp()
return n
匹配到的节点创建了时间戳
如果节点没有匹配到,那么这样执行会创建节点,但不会设置属性
merge (n:Person{email:'test1@test1.com'})
on match set n.createTime = timestamp()
return n
这种写法是不太好的,我们把这个错误的节点删除掉(用节点属性直接删除)
match (n{email:'test1@test1.com'}) delete n
情况三:直接指明对于融合节点要做的两种操作,增(create),改(set)
merge (n{email:'test1@test1.com'})
on create set n.createTime = timestamp()
on match set n.createTime = timestamp()
return n
使用schema命令可以看到索引已经创建好了(neo4j创建索引的速度还可以,1千万节点大约3分钟创建索引),查询基本都是毫秒级返回的。
原来的用户类大致是这样的:
... 配置时的定义 config.py SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://YOURUSER:YOURPWD@localhost:3306/YOURDB?charset=utf8' SQLALCHEMY_TRACK_MODIFICATIONS = True SQLALCHEMY_COMMIT_ON_TEARDOWN = True ... 登录时的视图处理 views.py ... def login(): form = LoginForm() if form.validate_on_submit(): user = Users.query.filter_by(email=form.email.data).first() if user is not None and user.verify_password(form.password.data): login_user(user, form.remember_me.data) # login登记了该用户 return redirect(request.args.get('next') or url_for('main.index')) flash('无效用户名或密码') ... 数据模型定义 models.py ... class Users(UserMixin, db.Model): __tablename__ = 'users' # 1 ID类 id = db.Column(db.Integer, primary_key=True) username = db.Column(db.String(64), unique=True, index=True) # 唯一约束,加索引 email = db.Column(db.String(64), unique=True, index=True) # 唯一 ...
进行替代时我们要自己补充数据查询的一些方法。假设我们在config中已经传入图库的目标位置、用户和密码,以及一个实例化好的图对象,现在我们需要先补充几个函数。(主要是为了处理当graph连接失效时的重建)
# 创建一个图连接对象 def get_a_neo4j_connect(ip_port=None, user=None, pwd=None ): return py2neo.Graph(ip_port, username=user, password=pwd) # 获取一个库中所有的节点数 def get_nodes_num(graph): the_cypher = '''match (n) return count(n) as nodenum ''' return graph.run(the_cypher).data()[0].get('nodenum') # py2neo的执行 def exe_cypher(graph = None, cypher = None): return graph.run(cypher).data() # 尝试重连一次的cypher执行(假定cypher语句不会错) def exe_cypher_1more_try(cypher=None, graph = None, ip_port=None, user=None, pwd = None): status = None query_res = None new_graph_connect = None try: # 正常执行 res = exe_cypher(graph=graph, cypher=cypher) status = True query_res = res new_graph_connect = None except: # 如果失败尝试一次重连 new_graph = get_a_neo4j_connect(ip_port=ip_port, user=user, pwd=pwd) try: res = exe_cypher(graph=new_graph, cypher=cypher) status = True query_res = res new_graph_connect = new_graph except: status = False query_res = None new_graph_connect = None return status, query_res, new_graph_connect
目前我还没碰到graph连接对象挂掉的问题,以后看是不是有必要加上这个判断。
# 查询用户(email)是否存在,如果存在则返回用户uid def query_email(graph, email): the_cypher = '''match (n:Person{email:'%s'}) return id(n) as nid''' % email # print(the_cypher) res_list = exe_cypher(graph=graph, cypher= the_cypher) if len(res_list) > 0: return res_list[0].get('nid') else: return False --- 成功的一个查询 In [64]: query_email(graph, 'test@test.com') Out[64]: 60 失败的一个查询(没有这个用户) In [75]: query_email(graph, 'testabc@testabc.com') Out[75]: False
这个id是neo4j的自动编号,可以用于查询。
match (n)
where id(n) = 60
return n
关于密码的哈希函数可以参考我的这篇文章
# 根据节点的邮件设置密码 def set_pwd_hash(graph = None , para_dict = None): # 解参数 email = para_dict['email'] original_pwd = para_dict['original_pwd'] pwd_ttl = para_dict['pwd_ttl'] salt = para_dict['salt'] pwd_hash = a_sha256_trans_salt_ttl(original_pwd, salt, ttl=pwd_ttl) the_cypher = '''merge (n:Person{email:'%s'}) on match set n.pwd_ttl_hash = '%s',n.update_time= timestamp() return n.pwd_ttl_hash as pwd_ttl_hash''' %(email,pwd_hash ) print(the_cypher) res_list = exe_cypher(graph=graph, cypher= the_cypher) if len(res_list) > 0: the_check_pwd_hash = res_list[0].get('pwd_ttl_hash') else: the_check_pwd_hash = False return pwd_hash == the_check_pwd_hash ---- para_dict ={} para_dict['email'] = 'test@test.com' para_dict['original_pwd'] = 'iamPassword' # 十年过期 para_dict['pwd_ttl'] = 315360000 # 这个可以在配置里改 para_dict['salt'] = 'iamSalt' In [82]: set_pwd_hash(graph, para_dict) merge (n:Person{email:'test@test.com'}) on match set n.pwd_ttl_hash = '74169501e67002adae4114f8b3654b247eb2404310919cd7658e1dd3497216db_1925449658',n.update_time= timestamp() return n.pwd_ttl_hash as pwd_ttl_hash Out[82]: True
# 验证密码with ttl def verify_pwd_hash(graph= None, para_dict= None): # 解参数 email = para_dict['email'] original_pwd = para_dict['original_pwd'] salt = para_dict['salt'] the_cypher = '''match (n:Person{email:'%s'}) return n.pwd_ttl_hash as pwd_ttl_hash ''' % email res_list = exe_cypher(graph=graph, cypher= the_cypher) if len(res_list) > 0: pwd_ttl_hash = res_list[0].get('pwd_ttl_hash') compara_res = b_check_hash(original_pwd,salt, pwd_ttl_hash) return compara_res=='ok' else: return False --- para_dict ={} para_dict['email'] = 'test@test.com' para_dict['original_pwd'] = 'iamPassword' # 这个可以在配置里改 para_dict['salt'] = 'iamSalt' --- In [95]: verify_pwd_hash(graph , para_dict) Out[95]: True --- 故意搞错用户 para_dict['email'] ='aaa' In [98]: verify_pwd_hash(graph , para_dict) Out[98]: False --- 故意搞错密码 In [100]: para_dict['original_pwd']='iamWrongPasswor' In [101]: verify_pwd_hash(graph , para_dict) Out[101]: False
这个应该是体现图数据库价值的地方了。
狗书中举了一个博客的例子,里面分为了管理员,普通用户等几个角色,使用二进制的位码来表示权限高低。使用图库后,我们的表达可以异常丰富。
原函数:
...
# 4 判断权限
def can(self, permissions):
return self.role is not None and self.role.auth <= permissions
...
我们重写这个函数,permissions
不在是一个简单的数值
场景:对某个视图函数,我们要求用户必须具备Com1
的admin
和manager
两个角色,并且admin
的权限值< 100。
假设当前请求的用户仍然是Andy
, 以下cypher语句进行了查询
match (n:Person{email:'test@test.com'})-[r:AdminOf]->(:Org{name:'Com1'})
where r.auth_num < 100
with n
match (n)-[:ManagerOf]->(Org{name:'Com1'})
return n.email as email
因为目前我们并没有为Andy
创建任何关系,所以目前的返回为空。
采用match(节点),merge(边)的方法创建唯一边(关系)
match到起始和终止节点后一次创建两条边
match (n:Person{email:'test@test.com'})
match (n1:Org{name:'Com1'})
with n, n1
merge (n)-[r:AdminOf]->(n1)
on create set r.auth_num =99
on match set r.auth_num = 99
merge (n)-[r1:ManagerOf]->(n1)
此时再进行权限的查询,返回了该用户的邮箱
我们可以约定,权限就是一个cypher语句
,使用当前用户的邮箱作为参数。如果有权限,查询会返回当前用户邮箱,否则就是空。
这样权限的设置几乎完全可以满足任何的需求,如果要修改只是修改cypher,而不用去动任何其他的配置。
# 校验由cypher定义的权限,具备权限时返回邮箱名 def can_graph(graph= None, para_dict = None): # 解参数 email = para_dict['email'] the_cypher = para_dict['auth_cypher'] % email res_list = exe_cypher(graph=graph, cypher= the_cypher) if len(res_list) > 0: return res_list[0]['email'] == email else: return False --- para_dict = {} para_dict['email'] = 'test@test.com' para_dict['auth_cypher'] = '''match (n:Person{email:'%s'})-[r:AdminOf]->(:Org{name:'Com1'}) where r.auth_num < 100 with n match (n)-[:ManagerOf]->(Org{name:'Com1'}) return n.email as email ''' --- In [104]: can_graph(graph, para_dict) Out[104]: True In [105]: para_dict['email'] ='aa' In [106]: can_graph(graph, para_dict) Out[106]: False
如果访问量比较大,就可以使用内存数据库来缓存查询。将查询进行哈希后存进数据库,例如对于上例的访问权限,其哈希码为
In [107]: a_sha256_trans_salt(para_dict['auth_cypher'],'iamSalt')
Out[107]: '3e2cd4f2888e8804890b6b6ac6cae0c013a5c0e88095a75089a1f882d9ea14ee'
一开始没有经过授权,redis里面没有数据,因此向flask请求,flask确认用户的权限ok后加上一个ttl送到redis中。
In [108]: a_sha256_trans_salt_ttl(para_dict['auth_cypher'],'iamSalt',ttl=3600)
Out[108]: '3e2cd4f2888e8804890b6b6ac6cae0c013a5c0e88095a75089a1f882d9ea14ee_1610103819'
ttl设为3600秒,这样一小时内该授权可以不必重新请求图库,flask只要先通过redis里的数据判断
In [110]: def b_check_hash_time(hash_a, hash_b_ttl): ...: current_ts = int(time.time()) ...: hash_a_check, ttl_tl = b_split_str_by_last_underline(hash_b_ttl) ...: # 当前时间戳如果大于ttl时间戳,那么令牌校验失败 ...: if current_ts >= int(ttl_tl): ...: return 'ttlError' ...: ...: if hash_a != hash_a_check: ...: return 'Diffenent' ...: ...: return 'ok' ...: In [111]: b_check_hash_time('3e2cd4f2888e8804890b6b6ac6cae0c013a5c0e88095a75089a1f882d9ea14ee','3e2cd4f2888e8804890b6b6ac6cae0c013a5c0e88095a75089a1f882d9ea14ee_16 ...: 10103819') Out[111]: 'ok'
扯远了… , 还是继续回到这个数据模型上
狗书里用的是序列化函数,但是在这个场景下差别不大。
... def generate_confirmation_token(self, expiration=3600): s = Serializer(current_app.config['SECRET_KEY']) return s.dumps({'confirm': self.id}) def confirm(self, token): s = Serializer(current_app.config['SECRET_KEY']) try: data = s.loads(token) except: return False if data.get('confirm') != self.id: return False self.confirmed = True db.session.add(self) return True ...
可以看到原文把用户id进行了加密,传输给用户确认。在收到用户返回的请求后从密文中解析出用户的id,做的仍然是布尔类的判断,因此用自建的加盐哈希是完全可以替代的。例如用毫秒级时间戳作为盐,结合用户的邮件名称生成一个带有效时间的哈希码。发给用户的时候就不必发有效时间戳了,只发前面的哈希码部分就可以。
和前面设置/校验密码的过程类似,模式都是根据key匹配一个节点,设置/获取这个节点的一个属性
因此这里写一个通用的函数。
授权的密码/令牌校验目前看来有:
1 用户,密码设置/校验
2 确认邮件账户的令牌设置/校验
3 重置密码的令牌设置/校验
4 httpauth的访问令牌设置/校验
通用的设置节点数据函数
# 根据key设置一个节点的一个属性;不指定salt就是随机(时间戳) def set_node_hashattr(graph = None, para_dict = None): # 盐 salt = para_dict.get('salt') or str(int(1000*time.time())) # 匹配的那个键 key_name = para_dict['key_name'] key_val = para_dict['key_val'] # 设置的那个属性 attr_name = para_dict['attr_name'] # 属性的值可以没有(令牌),如果是密码就必须要有 attr_val = para_dict.get('attr_val') or para_dict['key_val'] # ttl ttl = para_dict.get('ttl') or 3600 # 计算hash_ttl hash_ttl = a_sha256_trans_salt_ttl(attr_val, salt, ttl=ttl) # cypher语句模板 the_cypher = '''match (n:Person{%s:'%s'}) set n.%s = '%s', n.update_time= timestamp() return n.%s as %s ''' % (key_name,key_val, attr_name,hash_ttl,attr_name,attr_name) res_list = exe_cypher(graph=graph, cypher= the_cypher) if len(res_list) > 0: the_check_pwd_hash = res_list[0].get(attr_name) else: the_check_pwd_hash = False return hash_ttl == the_check_pwd_hash --- In [124]: para_dict Out[124]: {'key_name': 'email', 'key_val': 'test@test.com', 'attr_name': 'confirm_token_ttl'} 设置一个验证账户的随机token In [125]: set_node_hashattr(graph, para_dict) Out[125]: True
可以看到生成了一个带ttl的令牌。
校验某个属性(token with ttl)的函数
# 校验某个属性的ttl。 # 一种是使用固定盐进行恢复后校验(例如密码) # 一种则是直接校验令牌(不需要再哈希) def verify_node_ttl_token(graph = None, para_dict = None): # 盐 salt = para_dict.get('salt') # 匹配的那个键 key_name = para_dict['key_name'] key_val = para_dict['key_val'] # 校验的那个属性 attr_name = para_dict['attr_name'] if salt is None: current_token = para_dict.get('attr_val') else: current_token = a_sha256_trans_salt(para_dict.get('attr_val'), salt) # 获取数据库中的哈希码 the_cypher = '''match (n:Person{%s:'%s'}) return n.%s as %s ''' % (key_name, key_val, attr_name, attr_name) res_list = exe_cypher(graph=graph, cypher= the_cypher) if len(res_list) > 0: db_toben_ttl = res_list[0].get(attr_name) check_result = b_check_hash_time(current_token,db_toben_ttl) == 'ok' else: check_result = False return check_result --- 发起一个校验 In [127]: para_dict = {} ...: para_dict['key_name'] = 'email' ...: para_dict['key_val'] ='test@test.com' ...: para_dict['attr_name'] = 'confirm_token_ttl' ...: para_dict['attr_val'] = 'ae24990c7b722d8cbeb03e85bf1a4a26d5d3c7358a919530f41be45b9ec46288' 因为离创建这个随机令牌超过了3600秒(1小时)所以校验失败 In [128]: verify_node_ttl_token(graph, para_dict) Out[128]: False 重新试一次 In [130]: set_node_hashattr(graph, para_dict) Out[130]: True 新的随机令牌,可以看到和之前的毫无关系 c6a8e18c24ae6cd53ae0d0705074fad770b775870ebed3fb43145a34fa1c0f15 In [131]: para_dict = {} ...: para_dict['key_name'] = 'email' ...: para_dict['key_val'] ='test@test.com' ...: para_dict['attr_name'] = 'confirm_token_ttl' ...: para_dict['attr_val'] = 'c6a8e18c24ae6cd53ae0d0705074fad770b775870ebed3fb43145a34fa1c0f15' # 校验成功 In [132]: verify_node_ttl_token(graph, para_dict) Out[132]: True
暂时有这些就够了,置于权限的定义,完全就是n个cypher语句。
因为这里需要做的是用户授权登录,对于flask基于session的登录方式,必须要使用flask_login来处理,所以要稍微了解下flask的登录管理;对于接口的登录关系反而更简单些。
关于登录可以参考这篇文章的一些内容。我们要自己实现flask登录的用户类,必须要有四个基本方法:
通过继承flask_login里的UserMixin可以免去自己写这几种方法
class User(UserMixin): # 初始化 def __init__(self, id, email, username = None): self.id = id self.email = email self.username = username or email # 数据库查询初始化用户 @staticmethod def query_user_email(graph, email): id = query_email(graph, email) para_dict = {} para_dict['key_name'] = 'email' para_dict['key_val'] =email para_dict['attr_name'] = 'name' username = get_a_node_attr(graph,para_dict) if id: return User(id, email, username) else: return None
我们尝试使用User西面的静态方法初始化一个实例:
user = User.query_user_email(graph, 'test@test.com')
可以看到因为继承了UserMixin,所以必要的四种方法都有了。
当然,这里还没有做密码校验就直接返回了。
原来的版本大概是这样的:登录时提交wtform表单,如果用户存在且密码校验通过则为登录。
def login():
form = LoginForm()
if form.validate_on_submit():
user = Users.query.filter_by(email=form.email.data).first()
if user is not None and user.verify_password(form.password.data):
login_user(user, form.remember_me.data) # login登记了该用户
return redirect(request.args.get('next') or url_for('main.index'))
flash('无效用户名或密码')
return render_template(some_page, form=form)
所以我给用户增加密码的方法:
... # 密码校验:盐的部分可以调试完以后从current_app.config里面读,不是明文写在这里。 # 而app.config里的重要配置可以写在bash变量里,启动容器时传入(所以用户名、密码等不会明文出现在代码里) def verify_password(self,graph, form_pwd,salt): para_dict ={} para_dict['email'] = self.email para_dict['original_pwd'] = form_pwd # 这个可以在配置里改 para_dict['salt'] = salt return verify_pwd_hash(graph, para_dict) ... --- # 查找到了用户 In [165]: user = User.query_user_email(graph, 'test@test.com') # 密码校验成功 In [166]: user.verify_password(graph, 'iamPassword', 'iamSalt') Out[166]: True # 密码校验失败(输错) In [167]: user.verify_password(graph, 'WrongiamPassword', 'iamSalt') Out[167]: False
启动服务,发现报了个错,蓝图的方式需要有个user_loader函数,让用户可以用id初始化。
# 在user对象里增加方法 ... # 单独写一个由id初始化的方法 @staticmethod def init_by_id(graph, id): the_cypher = '''match (n:Person) where id(n) = %s return n.email as email, n.name as name''' % id res_list = exe_cypher(graph, the_cypher) if len(res_list) > 0: email = res_list[0].get('email') username = res_list[0].get('name') return User(id, email, username) else: return False ... 在models.py里增加一个user_loader方法 # 蓝图模式下必须要这样,否则无法运行(用户登录管理失败) @login_manager.user_loader def load_user(user_id): return User.init_by_id(graph,user_id)
然后就成功了
总共不到90行,完成了用户类数据模型的重定义。
from flask_login import UserMixin, AnonymousUserMixin from flask_login import current_user, login_user, logout_user from app import login_manager from funcs import get_nodes_num,set_pwd_hash, query_email,set_pwd_hash,verify_pwd_hash from funcs import can_graph,set_node_hashattr,verify_node_ttl_token from funcs import set_a_node_attr,get_a_node_attr from funcs import graph, exe_cypher class User(UserMixin): # 初始化 def __init__(self, id, email, username = None): self.id = id self.email = email self.username = username or email # 密码校验:盐的部分可以调试完以后从current_app.config里面读,不是明文写在这里。 # 而app.config里的重要配置可以写在bash变量里,启动容器时传入(所以用户名、密码等不会明文出现在代码里) def verify_password(self,graph, form_pwd,salt): para_dict ={} para_dict['email'] = self.email para_dict['original_pwd'] = form_pwd # 这个可以在配置里改 para_dict['salt'] = salt return verify_pwd_hash(graph, para_dict) # 使用用户的email生成一个带ttl的随机令牌 def generate_ttl_token(self, graph, token_name , ttl=3600): para_dict = {} para_dict['key_name'] = 'email' para_dict['key_val'] =self.email para_dict['attr_name'] = token_name return set_node_hashattr(graph, para_dict) # 校验一个令牌 def verified_ttl_token(self, graph, ttl_token_name, token): para_dict = {} para_dict['key_name'] = 'email' para_dict['key_val'] = self.email para_dict['attr_name'] = ttl_token_name para_dict['attr_val'] = token return verify_node_ttl_token(graph, para_dict) # 判断权限 def can(self, graph ,the_auth_cypher): para_dict = {} para_dict['email'] = self.email para_dict['auth_cypher'] = the_auth_cypher return can_graph(graph, para_dict) # 数据库查询初始化用户 @staticmethod def query_user_email(graph, email): id = query_email(graph, email) para_dict = {} para_dict['key_name'] = 'email' para_dict['key_val'] =email para_dict['attr_name'] = 'name' username = get_a_node_attr(graph,para_dict) if id: return User(id, email, username) else: return None # 单独写一个由id初始化的方法 @staticmethod def init_by_id(graph, id): the_cypher = '''match (n:Person) where id(n) = %s return n.email as email, n.name as name''' % id res_list = exe_cypher(graph, the_cypher) if len(res_list) > 0: email = res_list[0].get('email') username = res_list[0].get('name') return User(id, email, username) else: return False # 蓝图模式下必须要这样,否则无法运行(用户登录管理失败) @login_manager.user_loader def load_user(user_id): return User.init_by_id(graph,user_id)
这篇内容太多了,细节就不写了,之后好了我把结果贴上来。
这些内容比较基础,这篇内容已经太多,就不写了。
在视图函数中的限制(要求用户必须为Com1
的Admin
)
@auth.route('/test_can/', methods=['GET', 'POST'])
@login_required
def test_can():
graph = current_app.config['THE_GRAPH']
the_auth_cypher = '''match (n:Person{email:'%s'})-[r:AdminOf]->(:Org{name:'Com1'}) return n.email as email '''
if current_user.can(graph, the_auth_cypher):
flash('页面浏览授权校验通过')
return render_template('/app1/view9.html')
else:
flash('页面浏览授权校验未通过')
return redirect(url_for('main.index'))
直接访问
在图库里直接增加关系
match (n:Person{email:'abc@abc.com'})
match (n1:Org{name:'Com1'})
merge (n)-[r:AdminOf]->(n1)
return r
大致可以感觉到,这种权限控制会很方便。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。