当前位置:   article > 正文

简单云数据库API开发

简单云数据库API开发

目录

步骤一:项目目录

各位客官,让我们首先上个目录,让大家有个大体了解。

步骤二:材料准备

本次用到的一些python库:

本次用到的一些知识:

代码(注释详细,小白可入!)

接下来把代码放一下,注释写的很详细也都是基础知识。

再之后是SDK与文档,这两个接口这么能缺呢?


步骤一:项目目录

各位客官,让我们首先上个目录,让大家有个大体了解。

  1. cloudDatabase项目文件夹
  2. │ main.py 主应用入口
  3. │ operation.py 定义一些常用操作函数
  4. ├─database db数据库文件存放文件夹
  5. │ databaseInformation.db
  6. │ user.db
  7. ├─databaseManagement 管理db数据库的py程序
  8. │ │ databaseInformation.py
  9. │ │ user.py
  10. │ │ __init__.py
  11. │ │
  12. │ └─__pycache__
  13. │ databaseInformation.cpython-37.pyc
  14. │ user.cpython-37.pyc
  15. │ __init__.cpython-37.pyc
  16. ├─pythonSdk 面向Python的SDK代码目录
  17. │ main.py
  18. │ __init__.py
  19. ├─static Flask固定资源存放文件夹
  20. │ cloudDatabase.jpg
  21. top.png
  22. └─templates Flask网页模板存放文件夹
  23. index.html

步骤二:材料准备

本次用到的一些python库

主要是sqlite3,flask(具体是Flask,request)
json,time,os等等常用库就不说了。

本次用到的一些知识:

flask

  1. from flask import Flask,request,render_template
  2. app = Flask(__name__)
  3. app.debug = True
  4. @app.route('/init',methods=['POST'])
  5. def init(): # 接口初始化
  6. userId = data['userId'] # 获取userId
  7. ip = data['ip'] # 获取ip
  8. app.run(host="0.0.0.0",port=5000)

解释:

app = Flask(__name__) :

web服务器使用wsgi协议,把客户端所有的请求都转发给这个程序实例
初始化:所有的Flask都必须创建程序实例, 程序实例是Flask的对象,一般情况下用如下方法实例化 Flask类只有一个必须指定的参数,即程序主模块或者包的名字,__name__是系统变量,该变量指的是本py文件的文件名


app.debug = True:开启调试,只要更改源代码文件就会自动加载,不需要一直手动运行


@app.route('/init',methods=['POST'])
def init(): # 接口初始化

在flask中,定义路由最简便的方式,是使用程序实例的app.route装饰器,把装饰的函数注册为路由


userId = data['userId'] # 获取userId
ip = data['ip'] # 获取ip

这是获取post传参的方法


app.run(host="0.0.0.0",port=5000)

run方法启动flask集成的开发web服务器

服务器启动后,会启动轮询,等待并处理请求。轮询会一直请求,直到程序停止。

host="0.0.0.0"不填会自动加载本机ip但我们需要外网可访问所以填"0.0.0.0"会自动加载外网地址

port=5000表示监听5000端口,可以设置其他的,不过为了我吗能够顺利打开,在服务器上也要添加对应端口,这里方法我们文章后续讲解

SQLite3

  1. conn = sqlite3.connect(dbPath) # 连接数据库
  2. c = self.conn.cursor() # 新建游标
  3. sql = '''select ip from userToken''' # 查询 userToken 的 ip 列
  4. results = self.c.execute(sql) # 获取结果
  5. # 数据库将返回结果到results变量中
  6. all_ip = results.fetchall()

学习:
 sql = '''select */column from tablename'''
 "*"代表所有内容。此代码我们搜寻“students”表的所有内容。
  sql = '''select */column from userToken where column="string"'''
  sql语句意为从表格中挑选 符合Boolean语句为真的条件 的 全部内容或某些列项 。
  如果只是想看students表的id列,eg:select id from students 

因为返回的results类似于:<sqlite3.Cursor object at 0x00000257F9100AB0>是sqlite3对象
所以需要使用fetchall函数把所有结果保存到新的变量中,也就是用fetchall()方法获取查询结果
值如下[('286.56.2',), ('5255324532',)]整体为列表在,其中每个元组值代表数据库表中的一行数据,元组中每个值则为这行数据的某列值
简单sql语句
"INSERT INTO userToken values(?,?,?,?)" 
# 添加值 其中userToken为表名 values()传参每个?(英文)是占位符代表一个值,一行数据需要几个值就写几个问号。官网提倡这样传参说用python字符串拼接不安全
"UPDATE userToken SET time=? WHERE ip=?" 
# 更新值where ip= 是要验证的条件,整体说就是当一行数据满足ip= 设定值时把这行对应的time值设置成某某
'''create table if not exists 测试用表(roollno real,name text,class real)''' 
# 创建表 create table if not exists意思是如果这个表不存在则创建能避免出错。

还有执行语句:

c.execute(sql, (ip,token, ts,power))  # 执行sql语句 sql就是上面的sql语句是字符串,后面()是传入sql语句中?所代表的参数,注意需要一个元组,只有一个参数ip时要写成(ip,)","不能丢不然不是元组

有了这点基础知识,对sql这块不了解也没关系,网上很多可视化软件,这里我推荐Navicat 15 for SQLite,其他的也行,这不是主要的。

