Python异步 操作MySQL 连接池,打破数据库瓶颈!提升性能
引言:
这AsyncMysql 类实现的功能是通过异步方式连接、查询、操作 MySQL 数据库,利用 aiomysql 库提供的异步支持来提升数据库操作的效率,尤其适用于需要高并发访问数据库的场景。
准备(mysql数据库信息):
D盘或者其他路径 创建一个文件Database.yaml内容如下:
host: 127.0.0.1
port: 3306
user: sa
password: 密码1
database: mysql
封装代码如下:
import asyncio # 导入 asyncio 库,用于异步编程
import aiomysql # 导入 aiomysql 库,用于异步 MySQL 操作
from Public_Config.PublicConfig import ConfigYaml # 导入配置文件模块,获取 MySQL 配置信息
class AsyncMysql:
def __init__(self):
# 获取 MySQL 配置信息
mysql = self.load_config(r'D:\Database.yaml') # 从配置文件中读取 MySQL 配置
self.host = mysql.get('host') # 主机地址
self.port = int(mysql.get('port')) # 端口,转为整数
self.user = mysql.get('user') # 用户名
self.password = mysql.get('password') # 密码
self.database = mysql('database') # 数据库名
self.pool = None # 初始化连接池为 None,稍后创建连接池
def load_config(self, config_file): #todo 加载配置文件的方法
with open(config_file, 'r') as file: #todo 以只读模式打开配置文件
return yaml.safe_load(file) #todo 使用 yaml.safe_load 解析 YAML 文件内容
async def create_pool(self):
""" 创建 MySQL 连接池 """
try:
# 创建 MySQL 连接池,设置最小连接数 minsize 和最大连接数 maxsize
self.pool = await aiomysql.create_pool(
host=self.host, # 主机
port=self.port, # 端口
user=self.user, # 用户名
password=self.password, # 密码
db=self.database, # 数据库名
autocommit=True, # 自动提交事务
minsize=2, # 设置最小连接数
maxsize=10 # 设置最大连接数
)
except Exception as e:
# 如果创建连接池失败,捕获异常并输出错误信息
print(f"Failed to create MySQL connection pool: {str(e)}")
async def execute_query(self, query):
""" 执行增删改操作 """
try:
# 从连接池中获取一个连接对象
async with self.pool.acquire() as conn:
# 使用连接对象创建游标,用于执行 SQL 查询
async with conn.cursor() as cursor:
await cursor.execute(query) # 执行 SQL 查询
return True # 返回执行成功标志
except Exception as e:
# 如果执行 SQL 失败,捕获异常并输出错误信息
print(f"Failed to execute query: {query}")
print(f"Error: {str(e)}")
return False # 执行失败时返回 False
async def execute_select(self, query):
""" 执行查询操作 """
try:
# 从连接池中获取一个连接对象
async with self.pool.acquire() as conn:
# 使用连接对象创建游标,用于执行 SQL 查询
async with conn.cursor() as cursor:
await cursor.execute(query) # 执行 SQL 查询
results = await cursor.fetchall() # 获取所有查询结果
return results # 返回查询结果
except Exception as e:
# 如果执行查询失败,捕获异常并输出错误信息
print(f"Failed to execute select query: {query}")
print(f"Error: {str(e)}")
return None # 执行失败时返回 None
async def close_pool(self):
""" 关闭 MySQL 连接池 """
if self.pool:
self.pool.close() # 关闭连接池
await self.pool.wait_closed() # 等待连接池完全关闭
print("MySQL connection pool closed.") # 输出连接池已关闭的信息
# 测试异步操作
async def main():
mysql = AsyncMysql() # 创建 AsyncMysql 实例
await mysql.create_pool() # 异步创建连接池
# 定义 SQL 查询语句
query1 = "SELECT * FROM USERLOGINLOG LIMIT 5" # 查询 USERLOGINLOG 表的前 5 条记录
query2 = "SELECT COUNT(*) FROM USERLOGINLOG" # 查询 USERLOGINLOG 表的记录总数
# 使用 asyncio.gather 同时执行多个查询操作
results = await asyncio.gather(
mysql.execute_select(query1), # 执行第一个查询
mysql.execute_select(query2) # 执行第二个查询
)
# 打印查询结果
print("Query 1 results:", results[0]) # 输出第一个查询的结果
print("Query 2 results:", results[1]) # 输出第二个查询的结果
# 关闭连接池
await mysql.close_pool() # 异步关闭连接池
# 运行异步任务
if __name__ == '__main__':
asyncio.run(main()) # 启动并运行 main 异步任务
功能总结
该AsyncMysql 类主要用于处理 MySQL 数据库的连接池管理和异步数据库操作,具有以下特点:
- 连接池管理:通过异步创建和关闭数据库连接池,提高连接的复用性和操作效率。
- 异步操作:使用 aiomysql 和 asyncio 实现非阻塞的数据库操作,可以同时执行多个查询或操作,不会阻塞主程序,适用于高并发环境。
- 增删改查操作支持:支持执行数据库的增删改查操作,且能处理 SQL 查询结果。
- 自动提交事务:每次执行 SQL 操作时,自动提交事务,避免需要手动调用 commit()。
应用场景
这个类适用于以下几种情况:
- 高并发场景:比如需要频繁访问数据库的 Web 应用或服务,异步操作可以有效提高数据库操作的并发性,提升性能。
- 大规模数据查询和操作:尤其是在需要大量数据库查询或更新操作的场景,使用连接池可以减少连接的建立和销毁开销,提升整体效率。
- 异步 Web 框架的数据库访问:比如在使用 FastAPI、Sanic 等异步框架时,通过 aiomysql 可以方便地执行数据库操作,保持程序的高效运行。