赞
踩
目录
- cloudDatabase项目文件夹
- │ main.py 主应用入口
- │ operation.py 定义一些常用操作函数
- │
- ├─database db数据库文件存放文件夹
- │ databaseInformation.db
- │ user.db
- │
- ├─databaseManagement 管理db数据库的py程序
- │ │ databaseInformation.py
- │ │ user.py
- │ │ __init__.py
- │ │
- │ └─__pycache__
- │ databaseInformation.cpython-37.pyc
- │ user.cpython-37.pyc
- │ __init__.cpython-37.pyc
- │
- ├─pythonSdk 面向Python的SDK代码目录
- │ main.py
- │ __init__.py
- │
- ├─static Flask固定资源存放文件夹
- │ cloudDatabase.jpg
- │ top.png
- │
- └─templates Flask网页模板存放文件夹
- index.html
主要是sqlite3,flask(具体是Flask,request)
json,time,os等等常用库就不说了。
- from flask import Flask,request,render_template
-
- app = Flask(__name__)
- app.debug = True
-
- @app.route('/init',methods=['POST'])
- def init(): # 接口初始化
- userId = data['userId'] # 获取userId
- ip = data['ip'] # 获取ip
-
- app.run(host="0.0.0.0",port=5000)
解释:
app = Flask(__name__) :
web服务器使用wsgi协议,把客户端所有的请求都转发给这个程序实例
初始化:所有的Flask都必须创建程序实例, 程序实例是Flask的对象,一般情况下用如下方法实例化 Flask类只有一个必须指定的参数,即程序主模块或者包的名字,__name__是系统变量,该变量指的是本py文件的文件名
app.debug = True:开启调试,只要更改源代码文件就会自动加载,不需要一直手动运行
@app.route('/init',methods=['POST'])
def init(): # 接口初始化
在flask中,定义路由最简便的方式,是使用程序实例的app.route装饰器,把装饰的函数注册为路由
userId = data['userId'] # 获取userId
ip = data['ip'] # 获取ip
这是获取post传参的方法
app.run(host="0.0.0.0",port=5000)
run方法启动flask集成的开发web服务器
服务器启动后,会启动轮询,等待并处理请求。轮询会一直请求,直到程序停止。
host="0.0.0.0"不填会自动加载本机ip但我们需要外网可访问所以填"0.0.0.0"会自动加载外网地址
port=5000表示监听5000端口,可以设置其他的,不过为了我吗能够顺利打开,在服务器上也要添加对应端口,这里方法我们文章后续讲解
SQLite3
- conn = sqlite3.connect(dbPath) # 连接数据库
- c = self.conn.cursor() # 新建游标
- sql = '''select ip from userToken''' # 查询 userToken 的 ip 列
- results = self.c.execute(sql) # 获取结果
- # 数据库将返回结果到results变量中
- all_ip = results.fetchall()
学习:
sql = '''select */column from tablename'''
"*"代表所有内容。此代码我们搜寻“students”表的所有内容。
sql = '''select */column from userToken where column="string"'''
sql语句意为从表格中挑选 符合Boolean语句为真的条件 的 全部内容或某些列项 。
如果只是想看students表的id列,eg:select id from students
因为返回的results类似于:<sqlite3.Cursor object at 0x00000257F9100AB0>是sqlite3对象 所以需要使用fetchall函数把所有结果保存到新的变量中,也就是用fetchall()方法获取查询结果 值如下[('286.56.2',), ('5255324532',)]整体为列表在,其中每个元组值代表数据库表中的一行数据,元组中每个值则为这行数据的某列值
简单sql语句 "INSERT INTO userToken values(?,?,?,?)" # 添加值 其中userToken为表名 values()传参每个?(英文)是占位符代表一个值,一行数据需要几个值就写几个问号。官网提倡这样传参说用python字符串拼接不安全
"UPDATE userToken SET time=? WHERE ip=?" # 更新值where ip= 是要验证的条件,整体说就是当一行数据满足ip= 设定值时把这行对应的time值设置成某某
'''create table if not exists 测试用表(roollno real,name text,class real)''' # 创建表 create table if not exists意思是如果这个表不存在则创建能避免出错。
还有执行语句:
c.execute(sql, (ip,token, ts,power)) # 执行sql语句 sql就是上面的sql语句是字符串,后面()是传入sql语句中?所代表的参数,注意需要一个元组,只有一个参数ip时要写成(ip,)","不能丢不然不是元组
有了这点基础知识,对sql这块不了解也没关系,网上很多可视化软件,这里我推荐Navicat 15 for SQLite,其他的也行,这不是主要的。
最后我还想说的是,本文用到的一个创建随机字符串的方法:
- def creatToken(self):
- """
- 生成一个指定长度的随机字符串,充当token
- """
- randomlength = 36 # 定义字符串长度
-
- random_str =''
- base_str ='@#$%^&*()_+|=-!~`.,?:;ABCDEFGHIGKLMNOPQRSTUVWXYZabcdefghigklmnopqrstuvwxyz0123456789'
- length =len(base_str) -1
- for i in range(randomlength):
- random_str +=base_str[random.randint(0, length)]
-
- return random_str
不难理解random.randint创建随机数在base_str中的字符中不断随机挑选字符,组成我们需要的随机字符串。
- # -*- coding: utf-8 -*-
- """
- @connect: 云数据库
- @Time : 2022/7/17-2022/7/18
- @python : python3.7 , flask
- @File :main.py
- @IDE :PyCharm
- @Motto:技术总是要日积月累的
- """
- '''
- 目录介绍:
- cloudDatabase项目文件夹
- │ main.py 主应用入口
- │ operation.py 定义一些常用操作函数
- │
- ├─database db数据库文件存放文件夹
- │ databaseInformation.db
- │ user.db
- │
- ├─databaseManagement 管理db数据库的py程序
- │ │ databaseInformation.py
- │ │ user.py
- │ │ __init__.py
- │ │
- │ └─__pycache__
- │ databaseInformation.cpython-37.pyc
- │ user.cpython-37.pyc
- │ __init__.cpython-37.pyc
- │
- ├─pythonSdk 面向Python的SDK代码目录
- │ main.py
- │ __init__.py
- │
- ├─static Flask固定资源存放文件夹
- │ cloudDatabase.jpg
- │ top.png
- │
- └─templates Flask网页模板存放文件夹
- index.html
- '''
- import sqlite3
- import traceback
- import json,time,os
- from flask import Flask,request,render_template
- from databaseManagement.user import user
- from databaseManagement.databaseInformation import databaseInformation
- from operation import operation
-
- app = Flask(__name__)
- app.debug = True
-
- operation = operation(app.debug) # 初始化operation,获取常用操作支持以及设定常量
-
- # 正确配置测试
- @app.route('/',methods=['GET'])
- def test():
- return render_template('index.html')
-
- # 接口初始化
- @app.route('/init',methods=['POST'])
- def init(): # 接口初始化
- headers =request.headers # 获得headers
-
- # post参数处理,储存到data(字典)
- data = operation.dealPostData(request.data)
-
- userId = data['userId'] # 获取userId
- ip = data['ip'] # 获取ip
- # 根据账户判断权限
- if userId == operation.adminAccount:
- power = 0
- elif userId == '':
- power = 3
-
- userManagenmant = user() # 实例化user
- ipList = userManagenmant.get_ipList()
- tokenList = userManagenmant.get_tokenList()
-
- # 检测ip是否已经存在
- if ip in ipList:
- token = userManagenmant.searchToken(ip)
- results = userManagenmant.updateTime(ip)
- print('result',results)
- else:
- token = userManagenmant.creatToken()
- while token in tokenList:
- token = userManagenmant.creatToken()
- results = userManagenmant.addToken(ip, token,power)
-
- userManagenmant.close()
-
- if results:
- returnData = {"token":token,"time":time.time()}
- return operation.dealReturn(code = 200,returnData = returnData ,reason = '合法请求,请求成功。',tips = None)
- else:
- resultJson = '''{"result":"失败!"}'''
- return operation.dealReturn(code = 500,returnData = None ,reason = '服务器发生未知错误',tips = '重新尝试操作或联系管理员。')
-
-
- # 查询数据库信息
- @app.route('/information',methods=['POST'])
- def information(): # 查询数据库信息
- headers = request.headers # 获得headers
-
- # post参数处理,储存到data(字典)
- data = operation.dealPostData(request.data)
-
- token = data['token'] # 获取token
-
- userManagenmant = user() # 实例化user
-
- # 查询权限并检查账户
- power = userManagenmant.searchPower(token)
- if power == None:
- return operation.dealReturn(code=401, returnData=None, reason='不合法的账户,token失效或不存在。', tips='请检查token或重新获取token')
-
- if power == 0:
- try:
- dbManagenmant = databaseInformation() # 实例化databaseInformation
- dbNameList = dbManagenmant.get_databaseName()
-
- # 获取数据库文件大小
- availableMemory = 0
- for fileName in dbNameList:
- name = fileName + ".db"
- size = os.path.getsize(operation.databasePath+name)
- availableMemory = availableMemory + size
-
- returnData ={
- "databaseName":dbNameList,
- "totalMemory":operation.totalMemory,
- "availableMemory":availableMemory,
- }
-
- dbManagenmant.close() # 关闭数据库
- return operation.dealReturn(code=200, returnData=returnData, reason='合法请求,请求成功。', tips=None)
- except Exception as e:
- return operation.dealErro(code=500, returnData=None, reason='发生内部错误。错误信息(erro):'+e, tips='重新尝试操作或联系管理员。')
- else:
- return operation.dealReturn(code=401, returnData=None, reason='不合法的账户,token无权限执行此操作。', tips='请以管理员身份操作。')
-
- # 添加数据库
- @app.route('/addDatabase',methods=['POST'])
- def addDatabase():# 添加数据库
- headers = request.headers # 获得headers
-
- # post参数处理,储存到data(字典)
- data = operation.dealPostData(request.data)
-
- token = data['token'] # 获取token
- databaseName = data['databaseName'] # 获取databaseName
- # databaseName中.db后缀处理
- if '.db' in databaseName:
- databaseName = databaseName.replace('.db','')
-
- userManagenmant = user() # 实例化user
- dbManagenmant = databaseInformation() # 实例化databaseInformation
-
- # 查询权限并检查账户
- power = userManagenmant.searchPower(token)
- if power == None:
- return operation.dealReturn(code=401, returnData=None, reason='不合法的账户,token失效或不存在。', tips='请检查token或重新获取token')
-
- # 权限合格处理请求
- if power == 0:
- try:
- result = dbManagenmant.addDatabase(databaseName,operation.databasePath+databaseName+'.db')
- if result == '已存在':
- return operation.dealReturn(code=400, returnData=None, reason='该数据库已经存在。', tips='请直接使用或更换其他未使用的名称。')
- elif result:
- returnData = {
- "result": "success",
- "databaseName": databaseName,
- "time":time.time()
- }
-
- userManagenmant.close()
- dbManagenmant.close()
- return operation.dealReturn(code=200, returnData=returnData, reason='合法请求,请求成功。', tips=None)
- else:
- userManagenmant.close()
- dbManagenmant.close()
- return operation.dealErro(code=500, returnData=None, reason='发生内部错误。错误信息(erro):', tips='重新尝试操作或联系管理员。')
- except Exception as e:
- userManagenmant.close()
- dbManagenmant.close()
- return operation.dealErro(code=500, returnData=None, reason='发生内部错误。错误信息(erro):'+e, tips='重新尝试操作或联系管理员。')
-
- else:
- userManagenmant.close()
- dbManagenmant.close()
- return operation.dealReturn(code=401, returnData=None, reason='不合法的账户,token无权限执行此操作。', tips='请以管理员身份操作。')
-
-
- # 执行一条非查询的sql语句
- @app.route('/notQuerySqlExecute',methods=['POST'])
- def notQuerySqlExecute():# 执行一条非查询的sql语句
- headers = request.headers # 获得headers
-
- # post参数处理,储存到data(字典)
- data = operation.dealPostData(request.data)
-
- token = data['token'] # 获取token
- databaseName = data['databaseName'] # 获取操作的databaseName
- tableName = data['tableName'] # 获取操作的表名
- sqlList = data['sql'] # 获取要执行的sql语句
- dataList = data['data'] #获取要传入的参数
-
- # databaseName中.db后缀处理
- if '.db' in databaseName:
- databaseName = databaseName.replace('.db', '')
-
- userManagenmant = user() # 实例化user
- dbManagenmant = databaseInformation() # 实例化databaseInformation
-
- # 查询权限并检查账户
- power = userManagenmant.searchPower(token)
- if power == None:
- return operation.dealReturn(code=401, returnData=None, reason='不合法的账户,token失效或不存在。', tips='请检查token或重新获取token')
-
- # 系统关键数据库禁止修改
- # 初次检查表名
- forbidName = ['databaseInformation','user']
- if databaseName in forbidName:
- return operation.dealReturn(code=403, returnData=None, reason='不合法的操作,当前方法不支持访问核心数据库文件!', tips='请查看api文档检查语法。')
- # 再次遍历sql语句是否合法
- for fName in forbidName:
- for sql in sqlList:
- if fName in sql:
- return operation.dealReturn(code=403, returnData=None, reason='不合法的操作,当前方法不支持访问核心数据库文件!', tips='请查看api文档检查语法。')
-
- # 验证完毕执行sql语句
- try:
- # 执行语句
- conn = sqlite3.connect(operation.databasePath + databaseName + '.db') # 连接数据库
- # print("数据库打开成功")
- c = conn.cursor() # 新建游标
- js = 0
- for sql in sqlList:
- # print(sql)
- # print(tuple(dataList[js]))
- c.execute(sql, tuple(dataList[js]))
- conn.commit()
- js = js + 1
- conn.close()
-
- return operation.dealReturn(code=200, returnData=None, reason='合法请求,执行sql语句成功。', tips=None)
- except Exception as e:
- userManagenmant.close()
- dbManagenmant.close()
- return operation.dealErro(code=500, returnData=None, reason='发生内部错误。错误信息(erro):' + e, tips='重新尝试操作或联系管理员。')
-
-
- # 执行一条查询的sql语句
- @app.route('/querySqlExecute',methods=['POST'])
- def querySqlExecute():# 执行一条非查询的sql语句
- headers = request.headers # 获得headers
-
- # post参数处理,储存到data(字典)
- data = operation.dealPostData(request.data)
-
- token = data['token'] # 获取token
- databaseName = data['databaseName'] # 获取操作的databaseName
- tableName = data['tableName'] # 获取操作的表名
- sqlList = data['sql'] # 获取要执行的sql语句
- dataList = data['data'] # 获取要传入的参数
-
- # databaseName中.db后缀处理
- if '.db' in databaseName:
- databaseName = databaseName.replace('.db', '')
-
- userManagenmant = user() # 实例化user
- dbManagenmant = databaseInformation() # 实例化databaseInformation
-
- # 查询权限并检查账户
- power = userManagenmant.searchPower(token)
- if power == None:
- return operation.dealReturn(code=401, returnData=None, reason='不合法的账户,token失效或不存在。', tips='请检查token或重新获取token')
-
- # 系统关键数据库禁止修改
- # 初次检查表名
- forbidName = ['databaseInformation', 'user']
- if databaseName in forbidName:
- return operation.dealReturn(code=403, returnData=None, reason='不合法的操作,当前方法不支持访问核心数据库文件!', tips='请查看api文档检查语法。')
- # 再次遍历sql语句是否合法
- for fName in forbidName:
- for sql in sqlList:
- if fName in sql:
- return operation.dealReturn(code=403, returnData=None, reason='不合法的操作,当前方法不支持访问核心数据库文件!',
- tips='请查看api文档检查语法。')
-
- # 验证完毕执行sql语句
- try:
- returnData = []
- # 执行语句
- conn = sqlite3.connect(operation.databasePath + databaseName + '.db') # 连接数据库
- # print("数据库打开成功")
- c = conn.cursor() # 新建游标
- js = 0
- for sql in sqlList:
- # print(sql)
- # print(tuple(dataList[js]))
- result = c.execute(sql, tuple(dataList[js]))
- # 结果写入returnData
- returnData.append(result.fetchall())
- js = js + 1
- conn.close()
-
- return operation.dealReturn(code=200, returnData=returnData, reason='合法请求,查询语句成功。', tips=None)
- except Exception as e:
- userManagenmant.close()
- dbManagenmant.close()
- return operation.dealErro(code=500, returnData=None, reason='发生内部错误。错误信息(erro):' + e, tips='重新尝试操作或联系管理员。')
-
-
-
- if __name__ == '__main__':
- app.run()
- # -*- coding: utf-8 -*-
- """
- @connect: 一些常用操作
- @Time : 2022/7/17-2022/7/18
- @python : python3.7 , flask
- @File :operation.py
- @IDE :PyCharm
- @Motto:技术总是要日积月累的
- """
-
- import traceback
- import json,time,os
- from flask import Flask,request
- from databaseManagement.user import user
- from databaseManagement.databaseInformation import databaseInformation
-
- class operation:
- debug = None # 继承自falsk的app.debug属性
- adminAccount = None # 管理者账户
- databasePath = None # 数据库路径
- totalMemory = None #数据库设定总内存
-
- def __init__(self,debug):
- self.debug = debug
-
- self.userManagenmant = user() # 实例化user
- self.dbManagenmant = databaseInformation() # 实例化databaseInformation
-
- self.initOperation()
-
- def initOperation(self): # 初始化常量
- setList = self.dbManagenmant.get_settings()
- self.adminAccount = setList[2] # 管理者账户
- self.databasePath = setList[1] # 数据库路径
- self.totalMemory = setList[0] #数据库设定总内存
-
- def dealPostData(self,data): # 处理post数据
- '''
- post参数处理,储存到data(字典)返回
- :param data: 字节型文本
- :return:
- '''
- data = data.decode('utf-8')
- data = data.replace('\r', '').replace('\n', '')#.replace(' ', '')
- data = json.loads(data)
-
- return data
-
- # 处理未知erro
- def dealErro(self,code,returnData = None ,reason = None,tips = None):
- '''
- 处理未知erro
- :param erro:[WinError 2] 系统找不到指定的文件。: 'amadeus.db'
- :return:
- '''
- if self.debug:
- print('发生错误!erro信息:'+ str(reason))
- print('报告错误详细内容及位置:')
- traceback.print_exc()
- print('报告完毕,请及时处理!')
- return self.dealReturn(code,returnData = None ,reason = None,tips = None)
- else:
- return self.dealReturn(code,returnData = None ,reason = None,tips = None)
-
- def dealExceptionRequest(self,reason):
- '''
- 处理异常请求
- :param reason: "权限不足!power=%s"
- :return:
- '''
- if self.debug:
- print('获取到一个异常请求,拦截原因:\n\t%s'%reason)
- return json.dumps({"result":403,"reason":reason})
- else:
- return json.dumps({"result": 403, "reason": reason})
-
- def dealReturn(self,code,returnData = None ,reason = None,tips = None):
- '''
- 格式化返回信息
- :param code: 操作代码
- :param returnData: 返回数据
- :param reason: 返回原因
- :param tips: 提示操作
- :return: json数据
- '''
- returnJson = {
- 'result':code,
- 'return':returnData,
- 'reason':reason,
- 'tips':tips,
- }
- return json.dumps(returnJson)
-
- # -*- coding: utf-8 -*-
- """
- @connect: 查询数据库信息
- @Time : 2022/7/17-2022/7/18
- @python : python3.7 , flask
- @File :databaseName.py
- @IDE :PyCharm
- @Motto:技术总是要日积月累的
- """
- import sqlite3,random,time
-
- class databaseInformation:
- def __init__(self,dbPath = 'database/databaseInformation.db'):
- self.conn = sqlite3.connect(dbPath) # 连接数据库
- #print("数据库打开成功")
- self.c = self.conn.cursor() # 新建游标
-
- def get_databaseName(self): # 返回dbNameList 数据模型如下['286.56.2', '5255324532']
- # 学习:
- # sql = '''select */column from tablename'''
- # "*"代表所有内容。此代码我们搜寻“students”表的所有内容。
- # sql = '''select */column from userToken where column="string"'''
- # sql语句意为从表格中挑选 符合Boolean语句为真的条件 的 全部内容或某些列项 。
- # 如果只是想看students表的id列,eg:select id from students
-
- sql = '''select name from databaseName''' # 查询 databaseName 的 name 列
- results = self.c.execute(sql) # 获取结果
- # 数据库将返回结果到results变量中
- all_dbName = results.fetchall()
- # 因为返回的results类似于:<sqlite3.Cursor object at 0x00000257F9100AB0>
- # 所以需要使用fetchall函数把所有结果保存到新的变量中
- # 值如下[('286.56.2',), ('5255324532',)]
-
- # 加工提取ip数据列表
- dbNameList = []
- for ip in all_dbName:
- dbNameList.append(ip[0])
-
-
- return dbNameList
-
- def get_settings(self):# 返回setList 数据模型如下[5368709120, 'E:/Flask/database/', 2061360308)]对应总内存,路径,管理账号
- # 学习:
- # sql = '''select */column from tablename'''
- # "*"代表所有内容。此代码我们搜寻“students”表的所有内容。
- # sql = '''select */column from userToken where column="string"'''
- # sql语句意为从表格中挑选 符合Boolean语句为真的条件 的 全部内容或某些列项 。
- # 如果只是想看students表的id列,eg:select id from students
-
- sql = "select * from setting" # 查询 databaseName 的 name 列
- results = self.c.execute(sql) # 获取结果
- # 数据库将返回结果到results变量中
- settings = results.fetchall()
- # 因为返回的results类似于:<sqlite3.Cursor object at 0x00000257F9100AB0>
- # 所以需要使用fetchall函数把所有结果保存到新的变量中
- # 值如下[('5368709120','E:/Flask/database/','2061360308')]对应总内存,路径,管理账号
-
- # 加工提取ip数据列表
- setList = []
- for set in settings[0]:
- setList.append(set)
-
- return setList
-
- def addDatabase(self,name,dbPath):
- try:
- # 获取所有database的名称
- sql = '''select name from databaseName'''
- name_result = self.c.execute(sql)
- name_result = name_result.fetchall()
- nameList = []
- for name_exist in name_result:
- nameList.append(name_exist[0])
- # 判断所要创建的数据库文件是否已经存在
- if name in nameList:
- return '已存在'
- else:
- conn = sqlite3.connect(dbPath) # 连接数据库
- conn.close()
- sql = '''insert into databaseName values(?)'''
- self.c.execute(sql, (name,))
- self.conn.commit()
- return True
- except Exception as e:
- print(e)
- return False
-
-
-
- def close(self):
- # 关闭数据库
- self.conn.commit() # 提交结果
- self.conn.close() # 关闭数据库
-
- if __name__ == '__main__':
- a = databaseInformation('../database/databaseInformation.db')
- print(a.get_databaseName())
- # -*- coding: utf-8 -*-
- """
- @connect: 管理使用的用户
- @Time : 2022/7/17-2022/7/18
- @python : python3.7 , flask
- @File :user.py
- @IDE :PyCharm
- @Motto:技术总是要日积月累的
- """
- import sqlite3,random,time
-
- class user:
- def __init__(self,dbPath = 'database/user.db'):
- self.conn = sqlite3.connect(dbPath) # 连接数据库
- #print("数据库打开成功")
- self.c = self.conn.cursor() # 新建游标
-
-
- def get_ipList(self): # 返回ipList 数据模型如下['286.56.2', '5255324532']
- # 学习:
- # sql = '''select */column from tablename'''
- # "*"代表所有内容。此代码我们搜寻“students”表的所有内容。
- # sql = '''select */column from userToken where column="string"'''
- # sql语句意为从表格中挑选 符合Boolean语句为真的条件 的 全部内容或某些列项 。
- # 如果只是想看students表的id列,eg:select id from students
-
- sql = '''select ip from userToken''' # 查询 userToken 的 ip 列
- results = self.c.execute(sql) # 获取结果
- # 数据库将返回结果到results变量中
- all_ip = results.fetchall()
- # 因为返回的results类似于:<sqlite3.Cursor object at 0x00000257F9100AB0>
- # 所以需要使用fetchall函数把所有结果保存到新的变量中
- # 值如下[('286.56.2',), ('5255324532',)]
-
- # 加工提取ip数据列表
- ipList = []
- for ip in all_ip:
- ipList.append(ip[0])
-
-
- return ipList
-
- def get_tokenList(self): # 返回token 数据模型如下['286.56.2', '5255324532']
- # 学习:
- # sql = '''select */column from tablename'''
- # "*"代表所有内容。此代码我们搜寻“students”表的所有内容。
- # sql = '''select */column from userToken where column="string"'''
- # sql语句意为从表格中挑选 符合Boolean语句为真的条件 的 全部内容或某些列项 。
- # 如果只是想看students表的id列,eg:select id from students
-
- sql = '''select token from userToken''' # 查询 userToken 的 token 列
- results = self.c.execute(sql) # 获取结果
- # 数据库将返回结果到results变量中
- all_token = results.fetchall()
- # 因为返回的results类似于:<sqlite3.Cursor object at 0x00000257F9100AB0>
- # 所以需要使用fetchall函数把所有结果保存到新的变量中
- # 值如下[('286.56.2',), ('5255324532',)]
-
- # 加工提取ip数据列表
- tokenList = []
- for ip in all_token:
- tokenList.append(ip[0])
-
- return tokenList
- def creatToken(self):
- """
- 生成一个指定长度的随机字符串,充当token
- """
- randomlength = 36 # 定义字符串长度
-
- random_str =''
- base_str ='@#$%^&*()_+|=-!~`.,?:;ABCDEFGHIGKLMNOPQRSTUVWXYZabcdefghigklmnopqrstuvwxyz0123456789'
- length =len(base_str) -1
- for i in range(randomlength):
- random_str +=base_str[random.randint(0, length)]
-
- return random_str
-
-
-
- def addToken(self,ip,token,power): # 添加新的token
- sql = "INSERT INTO userToken values(?,?,?,?)" # 添加值
-
- ts = time.time()
- try:
- insertResults = self.c.execute(sql, (
- ip,token, ts,power)) # 执行sql语句 数据大概像这个样子(ip=string,token=string,时间戳,权限)
- # 数据库将返回结果到results变量中
- self.conn.commit() # 提交结果
-
- return True
- except:
- return insertResults
-
- def searchToken(self,ip): # 返回token 数据模型如下['286.56.2', '5255324532']
- '''
- :param ip: '192.168.0.100'
- :return: 'o:c8N3:9vl#bnmMsl#\\LD=%\\83`ZWvGs&dgk'
- '''
-
- # 学习:
- # sql = '''select */column from tablename'''
- # "*"代表所有内容。此代码我们搜寻“students”表的所有内容。
- # sql = '''select */column from userToken where column="string"'''
- # sql语句意为从表格中挑选 符合Boolean语句为真的条件 的 全部内容或某些列项 。
- # 如果只是想看students表的id列,eg:select id from students
-
- sql = '''select token from userToken where ip="%s"'''%ip # 查询 userToken 中 指定token
- results = self.c.execute(sql) # 执行sql
- # 数据库将返回结果到results变量中
- token = results.fetchall()
- # 因为返回的results类似于:<sqlite3.Cursor object at 0x00000257F9100AB0>
- # 所以需要使用fetchall函数把所有结果保存到新的变量中
- # 值如下[('o:c8N3:9vl#bnmMsl#\\LD=%\\83`ZWvGs&dgk',)]
-
- # 加工提取token值
- token = token[0][0]
-
- return token
-
- def updateTime(self,ip):
- '''
- :param ip: '192.168.0.100'
- :return: bool
- '''
-
- try:
- sql = "UPDATE userToken SET time=? WHERE ip=?"
- self.c.execute(sql,(time.time(),ip)) # 执行sql
- self.conn.commit() # 提交结果
-
- return True
- except:
- return False
-
- def searchPower(self,token):
- '''
- :param token: 'o:c8N3:9vl#bnmMsl#\\LD=%\\83`ZWvGs&dgk'
- :return: 0
- '''
-
- try:
- sql = '''select power from userToken WHERE token=?'''
- results = self.c.execute(sql,(token,)) # 执行sql
-
- # 数据库将返回结果到results变量中
- power = results.fetchall()
- # 因为返回的results类似于:<sqlite3.Cursor object at 0x00000257F9100AB0>
- # 所以需要使用fetchall函数把所有结果保存到新的变量中
- # 值如下[('o:c8N3:9vl#bnmMsl#\\LD=%\\83`ZWvGs&dgk',)]
-
- # 加工提取token值
- power = power[0][0]
-
- return power
- except:
- return None
-
- def close(self):
- # 关闭数据库
- self.conn.commit() # 提交结果
- self.conn.close() # 关闭数据库
-
- if __name__ == '__main__':
- a = user('../database/user.db')
- print(a.searchPower(''))
'运行
- <!DOCTYPE html>
- <html lang="zh">
- <head>
- <meta charset="UTF-8">
- <title>云数据库API</title>
- <link rel="shortcut icon" href="static/cloudDatabase.jpg">
- </head>
- <body>
- <div align="center" id = "frame">
- <div align="center" id = "main">
- <img src="static/top.png">
- <div align="center" id = "connect">
- <p id = "title">恭喜!您的云数据库API搭建成功。</p>
- <p id = "text">
- 在线API文档地址:
- <a target=_blank href="https://www.apifox.cn/apidoc/shared-93936f73-c065-480b-83b1-5f87f13916ed">点击打开apifox上分享的文档</a>
- <br/>
- 管理员:
- <br>
- 管理员联系方式:
- <br>
- 开发者:盧瞳
- <br>
- 开发者联系方式:QQ:2061360308
- <br>
- 原文链接:
- <br>
- GitHUb项目地址:
- </p>
- </div>
- </div>
- </div>
- <style>
- #frame {
- //background-color: #fe7fac;
- width: document.body.clientWidth;
- height: document.body.clientHeight;
- vertical-align:middle;
- }
- #main {
- background-color: #66CCFF;
- width: 600px;
- height: 100%;
- vertical-align:middle;
- }
- img {
- top:50px;
- left:50px;
- width: 600px;
- height: 120px;
- float: left;
- }
- #title {
- font-size:28px
- }
- #text{
- text-align:left;
- }
- </style>
- </body>
- </html>
文档这里我推荐Apifox他们的软件很好用界面也简洁易懂,功能也齐全,这里放本文的在线文档链接,有兴趣的同学可以点开翻一翻。文档地址
SDK也就是把上面接口用requests库做个包装,我简单写了一些代码:
- # -*- coding: utf-8 -*-
- """
- @connect: 面向python的云数据库SDK
- @Time : 2022/1/18 11:36
- @Auth : 技术空间
- @File :main.py
- @IDE :PyCharm
- @Motto:技术总是要日积月累的
- """
- import time
-
- import requests,json,socket
- from urllib import parse
- from pythonSdk.operation import operation
-
- class clouldDatabaseSDK:
- operation = operation() # 引入一些常用方法
- # 请求头
- header = {
- "Accept": "*/*",
- "Accept-Encoding": "gzip, deflate, br",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Connection": "keep-alive",
- "Content-Type": "application/json",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36"
- }
- severUrl = None # 服务器地址
- ip = None # 请求用的本机ip地址
- # 获取本机IP
- hostname = socket.gethostname()
- ip = socket.gethostbyname(hostname)
-
- adminAccount = None # 管理者账户
- token = None # 服务器返回零时token
-
- def __init__(self,serverUrl,adminAccount=None):
- '''
- 初始化SDK
- :param url:服务器请求地址
- :param adminAccount:账户,没有不填用于验证管理者权限的
- '''
- self.severUrl = serverUrl
- self.adminAccount = adminAccount
-
- # 调用初始化接口获取token
- post_json = json.dumps({'userId': self.adminAccount, 'ip': self.ip})
- print(post_json)
-
- # 处理返回内容
- requestReturn = operation.dealRequestReturn(self.operation,requests.post(parse.urljoin(self.severUrl,"init"), data=post_json, headers=self.header))
- # 验证返回内容
- if requestReturn.statusCode == 200:
- self.token = requestReturn.returnData['token']
- else:
- print(requestReturn.requestInformation)
-
- def queryDBInformation(self):
- '''
- 查询数据库信息,需管理员权限
- :return:
- '''
- post_json = json.dumps({'token': self.token})
-
- # 处理返回内容
- requestReturn = operation.dealRequestReturn(operation,requests.post(parse.urljoin(self.severUrl, "information"), data=post_json,
- headers=self.header))
- # 验证返回内容
- if requestReturn.statusCode == 200:
-
- return requestReturn
- else:
- print(requestReturn.requestInformation)
-
- def addDatabase(self,databaseName):
- '''
- 添加数据库,需管理员权限
- :param databaseName:要添加的数据库名称
- :return:
- '''
- post_json = json.dumps({'token': self.token,"databaseName":databaseName,"time":time.time()})
-
- # 处理返回内容
- requestReturn = operation.dealRequestReturn(operation,requests.post(parse.urljoin(self.severUrl, "/addDatabase"), data=post_json,
- headers=self.header))
- # 验证返回内容
- if requestReturn.statusCode == 200:
- return requestReturn
- else:
- print(requestReturn.requestInformation)
-
- def notQuerySqlExecute(self,databaseName,tableName,sql,data):
- '''
- 执行一条非查询的sql语句
- :param databaseName: 要操作的数据库名称
- :param tableName: 要操作的表名
- :param sql: sql语句列表 --> []
- :param data: 传入sql语句中参数列表 --> []
- :return:
- '''
-
- post_json = json.dumps({'token': self.token,"databaseName":databaseName,"tableName":tableName,"sql":sql,"data":data})
-
- # 处理返回内容
- requestReturn = operation.dealRequestReturn(operation,requests.post(parse.urljoin(self.severUrl, "/notQuerySqlExecute"), data=post_json,
- headers=self.header))
- # 验证返回内容
- if requestReturn.statusCode == 200:
- return requestReturn
- else:
- print(requestReturn.requestInformation)
-
- def querySqlExecute(self,databaseName,tableName,sql,data):
- '''
- 执行一条查询的sql语句
- :param databaseName: 要操作的数据库名称
- :param tableName: 要操作的表名
- :param sql: sql语句列表 --> []
- :param data: 传入sql语句中参数列表 --> []
- :return:
- '''
-
- post_json = json.dumps({'token': self.token,"databaseName":databaseName,"tableName":tableName,"sql":sql,"data":data})
-
- # 处理返回内容
- requestReturn = operation.dealRequestReturn(operation,requests.post(parse.urljoin(self.severUrl, "/querySqlExecute"), data=post_json,
- headers=self.header))
- # 验证返回内容
- if requestReturn.statusCode == 200:
- return requestReturn
- else:
- print(requestReturn.requestInformation)
-
- if __name__ == '__main__':
- # a=clouldDatabaseSDK(serverUrl='http://101.42.163.114:5000/',adminAccount='2061360308')
- a = clouldDatabaseSDK(serverUrl='http://192.168.0.101:5000', adminAccount='2061360308')
-
- #a.addDatabase('测试用数据库')
- '''create table if not exists 测试用表(roollno real,name text,class real)''',
- insertQuary = a.notQuerySqlExecute('测试用数据库','测试用表',
- ['''create table if not exists 测试用表(roollno real,name text,class real)''',
- '''insert into 测试用表 values(?,?,?)''',
- '''insert into 测试用表 values(?,?,?)''',
- ],
- [
- [],
- [0,1,2],
- ['roollno','name','class']
- ]
- )
- selectQuary = a.querySqlExecute('测试用数据库', '测试用表',
- ['''select * from 测试用表''',
- ],
- [
- [],
- ]
- )
- query = a.queryDBInformation()
- #print('返回:',query.statusCode,query.returnData)
- print(insertQuary.statusCode,insertQuary.returnData)
- print(selectQuary.statusCode,selectQuary.returnData)
- # -*- coding: utf-8 -*-
- """
- @connect: 一些常用操作
- @Time : 2022/7/17-2022/7/18
- @python : python3.7 , flask
- @File :operation.py
- @IDE :PyCharm
- @Motto:技术总是要日积月累的
- """
-
- import requests,json,socket
- from urllib import parse
-
- class operation:
- def __init__(self):
- pass
- def dealRequestReturn(self,object):
- '''
- 处理请求返回
- :param object: requests.post返回对象
- :return: 字典,数组等json.loads数据
- '''
- requestReturn = object.text # 获取返回json内容
- print(requestReturn)
- requestReturn = json.loads(requestReturn) # 解析json
- #requestReturn = json.loads('{"result": 200, "return": {"token": "", "time": 1658243312.8069735}, "reason": "\u5408\u6cd5\u8bf7\u6c42\uff0c\u8bf7\u6c42\u6210\u529f\u3002", "tips": null}') # 解析json
- statusCode = requestReturn['result']
- returnData = requestReturn['return']
- if statusCode == 200:
- requestInformation = '请求成功!\n状态码:' + str(requestReturn['result']) + '\n原因:' + str(requestReturn[
- 'reason']) + '\n提示:' + str(requestReturn['tips'])
- else:
- requestInformation = '请求失败!\n状态码:' + str(requestReturn['result']) + '\n原因:' + str(requestReturn[
- 'reason']) + '\n提示:' + str(requestReturn['tips'])
-
- #构建类对象返回
- class requestReturn():
- '''
- 请求返回内容对象
- :object statusCode: 状态码
- :object returnData:返回的数据
- :object requestInformation:返回的提示
- '''
- def __init__(self,statusCode,returnData,requestInformation):
- self.statusCode = statusCode
- self.returnData = returnData
- self.requestInformation = requestInformation
- requestReturn = requestReturn(statusCode,returnData,requestInformation)
- return requestReturn
'运行
代码ok了,想要真正让接口在服务器上跑起来,还需要一些配置:
首先host=“0.0.0.0”这个我们支前准备好了,还有就是在服务器上将你的端口号加进去,因为服务器厂商太多这里不做详细解说,有需要可以百度“Flask公网访问”,这里放一个腾讯云设置方法的教程地址
文章结束,感谢 收藏+关注,希望本文对你有所启发。
具体的源码和SDK我打包放这里了,源码地址。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。