概述
一: 需求
- 线上数据全量同步到测试环境。
二:脚本
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import datetime
import pymysql
import os
CUR_PATH = os.path.dirname(os.path.abspath(__file__))
# 需要同步的表
table_names = ['table_name_1', 'table_name_2']
source_db_config = {
'db_host': 'XXXXX',
'db_user': 'XXXX',
'db_pwd': 'XXXX',
'db': 'XCXXXCX',
'db_port': 3306,
}
out_db_config = {
'db_host': 'XXXX',
'db_user': 'XXXX',
'db_pwd': 'XXXX',
'db': 'XXXXX',
'db_port': 3306,
}
def get_datas(table_name):
"""源数据库,获取全量数据"""
db = pymysql.connect(source_db_config['db_host'], source_db_config['db_user'], source_db_config['db_pwd'],
source_db_config['db'], source_db_config['dp_port'], charset='utf8')
cursor = db.cursor(pymysql.cursors.DictCursor)
readsql = '''select * from {}'''.format(table_name)
cursor.execute(readsql)
results = cursor.fetchall()
for data in results:
yield data
cursor.close()
db.close()
def format_data(data):
"""数据格式化"""
for k, v in data.items():
if type(v) == datetime.datetime:
data[k] = "'{}'".format(v.strftime('%Y-%m-%d %H:%M:%S'))
elif type(v) == type(v) == datetime.date:
data[k] = "'{}'".format(v.strftime('%Y-%m-%d'))
elif type(v) == unicode:
data[k] = "'{}'".format(v.encode('utf-8'))
return data
def out_put_data(table_name):
write = pymysql.connect(out_db_config['db_host'], out_db_config['db_user'], out_db_config['db_pwd'],
out_db_config['db'], out_db_config['dp_port'], charset='utf8')
for data in get_datas(table_name):
write_cursor = write.cursor()
data = format_data(data)
sql = ','.join(['{}=%s'.format(item) for item in data.keys()])
temp = sql % (tuple(data.values()))
write_sql = '''insert into %s set %s on duplicate key update %s''' % (table_name, temp, temp)
try:
write_cursor.execute(write_sql)
write.commit()
except Exception as e:
print("insert error, err_msg is {}".format(e))
write.rollback()
finally:
write_cursor.close()
write.close()
if __name__ == "__main__":
start_time = time.time()
for table_name in table_names:
out_put_data(table_name=table_name)
end_time = time.time()
cost_time = end_time - start_time
print("cost_time is {}".format(cost_time))
最后
以上就是俏皮毛豆为你收集整理的Mysql全量数据同步工具的全部内容,希望文章能够帮你解决Mysql全量数据同步工具所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
发表评论 取消回复