当前位置:   article > 正文

基于Flask实现对一张表的增删该查接口_flask 根据传入信息实现对任意表的查询接口

flask 根据传入信息实现对任意表的查询接口

1 创建数据库和表:

CREATE DATABASE IF NOT EXISTS `test`;

CREATE TABLE `person` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(100) NOT NULL,
  `telephone` varchar(100) NOT NULL,
  `sex` char(2) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

2 下载需要的python模块

flask, flask-cors,pymysql

pip install Flask
pip install flask-cors
pip install pymysql
  • 1
  • 2
  • 3

3 代码:

mysqlutil.py文件

import pymysql


def getConn(host, port, username, password, database):
    """获得连接"""
    return pymysql.connect(host=host, port=port, user=username, password=password, database=database, charset='utf8')


def addPerson(name, telephone, sex, conn):
    """增加用户"""
    result = {}
    sql = "insert into person(name, telephone, sex) values ('%s', '%s', '%s')" % (
        name, telephone, sex)
    try:
        cursor = conn.cursor()
        cursor.execute(sql)
        conn.commit()
        print("add successfully!")
    except Exception as e:
        conn.rollback()
        print(e)
        print("fail to add!")
    result['row'] = cursor.rowcount
    return result


def all(conn):
    """获得所有用户"""
    cursor = conn.cursor()
    sql = "select * from person"
    list = []
    try:
        cursor.execute(sql)
        results = cursor.fetchall()
        for row in results:
            temp = {}
            temp['id'] = row[0]
            temp['name'] = row[1]
            temp['telephone'] = row[2]
            temp['sex'] = row[3]
            list.append(temp)
    except Exception as e:
        print(e)
    return list


def findById(conn, id):
    """根据id获得用户"""
    cursor = conn.cursor()
    sql = "select * from person where id='%s'" % (id)
    list = []
    try:
        cursor.execute(sql)
        results = cursor.fetchall()
        for row in results:
            temp = {}
            temp['id'] = row[0]
            temp['name'] = row[1]
            temp['telephone'] = row[2]
            temp['sex'] = row[3]
            list.append(temp)
    except Exception as e:
        print(e)
    return list[0]


def findByName(conn, name):
    """根据name获得用户"""
    cursor = conn.cursor()
    sql = "select * from person where name='%s'" % (name)
    list = []
    try:
        cursor.execute(sql)
        results = cursor.fetchall()
        for row in results:
            temp = {}
            temp['id'] = row[0]
            temp['name'] = row[1]
            temp['telephone'] = row[2]
            temp['sex'] = row[3]
            list.append(temp)
    except Exception as e:
        print(e)
    return list[0]


def deleteAll(conn):
    """删除所有用户"""
    result = {}
    cursor = conn.cursor()
    sql = "delete from person"
    try:
        cursor.execute(sql)
        conn.commit()
    except Exception as e:
        conn.rollback()
        print(e)
    result['row'] = cursor.rowcount
    return result


def deleteById(conn, id):
    """根据Id删除用户"""
    result = {}
    cursor = conn.cursor()
    sql = "delete from person where id=%d " % (id)
    try:
        cursor.execute(sql)
        conn.commit()
    except Exception as e:
        conn.rollback()
        print(e)
    result['row'] = cursor.rowcount
    return result


def updateById(id, name, telephone, sex, conn):
    """根据id更新用户"""
    result = {}
    sql = "update person set name='%s', telephone='%s', sex='%s' where id=%d" % (
        name, telephone, sex, id)
    try:
        cursor = conn.cursor()
        cursor.execute(sql)
        conn.commit()
    except Exception as e:
        conn.rollback()
        print(e)
        print("fail to add!")
    result['row'] = cursor.rowcount
    return result
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131

testmysql.py 文件:

import mysqlutil

def getConn():
    """根据主机ip,端口号,数据库用户名,数据库密码,数据库名获取数据库连接"""
    return mysqlutil.getConn(host='127.0.0.1', port=3306, username='root', password='root', database='test')
  • 1
  • 2
  • 3
  • 4
  • 5

app.py文件:

import json
from flask import Flask
from flask import request
from flask_cors import CORS
import testmysql
import mysqlutil
import json

app = Flask(__name__)
# 解决跨域问题
CORS(app)

@app.route("/")
def hello_world():
    return "<h1>Hello, World!</h1>"

@app.route("/person/all")
def findAllPerson():
    conn = testmysql.getConn()
    list = mysqlutil.all(conn)
    conn.close()
    return json.dumps(list)

@app.route("/person/id/<id>")
def findPersonById(id):
    conn = testmysql.getConn()
    person = mysqlutil.findById(conn, id)
    conn.close()
    return json.dumps(person)

@app.route("/person/delete/id/<id>")
def deletePersonById(id):
    conn = testmysql.getConn()
    result = mysqlutil.deleteById(conn, int(id))
    conn.close()
    return json.dumps(result)

@app.route("/person/add", methods=['POST'])
def addPerson():
    print("addperson")
    name = request.json['name']
    telephone = request.json['telephone']
    sex = request.json['sex']
    conn = testmysql.getConn()
    result = mysqlutil.addPerson(name, telephone, sex, conn)
    conn.close()
    return json.dumps(result)

@app.route("/person/update", methods=["POST"])
def updatePerson():
    id = request.json['id']
    name = request.json['name']
    telephone = request.json['telephone']
    sex = request.json['sex']
    conn = testmysql.getConn()
    result = mysqlutil.updateById(int(id), name, telephone, sex, conn)
    conn.close()
    return json.dumps(result)

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/小舞很执着/article/detail/784651
推荐阅读
相关标签
  

闽ICP备14008679号