我是靠谱客的博主 精明菠萝,最近开发中收集的这篇文章主要介绍Mysql数据库增量式备份,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

需求:

一般比较大型的项目,它的数据库的数据量可能高达几十G以上,如果每天都需要进行备份,就会消耗很大的磁盘空间,对于一些很少机会再使用的数据,可以不用每天都备份,一周备份一次就好。

思路:

既然标题说增量式备份,那就肯定不会直接mysqldump简单导出整个数据库,首先,把备份分成两种备份模式:完整备份和增量备份。

  • 完整备份:一周只备份一次,只备份要增量备份的那些数据表

  • 增量备份:每天都备份一次,这里比较复杂,下面详细说

下面举一个例子,让大家更好地理解:
假设有一个数据库叫OA,它有4个数据库表:

  • user表,数据量:100 MB
  • 签到表,数据量:10 GB
  • 活动表,数据量:10 MB
  • 会议表,数据量:100 MB

签到表的数据量太多了,对它进行增量备份,其他的表进行完整备份,下面对这个数据库进行备份:
增量式备份之前,必须先进行完整备份。
首先,进行完整备份,导出签到表的所有数据(不包括表结构),压缩并上传到七牛云,保存上传信息到本地的一个配置文件,这个配置文件记录了完整备份的上传信息和增量备份的进度。接着,进行增量备份:

  1. 导出整个数据库的结构,不包括数据,压缩、上传;
  2. 导出除了增量表之外的表的数据(不包括表结构),压缩,上传;
  3. 根据设置的查询条件,导出增量表的增量数据。

完整代码:
此备份是基于Django项目,其他框架可以进行适当的修改,QiNiu可以到七牛云下载并import进来