最后我还想说的是,本文用到的一个创建随机字符串的方法:

  1. def creatToken(self):
  2. """
  3. 生成一个指定长度的随机字符串,充当token
  4. """
  5. randomlength = 36 # 定义字符串长度
  6. random_str =''
  7. base_str ='@#$%^&*()_+|=-!~`.,?:;ABCDEFGHIGKLMNOPQRSTUVWXYZabcdefghigklmnopqrstuvwxyz0123456789'
  8. length =len(base_str) -1
  9. for i in range(randomlength):
  10. random_str +=base_str[random.randint(0, length)]
  11. return random_str

 不难理解random.randint创建随机数在base_str中的字符中不断随机挑选字符,组成我们需要的随机字符串。

代码(注释详细,小白可入!)

接下来把代码放一下,注释写的很详细也都是基础知识。

  1. # -*- coding: utf-8 -*-
  2. """
  3. @connect: 云数据库
  4. @Time : 2022/7/17-2022/7/18
  5. @python : python3.7 , flask
  6. @File :main.py
  7. @IDE :PyCharm
  8. @Motto:技术总是要日积月累的
  9. """
  10. '''
  11. 目录介绍:
  12. cloudDatabase项目文件夹
  13. │ main.py 主应用入口
  14. │ operation.py 定义一些常用操作函数
  15. ├─database db数据库文件存放文件夹
  16. │ databaseInformation.db
  17. │ user.db
  18. ├─databaseManagement 管理db数据库的py程序
  19. │ │ databaseInformation.py
  20. │ │ user.py
  21. │ │ __init__.py
  22. │ │
  23. │ └─__pycache__
  24. │ databaseInformation.cpython-37.pyc
  25. │ user.cpython-37.pyc
  26. │ __init__.cpython-37.pyc
  27. ├─pythonSdk 面向Python的SDK代码目录
  28. │ main.py
  29. │ __init__.py
  30. ├─static Flask固定资源存放文件夹
  31. │ cloudDatabase.jpg
  32. │ top.png
  33. └─templates Flask网页模板存放文件夹
  34. index.html
  35. '''
  36. import sqlite3
  37. import traceback
  38. import json,time,os
  39. from flask import Flask,request,render_template
  40. from databaseManagement.user import user
  41. from databaseManagement.databaseInformation import databaseInformation
  42. from operation import operation
  43. app = Flask(__name__)
  44. app.debug = True
  45. operation = operation(app.debug) # 初始化operation,获取常用操作支持以及设定常量
  46. # 正确配置测试
  47. @app.route('/',methods=['GET'])
  48. def test():
  49. return render_template('index.html')
  50. # 接口初始化
  51. @app.route('/init',methods=['POST'])
  52. def init(): # 接口初始化
  53. headers =request.headers # 获得headers
  54. # post参数处理,储存到data(字典)
  55. data = operation.dealPostData(request.data)
  56. userId = data['userId'] # 获取userId
  57. ip = data['ip'] # 获取ip
  58. # 根据账户判断权限
  59. if userId == operation.adminAccount:
  60. power = 0
  61. elif userId == '':
  62. power = 3
  63. userManagenmant = user() # 实例化user
  64. ipList = userManagenmant.get_ipList()
  65. tokenList = userManagenmant.get_tokenList()
  66. # 检测ip是否已经存在
  67. if ip in ipList:
  68. token = userManagenmant.searchToken(ip)
  69. results = userManagenmant.updateTime(ip)
  70. print('result',results)
  71. else:
  72. token = userManagenmant.creatToken()
  73. while token in tokenList:
  74. token = userManagenmant.creatToken()
  75. results = userManagenmant.addToken(ip, token,power)
  76. userManagenmant.close()
  77. if results:
  78. returnData = {"token":token,"time":time.time()}
  79. return operation.dealReturn(code = 200,returnData = returnData ,reason = '合法请求,请求成功。',tips = None)
  80. else:
  81. resultJson = '''{"result":"失败!"}'''
  82. return operation.dealReturn(code = 500,returnData = None ,reason = '服务器发生未知错误',tips = '重新尝试操作或联系管理员。')
  83. # 查询数据库信息
  84. @app.route('/information',methods=['POST'])
  85. def information(): # 查询数据库信息
  86. headers = request.headers # 获得headers
  87. # post参数处理,储存到data(字典)
  88. data = operation.dealPostData(request.data)
  89. token = data['token'] # 获取token
  90. userManagenmant = user() # 实例化user
  91. # 查询权限并检查账户
  92. power = userManagenmant.searchPower(token)
  93. if power == None:
  94. return operation.dealReturn(code=401, returnData=None, reason='不合法的账户,token失效或不存在。', tips='请检查token或重新获取token')
  95. if power == 0:
  96. try:
  97. dbManagenmant = databaseInformation() # 实例化databaseInformation
  98. dbNameList = dbManagenmant.get_databaseName()
  99. # 获取数据库文件大小
  100. availableMemory = 0
  101. for fileName in dbNameList:
  102. name = fileName + ".db"
  103. size = os.path.getsize(operation.databasePath+name)
  104. availableMemory = availableMemory + size
  105. returnData ={
  106. "databaseName":dbNameList,
  107. "totalMemory":operation.totalMemory,
  108. "availableMemory":availableMemory,
  109. }
  110. dbManagenmant.close() # 关闭数据库
  111. return operation.dealReturn(code=200, returnData=returnData, reason='合法请求,请求成功。', tips=None)
  112. except Exception as e:
  113. return operation.dealErro(code=500, returnData=None, reason='发生内部错误。错误信息(erro):'+e, tips='重新尝试操作或联系管理员。')
  114. else:
  115. return operation.dealReturn(code=401, returnData=None, reason='不合法的账户,token无权限执行此操作。', tips='请以管理员身份操作。')
  116. # 添加数据库
  117. @app.route('/addDatabase',methods=['POST'])
  118. def addDatabase():# 添加数据库
  119. headers = request.headers # 获得headers
  120. # post参数处理,储存到data(字典)
  121. data = operation.dealPostData(request.data)
  122. token = data['token'] # 获取token
  123. databaseName = data['databaseName'] # 获取databaseName
  124. # databaseName中.db后缀处理
  125. if '.db' in databaseName:
  126. databaseName = databaseName.replace('.db','')
  127. userManagenmant = user() # 实例化user
  128. dbManagenmant = databaseInformation() # 实例化databaseInformation
  129. # 查询权限并检查账户
  130. power = userManagenmant.searchPower(token)
  131. if power == None:
  132. return operation.dealReturn(code=401, returnData=None, reason='不合法的账户,token失效或不存在。', tips='请检查token或重新获取token')
  133. # 权限合格处理请求
  134. if power == 0:
  135. try:
  136. result = dbManagenmant.addDatabase(databaseName,operation.databasePath+databaseName+'.db')
  137. if result == '已存在':
  138. return operation.dealReturn(code=400, returnData=None, reason='该数据库已经存在。', tips='请直接使用或更换其他未使用的名称。')
  139. elif result:
  140. returnData = {
  141. "result": "success",
  142. "databaseName": databaseName,
  143. "time":time.time()
  144. }
  145. userManagenmant.close()
  146. dbManagenmant.close()
  147. return operation.dealReturn(code=200, returnData=returnData, reason='合法请求,请求成功。', tips=None)
  148. else:
  149. userManagenmant.close()
  150. dbManagenmant.close()
  151. return operation.dealErro(code=500, returnData=None, reason='发生内部错误。错误信息(erro):', tips='重新尝试操作或联系管理员。')
  152. except Exception as e:
  153. userManagenmant.close()
  154. dbManagenmant.close()
  155. return operation.dealErro(code=500, returnData=None, reason='发生内部错误。错误信息(erro):'+e, tips='重新尝试操作或联系管理员。')
  156. else:
  157. userManagenmant.close()
  158. dbManagenmant.close()
  159. return operation.dealReturn(code=401, returnData=None, reason='不合法的账户,token无权限执行此操作。', tips='请以管理员身份操作。')
  160. # 执行一条非查询的sql语句
  161. @app.route('/notQuerySqlExecute',methods=['POST'])
  162. def notQuerySqlExecute():# 执行一条非查询的sql语句
  163. headers = request.headers # 获得headers
  164. # post参数处理,储存到data(字典)
  165. data = operation.dealPostData(request.data)
  166. token = data['token'] # 获取token
  167. databaseName = data['databaseName'] # 获取操作的databaseName
  168. tableName = data['tableName'] # 获取操作的表名
  169. sqlList = data['sql'] # 获取要执行的sql语句
  170. dataList = data['data'] #获取要传入的参数
  171. # databaseName中.db后缀处理
  172. if '.db' in databaseName:
  173. databaseName = databaseName.replace('.db', '')
  174. userManagenmant = user() # 实例化user
  175. dbManagenmant = databaseInformation() # 实例化databaseInformation
  176. # 查询权限并检查账户
  177. power = userManagenmant.searchPower(token)
  178. if power == None:
  179. return operation.dealReturn(code=401, returnData=None, reason='不合法的账户,token失效或不存在。', tips='请检查token或重新获取token')
  180. # 系统关键数据库禁止修改
  181. # 初次检查表名
  182. forbidName = ['databaseInformation','user']
  183. if databaseName in forbidName:
  184. return operation.dealReturn(code=403, returnData=None, reason='不合法的操作,当前方法不支持访问核心数据库文件!', tips='请查看api文档检查语法。')
  185. # 再次遍历sql语句是否合法
  186. for fName in forbidName:
  187. for sql in sqlList:
  188. if fName in sql:
  189. return operation.dealReturn(code=403, returnData=None, reason='不合法的操作,当前方法不支持访问核心数据库文件!', tips='请查看api文档检查语法。')
  190. # 验证完毕执行sql语句
  191. try:
  192. # 执行语句
  193. conn = sqlite3.connect(operation.databasePath + databaseName + '.db') # 连接数据库
  194. # print("数据库打开成功")
  195. c = conn.cursor() # 新建游标
  196. js = 0
  197. for sql in sqlList:
  198. # print(sql)
  199. # print(tuple(dataList[js]))
  200. c.execute(sql, tuple(dataList[js]))
  201. conn.commit()
  202. js = js + 1
  203. conn.close()
  204. return operation.dealReturn(code=200, returnData=None, reason='合法请求,执行sql语句成功。', tips=None)
  205. except Exception as e:
  206. userManagenmant.close()
  207. dbManagenmant.close()
  208. return operation.dealErro(code=500, returnData=None, reason='发生内部错误。错误信息(erro):' + e, tips='重新尝试操作或联系管理员。')
  209. # 执行一条查询的sql语句
  210. @app.route('/querySqlExecute',methods=['POST'])
  211. def querySqlExecute():# 执行一条非查询的sql语句
  212. headers = request.headers # 获得headers
  213. # post参数处理,储存到data(字典)
  214. data = operation.dealPostData(request.data)
  215. token = data['token'] # 获取token
  216. databaseName = data['databaseName'] # 获取操作的databaseName
  217. tableName = data['tableName'] # 获取操作的表名
  218. sqlList = data['sql'] # 获取要执行的sql语句
  219. dataList = data['data'] # 获取要传入的参数
  220. # databaseName中.db后缀处理
  221. if '.db' in databaseName:
  222. databaseName = databaseName.replace('.db', '')
  223. userManagenmant = user() # 实例化user
  224. dbManagenmant = databaseInformation() # 实例化databaseInformation
  225. # 查询权限并检查账户
  226. power = userManagenmant.searchPower(token)
  227. if power == None:
  228. return operation.dealReturn(code=401, returnData=None, reason='不合法的账户,token失效或不存在。', tips='请检查token或重新获取token')
  229. # 系统关键数据库禁止修改
  230. # 初次检查表名
  231. forbidName = ['databaseInformation', 'user']
  232. if databaseName in forbidName:
  233. return operation.dealReturn(code=403, returnData=None, reason='不合法的操作,当前方法不支持访问核心数据库文件!', tips='请查看api文档检查语法。')
  234. # 再次遍历sql语句是否合法
  235. for fName in forbidName:
  236. for sql in sqlList:
  237. if fName in sql:
  238. return operation.dealReturn(code=403, returnData=None, reason='不合法的操作,当前方法不支持访问核心数据库文件!',
  239. tips='请查看api文档检查语法。')
  240. # 验证完毕执行sql语句
  241. try:
  242. returnData = []
  243. # 执行语句
  244. conn = sqlite3.connect(operation.databasePath + databaseName + '.db') # 连接数据库
  245. # print("数据库打开成功")
  246. c = conn.cursor() # 新建游标
  247. js = 0
  248. for sql in sqlList:
  249. # print(sql)
  250. # print(tuple(dataList[js]))
  251. result = c.execute(sql, tuple(dataList[js]))
  252. # 结果写入returnData
  253. returnData.append(result.fetchall())
  254. js = js + 1
  255. conn.close()
  256. return operation.dealReturn(code=200, returnData=returnData, reason='合法请求,查询语句成功。', tips=None)
  257. except Exception as e:
  258. userManagenmant.close()
  259. dbManagenmant.close()
  260. return operation.dealErro(code=500, returnData=None, reason='发生内部错误。错误信息(erro):' + e, tips='重新尝试操作或联系管理员。')
  261. if __name__ == '__main__':
  262. app.run()
  1. # -*- coding: utf-8 -*-
  2. """
  3. @connect: 一些常用操作
  4. @Time : 2022/7/17-2022/7/18
  5. @python : python3.7 , flask
  6. @File :operation.py
  7. @IDE :PyCharm
  8. @Motto:技术总是要日积月累的
  9. """
  10. import traceback
  11. import json,time,os
  12. from flask import Flask,request
  13. from databaseManagement.user import user
  14. from databaseManagement.databaseInformation import databaseInformation
  15. class operation:
  16. debug = None # 继承自falsk的app.debug属性
  17. adminAccount = None # 管理者账户
  18. databasePath = None # 数据库路径
  19. totalMemory = None #数据库设定总内存
  20. def __init__(self,debug):
  21. self.debug = debug
  22. self.userManagenmant = user() # 实例化user
  23. self.dbManagenmant = databaseInformation() # 实例化databaseInformation
  24. self.initOperation()
  25. def initOperation(self): # 初始化常量
  26. setList = self.dbManagenmant.get_settings()
  27. self.adminAccount = setList[2] # 管理者账户
  28. self.databasePath = setList[1] # 数据库路径
  29. self.totalMemory = setList[0] #数据库设定总内存
  30. def dealPostData(self,data): # 处理post数据
  31. '''
  32. post参数处理,储存到data(字典)返回
  33. :param data: 字节型文本
  34. :return:
  35. '''
  36. data = data.decode('utf-8')
  37. data = data.replace('\r', '').replace('\n', '')#.replace(' ', '')
  38. data = json.loads(data)
  39. return data
  40. # 处理未知erro
  41. def dealErro(self,code,returnData = None ,reason = None,tips = None):
  42. '''
  43. 处理未知erro
  44. :param erro:[WinError 2] 系统找不到指定的文件。: 'amadeus.db'
  45. :return:
  46. '''
  47. if self.debug:
  48. print('发生错误!erro信息:'+ str(reason))
  49. print('报告错误详细内容及位置:')
  50. traceback.print_exc()
  51. print('报告完毕,请及时处理!')
  52. return self.dealReturn(code,returnData = None ,reason = None,tips = None)
  53. else:
  54. return self.dealReturn(code,returnData = None ,reason = None,tips = None)
  55. def dealExceptionRequest(self,reason):
  56. '''
  57. 处理异常请求
  58. :param reason: "权限不足!power=%s"
  59. :return:
  60. '''
  61. if self.debug:
  62. print('获取到一个异常请求,拦截原因:\n\t%s'%reason)
  63. return json.dumps({"result":403,"reason":reason})
  64. else:
  65. return json.dumps({"result": 403, "reason": reason})
  66. def dealReturn(self,code,returnData = None ,reason = None,tips = None):
  67. '''
  68. 格式化返回信息
  69. :param code: 操作代码
  70. :param returnData: 返回数据
  71. :param reason: 返回原因
  72. :param tips: 提示操作
  73. :return: json数据
  74. '''
  75. returnJson = {
  76. 'result':code,
  77. 'return':returnData,
  78. 'reason':reason,
  79. 'tips':tips,
  80. }
  81. return json.dumps(returnJson)
  1. # -*- coding: utf-8 -*-
  2. """
  3. @connect: 查询数据库信息
  4. @Time : 2022/7/17-2022/7/18
  5. @python : python3.7 , flask
  6. @File :databaseName.py
  7. @IDE :PyCharm
  8. @Motto:技术总是要日积月累的
  9. """
  10. import sqlite3,random,time
  11. class databaseInformation:
  12. def __init__(self,dbPath = 'database/databaseInformation.db'):
  13. self.conn = sqlite3.connect(dbPath) # 连接数据库
  14. #print("数据库打开成功")
  15. self.c = self.conn.cursor() # 新建游标
  16. def get_databaseName(self): # 返回dbNameList 数据模型如下['286.56.2', '5255324532']
  17. # 学习:
  18. # sql = '''select */column from tablename'''
  19. # "*"代表所有内容。此代码我们搜寻“students”表的所有内容。
  20. # sql = '''select */column from userToken where column="string"'''
  21. # sql语句意为从表格中挑选 符合Boolean语句为真的条件 的 全部内容或某些列项 。
  22. # 如果只是想看students表的id列,eg:select id from students
  23. sql = '''select name from databaseName''' # 查询 databaseName 的 name 列
  24. results = self.c.execute(sql) # 获取结果
  25. # 数据库将返回结果到results变量中
  26. all_dbName = results.fetchall()
  27. # 因为返回的results类似于:<sqlite3.Cursor object at 0x00000257F9100AB0>
  28. # 所以需要使用fetchall函数把所有结果保存到新的变量中
  29. # 值如下[('286.56.2',), ('5255324532',)]
  30. # 加工提取ip数据列表
  31. dbNameList = []
  32. for ip in all_dbName:
  33. dbNameList.append(ip[0])
  34. return dbNameList
  35. def get_settings(self):# 返回setList 数据模型如下[5368709120, 'E:/Flask/database/', 2061360308)]对应总内存,路径,管理账号
  36. # 学习:
  37. # sql = '''select */column from tablename'''
  38. # "*"代表所有内容。此代码我们搜寻“students”表的所有内容。
  39. # sql = '''select */column from userToken where column="string"'''
  40. # sql语句意为从表格中挑选 符合Boolean语句为真的条件 的 全部内容或某些列项 。
  41. # 如果只是想看students表的id列,eg:select id from students
  42. sql = "select * from setting" # 查询 databaseName 的 name 列
  43. results = self.c.execute(sql) # 获取结果
  44. # 数据库将返回结果到results变量中
  45. settings = results.fetchall()
  46. # 因为返回的results类似于:<sqlite3.Cursor object at 0x00000257F9100AB0>
  47. # 所以需要使用fetchall函数把所有结果保存到新的变量中
  48. # 值如下[('5368709120','E:/Flask/database/','2061360308')]对应总内存,路径,管理账号
  49. # 加工提取ip数据列表
  50. setList = []
  51. for set in settings[0]:
  52. setList.append(set)
  53. return setList
  54. def addDatabase(self,name,dbPath):
  55. try:
  56. # 获取所有database的名称
  57. sql = '''select name from databaseName'''
  58. name_result = self.c.execute(sql)
  59. name_result = name_result.fetchall()
  60. nameList = []
  61. for name_exist in name_result:
  62. nameList.append(name_exist[0])
  63. # 判断所要创建的数据库文件是否已经存在
  64. if name in nameList:
  65. return '已存在'
  66. else:
  67. conn = sqlite3.connect(dbPath) # 连接数据库
  68. conn.close()
  69. sql = '''insert into databaseName values(?)'''
  70. self.c.execute(sql, (name,))
  71. self.conn.commit()
  72. return True
  73. except Exception as e:
  74. print(e)
  75. return False
  76. def close(self):
  77. # 关闭数据库
  78. self.conn.commit() # 提交结果
  79. self.conn.close() # 关闭数据库
  80. if __name__ == '__main__':
  81. a = databaseInformation('../database/databaseInformation.db')
  82. print(a.get_databaseName())
  1. # -*- coding: utf-8 -*-
  2. """
  3. @connect: 管理使用的用户
  4. @Time : 2022/7/17-2022/7/18
  5. @python : python3.7 , flask
  6. @File :user.py
  7. @IDE :PyCharm
  8. @Motto:技术总是要日积月累的
  9. """
  10. import sqlite3,random,time
  11. class user:
  12. def __init__(self,dbPath = 'database/user.db'):
  13. self.conn = sqlite3.connect(dbPath) # 连接数据库
  14. #print("数据库打开成功")
  15. self.c = self.conn.cursor() # 新建游标
  16. def get_ipList(self): # 返回ipList 数据模型如下['286.56.2', '5255324532']
  17. # 学习:
  18. # sql = '''select */column from tablename'''
  19. # "*"代表所有内容。此代码我们搜寻“students”表的所有内容。
  20. # sql = '''select */column from userToken where column="string"'''
  21. # sql语句意为从表格中挑选 符合Boolean语句为真的条件 的 全部内容或某些列项 。
  22. # 如果只是想看students表的id列,eg:select id from students
  23. sql = '''select ip from userToken''' # 查询 userToken 的 ip 列
  24. results = self.c.execute(sql) # 获取结果
  25. # 数据库将返回结果到results变量中
  26. all_ip = results.fetchall()
  27. # 因为返回的results类似于:<sqlite3.Cursor object at 0x00000257F9100AB0>
  28. # 所以需要使用fetchall函数把所有结果保存到新的变量中
  29. # 值如下[('286.56.2',), ('5255324532',)]
  30. # 加工提取ip数据列表
  31. ipList = []
  32. for ip in all_ip:
  33. ipList.append(ip[0])
  34. return ipList
  35. def get_tokenList(self): # 返回token 数据模型如下['286.56.2', '5255324532']
  36. # 学习:
  37. # sql = '''select */column from tablename'''
  38. # "*"代表所有内容。此代码我们搜寻“students”表的所有内容。
  39. # sql = '''select */column from userToken where column="string"'''
  40. # sql语句意为从表格中挑选 符合Boolean语句为真的条件 的 全部内容或某些列项 。
  41. # 如果只是想看students表的id列,eg:select id from students
  42. sql = '''select token from userToken''' # 查询 userToken 的 token 列
  43. results = self.c.execute(sql) # 获取结果
  44. # 数据库将返回结果到results变量中
  45. all_token = results.fetchall()
  46. # 因为返回的results类似于:<sqlite3.Cursor object at 0x00000257F9100AB0>
  47. # 所以需要使用fetchall函数把所有结果保存到新的变量中
  48. # 值如下[('286.56.2',), ('5255324532',)]
  49. # 加工提取ip数据列表
  50. tokenList = []
  51. for ip in all_token:
  52. tokenList.append(ip[0])
  53. return tokenList
  54. def creatToken(self):
  55. """
  56. 生成一个指定长度的随机字符串,充当token
  57. """
  58. randomlength = 36 # 定义字符串长度
  59. random_str =''
  60. base_str ='@#$%^&*()_+|=-!~`.,?:;ABCDEFGHIGKLMNOPQRSTUVWXYZabcdefghigklmnopqrstuvwxyz0123456789'
  61. length =len(base_str) -1
  62. for i in range(randomlength):
  63. random_str +=base_str[random.randint(0, length)]
  64. return random_str
  65. def addToken(self,ip,token,power): # 添加新的token
  66. sql = "INSERT INTO userToken values(?,?,?,?)" # 添加值
  67. ts = time.time()
  68. try:
  69. insertResults = self.c.execute(sql, (
  70. ip,token, ts,power)) # 执行sql语句 数据大概像这个样子(ip=string,token=string,时间戳,权限)
  71. # 数据库将返回结果到results变量中
  72. self.conn.commit() # 提交结果
  73. return True
  74. except:
  75. return insertResults
  76. def searchToken(self,ip): # 返回token 数据模型如下['286.56.2', '5255324532']
  77. '''
  78. :param ip: '192.168.0.100'
  79. :return: 'o:c8N3:9vl#bnmMsl#\\LD=%\\83`ZWvGs&dgk'
  80. '''
  81. # 学习:
  82. # sql = '''select */column from tablename'''
  83. # "*"代表所有内容。此代码我们搜寻“students”表的所有内容。
  84. # sql = '''select */column from userToken where column="string"'''
  85. # sql语句意为从表格中挑选 符合Boolean语句为真的条件 的 全部内容或某些列项 。
  86. # 如果只是想看students表的id列,eg:select id from students
  87. sql = '''select token from userToken where ip="%s"'''%ip # 查询 userToken 中 指定token
  88. results = self.c.execute(sql) # 执行sql
  89. # 数据库将返回结果到results变量中
  90. token = results.fetchall()
  91. # 因为返回的results类似于:<sqlite3.Cursor object at 0x00000257F9100AB0>
  92. # 所以需要使用fetchall函数把所有结果保存到新的变量中
  93. # 值如下[('o:c8N3:9vl#bnmMsl#\\LD=%\\83`ZWvGs&dgk',)]
  94. # 加工提取token值
  95. token = token[0][0]
  96. return token
  97. def updateTime(self,ip):
  98. '''
  99. :param ip: '192.168.0.100'
  100. :return: bool
  101. '''
  102. try:
  103. sql = "UPDATE userToken SET time=? WHERE ip=?"
  104. self.c.execute(sql,(time.time(),ip)) # 执行sql
  105. self.conn.commit() # 提交结果
  106. return True
  107. except:
  108. return False
  109. def searchPower(self,token):
  110. '''
  111. :param token: 'o:c8N3:9vl#bnmMsl#\\LD=%\\83`ZWvGs&dgk'
  112. :return: 0
  113. '''
  114. try:
  115. sql = '''select power from userToken WHERE token=?'''
  116. results = self.c.execute(sql,(token,)) # 执行sql
  117. # 数据库将返回结果到results变量中
  118. power = results.fetchall()
  119. # 因为返回的results类似于:<sqlite3.Cursor object at 0x00000257F9100AB0>
  120. # 所以需要使用fetchall函数把所有结果保存到新的变量中
  121. # 值如下[('o:c8N3:9vl#bnmMsl#\\LD=%\\83`ZWvGs&dgk',)]
  122. # 加工提取token值
  123. power = power[0][0]
  124. return power
  125. except:
  126. return None
  127. def close(self):
  128. # 关闭数据库
  129. self.conn.commit() # 提交结果
  130. self.conn.close() # 关闭数据库
  131. if __name__ == '__main__':
  132. a = user('../database/user.db')
  133. print(a.searchPower(''))
