概述
一、基本操作
1.安装依赖
pip install pymysql
2.基本操作
from pymysql import connect
mysql_conf = {
"host": '127.0.0.1',
"port": 3306,
"user": 'root',
"password": '1234',
"db": 'sqlalchemy_test',
"charset": "utf8",
"cursorclass": pymysql.cursors.DictCursor
}
#创建Connection连接
conn = connect(**mysql_conf)
# 获得Cursor对象
cs1 = conn.cursor()
# 执行insert语句,并返回受影响的行数:添加一条数据
# 增加
count = cs1.execute('sql语句')
#打印受影响的行数
print(count)
count = cs1.execute('sql语句')
print(count)
# # 更新
# count = cs1.execute('update goods_cates set name="机械硬盘" where name="硬盘"')
# # 删除
# count = cs1.execute('delete from goods_cates where id=6')
# 提交之前的操作,如果之前已经之执行过多次的execute,那么就都进行提交
conn.commit()
# 关闭Cursor对象
cs1.close()
# 关闭Connection对象
conn.close()
注:在多任务并发的情况下,cursor不能重复使用,应该重新创建一个连接,否则会引起连接错误
二、创建数据库连接池
1.安装依赖
pip install dbutils
2.导包
from dbutils.pooled_db import PooledDB
import pymysql
3.执行流程
# 创建连接池对象
db_pool = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5, # 链接池中最多闲置的链接,0和None不限制
maxshared=0, # 链接池中最多共享的连接数量,0和None表示不共享。这里的共享是针对多线程共享的,例如设置为3就是三个线程可以共享一个连接。默认为0,每个连接都是专享的,这里设置默认值就好了,这个参数并不常用,可以忽略。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='1234',
database='test',
charset='utf8'
)
# 创建连接
conn = db_pool.connection()
# 创建cursor对象
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# 执行sql语句
cursor.execute('select * from users')
# 获取所有的数据
result = cursor.fetchall()
# 关闭cursor对象
cursor.close()
# 关闭连接,将连接返回连接池
conn.close()
三、开启事务
注:mysql默认事务的隔离级别:可重复读
pymysql在默认是已经自动开启了事务
# mysql真正开启可重复读事务的标志是进行了select语句
# 情景1
with conn.cursor() as cursor: # 事务已经开始了,但没完全开始
sql = "update xxx set xxx where xxx"
affect_row = curor.execute(sql)
#### 在这个节点如果往数据库里插一条数据,这里的slect仍然会查询到提交的数据 ####
sql = "select * from xxx where xxx"
affect_row = cursor.execute(sql)
# 情景2
with conn.cursor() as cursor: # 事务已经开始了,但没完全开始
sql = "select * from xxx where xxx"
affect_row = cursor.execute(sql)
#### 在这个节点如果往数据库里插一条数据,这里的slect不会查询到提交的数据,这里事务已经完全开启####
sql = "update xxx set xxx where xxx"
affect_row = curor.execute(sql)
### 之所以说事务没有完全开始,是因为事务的回滚仍然有效,仍然能回滚掉该事务中未提交到数据库中的数据
四、协程与pymysql实现并发操作数据库
from gevent import monkey
monkey.patch_all()
import logging
from gevent.pool import Pool
import gevent
import pymysql
from dbutils.pooled_db import PooledDB
class CustomLogger:
def __init__(self, platform, path_dir, log_name):
self.log_name = log_name
self.path_dir = str(path_dir)
self.platform = platform
if not os.path.exists(f"./{self.platform}_logs/{self.path_dir}"):
os.makedirs(f"./{self.platform}_logs/{self.path_dir}", exist_ok=True)
# 创建一个记录器对象
_logger = logging.getLogger(self.log_name)
# 设置日志级别
_logger.setLevel(logging.INFO)
# 设置日志的保存路径
logfile = f'./{self.platform}_logs/{self.path_dir}/{self.log_name}.logs'
# 指定文件处理器,发送日志输出到磁盘文件
fh = logging.FileHandler(logfile, encoding='utf-8')
fh.setLevel(logging.INFO)
# 指定流处理器,发送日志输出
sh = logging.StreamHandler()
sh.setLevel(logging.INFO)
# 创建格式化程序,指定日志的输出格式
formatter = logging.Formatter("%(asctime)s 【%(name)s】 n- %(levelname)s: %(message)s")
# 给文件文件处理器和流处理器指定格式化成
fh.setFormatter(formatter)
sh.setFormatter(formatter)
# 添加处理程序对象
_logger.addHandler(fh)
_logger.addHandler(sh)
self.get_logger = _logger
@property
def info(self):
return self.get_logger.info
@property
def error(self):
return self.get_logger.error
@property
def warning(self):
return self.get_logger.warning
p = Pool(10)
db_pool = PooledDB(
creator=pymysql, # 使用链接数据库的模块
maxconnections=10, # 连接池允许的最大连接数,0和None表示不限制连接数
mincached=5, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=0, # 链接池中最多闲置的链接,0和None不限制
maxshared=0, # 链接池中最多共享的连接数量,0和None表示不共享。这里的共享是针对多线程共享的,例如设置为3就是三个线程可以共享一个连接。默认为0,每个连接都是专享的,这里设置默认值就好了,这个参数并不常用,可以忽略。
blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='1234',
database='sqlalchemy_test',
charset='utf8'
)
def f1(n, name, num, db_pool):
conn = db_pool.connection()
custom_logger = CustomLogger(name)
custom_logger.info(f"hello world!!!!!!!!!!!!!{name}")
with conn.cursor() as cursor:
for i in range(n):
update_sql = f"""update user set email={1} where id ={num}"""
custom_logger.info(f"开始执行sql-{name}")
cursor.execute(update_sql)
conn.commit()
custom_logger.info(f"结束执行sql-{name}")
task_list = [p.spawn(f1, 3, "test1", 1, db_pool),
p.spawn(f1, 3, "test2", 2, db_pool),
p.spawn(f1, 3, "test3", 3, db_pool)]
gevent.joinall(task_list)
最后
以上就是无心康乃馨为你收集整理的python操作mysql数据库(pymysql)的全部内容,希望文章能够帮你解决python操作mysql数据库(pymysql)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复