赞
踩
安装 MySQL 模块
在终端输入命令:pip install pymysql
在文件中引入模块
import pymysql
用于建立与数据库的连接
调用 connect() 方法
conn = connect(参数列表)
执行 SQL 语句
调用 Connection 对象的 cursor() 方法
cursor = conn.cursor()
rowcount 只读属性,表示最近一次 execute() 执行后受影响的行数
connection 获得当前连接对象
import pymysql # 引入 MySQL 模块 try: # 建立连接 connect conn = pymysql.connect(host='localhost',port=3306,db='school',user='root',passwd='123456',charset='utf8') print(conn) # <pymysql.connections.Connection object at 0x000001F4BFFB9FF0> # 准备执行的语句,开启游标 cs = conn.cursor() # 插入数据,execute() 写执行的语法 students(sname):表名(字段名) count1 = cs.execute("insert into students(sname) values('王五')") # 返回受影响的行数 print(count1) # 1 # 提交 conn.commit() # 关闭连接 cs.close() conn.close() except Exception as e: print(e)
import pymysql # 引入 MySQL 模块 try: # 建立连接 connect conn = pymysql.connect(host='localhost',port=3306,db='school',user='root',passwd='123456',charset='utf8') print(conn) # <pymysql.connections.Connection object at 0x000001F4BFFB9FF0> # 准备执行的语句,开启游标 cs = conn.cursor() # 修改数据 count2 = cs.execute("update students set sname='赵六' where id = 4") print(count2) # 1 # 提交 conn.commit() # 关闭连接 cs.close() conn.close() except Exception as e: print(e)
import pymysql # 引入 MySQL 模块 try: # 建立连接 connect conn = pymysql.connect(host='localhost',port=3306,db='school',user='root',passwd='123456',charset='utf8') print(conn) # <pymysql.connections.Connection object at 0x000001F4BFFB9FF0> # 准备执行的语句,开启游标 cs = conn.cursor() # 删除数据 count3 = cs.execute("delete from students where id=4") print(count3) # 1 # 提交 conn.commit() # 关闭连接 cs.close() conn.close() except Exception as e: print(e)
import pymysql # 引入 MySQL 模块 try: # 建立连接 connect conn = pymysql.connect(host='localhost',port=3306,db='school',user='root',passwd='123456',charset='utf8') print(conn) # <pymysql.connections.Connection object at 0x000001F4BFFB9FF0> # 准备执行的语句,开启游标 cs = conn.cursor() # 从用户获取学生姓名输入 sname = input("请输入学生姓名:") # 准备 SQL 参数列表,这里只包含学生姓名 params = [sname] # 插入数据 count = cs.execute('insert into students(sname) values(%s)', params) print(count) # 1 # 提交 conn.commit() # 关闭连接 cs.close() conn.close() except Exception as e: print(e)
import pymysql # 引入 MySQL 模块 try: # 建立连接 connect conn = pymysql.connect(host='localhost',port=3306,db='school',user='root',passwd='123456',charset='utf8') print(conn) # <pymysql.connections.Connection object at 0x000001F4BFFB9FF0> # 准备执行的语句,开启游标 cs = conn.cursor() # 查询一行数据 cs.execute("select * from students") # fetchone() 执行查询语句时,获取查询结果集的第一个行数据,返回一个元组 result = cs.fetchone() print(result) # (1, '张三') # 提交 conn.commit() # 关闭连接 cs.close() conn.close() except Exception as e: print(e)
import pymysql # 引入 MySQL 模块 try: # 建立连接 connect conn = pymysql.connect(host='localhost',port=3306,db='school',user='root',passwd='123456',charset='utf8') print(conn) # <pymysql.connections.Connection object at 0x000001F4BFFB9FF0> # 准备执行的语句,开启游标 cs = conn.cursor() # 查询多行数据 cs.execute("select * from students") # fetchall() 执行查询时,获取结果集的所有行,一行构成一个元组,再将这些元组装入一个元组返回 result1 = cs.fetchall() print(result1) # ((1, '张三'), (2, '李四')) # 提交 conn.commit() # 关闭连接 cs.close() conn.close() except Exception as e: print(e)
封装查询功能
# 导入 pymysql 模块,用于连接 MySQL 数据库 import pymysql # 创建 SelectMysql 类 class SelectMysql: def __init__(self): # 配置数据库连接信息,包括主机、端口、用户名、密码、数据库名 db_config = { 'host': '127.0.0.1', # 主机 'port': 3306, # 端口 'user': 'root', # 用户名 'password': '123456', # 密码 'db': 'school', # 数据库名 } # 连接 MySQL 数据库 self.conn = pymysql.Connect(**db_config) # 创建游标 self.cur = self.conn.cursor() # 定义查询方法,传入 SQL 语句和可选的返回记录数 def select_database(self, sql , one=None): try: # 执行数据库操作,执行传入的 SQL 语句 self.cur.execute(sql) except Exception: # 当 SQL 语句错误时回退事务 self.conn.rollback() else: # fetchall(self):接收全部的返回结果行 # fetchmany(self, size=None):接收 size 条返回结果行 # 如果 size 的值大于返回的结果行的数量,则会返回 cursor.arraysize 条数据 if not one==None: # 如果传入了返回记录数 one,则获取指定数量的记录 print(self.cur.fetchmany(int(one))) else: # 否则获取一条记录 print(self.cur.fetchone()) # 提交事务 self.conn.commit() finally: # 关闭游标 self.cur.close() # 关闭数据库连接 self.conn.close() # 创建 SelectMysql 的实例 mysqltest mysqltest = SelectMysql() # 准备查询语句 sql = "select * from students" # 调用 select_database 方法执行查询,参数 5 表示获取 5 条记录 mysqltest.select_database(sql, 5)
封装增删改查功能
# 导入 pymysql 模块,用于连接 MySQL 数据库 import pymysql # 创建 MySQLHelper 类 class MySQLHelper: # 初始化方法,用于创建数据库连接和游标 def __init__(self, host, port, user, password, db): # 参数说明: # host: 数据库主机地址 # port: 数据库端口号 # user: 数据库用户名 # password: 数据库密码 # db: 数据库名称 self.conn = pymysql.connect( # 创建数据库连接 host=host, # 数据库主机地址 port=port, # 数据库端口号 user=user, # 数据库用户名 password=password, # 数据库密码 db=db, # 数据库名称 charset='utf8' # 字符编码设置为 utf8 ) self.cur = self.conn.cursor() # 创建游标 # 执行查询语句方法 def execute_query(self, sql, params=None, fetch_one=False): try: # 参数说明: # sql: SQL 查询语句 # params: SQL 查询语句中的参数,可以为 None # fetch_one: 是否只获取一条记录,默认为 False self.cur.execute(sql, params) # 执行 SQL 查询语句 if fetch_one: # 如果 fetch_one 为 True result = self.cur.fetchone() # 获取一条记录 else: result = self.cur.fetchall() # 获取所有记录 return result # 返回查询结果 except Exception as e: print(f"Error: {e}") # 捕获异常并打印错误信息 return None # 执行更新语句方法 def execute_update(self, sql, params=None): try: # 参数说明: # sql: SQL 更新语句 # params: SQL 更新语句中的参数,可以为 None self.cur.execute(sql, params) # 执行 SQL 更新语句 self.conn.commit() # 提交事务,使更新生效 return self.cur.rowcount # 返回受影响的行数 except Exception as e: self.conn.rollback() # 回滚事务,以保证数据的一致性 print(f"Error: {e}") # 捕获异常并打印错误信息 return -1 # 返回错误标志 def close(self): # 关闭方法,用于关闭游标和数据库连接 self.cur.close() # 关闭游标 self.conn.close() # 关闭数据库连接 # 使用示例 if __name__ == "__main__": # 初始化 MySQLHelper 实例 helper = MySQLHelper( host='127.0.0.1', port=3306, user='root', password='123456', db='school' ) # 查询示例 select_sql = "select * from students where id = %s" # 定义 SQL 查询语句,%s 是占位符 result = helper.execute_query(select_sql, (1,)) # 执行查询,参数 (1,) 传入了占位符的值 print(result) # 打印查询结果 # 更新示例 update_sql = "update students set sname = %s where id = %s" # 定义 SQL 更新语句,两个占位符 %s rows_affected = helper.execute_update(update_sql, ('NewName', 1)) # 执行更新,参数 ('NewName', 1) 传入了占位符的值 print(f"受影响的行数: {rows_affected}") # 打印受影响的行数 # 插入示例 insert_sql = "insert into students (sname) values (%s)" # 定义 SQL 插入语句,一个占位符 %s rows_inserted = helper.execute_update(insert_sql, ('NewStudent',)) # 执行插入,参数 ('NewStudent',) 传入了占位符的值 print(f"插入的行数: {rows_inserted}") # 打印插入的行数 # 删除示例 delete_sql = "delete from students where id = %s" # 定义 SQL 删除语句,%s 是占位符 rows_deleted = helper.execute_update(delete_sql, (1,)) # 执行删除,参数 (1,) 传入了占位符的值 print(f"删除的行数: {rows_deleted}") # 打印删除的行数 # 关闭数据库连接 helper.close()
安装 Redis 模块
在终端输入命令:pip install redis
在文件中引入模块
import redis
# 导入 redis 模块,用于连接 Redis 数据库 import redis # 创建一个 Redis 连接实例,连接本地的 Redis 服务器,decode_responses 参数用于指定返回的结果为字符串类型 red = redis.StrictRedis(host='127.0.0.1', decode_responses=True) # 打印 red.keys() 的类型,通常返回的是一个列表 print(type(red.keys())) # 使用 red.keys() 获取所有的键名并打印 print(red.keys()) # string # # 使用 red.set() 方法设置键为 'name' 的值为 'haha' # red.set('name', 'haha') # 使用 red.set() 方法再次设置键 'name' 的值为 'heihei',会覆盖之前的值 red.set('name', 'heihei') # 使用 red.get() 方法获取键 'name' 的值 ret = red.get('name') print(ret) # # 使用 red.append() 方法向键 'name' 的值后追加字符串 ' lee' # red.append('name', ' lee') # # 使用 red.type() 方法获取键 'name' 的数据类型 # print(red.type('name')) # # 使用 red.delete() 方法删除键 'name' # red.delete('name') # # 再次尝试获取键 'name' 的值,返回 None,因为已经被删除 # print(red.get('name')) # # 使用 red.exists() 方法判断键 'name' 是否存在,返回 False,因为已经被删除 # print(red.exists('name')) # # 使用 red.mset() 方法同时设置多个键值对 # red.mset({'abc': 123, 'bcd': 234}) # # 使用 red.mget() 方法获取多个键的值 # print(red.mget('abc', 'bcd')) # 使用 red.keys() 方法获取所有的键名并打印 print(red.keys()) # 使用 red.rename() 方法将键 'name' 重命名为 'new_ex_name' red.rename('name', 'new_ex_name') # 再次使用 red.keys() 方法获取所有的键名并打印 print(red.keys()) # 使用 red.ttl() 方法获取键 'name' 的剩余过期时间 print(red.ttl('name')) # 设置过期时间 # 使用 red.set() 方法设置键 'name' 的值为 'qiye',并指定过期时间为10秒 red.set('name', 'qiye', ex=10) # 使用 red.get() 方法获取键 'name' 的值 print(red.get("name")) # list # 使用 red.lpush() 方法向列表 'list_name' 左侧插入元素 red.lpush("mylist", 10, 20, 20) # 使用 red.lrange() 方法获取列表 'list_name' 中的所有元素 print(red.lrange("mylist", 0, -1)) # 使用 red.rpush() 方法向列表 'list_name' 右侧插入元素 red.rpush("mylist", 40, 50) # 使用 red.lrange() 方法再次获取列表 'list_name' 中的所有元素 print(red.lrange("mylist", 0, -1)) # 使用 red.llen() 方法获取列表 'list_name' 的长度 print(red.llen('mylist')) # 使用 red.linsert() 方法在列表 'list_name' 中,在元素40之前插入字符串'蓝桥' red.linsert("mylist", "before", 40, "蓝桥") # 使用 red.lrange() 方法获取列表 'list_name' 中的所有元素 print(red.lrange("mylist", 0, -1)) # 使用 red.lrem() 方法从列表 'list_name' 中删除指定值10,最多删除1次 red.lrem("mylist", 1, 10) # 使用 red.lrange() 方法获取列表 'list_name' 中的所有元素 print(red.lrange("mylist", 0, -1)) # 使用 red.lpop() 方法从列表 'list_name' 的左侧弹出一个元素 print(red.lpop("mylist")) # 使用 red.lindex() 方法获取列表 'list_name' 中索引为1的元素 print(red.lindex("mylist", 1)) # 哈希 # 使用 red.hset() 方法设置哈希表 'hash_name' 中的键 'name' 的值为 'qiye' red.hset('hash_name', 'name', 'qiye') # 使用 red.hget() 方法获取哈希表 'hash_name' 中键 'name' 的值 print(red.hget('hash_name', 'name')) # set # 使用 red.sadd() 方法向集合 'myset' 中添加元素 'a'、'b'、'c' red.sadd("myset", "a", "b", "c") # 使用 red.sadd() 方法向集合 'yourset' 中添加元素 'b'、'c'、'd' red.sadd("yourset", "b", "c", "d") # 使用 red.smembers() 方法获取集合 'myset' 中的所有成员 print(red.smembers('myset')) # 使用 red.srem() 方法从集合 'myset' 中删除元素'3' red.srem('set_name', 3) # 使用 red.scard() 方法获取集合 'myset' 的基数(元素个数) print(red.scard('myset')) # 使用 red.sdiff() 方法获取集合 'myset' 与 'yourset' 的差集 print(red.sdiff("myset", "yourset")) # 使用 red.sinter() 方法获取集合 'myset' 与 'yourset' 的交集 print(red.sinter("myset", "yourset")) # 使用 red.sismember() 方法判断元素 'a' 是否存在于集合 'myset' 中 print(red.sismember("myset", "a")) # 使用 red.sismember() 方法判断元素 'a' 是否存在于集合 'yourset' 中 print(red.sismember("yourset", "a")) # zset # 使用 red.zadd() 方法向有序集合 'zzset_name' 中添加元素,同时指定元素的分值 red.zadd("zzset_name", {"a": 1, "b": 2, "c": 3, "d": 4}) # 使用 red.zrange() 方法获取有序集合 'zzset_name' 中的所有元素,并按照分值升序排列 print(red.zrange('zzset_name', 0, -1)) # 使用 red.zcard() 方法获取有序集合 'zzset_name' 的基数(元素个数) print(red.zcard('zzset_name')) # 使用 red.zcount() 方法获取有序集合 'zzset_name' 中分值在3到4之间的元素个数 print(red.zcount('zzset_name', 3, 4)) # 使用 red.zscore() 方法获取有序集合 'zzset_name' 中元素 'b' 的分值 print(red.zscore('zzset_name', 'b')) # 使用 red.zrem() 方法从有序集合 'zzset_name' 中删除元素 'a' red.zrem('zzset_name', 'a') # 使用 red.zrange() 方法获取有序集合 'zzset_name' 中的所有元素 print(red.zrange('zzset_name', 0, -1))
# 导入 redis 模块,用于连接 Redis 数据库 import redis # 创建 RedisHelper 类 class RedisHelper: def __init__(self, host='localhost', port=6379, db=0): """ 初始化 Redis 连接 Args: host (str): Redis 服务器主机名,默认为 'localhost' port (int): Redis 服务器端口,默认为 6379 db (int): Redis 数据库索引,默认为 0 """ self.conn = redis.StrictRedis(host=host, port=port, db=db, decode_responses=True) def add(self, key, value): """ 向 Redis 中添加一个键值对 Args: key (str): 要添加的键 value (str): 要添加的值 """ self.conn.set(key, value) def update(self, key, new_value): """ 修改 Redis 中指定键的值 Args: key (str): 要修改的键 new_value (str): 新的值 """ if self.conn.exists(key): self.conn.set(key, new_value) else: print(f"键 '{key}' 不存在于 Redis 中。") def delete(self, key): """ 从 Redis 中删除指定键 Args: key (str): 要删除的键 """ if self.conn.exists(key): self.conn.delete(key) else: print(f"键 '{key}' 不存在于 Redis 中。") def get(self, key): """ 从 Redis 中获取指定键的值 Args: key (str): 要获取的键 Returns: str: 键对应的值,如果键不存在则返回 None """ return self.conn.get(key) # 使用示例 if __name__ == "__main__": helper = RedisHelper() # 添加键值对 helper.add("name", "Alice") # 查询键值对 result = helper.get("name") print(result) # 输出: Alice # 修改键值对 helper.update("name", "Bob") # 查询修改后的值 result = helper.get("name") print(result) # 输出: Bob # 删除键值对 helper.delete("name") # 查询删除后的值 result = helper.get("name") print(result) # 输出: None
是一种轻量级的数据交换格式,常用于数据的序列化和跨语言传输。
JSON 数据是一个键值对的集合,类似于 Python 字典。
一个 JSON 对象可以包含零个或多个键值对,每个键值对由一个键(字符串)和一个值(可以是字符串、数字、布尔值、对象、数组、null)组成。
JSON 中的值可以是嵌套的,从而构成复杂的数据结构。
{
"name": "John",
"age": 30,
"isStudent": false,
"address": {
"street": "123 Main St",
"city": "New York"
},
"hobbies": ["Reading", "Music", "Travel"],
"isMarried": null
}
# 导入 Python 的 JSON 模块 import json # 创建一个 Python 字典 dict_a 包含三个键值对 dict_a = {'name':'qiye', 'age': None, 'isMan': True} # 使用 json.dumps 将 Python 字典 dict_a 转换为 JSON 格式的字符串,并存储在 json_a 中 json_a = json.dumps(dict_a) # 打印输出 json_a 的类型和内容 print(type(json_a), json_a) # 使用 json.loads 将 JSON 字符串 json_a 转换回 Python 字典,并存储在 new_dict_a 中 new_dict_a = json.loads(json_a) # 打印输出 new_dict_a 的类型和内容 print(type(new_dict_a), new_dict_a) # 使用 with 打开一个文件 'tt.json' 以供写入操作,确保操作结束后文件会自动关闭 with open('tt.json', 'w') as f: # 使用 json.dump 将 Python 字典 dict_a 写入文件 f,以 JSON 格式保存 json.dump(dict_a, f) # 使用 with 打开文件 'tt.json' 以供读取操作,确保操作结束后文件会自动关闭 with open('tt.json', 'r') as f1: # 使用 json.load 从文件 f1 中读取 JSON 数据并解析为 Python 字典,存储在 new_dict_b 中 new_dict_b = json.load(f1) # 打印输出 new_dict_b print(new_dict_b)
记录学习过程,欢迎讨论交流,尊重原创,转载请注明出处~
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。