'
运行
  1. <!DOCTYPE html>
  2. <html lang="zh">
  3. <head>
  4. <meta charset="UTF-8">
  5. <title>云数据库API</title>
  6. <link rel="shortcut icon" href="static/cloudDatabase.jpg">
  7. </head>
  8. <body>
  9. <div align="center" id = "frame">
  10. <div align="center" id = "main">
  11. <img src="static/top.png">
  12. <div align="center" id = "connect">
  13. <p id = "title">恭喜!您的云数据库API搭建成功。</p>
  14. <p id = "text">
  15. 在线API文档地址:
  16. <a target=_blank href="https://www.apifox.cn/apidoc/shared-93936f73-c065-480b-83b1-5f87f13916ed">点击打开apifox上分享的文档</a>
  17. <br/>
  18. 管理员:
  19. <br>
  20. 管理员联系方式:
  21. <br>
  22. 开发者:盧瞳
  23. <br>
  24. 开发者联系方式:QQ:2061360308
  25. <br>
  26. 原文链接:
  27. <br>
  28. GitHUb项目地址:
  29. </p>
  30. </div>
  31. </div>
  32. </div>
  33. <style>
  34. #frame {
  35. //background-color: #fe7fac;
  36. width: document.body.clientWidth;
  37. height: document.body.clientHeight;
  38. vertical-align:middle;
  39. }
  40. #main {
  41. background-color: #66CCFF;
  42. width: 600px;
  43. height: 100%;
  44. vertical-align:middle;
  45. }
  46. img {
  47. top:50px;
  48. left:50px;
  49. width: 600px;
  50. height: 120px;
  51. float: left;
  52. }
  53. #title {
  54. font-size:28px
  55. }
  56. #text{
  57. text-align:left;
  58. }
  59. </style>
  60. </body>
  61. </html>