# -*- coding: utf-8 -*-
import re
import os
import json
import logging
import requests
from django.conf import settings
from django.db import connection
from django.utils import timezone
from yz_lib.utils.qi_niu import QiNiu
from yz_lib.utils.others import sha1_file
__author__ = 'JayChen'
logger = logging.getLogger('scripts')
class YZBackUp(object):
def __init__(self, increase_models=None, force_update=False, full_backup_day=0, zip_password=None, cmd_args=None,
abandoned_models=None):
self.full_backup_day = full_backup_day
self.increase_models = increase_models if increase_models else []
self.abandoned_models = abandoned_models if abandoned_models else []
self.force_update = force_update
self.db = settings.DATABASES['default']['NAME']
self.domain = re.sub(r'W', '_', settings.SITE_DOMAIN)
self._tm = timezone.now().strftime('%Y%m%d_%H%M%S')
# 根据USE_TZ选择不同时区的时间
self._dir = os.path.join(settings.PROJECT_PATH, 'media', 'tmp', '{}_{}'.format(self.domain, self._tm))
self.zip_password = '-p{}'.format(zip_password) if zip_password else ''
self.cmd_args = cmd_args if cmd_args else ''
self.cursor = connection.cursor()
self.ignore_tables = [{"table": m['model']._meta.db_table, "query": m['query']} for m in self.increase_models]
self.abandoned_tables = [m._meta.db_table for m in self.abandoned_models]
self.config_file = os.path.join(settings.PROJECT_PATH, 'media', 'tmp', self.domain + '_backup_config.json')
os.mkdir(self._dir)
def get_tables(self) -> list:
"""
获取数据库所有表名的列表
Return:
result:
数据库所有表名的列表
"""
result = []
sql = "SELECT table_name FROM information_schema.tables WHERE table_schema='{}'".format(self.db)
self.cursor.execute(sql)
for item in self.cursor.fetchall():
result.append(item[0])
return result
def upload(self, zip_file: str, remove_after_finish=True):
"""
上传压缩文件到七牛云服务器
Args:
zip_file:
要上传的文件的路径
remove_after_finish:
文件上传后是否删除,默认为True,即删除
Return:
result:
文件上传信息
"""
qn = QiNiu()
key = sha1_file(zip_file)
logger.info('准备上传: {}'.format(zip_file))
logger.info('文件 SHA1: {}'.format(key))
ret, info = qn.upload_file(
zip_file,
key,
bucket_name=settings.QINIU_BACKUP_BUCKET if hasattr(settings, 'QINIU_BACKUP_BUCKET') else 'media'
)
logger.info('上传结果 ret: {}'.format(json.dumps(ret)))
logger.info('上传结果 info: {}'.format(info.text_body))
# 转低频存储
ret, info = qn.change_type(key, 1)
logger.info('转低频结果 ret: {}'.format(ret))
data = {'name': os.path.basename(zip_file)}
resp = requests.post('https://{}/api/v1/upload/{}/link/'.format(settings.PUBLIC_DOMAIN, key), data=data,
verify=False)
resp.raise_for_status()
if remove_after_finish:
os.remove(zip_file)
result = resp.json()
logger.info('上传结果: {}'.format(json.dumps(result, indent=4)))
if 'location' in result:
result['location'] = 'http://{}{}'.format(settings.PUBLIC_DOMAIN, result['location'])
result.pop('link')
return result
def compress_file(self, sql_file, sql_7z_file):
"""
压缩文件
Args:
sql_file:
要压缩的文件的路径
sql_7z_file:
保存压缩生成的文件的路径
Return:
sql_7z_file:
保存压缩生成的文件的路径
"""
os.system('7z a {} {} {}'.format(sql_7z_file, sql_file, self.zip_password))
os.remove(sql_file)
return sql_7z_file
def query(self, params: dict):
"""
通过API查询数据
Args:
params:
API接口查询数据的参数
Return:
data:
接口返回的json数据
"""
query_url = 'https://{}/api/v1/uploads/'.format(settings.PUBLIC_DOMAIN)
resp = requests.get(query_url, params=params,
headers={'Authorization': 'Token {}'.format(settings.PUB_API_TOKEN)})
resp.raise_for_status()
data = resp.json()
return data
def delete_dir(self, path: str):
"""
删除文件夹
Args:
path:
需要被删除的文件夹的路径
"""
files = os.listdir(path)
for f in files:
file_path = os.path.join(path, f)
if os.path.isdir(file_path):
self.delete_dir(file_path)
else:
os.remove(file_path)
def init_config_file(self, force_update=False, full_backup_day=None):
"""
初始化配置文件
Args:
force_update:
是否强制更新,默认为False
full_backup_day:
完整备份的时间,可填参数:0 1 2 3 4 5 6,分别代表周日...周六
Return:
config_data:
字典格式的配置信息
"""
if force_update and os.path.exists(self.config_file):
os.remove(self.config_file)
if not os.path.exists(self.config_file):
config_data = {
"backup_config": {
"last_full_backup_tm": "",
"full_backup_day": full_backup_day,
},
"incremental_tables_basic_data": {},
"incremental_tables_basic_data_upload_info": {}
}
with open(self.config_file, 'w') as f:
json.dump(config_data, f, ensure_ascii=False, indent=4)
else:
with open(self.config_file, 'r') as f:
config_data = json.load(f)
return config_data
def get_last_id(self, table: str, pk: str):
"""
返回最后一个id
Args:
table:
数据库表名
pk:
PK
Return:
返回最后一个pk(如果数据库表没有数据,返回 1)
"""
sql = "SELECT {} FROM `{}` ORDER BY `{}` DESC LIMIT 1".format(pk, table, pk)
self.cursor.execute(sql)
result = self.cursor.fetchall()
if result:
return result[0][0]
else:
return 1
def dump_table(self, table: str, sql_path: str, is_add=True):
"""
导出数据库表的数据到.sql文件
Args:
table:
数据库表名
sql_path:
导出的.sql文件的保存路径
is_add:
是否在文件末尾追加, 默认为True
"""
cmd = 'mysqldump -u{user} -p{password} -h{host} -P{port} -t --set-gtid-purged=OFF {other_args} {db} {table} >{add} {sql_path}'.format(
user=settings.DATABASES['default']['USER'],
password=settings.DATABASES['default']['PASSWORD'],
host=settings.DATABASES['default']['HOST'],
port=settings.DATABASES['default']['PORT'] if settings.DATABASES['default']['PORT'] else 3306,
other_args=self.cmd_args,
db=self.db,
table=table,
sql_path=sql_path,
add='>' if is_add else ''
)
os.system(cmd)
def dump_structure_only(self) -> list:
"""
只导出数据表结构,不导出数据
Return:
db_files:
导出文件上传后的信息
"""
db_files = []
tables = self.get_tables()
structure_sql_path = os.path.join(self._dir, self.domain + '_structure_' + self._tm + '.sql')
structure_7z_file = os.path.join(self._dir, self.domain + '_structure_' + self._tm + '.7z')
for table in tables:
cmd = 'mysqldump -u{user} -p{password} -h{host} -P{port} --skip-comments --set-gtid-purged=OFF {other_args} -d {db} {table} >> {sql_file}'.format(
user=settings.DATABASES['default']['USER'],
password=settings.DATABASES['default']['PASSWORD'],
host=settings.DATABASES['default']['HOST'],
port=settings.DATABASES['default']['PORT'] if settings.DATABASES['default']['PORT'] else 3306,
other_args=self.cmd_args,
db=self.db,
table=table,
sql_file=structure_sql_path,
)
os.system(cmd)
logger.info('完成导出表结构')
os.system('7z a {structure_7z_file} {structure_sql_path} {password}'.format(structure_7z_file=structure_7z_file,
structure_sql_path=structure_sql_path,
password=self.zip_password))
logger.info('{} 完成'.format(structure_7z_file))
db_files.append(self.upload(structure_7z_file))
return db_files
def dump_incremental_tables_basic_data(self, config_data: dict):
"""
导出增量表基本数据、压缩、上传七牛云
Args:
config_data:
配置信息
Return:
config_data:
配置信息
"""
sql_path = os.path.join(self._dir, self.domain + '_incremental_tables_basic_data_' + self._tm + '.sql')
sql_7z_file = os.path.join(self._dir, self.domain + '_incremental_tables_basic_data_' + self._tm + '.7z')
for item in self.ignore_tables:
self.dump_table(item['table'], sql_path)
if item["query"] == 'updated_on':
value = timezone.now().strftime('%Y-%m-%d %H:%M:%S')
else:
value = self.get_last_id(table=item['table'])
config_data['incremental_tables_basic_data'][item['table']] = {
"query": {
"key": item["query"],
"value": value
}
}
sql_7z_file = self.compress_file(sql_path, sql_7z_file)
upload_info = self.upload(sql_7z_file)
config_data['incremental_tables_basic_data_upload_info'] = upload_info
config_data['backup_config']['last_full_backup_tm'] = timezone.now().strftime('%Y-%m-%d %H:%M:%S')
return config_data
def dump_non_incremental_tables_data(self) -> str:
"""
导出非增量表数据、压缩、上传七牛云
Return:
上传七牛云后返回的数据
"""
_ignore_table = []
for item in self.ignore_tables:
_ignore_table.append(item['table'])
for table in self.abandoned_tables:
_ignore_table.append(table)
sql_path = os.path.join(self._dir, self.domain + '_non_incremental_tables_data_' + self._tm + '.sql')
sql_7z_file = os.path.join(self._dir, self.domain + '_non_incremental_tables_data_' + self._tm + '.7z')
tables = self.get_tables()
for table in tables:
if table in _ignore_table:
continue
self.dump_table(table, sql_path)
sql_7z_file = self.compress_file(sql_path, sql_7z_file)
return self.upload(sql_7z_file)
def dump_incremental_data_of_incremental_tables(self, config_data: dict) -> dict:
"""
导出增量表的增量数据、压缩、上传七牛云
Args:
config_data:
配置信息
Return:
上传七牛云后返回的数据
"""
sql_path = os.path.join(self._dir, self.domain + '_incremental_data_of_incremental_tables_' + self._tm + '.sql')
for item in self.ignore_tables:
cmd = '''mysqldump -u{user} -p{password} -h{host} -P{port} -t --set-gtid-purged=OFF --where="{query_key}>='{query_value}'" --replace {other_args} {db} {table} >> {sql_path}'''.format(
user=settings.DATABASES['default']['USER'],
password=settings.DATABASES['default']['PASSWORD'],
host=settings.DATABASES['default']['HOST'],
port=settings.DATABASES['default']['PORT'] if settings.DATABASES['default']['PORT'] else 3306,
other_args=self.cmd_args,
db=self.db,
table=item['table'],
sql_path=sql_path,
query_key=config_data['incremental_tables_basic_data'][item['table']]['query']['key'],
query_value=config_data['incremental_tables_basic_data'][item['table']]['query']['value']
)
os.system(cmd)
sql_7z_path = os.path.join(self._dir, self.domain + '_incremental_data_of_incremental_tables_' + self._tm + '.7z')
sql_7z_path = self.compress_file(sql_path, sql_7z_path)
return self.upload(sql_7z_path)
def all_backup(self, config_data: dict) -> dict:
"""
完整备份
增量表基本数据:
vps_vbio_top_incremental_tables_basic_data_20211101_011216.7z
可以选择恢复或者不恢复的数据:
vps_vbio_top_selectable_recovered_tables_20211101_011216.7z
Args:
config_data:
配置信息
Return:
db_files:
文件上传信息
"""
self.dump_incremental_tables_basic_data(config_data)
return config_data
def incr_backup(self, config_data: dict) -> list:
"""
增量备份
增量备份文件名:
数据库表结构:vps_vbio_top_structure_20211019_200055.7z
非增量表数据:vps_vbio_top_non_incremental_tables_data_20211104_011216.7z
增量表的增量数据:vps_vbio_top_incremental_data_of_incremental_tables_20211104_011216.7z
Args:
config_data:
配置信息
Return:
db_files:
文件上传信息
"""
db_files = self.dump_structure_only()
# 表结构
db_files.append(self.dump_non_incremental_tables_data())
# 非增量表数据
db_files.append(config_data["incremental_tables_basic_data_upload_info"])
# 增量表基本数据
db_files.append(self.dump_incremental_data_of_incremental_tables(config_data))
# 增量表的增量数据
return db_files
def start(self):
"""开始备份"""
config_data = self.init_config_file(force_update=self.force_update, full_backup_day=self.full_backup_day)
if config_data["backup_config"]["full_backup_day"] != timezone.now().weekday() and not self.force_update:
# 进行增量备份
db_files = self.incr_backup(config_data)
else:
# 先进行完整备份,再增量备份
logger.info("开始进行完整备份")
config_data = self.all_backup(config_data)
logger.info("完整备份结束")
logger.info("开始进行增量备份")
db_files = self.incr_backup(config_data)
logger.info("增量备份结束")
with open(self.config_file, 'w') as f:
json.dump(config_data, f, ensure_ascii=False, indent=4)
if os.path.exists(self._dir) and settings.PROJECT_PATH in self._dir:
self.delete_dir(self._dir)
return db_files
"""
备份数据库
"""
import logging
import re
import pytz
import socket
import requests
from django.conf import settings
from django.template import Template, Context
from django.utils import timezone
import YZBackUp
__author__ = 'JayChen'
logger = logging.getLogger('scripts')
def run(*args):
force_update = False
full_backup_day = 0
if len(args) >= 1 and args[0] == '1':
force_update = True
if len(args) == 2:
full_backup_day = int(args[1])
increase_models = [
{'model': Signin, 'query': 'id'},
]
db_files = YZBackUp(increase_models=increase_models,
force_update=force_update,
full_backup_day=full_backup_day).start()

最后

以上就是精明菠萝为你收集整理的Mysql数据库增量式备份的全部内容,希望文章能够帮你解决Mysql数据库增量式备份所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(34)

评论列表共有 0 条评论

立即
投稿
返回
顶部