当前位置:   article > 正文

python库之sqlparse碎片拾遗_python sqlparse

python sqlparse


1 sqlparse简介

sqlparse有几个最简单的工具:split,format,parse,分别是提取sql单个语句、格式化sql的语句以及解析sql

例:

CREATE TABLE TABLE_TO_CREATE NOLOGGING AS
SELECT	DISTINCT
	A.COLA,
	B.COLB,
	DECODE(A.DECODE_CONDITION, 1, '是', '否') DECODED,
	ROW_NUMBER() OVER(PARTITION BY A.CLASS_CONDITION ORDER BY A.RAND_CONDITION DESC) RN
FROM	FSCRM.TABLE_A A,
	(SELECT * FROM TABLE_C C WHERE C.SOMETHING='SOMETHING' AND C.NUM=1234) B 
WHERE	A.COMPARE_CONDITION=B.COMPARE_CONDITION
AND 	A.NUM NOT IN (1, 2, 3)
AND 	NOT EXISTS (SELECT D.COLD FROM TABLE_D WHERE A.COLA=D.COLD)
ORDER BY A.ORDER_CONDITION
/* COMMENTS */
;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

使用parse后,sql语句被解析成一棵树。这棵树跟常见的树不一样,因为它的父节点,是完全包含了子节点的信息。例如在上面的例子:

import sqlparse
import os
from PyQt5.QtSql import *
import re

# 获取桌面路径

desk_path = os.path.join(os.path.expanduser('~'), "Desktop")

# 文件路径
file_path = desk_path + '/manager/auto_tools_master/pyqt5/db/sample.sql'

if __name__ == '__main__':
    print(file_path)
    with open(file_path, 'r', encoding='utf8', errors='ignore') as sql_file:

        # 读取sql文件并解析,返回对象元祖
        file_parse = sqlparse.parse(sql_file.read().strip())

        # 遍历元祖对象内的sql
        line = 0
        for value in file_parse:
            line += 1
            print(str(value) + "第%s个sql" % line + '\n')
        # 查看解析内容
        for token in file_parse[0].tokens:
            print(type(token), token.ttype, token.value)


  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

处理后结果:

CREATE TABLE TABLE_TO_CREATE NOLOGGING AS
SELECT	DISTINCT
	A.COLA,
	B.COLB,
	DECODE(A.DECODE_CONDITION, 1, '是', '否') DECODED,
	ROW_NUMBER() OVER(PARTITION BY A.CLASS_CONDITION ORDER BY A.RAND_CONDITION DESC) RN
FROM	FSCRM.TABLE_A A,
	(SELECT * FROM TABLE_C C WHERE C.SOMETHING='SOMETHING' AND C.NUM=1234) B
WHERE	A.COMPARE_CONDITION=B.COMPARE_CONDITION
AND 	A.NUM NOT IN (1, 2, 3)
AND 	NOT EXISTS (SELECT D.COLD FROM TABLE_D WHERE A.COLA=D.COLD)
ORDER BY A.ORDER_CONDITION
/* COMMENTS */
;1sql





CREATE TABLE TABLE_TO_CREATE NOLOGGING AS
SELECT	DISTINCT
	A.COLA,
	B.COLB,
	DECODE(A.DECODE_CONDITION, 1, '是', '否') DECODED,
	ROW_NUMBER() OVER(PARTITION BY A.CLASS_CONDITION ORDER BY A.RAND_CONDITION DESC) RN
FROM	FSCRM.TABLE_A A,
	(SELECT * FROM TABLE_C C WHERE C.SOMETHING='SOMETHING' AND C.NUM=1234) B
WHERE	A.COMPARE_CONDITION=B.COMPARE_CONDITION
AND 	A.NUM NOT IN (1, 2, 3)
AND 	NOT EXISTS (SELECT D.COLD FROM TABLE_D WHERE A.COLA=D.COLD)
ORDER BY A.ORDER_CONDITION
/* COMMENTS */
;2sql

<class 'sqlparse.sql.Token'> Token.Keyword.DDL CREATE
<class 'sqlparse.sql.Token'> Token.Text.Whitespace  
<class 'sqlparse.sql.Token'> Token.Keyword TABLE
<class 'sqlparse.sql.Token'> Token.Text.Whitespace  
<class 'sqlparse.sql.Identifier'> None TABLE_TO_CREATE NOLOGGING
<class 'sqlparse.sql.Token'> Token.Text.Whitespace  
<class 'sqlparse.sql.Token'> Token.Keyword AS
<class 'sqlparse.sql.Token'> Token.Text.Whitespace.Newline 

<class 'sqlparse.sql.Token'> Token.Keyword.DML SELECT
<class 'sqlparse.sql.Token'> Token.Text.Whitespace 	
<class 'sqlparse.sql.Token'> Token.Keyword DISTINCT
<class 'sqlparse.sql.Token'> Token.Text.Whitespace.Newline 

<class 'sqlparse.sql.Token'> Token.Text.Whitespace 	
<class 'sqlparse.sql.IdentifierList'> None A.COLA,
	B.COLB,
	DECODE
<class 'sqlparse.sql.IdentifierList'> None (A.DECODE_CONDITION, 1, '是', '否') DECODED,
	ROW_NUMBER
