赞
踩
CREATE DATABASE IF NOT EXISTS `test`;
CREATE TABLE `person` (
`id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(100) NOT NULL,
`telephone` varchar(100) NOT NULL,
`sex` char(2) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
flask, flask-cors,pymysql
pip install Flask
pip install flask-cors
pip install pymysql
mysqlutil.py文件
import pymysql def getConn(host, port, username, password, database): """获得连接""" return pymysql.connect(host=host, port=port, user=username, password=password, database=database, charset='utf8') def addPerson(name, telephone, sex, conn): """增加用户""" result = {} sql = "insert into person(name, telephone, sex) values ('%s', '%s', '%s')" % ( name, telephone, sex) try: cursor = conn.cursor() cursor.execute(sql) conn.commit() print("add successfully!") except Exception as e: conn.rollback() print(e) print("fail to add!") result['row'] = cursor.rowcount return result def all(conn): """获得所有用户""" cursor = conn.cursor() sql = "select * from person" list = [] try: cursor.execute(sql) results = cursor.fetchall() for row in results: temp = {} temp['id'] = row[0] temp['name'] = row[1] temp['telephone'] = row[2] temp['sex'] = row[3] list.append(temp) except Exception as e: print(e) return list def findById(conn, id): """根据id获得用户""" cursor = conn.cursor() sql = "select * from person where id='%s'" % (id) list = [] try: cursor.execute(sql) results = cursor.fetchall() for row in results: temp = {} temp['id'] = row[0] temp['name'] = row[1] temp['telephone'] = row[2] temp['sex'] = row[3] list.append(temp) except Exception as e: print(e) return list[0] def findByName(conn, name): """根据name获得用户""" cursor = conn.cursor() sql = "select * from person where name='%s'" % (name) list = [] try: cursor.execute(sql) results = cursor.fetchall() for row in results: temp = {} temp['id'] = row[0] temp['name'] = row[1] temp['telephone'] = row[2] temp['sex'] = row[3] list.append(temp) except Exception as e: print(e) return list[0] def deleteAll(conn): """删除所有用户""" result = {} cursor = conn.cursor() sql = "delete from person" try: cursor.execute(sql) conn.commit() except Exception as e: conn.rollback() print(e) result['row'] = cursor.rowcount return result def deleteById(conn, id): """根据Id删除用户""" result = {} cursor = conn.cursor() sql = "delete from person where id=%d " % (id) try: cursor.execute(sql) conn.commit() except Exception as e: conn.rollback() print(e) result['row'] = cursor.rowcount return result def updateById(id, name, telephone, sex, conn): """根据id更新用户""" result = {} sql = "update person set name='%s', telephone='%s', sex='%s' where id=%d" % ( name, telephone, sex, id) try: cursor = conn.cursor() cursor.execute(sql) conn.commit() except Exception as e: conn.rollback() print(e) print("fail to add!") result['row'] = cursor.rowcount return result
testmysql.py 文件:
import mysqlutil
def getConn():
"""根据主机ip,端口号,数据库用户名,数据库密码,数据库名获取数据库连接"""
return mysqlutil.getConn(host='127.0.0.1', port=3306, username='root', password='root', database='test')
app.py文件:
import json from flask import Flask from flask import request from flask_cors import CORS import testmysql import mysqlutil import json app = Flask(__name__) # 解决跨域问题 CORS(app) @app.route("/") def hello_world(): return "<h1>Hello, World!</h1>" @app.route("/person/all") def findAllPerson(): conn = testmysql.getConn() list = mysqlutil.all(conn) conn.close() return json.dumps(list) @app.route("/person/id/<id>") def findPersonById(id): conn = testmysql.getConn() person = mysqlutil.findById(conn, id) conn.close() return json.dumps(person) @app.route("/person/delete/id/<id>") def deletePersonById(id): conn = testmysql.getConn() result = mysqlutil.deleteById(conn, int(id)) conn.close() return json.dumps(result) @app.route("/person/add", methods=['POST']) def addPerson(): print("addperson") name = request.json['name'] telephone = request.json['telephone'] sex = request.json['sex'] conn = testmysql.getConn() result = mysqlutil.addPerson(name, telephone, sex, conn) conn.close() return json.dumps(result) @app.route("/person/update", methods=["POST"]) def updatePerson(): id = request.json['id'] name = request.json['name'] telephone = request.json['telephone'] sex = request.json['sex'] conn = testmysql.getConn() result = mysqlutil.updateById(int(id), name, telephone, sex, conn) conn.close() return json.dumps(result)
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。