再之后是SDK与文档,这两个接口这么能缺呢?

文档这里我推荐Apifox他们的软件很好用界面也简洁易懂,功能也齐全,这里放本文的在线文档链接,有兴趣的同学可以点开翻一翻。文档地址

SDK也就是把上面接口用requests库做个包装,我简单写了一些代码:

  1. # -*- coding: utf-8 -*-
  2. """
  3. @connect: 面向python的云数据库SDK
  4. @Time : 2022/1/18 11:36
  5. @Auth : 技术空间
  6. @File :main.py
  7. @IDE :PyCharm
  8. @Motto:技术总是要日积月累的
  9. """
  10. import time
  11. import requests,json,socket
  12. from urllib import parse
  13. from pythonSdk.operation import operation
  14. class clouldDatabaseSDK:
  15. operation = operation() # 引入一些常用方法
  16. # 请求头
  17. header = {
  18. "Accept": "*/*",
  19. "Accept-Encoding": "gzip, deflate, br",
  20. "Accept-Language": "zh-CN,zh;q=0.9",
  21. "Connection": "keep-alive",
  22. "Content-Type": "application/json",
  23. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/92.0.4515.159 Safari/537.36"
  24. }
  25. severUrl = None # 服务器地址
  26. ip = None # 请求用的本机ip地址
  27. # 获取本机IP
  28. hostname = socket.gethostname()
  29. ip = socket.gethostbyname(hostname)
  30. adminAccount = None # 管理者账户
  31. token = None # 服务器返回零时token
  32. def __init__(self,serverUrl,adminAccount=None):
  33. '''
  34. 初始化SDK
  35. :param url:服务器请求地址
  36. :param adminAccount:账户,没有不填用于验证管理者权限的
  37. '''
  38. self.severUrl = serverUrl
  39. self.adminAccount = adminAccount
  40. # 调用初始化接口获取token
  41. post_json = json.dumps({'userId': self.adminAccount, 'ip': self.ip})
  42. print(post_json)
  43. # 处理返回内容
  44. requestReturn = operation.dealRequestReturn(self.operation,requests.post(parse.urljoin(self.severUrl,"init"), data=post_json, headers=self.header))
  45. # 验证返回内容
  46. if requestReturn.statusCode == 200:
  47. self.token = requestReturn.returnData['token']
  48. else:
  49. print(requestReturn.requestInformation)
  50. def queryDBInformation(self):
  51. '''
  52. 查询数据库信息,需管理员权限
  53. :return:
  54. '''
  55. post_json = json.dumps({'token': self.token})
  56. # 处理返回内容
  57. requestReturn = operation.dealRequestReturn(operation,requests.post(parse.urljoin(self.severUrl, "information"), data=post_json,
  58. headers=self.header))
  59. # 验证返回内容
  60. if requestReturn.statusCode == 200:
  61. return requestReturn
  62. else:
  63. print(requestReturn.requestInformation)
  64. def addDatabase(self,databaseName):
  65. '''
  66. 添加数据库,需管理员权限
  67. :param databaseName:要添加的数据库名称
  68. :return:
  69. '''
  70. post_json = json.dumps({'token': self.token,"databaseName":databaseName,"time":time.time()})
  71. # 处理返回内容
  72. requestReturn = operation.dealRequestReturn(operation,requests.post(parse.urljoin(self.severUrl, "/addDatabase"), data=post_json,
  73. headers=self.header))
  74. # 验证返回内容
  75. if requestReturn.statusCode == 200:
  76. return requestReturn
  77. else:
  78. print(requestReturn.requestInformation)
  79. def notQuerySqlExecute(self,databaseName,tableName,sql,data):
  80. '''
  81. 执行一条非查询的sql语句
  82. :param databaseName: 要操作的数据库名称
  83. :param tableName: 要操作的表名
  84. :param sql: sql语句列表 --> []
  85. :param data: 传入sql语句中参数列表 --> []
  86. :return:
  87. '''
  88. post_json = json.dumps({'token': self.token,"databaseName":databaseName,"tableName":tableName,"sql":sql,"data":data})
  89. # 处理返回内容
  90. requestReturn = operation.dealRequestReturn(operation,requests.post(parse.urljoin(self.severUrl, "/notQuerySqlExecute"), data=post_json,
  91. headers=self.header))
  92. # 验证返回内容
  93. if requestReturn.statusCode == 200:
  94. return requestReturn
  95. else:
  96. print(requestReturn.requestInformation)
  97. def querySqlExecute(self,databaseName,tableName,sql,data):
  98. '''
  99. 执行一条查询的sql语句
  100. :param databaseName: 要操作的数据库名称
  101. :param tableName: 要操作的表名
  102. :param sql: sql语句列表 --> []
  103. :param data: 传入sql语句中参数列表 --> []
  104. :return:
  105. '''
  106. post_json = json.dumps({'token': self.token,"databaseName":databaseName,"tableName":tableName,"sql":sql,"data":data})
  107. # 处理返回内容
  108. requestReturn = operation.dealRequestReturn(operation,requests.post(parse.urljoin(self.severUrl, "/querySqlExecute"), data=post_json,
  109. headers=self.header))
  110. # 验证返回内容
  111. if requestReturn.statusCode == 200:
  112. return requestReturn
  113. else:
  114. print(requestReturn.requestInformation)
  115. if __name__ == '__main__':
  116. # a=clouldDatabaseSDK(serverUrl='http://101.42.163.114:5000/',adminAccount='2061360308')
  117. a = clouldDatabaseSDK(serverUrl='http://192.168.0.101:5000', adminAccount='2061360308')
  118. #a.addDatabase('测试用数据库')
  119. '''create table if not exists 测试用表(roollno real,name text,class real)''',
  120. insertQuary = a.notQuerySqlExecute('测试用数据库','测试用表',
  121. ['''create table if not exists 测试用表(roollno real,name text,class real)''',
  122. '''insert into 测试用表 values(?,?,?)''',
  123. '''insert into 测试用表 values(?,?,?)''',
  124. ],
  125. [
  126. [],
  127. [0,1,2],
  128. ['roollno','name','class']
  129. ]
  130. )
  131. selectQuary = a.querySqlExecute('测试用数据库', '测试用表',
  132. ['''select * from 测试用表''',
  133. ],
  134. [
  135. [],
  136. ]
  137. )
  138. query = a.queryDBInformation()
  139. #print('返回:',query.statusCode,query.returnData)
  140. print(insertQuary.statusCode,insertQuary.returnData)
  141. print(selectQuary.statusCode,selectQuary.returnData)
  1. # -*- coding: utf-8 -*-
  2. """
  3. @connect: 一些常用操作
  4. @Time : 2022/7/17-2022/7/18
  5. @python : python3.7 , flask
  6. @File :operation.py
  7. @IDE :PyCharm
  8. @Motto:技术总是要日积月累的
  9. """
  10. import requests,json,socket
  11. from urllib import parse
  12. class operation:
  13. def __init__(self):
  14. pass
  15. def dealRequestReturn(self,object):
  16. '''
  17. 处理请求返回
  18. :param object: requests.post返回对象
  19. :return: 字典,数组等json.loads数据
  20. '''
  21. requestReturn = object.text # 获取返回json内容
  22. print(requestReturn)
  23. requestReturn = json.loads(requestReturn) # 解析json
  24. #requestReturn = json.loads('{"result": 200, "return": {"token": "", "time": 1658243312.8069735}, "reason": "\u5408\u6cd5\u8bf7\u6c42\uff0c\u8bf7\u6c42\u6210\u529f\u3002", "tips": null}') # 解析json
  25. statusCode = requestReturn['result']
  26. returnData = requestReturn['return']
  27. if statusCode == 200:
  28. requestInformation = '请求成功!\n状态码:' + str(requestReturn['result']) + '\n原因:' + str(requestReturn[
  29. 'reason']) + '\n提示:' + str(requestReturn['tips'])
  30. else:
  31. requestInformation = '请求失败!\n状态码:' + str(requestReturn['result']) + '\n原因:' + str(requestReturn[
  32. 'reason']) + '\n提示:' + str(requestReturn['tips'])
  33. #构建类对象返回
  34. class requestReturn():
  35. '''
  36. 请求返回内容对象
  37. :object statusCode: 状态码
  38. :object returnData:返回的数据
  39. :object requestInformation:返回的提示
  40. '''
  41. def __init__(self,statusCode,returnData,requestInformation):
  42. self.statusCode = statusCode
  43. self.returnData = returnData
  44. self.requestInformation = requestInformation
  45. requestReturn = requestReturn(statusCode,returnData,requestInformation)
  46. return requestReturn
'
运行

代码ok了,想要真正让接口在服务器上跑起来,还需要一些配置:

首先host=“0.0.0.0”这个我们支前准备好了,还有就是在服务器上将你的端口号加进去,因为服务器厂商太多这里不做详细解说,有需要可以百度“Flask公网访问”,这里放一个腾讯云设置方法的教程地址

文章结束,感谢  收藏+关注,希望本文对你有所启发。

具体的源码和SDK我打包放这里了,源码地址

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家小花儿/article/detail/889537
推荐阅读
相关标签
  

闽ICP备14008679号