<class 'sqlparse.sql.Identifier'> None () OVER
<class 'sqlparse.sql.Identifier'> None (PARTITION BY A.CLASS_CONDITION ORDER BY A.RAND_CONDITION DESC) RN
<class 'sqlparse.sql.Token'> Token.Text.Whitespace.Newline 

<class 'sqlparse.sql.Token'> Token.Keyword FROM
<class 'sqlparse.sql.Token'> Token.Text.Whitespace 	
<class 'sqlparse.sql.IdentifierList'> None FSCRM.TABLE_A A,
	(SELECT * FROM TABLE_C C WHERE C.SOMETHING='SOMETHING' AND C.NUM=1234) B
<class 'sqlparse.sql.Token'> Token.Text.Whitespace.Newline 

<class 'sqlparse.sql.Where'> None WHERE	A.COMPARE_CONDITION=B.COMPARE_CONDITION
AND 	A.NUM NOT IN (1, 2, 3)
AND 	NOT EXISTS (SELECT D.COLD FROM TABLE_D WHERE A.COLA=D.COLD)

<class 'sqlparse.sql.Token'> Token.Keyword ORDER BY
<class 'sqlparse.sql.Token'> Token.Text.Whitespace  
<class 'sqlparse.sql.Identifier'> None A.ORDER_CONDITION
/* COMMENTS */

<class 'sqlparse.sql.Token'> Token.Punctuation ;

Process finished with exit code 0

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77

parse语句返回的是一个tuple,就算只有一个元素,也是一个tuple,所以我们在使用的时候需要加上下标[0]

引用: https://blog.csdn.net/qq_39607437/article/details/79620383

import sqlparse
import os
import re
import sys
from PyQt5.QtWidgets import QMainWindow, QWidget, QApplication, QMessageBox, QTableView, QHBoxLayout
from PyQt5.QtSql import QSqlDatabase, QSqlQuery, QSqlTableModel

# 获取桌面路径

desk_path = os.path.join(os.path.expanduser('~'), "Desktop")

# 文件路径
file_path = desk_path + '/manager/auto_tools_master/pyqt5/db/sample.sql'


class sqlDemo(QWidget):
    def __init__(self):
        super().__init__()
        self.initUI()

    def initUI(self):
        self.setWindowTitle('demo')
        self.resize(600, 600)
        self.layout = QHBoxLayout()
        self.setLayout(self.layout)

        # ------数据库操作------
        # 1连接数据库
        self.connect_db()
        # 操作数据库
        self.opreate_db()
        # 关闭数据库
        self.close_db()

    def connect_db(self):
        self.db = QSqlDatabase.addDatabase('QSQLITE')
        self.db.setDatabaseName('./test.db')
        # d=self.db.open()
        # print(d)       #显示True 表示连接成功
        if not self.db.open():
            QMessageBox.critical(self, 'database error', self.db.lastError.text(), QMessageBox.Yes | QMessageBox.No,
                                 QMessageBox.Yes)

    def opreate_db(self):
        query = QSqlQuery()
        # #【建立表】
        # query.exec_("create table test(ID int primary key,name varchar(20),url varchar(100))")
        # #【插入数据】
        # query.exec_("insert into test values(1000,'tom','http://www.xx.com')")
        # query.exec_("insert into test values(1001,'marray','http://www.xx.com')")
        # query.exec_("insert into test values(1002,'jack','http://www.xx.com')")
        # #【查询数据】
        query.exec_("select * from test ")
        # 备注:在执行exec_()查询时指针会放在记录集中第一个记录之上所以需要调用next()
        while query.next():
            id = query.value(0)
            name = query.value(1)
            url = query.value(2)
            print('id:' + str(id) + '----name:' + name + '---url:' + url)
        # #【删除数据】
        value = query.exec_("delete from test where ID=1001")
        # print(value) #为true表示删除成功
        if value:
            QMessageBox.information(self, 'delet data', '删除成功', QMessageBox.Ok | QMessageBox.No, QMessageBox.Ok)
        else:
            QMessageBox.information(self, 'delet data', '删除失败', QMessageBox.Ok | QMessageBox.No, QMessageBox.Ok)

        # #【修改数据】
        query.exec_("update test set name='bbb' where id=1002")

    # 关闭数据库
    def close_db(self):
        self.db.close()


if __name__ == '__main__':
    # 测试sqlparse
    print(file_path)
    with open(file_path, 'r', encoding='utf8', errors='ignore') as sql_file:

        # 读取sql文件并解析,返回对象元祖
        file_parse = sqlparse.parse(sql_file.read().strip())

        # 遍历元祖对象内的sql
        line = 0
        for value in file_parse:
            line += 1
            # print(str(value) + "第%s个sql" % line + '\n')
            pass  # TODO:XXX
        # 查看解析内容
        for token in file_parse[0].tokens:
            # print(type(token), token.ttype, token.value)
            pass  # TODO:XXX

    # 测试 PyQt5
    app = QApplication(sys.argv)
    demo = sqlDemo()
    demo.show()
    sys.exit(app.exec_())

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100

引用:https://blog.csdn.net/chengmo123/article/details/95303605

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/从前慢现在也慢/article/detail/540976
推荐阅读
相关标签
  

闽ICP备14008679号