概述
python3----mysql线程池工具封装
#!/usr/python/bin/python3
# -*- coding:utf-8 -*-
import logging
from queue import Queue
import pymysql
LOG = logging.getLogger(__name__)
class ConnectionPool(object):
def __init__(self, **kwargs):
self.size = kwargs.get('size', 10)
self.kwargs = kwargs
self.conn_queue = Queue(maxsize=self.size)
for i in range(self.size):
self.conn_queue.put(self._create_new_conn())
def _create_new_conn(self):
return pymysql.connect(host=self.kwargs.get('host', '127.0.0.1'),
user=self.kwargs.get('user'),
db=self.kwargs.get('dbname'),
passwd=self.kwargs.get('password'),
port=self.kwargs.get('port'),
connect_timeout=5)
def _put_conn(self, conn):
self.conn_queue.put(conn)
def _get_conn(self):
conn = self.conn_queue.get()
if conn is None:
self._create_new_conn()
return conn
def exec_sql(self, sql):
conn = self._get_conn()
try:
cur = conn.cursor()
cur.execute(sql)
return cur.fetchall()
except pymysql.ProgrammingError as e:
LOG.error('execute sql {{0}} error {1}'.format(sql, e))
raise e
except pymysql.OperationalError as e:
conn = self._create_new_conn()
raise e
finally:
self._put_conn(conn)
def __del__(self):
try:
while True:
conn = self.conn_queue.get_nowait()
if conn:
conn.close()
except Exception as e:
pass
if __name__ == '__main__':
mysql_tool = ConnectionPool(
host='127.0.0.1',
user='root',
password='******',
dbname = 'mysqldb',
port=3306,
size=5
)
res = mysql_tool.exec_sql('select * from t_user')
print(res)
请勿重复造轮子,拿走不谢!!!
最后
以上就是寒冷芹菜为你收集整理的python3----mysql线程池工具封装的全部内容,希望文章能够帮你解决python3----mysql线程池工具封装所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复