我是靠谱客的博主 无心康乃馨,最近开发中收集的这篇文章主要介绍python操作mysql数据库(pymysql),觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

一、基本操作

1.安装依赖

pip install pymysql

2.基本操作

from pymysql import connect
mysql_conf = {
    "host": '127.0.0.1',
    "port": 3306,
    "user": 'root',
    "password": '1234',
    "db": 'sqlalchemy_test',
    "charset": "utf8",
    "cursorclass": pymysql.cursors.DictCursor
}
#创建Connection连接
conn = connect(**mysql_conf)
# 获得Cursor对象
cs1 = conn.cursor()
# 执行insert语句,并返回受影响的行数:添加一条数据
# 增加
count = cs1.execute('sql语句')
#打印受影响的行数
print(count)

count = cs1.execute('sql语句')
print(count)

# # 更新
# count = cs1.execute('update goods_cates set name="机械硬盘" where name="硬盘"')
# # 删除
# count = cs1.execute('delete from goods_cates where id=6')

# 提交之前的操作,如果之前已经之执行过多次的execute,那么就都进行提交
conn.commit()

# 关闭Cursor对象
cs1.close()
# 关闭Connection对象
conn.close()

注:在多任务并发的情况下,cursor不能重复使用,应该重新创建一个连接,否则会引起连接错误

二、创建数据库连接池

1.安装依赖

pip install dbutils

2.导包

from dbutils.pooled_db import PooledDB
import pymysql

3.执行流程

# 创建连接池对象
db_pool = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=0,  # 链接池中最多共享的连接数量,0和None表示不共享。这里的共享是针对多线程共享的,例如设置为3就是三个线程可以共享一个连接。默认为0,每个连接都是专享的,这里设置默认值就好了,这个参数并不常用,可以忽略。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0, # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='1234',
    database='test',
    charset='utf8'
)

# 创建连接
conn = db_pool.connection()

# 创建cursor对象
cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

# 执行sql语句
cursor.execute('select * from users')

# 获取所有的数据
result = cursor.fetchall()

# 关闭cursor对象
cursor.close()

# 关闭连接,将连接返回连接池
conn.close()

三、开启事务

注:mysql默认事务的隔离级别:可重复读

pymysql在默认是已经自动开启了事务

# mysql真正开启可重复读事务的标志是进行了select语句

# 情景1
with conn.cursor() as cursor: # 事务已经开始了,但没完全开始
	sql = "update xxx set xxx where xxx"
	affect_row = curor.execute(sql)
	
	#### 在这个节点如果往数据库里插一条数据,这里的slect仍然会查询到提交的数据 ####
	sql = "select * from xxx where xxx"
	affect_row = cursor.execute(sql)

# 情景2
with conn.cursor() as cursor: # 事务已经开始了,但没完全开始
	sql = "select * from xxx where xxx"
	affect_row = cursor.execute(sql)

	#### 在这个节点如果往数据库里插一条数据,这里的slect不会查询到提交的数据,这里事务已经完全开启####
	sql = "update xxx set xxx where xxx"
	affect_row = curor.execute(sql)


### 之所以说事务没有完全开始,是因为事务的回滚仍然有效,仍然能回滚掉该事务中未提交到数据库中的数据

四、协程与pymysql实现并发操作数据库

from gevent import monkey

monkey.patch_all()
import logging
from gevent.pool import Pool
import gevent

import pymysql
from dbutils.pooled_db import PooledDB


class CustomLogger:
    def __init__(self, platform, path_dir, log_name):
        self.log_name = log_name
        self.path_dir = str(path_dir)
        self.platform = platform
        if not os.path.exists(f"./{self.platform}_logs/{self.path_dir}"):
            os.makedirs(f"./{self.platform}_logs/{self.path_dir}", exist_ok=True)
        # 创建一个记录器对象
        _logger = logging.getLogger(self.log_name)
        # 设置日志级别
        _logger.setLevel(logging.INFO)
        # 设置日志的保存路径
        logfile = f'./{self.platform}_logs/{self.path_dir}/{self.log_name}.logs'

        # 指定文件处理器,发送日志输出到磁盘文件
        fh = logging.FileHandler(logfile, encoding='utf-8')
        fh.setLevel(logging.INFO)

        # 指定流处理器,发送日志输出
        sh = logging.StreamHandler()
        sh.setLevel(logging.INFO)

        # 创建格式化程序,指定日志的输出格式
        formatter = logging.Formatter("%(asctime)s 【%(name)s】 n- %(levelname)s: %(message)s")

        # 给文件文件处理器和流处理器指定格式化成
        fh.setFormatter(formatter)
        sh.setFormatter(formatter)

        # 添加处理程序对象
        _logger.addHandler(fh)
        _logger.addHandler(sh)
        self.get_logger = _logger

    @property
    def info(self):
        return self.get_logger.info

    @property
    def error(self):
        return self.get_logger.error

    @property
    def warning(self):
        return self.get_logger.warning


p = Pool(10)

db_pool = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=10,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=5,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=0,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=0,  # 链接池中最多共享的连接数量,0和None表示不共享。这里的共享是针对多线程共享的,例如设置为3就是三个线程可以共享一个连接。默认为0,每个连接都是专享的,这里设置默认值就好了,这个参数并不常用,可以忽略。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='1234',
    database='sqlalchemy_test',
    charset='utf8'
)


def f1(n, name, num, db_pool):
    conn = db_pool.connection()
    custom_logger = CustomLogger(name)
    custom_logger.info(f"hello world!!!!!!!!!!!!!{name}")
    with conn.cursor() as cursor:
        for i in range(n):
            update_sql = f"""update user set email={1} where id ={num}"""
            custom_logger.info(f"开始执行sql-{name}")
            cursor.execute(update_sql)
            conn.commit()
            custom_logger.info(f"结束执行sql-{name}")


task_list = [p.spawn(f1, 3, "test1", 1, db_pool),
             p.spawn(f1, 3, "test2", 2, db_pool),
             p.spawn(f1, 3, "test3", 3, db_pool)]

gevent.joinall(task_list)

最后

以上就是无心康乃馨为你收集整理的python操作mysql数据库(pymysql)的全部内容,希望文章能够帮你解决python操作mysql数据库(pymysql)所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(39)

评论列表共有 0 条评论

立即
投稿
返回
顶部