概述
python对postgresql数据库进行批量高效操作
文章目录
- python对postgresql数据库进行批量高效操作
- `executemany`与`execute_batch`、`execute_values`对比
- 批量操作测试
- 创建测试数据表
- 创建数据库操作基类
- 创建数据库具体的操作类
- 数据库操作测试代码
这里使用psycopg2库对postgresql进行批量的增删改查操作,批量操作可以使用 executemany、 execute_batch和 execute_values方法。其中 executemany的效率最差,不建议使用, execute_batch次之, execute_values效率最高。
executemany
与execute_batch
、execute_values
对比
-
executemany(query, vars_list)
对序列vars_list中找到的所有参数元组或映射执行数据库操作(查询或命令),执行速度不比使用循环的execute快。
- sql:要执行的查询。它必须包含一个%s占位符,该占位符将被值列表替换。示例:“插入mytable(id,f1,f2)值%s”。
- var_list:参数列表。
-
execute_batch(cur, sql, argslist, page_size=100)
以较少的服务器往返次数执行语句组,针对argslist中的所有参数集(序列或映射),执行sql数次。
-
cur:用于执行查询的光标。
-
sql:要执行的查询。它必须包含一个%s占位符,该占位符将被值列表替换。示例:“插入mytable(id,f1,f2)值%s”。
-
argslist:序列或字典序列,其中包含要发送给查询的参数。
-
page_size:每个语句中包含的argslist项的最大数量。如果有更多项,函数将执行多个语句。
-
-
execute_values(cur, sql, argslist, template=None, page_size=100, fetch=False)
使用具有一系列参数的VALUES执行语句。
-
cur:用于执行查询的光标。
-
sql:要执行的查询。它必须包含一个%s占位符,该占位符将被值列表替换。示例:“插入mytable(id,f1,f2)值%s”。
-
argslist:序列或字典序列,其中包含要发送给查询的参数。类型和内容必须与template一致。
-
template:要合并到argslist中的每个项目以构成查询的代码段。如果argslist项是序列,则它应该包含位置占位符(例如,(%s,%s,%s,%s)”,或“(%s,%s,42)”如果存在常量值…)。如果argslist项是映射,则它应该包含命名占位符(例如“(%id)s、%(f1)s、42)”。如果未指定,则假设参数是序列,并使用简单的位置模板(即(%s,%s,…)并使用argslist中第一个元素嗅探的占位符数量。
-
page_size:每个语句中包含的argslist项的最大数量。如果有更多项,函数将执行多个语句。
-
fetch:如果为True,则将查询结果返回到列表中(如在fetchall()中)。对于带有返回子句的查询有用。
-
批量操作测试
创建测试数据表
创建一个简单的user_account表。
-- DROP TABLE public.user_account;
CREATE TABLE public.user_account
(
id integer NOT NULL DEFAULT nextval('user_account_id_seq'::regclass),
name character varying(30) COLLATE pg_catalog."default",
fullname character varying COLLATE pg_catalog."default",
CONSTRAINT user_account_pkey PRIMARY KEY (id)
)
创建数据库操作基类
DBBase类封装了数据批量增删改查的方法。execute_by_execute_values
封装了execute_values
方法,execute_by_execute_batch
封装了execute_batch
方法,execute
是普通的执行方法,select
为查询方法。
from typing import Union, List
import psycopg2
import psycopg2.extras
class DBBase:
def __init__(self, database: str, user: str, password: str, port: int, host: str):
self.conn = psycopg2.connect(database=database, user=user, password=password, port=port, host=host)
self.conn.autocommit = False
self.cur = self.conn.cursor()
def close(self):
self.conn.close()
def select(self, sql: str, params: dict = None):
if params:
self.cur.execute(sql, params)
else:
self.cur.execute(sql)
return self.cur.fetchall()
def execute(self, sql: str, params: dict = None):
try:
if params:
self.cur.execute(sql, params)
else:
self.cur.execute(sql)
self.conn.commit()
except Exception as e:
self.conn.rollback()
raise e
def execute_by_execute_values(self, sql: str, data: Union[list, set], template: str = None, page_size: int = 100,
fetch: bool = False):
"""
使用execute_values批量插入、更新,效率比execute_batch和executemany高
"""
try:
psycopg2.extras.execute_values(self.cur, sql, data, template=template, page_size=page_size, fetch=fetch)
self.conn.commit()
except Exception as e:
self.conn.rollback()
raise e
def execute_by_execute_batch(self, sql: str, data: List[dict], page_size: int = 100):
try:
psycopg2.extras.execute_batch(self.cur, sql, data, page_size=page_size)
self.conn.commit()
except Exception as e:
self.conn.rollback()
raise e
创建数据库具体的操作类
DBTest类包含了几种批量操作的方法。
-
批量插入数据的方法:
create_users
、create_users2
和create_users3
。其中create_users
和create_users2
使用execute_by_execute_values
方式,create_users2
还使用了模板,create_users3
则使用execute_by_execute_batch
方式。 -
多ID查询的方法:
get_user2
-
批量删除的方法:
delete_users
-
批量更新的方法:
update_users
class DBTest(DBBase):
def __init__(self, database: str, user: str, password: str, port: int, host: str):
DBBase.__init__(self, database, user, password, port, host)
def create_users(self, data: Union[list, set]):
"""
批量创建user
:param data: user info, 列表或集合 [('user1', 'user1_fullname'),('user2', 'user2_fullname')]
"""
sql = """
INSERT INTO user_account ("name", fullname) VALUES %s
"""
self.execute_by_execute_values(sql, data)
def create_users2(self, data: Union[list, set], fullname: str = "template"):
"""
使用模板批量创建user,即创建的user的fullname都相同
:param fullname: 固定的fullname
:param data: user info, 列表或集合 [('user1',),('user2', )]
"""
sql = """
INSERT INTO user_account ("name", fullname) VALUES %s
"""
self.execute_by_execute_values(sql, data, template=f"(%s, '{fullname}')")
def create_users3(self, data: List[dict]):
"""
批量创建user
:param data: user info, 列表字典 [{'name': 'user1', 'fullname': 'user1'}, {'name': 'user2', 'fullname': 'user2'}]
"""
sql = """
INSERT INTO user_account ("name", fullname) VALUES (%(name)s, %(fullname)s)
"""
self.execute_by_execute_batch(sql, data)
def get_user(self, user_id: int):
"""
通过ID获取user信息
:param user_id: user ID
:return: user info
"""
sql = """
SELECT id, "name", fullname FROM user_account WHERE id = %(user_id)s
"""
return self.select(sql, params={"user_id": user_id})
def get_users(self, users_id: tuple):
"""
通过多个user ID获取user信息
:param users_id: user ID
:return: user info
"""
sql = """
SELECT id, "name", fullname FROM user_account WHERE id IN %(users_id)s
"""
return self.select(sql, params={"users_id": users_id})
def delete_users(self, users_id: tuple):
"""
批量删除user
:param users_id: users ID,元祖或字符,如(1, 2)
"""
sql = """
DELETE FROM user_account WHERE id IN %(users_id)s
"""
self.execute(sql, params={"users_id": users_id})
def update_users(self, data: Union[list, set]):
"""
批量更新user
:param data: user info
"""
sql = """
UPDATE user_account SET "name" = data.name, fullname = data.fullname
FROM (VALUES %s)
AS data (user_id, name, fullname)
WHERE id = data.user_id
"""
self.execute_by_execute_values(sql, data)
数据库操作测试代码
下面代码实现增删改查的批量操作。
同时,对比了execute_batch
和execute_values
的1000000条数据的插入时间(不同计算机时间不同),从结果可见execute_values
的速度更快。
from database import DBTest
def create_users():
# users_data = {('user1', 'user1fullname'), ('user2', 'user2fullname'), ('user3', 'user3fullname')}
users_data = [('user1', 'user1fullname')] * 1000000
start1 = timeit.default_timer()
db.create_users(data=users_data)
end1 = timeit.default_timer()
print(end1 - start1)
# 28.867767100000002
users_data = [{'name': 'user1', 'fullname': 'user1'}] * 1000000
start2 = timeit.default_timer()
db.create_users3(data=users_data)
end2 = timeit.default_timer()
print(end2 - start2)
# 87.6504971
def create_user2():
users_data = {('user1',), ('user2',), ('user3',)}
db.create_users2(data=users_data, fullname="test_user")
def get_user():
print(db.get_user(1))
print(db.get_users((10, 11, 12)))
def update_users():
users_data = {(1, 'user1', 'user1fullname'), (2, 'user2', 'user2fullname'), (3, 'user3', 'user3fullname')}
db.update_users(data=users_data)
def delete_users():
users_data = (7,)
db.delete_users(users_id=users_data)
def main():
create_users()
# get_user()
# create_user2()
# update_users()
# delete_users()
if __name__ == '__main__':
db = DBTest(database='test', user='postgres', password='postgre1234', port=5432, host='localhost')
main()
db.close()
提示:这里仅仅比较了psycopg2库的三种方式,还有其他更高效的批量操作。
最后
以上就是顺利舞蹈为你收集整理的python对postgresql数据库进行批量高效操作python对postgresql数据库进行批量高效操作的全部内容,希望文章能够帮你解决python对postgresql数据库进行批量高效操作python对postgresql数据库进行批量高效操作所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复