概述
为什么mysqldumper mydumper能实现一致性备份?–源码分析
mydmper是一个针对mysql和tidb的高性能多线程备份与恢复工具,其开发人员来自mysql等。
mydumper具有以下特征
采用轻量c语言编写 #include <glib.h>
执行速度比mysqldumper快十倍(参看资料)
执行innodb表和非innodb表(是否支持事务的表都可以。不支持事务的表加表锁,innodb表开启一个事务设为RR级别)
多线程备份
多线程恢复
基于 GNU General Public Lv3协议,也就是说是开源的
原理
备份过程由一个主线程和若干worker线程。
- 主线程对备份实现加读锁 FTWRL(flush tables with read lock),阻塞dml,以建立一执行的数据快照点。记录备份点的binlog信息。
- 建立woker线程(worker 线程的个数通过- -thread 或者 - t参数决定),初始化备份任务队列,并向队列中推送数据库元数据(schema),执行non innodb表和innodb表的备份任务。
- worker负责将备份任务队列中的任务按照顺序取出并完成备份(由单独的函数完成),有一个job列表和type,顺序为遍历任务列表的顺序
- worker线程拿到备份任务后,分别与建立备份实例链接,将session事务隔离级别改为RR,用于获取一执行的时间点的数据。
- 在主线程仍持有全局读锁时开启事务快照读,实现读到的一致性的数据与主线程的时间点相同,实现备份数据的一致性。
- 按顺序从备份队列中取出备份任务,先进行non innodb表的备份,在进行innodb表备份。这样可以在完成innodb表备份后通知主线程释放读锁,尽可能减少备份对业务的影响。
(划重点)
为啥能保证备份的一致性?
主线程获取备份的mededata,执行show master status,show slave status的时刻是持有全局的s锁,这个时候worker线程执行了start trx,并设置session隔离级别为RR,就保证了worker线程与主线程事务的一致性。
这里很巧妙的运用了事务隔离级别和并发查询。
(具体的见下方我添加的代码备注)
知道了整体的备份思路和设计原理,具体的备份过程是怎样的呢?
从代码中不难发现,mydumper的记录级备份由主线程负责任务拆分,(获取所有的job),由多个worker线程完成正式的备份任务。
主线程将表分为多个chunk(根据记录数量),每个chunk作为一个备份的job。
表的数据拆分方式如下:
在确定chunk之后,先获取该字段的最大最小值,再通过explain select field from db.table来估计该表的记录数,最后根据所设置的每个任务的记录数将该表分为多个chunk(生成多个job)
==>以上实现了基于单表的不同range的并行复制。
为了快速完成备份,尽可能减少对表的锁定时间。(non innodb表影响更大。)
代码中实现如下:
最后,将备注后的源代码粘贴到这里:
/*
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
Authors: Domas Mituzas, Facebook ( domas at fb dot com )
Mark Leith, Oracle Corporation (mark dot leith at oracle dot com)
Andrew Hutchings, SkySQL (andrew at skysql dot com)
Max Bubenick, Percona RDBA (max dot bubenick at percona dot com)
*/
#define _LARGEFILE64_SOURCE
#define _FILE_OFFSET_BITS 64
#include <mysql.h>
#if defined MARIADB_CLIENT_VERSION_STR && !defined MYSQL_SERVER_VERSION
#define MYSQL_SERVER_VERSION MARIADB_CLIENT_VERSION_STR
#endif
#include <unistd.h>
#include <stdio.h>
#include <string.h>
#include <glib.h>
#include <stdlib.h>
#include <stdarg.h>
#include <errno.h>
#include <time.h>
#include <zlib.h>
#include <pcre.h>
#include <signal.h>
#include <glib/gstdio.h>
#include "config.h"
#ifdef WITH_BINLOG
#include "binlog.h"
#else
#include "mydumper.h"
#endif
#include "server_detect.h"
#include "connection.h"
#include "common.h"
#include "g_unix_signal.h"
#include <math.h>
#include "getPassword.h"
#include "logging.h"
#include "set_verbose.h"
char *regexstring = NULL; //char *字符型指针,指向一个字符;
const char DIRECTORY[] = "export";
#ifdef WITH_BINLOG
const char BINLOG_DIRECTORY[] = "binlog_snapshot";
const char DAEMON_BINLOGS[] = "binlogs";
#endif
/* Some earlier versions of MySQL do not yet define MYSQL_TYPE_JSON */
#ifndef MYSQL_TYPE_JSON
#define MYSQL_TYPE_JSON 245
#endif
static GMutex *init_mutex = NULL;
//glib里_GMutex类型
//不透明数据类型隐藏了它们内部格式或结构。在C语言中,它们就像黑盒一样。支持它们的语言不是很多。
/* Program options */
gchar *output_directory = NULL;
guint statement_size = 1000000;
guint rows_per_file = 0;
guint chunk_filesize = 0;
int longquery = 60;
int longquery_retries = 0;
int longquery_retry_interval = 60;
int build_empty_files = 0;
int skip_tz = 0;
int need_dummy_read = 0;
int need_dummy_toku_read = 0;
int compress_output = 0;
int killqueries = 0;
int detected_server = 0;
int lock_all_tables = 0;
guint snapshot_interval = 60;
gboolean daemon_mode = FALSE;
gboolean have_snapshot_cloning = FALSE;
gchar *ignore_engines = NULL;
char **ignore = NULL;
gchar *tables_list = NULL;
gchar *tidb_snapshot = NULL;
GSequence *tables_skiplist = NULL;
gchar *tables_skiplist_file = NULL;
char **tables = NULL;
GList *no_updated_tables = NULL;
#ifdef WITH_BINLOG
gboolean need_binlogs = FALSE;
gchar *binlog_directory = NULL;
gchar *daemon_binlog_directory = NULL;
#endif
gboolean no_schemas = FALSE;
gboolean no_data = FALSE;
gboolean no_locks = FALSE;
gboolean dump_triggers = FALSE;
gboolean dump_events = FALSE;
gboolean dump_routines = FALSE;
gboolean no_dump_views = FALSE;
gboolean less_locking = FALSE;
gboolean use_savepoints = FALSE;
gboolean success_on_1146 = FALSE;
gboolean no_backup_locks = FALSE;
gboolean insert_ignore = FALSE;
GList *innodb_tables = NULL;
GMutex *innodb_tables_mutex = NULL;
GList *non_innodb_table = NULL;
GMutex *non_innodb_table_mutex = NULL;
GList *table_schemas = NULL;
GMutex *table_schemas_mutex = NULL;
GList *view_schemas = NULL;
GMutex *view_schemas_mutex = NULL;
GList *schema_post = NULL;
GMutex *schema_post_mutex = NULL;
gint database_counter = 0;
gint non_innodb_table_counter = 0;
gint non_innodb_done = 0;
guint less_locking_threads = 0;
guint updated_since = 0;
guint trx_consistency_only = 0;
guint complete_insert = 0;
gchar *set_names_str=NULL;
// For daemon mode, 0 or 1
guint dump_number = 0;
guint binlog_connect_id = 0;
gboolean shutdown_triggered = FALSE;
GAsyncQueue *start_scheduled_dump;
GMainLoop *m1;
static GCond *ll_cond = NULL;
static GMutex *ll_mutex = NULL;
int errors;
static GOptionEntry entries[] = {
{"database", 'B', 0, G_OPTION_ARG_STRING, &db, "Database to dump", NULL},
{"tables-list", 'T', 0, G_OPTION_ARG_STRING, &tables_list,
"Comma delimited table list to dump (does not exclude regex option)",
NULL},
{"omit-from-file", 'O', 0, G_OPTION_ARG_STRING, &tables_skiplist_file,
"File containing a list of database.table entries to skip, one per line "
"(skips before applying regex option)",
NULL},
{"outputdir", 'o', 0, G_OPTION_ARG_FILENAME, &output_directory,
"Directory to output files to", NULL},
{"statement-size", 's', 0, G_OPTION_ARG_INT, &statement_size,
"Attempted size of INSERT statement in bytes, default 1000000", NULL},
{"rows", 'r', 0, G_OPTION_ARG_INT, &rows_per_file,
"Try to split tables into chunks of this many rows. This option turns off "
"--chunk-filesize",
NULL},
{"chunk-filesize", 'F', 0, G_OPTION_ARG_INT, &chunk_filesize,
"Split tables into chunks of this output file size. This value is in MB",
NULL},
{"compress", 'c', 0, G_OPTION_ARG_NONE, &compress_output,
"Compress output files", NULL},
{"build-empty-files", 'e', 0, G_OPTION_ARG_NONE, &build_empty_files,
"Build dump files even if no data available from table", NULL},
{"regex", 'x', 0, G_OPTION_ARG_STRING, ®exstring,
"Regular expression for 'db.table' matching", NULL},
{"ignore-engines", 'i', 0, G_OPTION_ARG_STRING, &ignore_engines,
"Comma delimited list of storage engines to ignore", NULL},
{"insert-ignore", 'N', 0, G_OPTION_ARG_NONE, &insert_ignore,
"Dump rows with INSERT IGNORE", NULL},
{"no-schemas", 'm', 0, G_OPTION_ARG_NONE, &no_schemas,
"Do not dump table schemas with the data", NULL},
{"no-data", 'd', 0, G_OPTION_ARG_NONE, &no_data, "Do not dump table data",
NULL},
{"triggers", 'G', 0, G_OPTION_ARG_NONE, &dump_triggers, "Dump triggers",
NULL},
{"events", 'E', 0, G_OPTION_ARG_NONE, &dump_events, "Dump events", NULL},
{"routines", 'R', 0, G_OPTION_ARG_NONE, &dump_routines,
"Dump stored procedures and functions", NULL},
{"no-views", 'W', 0, G_OPTION_ARG_NONE, &no_dump_views, "Do not dump VIEWs",
NULL},
{"no-locks", 'k', 0, G_OPTION_ARG_NONE, &no_locks,
"Do not execute the temporary shared read lock. WARNING: This will cause "
"inconsistent backups",
NULL},
{"no-backup-locks", 0, 0, G_OPTION_ARG_NONE, &no_backup_locks,
"Do not use Percona backup locks", NULL},
{"less-locking", 0, 0, G_OPTION_ARG_NONE, &less_locking,
"Minimize locking time on InnoDB tables.", NULL},
{"long-query-retries", 0, 0, G_OPTION_ARG_INT, &longquery_retries,
"Retry checking for long queries, default 0 (do not retry)", NULL},
{"long-query-retry-interval", 0, 0, G_OPTION_ARG_INT, &longquery_retry_interval,
"Time to wait before retrying the long query check in seconds, default 60", NULL},
{"long-query-guard", 'l', 0, G_OPTION_ARG_INT, &longquery,
"Set long query timer in seconds, default 60", NULL},
{"kill-long-queries", 'K', 0, G_OPTION_ARG_NONE, &killqueries,
"Kill long running queries (instead of aborting)", NULL},
#ifdef WITH_BINLOG
{"binlogs", 'b', 0, G_OPTION_ARG_NONE, &need_binlogs,
"Get a snapshot of the binary logs as well as dump data", NULL},
#endif
{"daemon", 'D', 0, G_OPTION_ARG_NONE, &daemon_mode, "Enable daemon mode",
NULL},
{"snapshot-interval", 'I', 0, G_OPTION_ARG_INT, &snapshot_interval,
"Interval between each dump snapshot (in minutes), requires --daemon, "
"default 60",
NULL},
{"logfile", 'L', 0, G_OPTION_ARG_FILENAME, &logfile,
"Log file name to use, by default stdout is used", NULL},
{"tz-utc", 0, G_OPTION_FLAG_REVERSE, G_OPTION_ARG_NONE, &skip_tz,
"SET TIME_ZONE='+00:00' at top of dump to allow dumping of TIMESTAMP data "
"when a server has data in different time zones or data is being moved "
"between servers with different time zones, defaults to on use "
"--skip-tz-utc to disable.",
NULL},
{"skip-tz-utc", 0, 0, G_OPTION_ARG_NONE, &skip_tz, "", NULL},
{"use-savepoints", 0, 0, G_OPTION_ARG_NONE, &use_savepoints,
"Use savepoints to reduce metadata locking issues, needs SUPER privilege",
NULL},
{"success-on-1146", 0, 0, G_OPTION_ARG_NONE, &success_on_1146,
"Not increment error count and Warning instead of Critical in case of "
"table doesn't exist",
NULL},
{"lock-all-tables", 0, 0, G_OPTION_ARG_NONE, &lock_all_tables,
"Use LOCK TABLE for all, instead of FTWRL", NULL},
{"updated-since", 'U', 0, G_OPTION_ARG_INT, &updated_since,
"Use Update_time to dump only tables updated in the last U days", NULL},
{"trx-consistency-only", 0, 0, G_OPTION_ARG_NONE, &trx_consistency_only,
"Transactional consistency only", NULL},
{"complete-insert", 0, 0, G_OPTION_ARG_NONE, &complete_insert,
"Use complete INSERT statements that include column names", NULL},
{ "set-names",0, 0, G_OPTION_ARG_STRING, &set_names_str,
"Sets the names, use it at your own risk, default binary", NULL },
{"tidb-snapshot", 'z', 0, G_OPTION_ARG_STRING, &tidb_snapshot,
"Snapshot to use for TiDB", NULL},
{NULL, 0, 0, G_OPTION_ARG_NONE, NULL, NULL, NULL}};
struct tm tval;
void dump_schema_data(MYSQL *conn, char *database, char *table, char *filename);
void dump_triggers_data(MYSQL *conn, char *database, char *table,
char *filename);
void dump_view_data(MYSQL *conn, char *database, char *table, char *filename,
char *filename2);
void dump_schema(MYSQL *conn, char *database, char *table,
struct configuration *conf);
void dump_view(char *database, char *table, struct configuration *conf);
void dump_table(MYSQL *conn, char *database, char *table,
struct configuration *conf, gboolean is_innodb);
void dump_tables(MYSQL *, GList *, struct configuration *);
void dump_schema_post(char *database, struct configuration *conf);
void restore_charset(GString *statement);
void set_charset(GString *statement, char *character_set,
char *collation_connection);
void dump_schema_post_data(MYSQL *conn, char *database, char *filename);
guint64 dump_table_data(MYSQL *, FILE *, char *, char *, char *, char *);
void dump_database(char *, struct configuration *);
void dump_database_thread(MYSQL *, char *);
void dump_create_database(char *, struct configuration *);
void dump_create_database_data(MYSQL *, char *, char *);
void get_tables(MYSQL *conn, struct configuration *);
void get_not_updated(MYSQL *conn, FILE *);
GList *get_chunks_for_table(MYSQL *, char *, char *,
struct configuration *conf);
guint64 estimate_count(MYSQL *conn, char *database, char *table, char *field,
char *from, char *to);
void dump_table_data_file(MYSQL *conn, char *database, char *table, char *where,
char *filename);
void create_backup_dir(char *directory);
gboolean write_data(FILE *, GString *);
gboolean check_regex(char *database, char *table);
gboolean check_skiplist(char *database, char *table);
int tables_skiplist_cmp(gconstpointer a, gconstpointer b, gpointer user_data);
void read_tables_skiplist(const gchar *filename);
#ifdef WITH_BINLOG
MYSQL *reconnect_for_binlog(MYSQL *thrconn);
void *binlog_thread(void *data);
#endif
void start_dump(MYSQL *conn);
MYSQL *create_main_connection();
void *exec_thread(void *data);
void write_log_file(const gchar *log_domain, GLogLevelFlags log_level,
const gchar *message, gpointer user_data);
gboolean sig_triggered(gpointer user_data) {
(void)user_data;
g_message("Shutting down gracefully");
shutdown_triggered = TRUE;
g_main_loop_quit(m1);
return FALSE;
}
void clear_dump_directory() {
GError *error = NULL;
char *dump_directory =
g_strdup_printf("%s/%d", output_directory, dump_number);
GDir *dir = g_dir_open(dump_directory, 0, &error);
if (error) {
g_critical("cannot open directory %s, %sn", dump_directory,
error->message);
errors++;
return;
}
const gchar *filename = NULL;
while ((filename = g_dir_read_name(dir))) {
gchar *path = g_build_filename(dump_directory, filename, NULL);
if (g_unlink(path) == -1) {
g_critical("error removing file %s (%d)n", path, errno);
errors++;
return;
}
g_free(path);
}
g_dir_close(dir);
g_free(dump_directory);
}
gboolean run_snapshot(gpointer *data) {
(void)data;
g_async_queue_push(start_scheduled_dump, GINT_TO_POINTER(1));
return (shutdown_triggered) ? FALSE : TRUE;
}
/* Check database.table string against regular expression */
gboolean check_regex(char *database, char *table) {
/* This is not going to be used in threads */
static pcre *re = NULL;
int rc;
int ovector[9] = {0};
const char *error;
int erroroffset;
char *p;
/* Let's compile the RE before we do anything */
if (!re) {
re = pcre_compile(regexstring, PCRE_CASELESS | PCRE_MULTILINE, &error,
&erroroffset, NULL);
if (!re) {
g_critical("Regular expression fail: %s", error);
exit(EXIT_FAILURE);
}
}
p = g_strdup_printf("%s.%s", database, table);
rc = pcre_exec(re, NULL, p, strlen(p), 0, 0, ovector, 9);
g_free(p);
return (rc > 0) ? TRUE : FALSE;
}
/* Check database.table string against skip list; returns TRUE if found */
gboolean check_skiplist(char *database, char *table) {
if (g_sequence_lookup(tables_skiplist,
g_strdup_printf("%s.%s", database, table),
tables_skiplist_cmp, NULL)) {
return TRUE;
} else {
return FALSE;
};
}
/* Comparison function for skiplist sort and lookup */
int tables_skiplist_cmp(gconstpointer a, gconstpointer b, gpointer user_data) {
/* Not using user_data, but needed for function prototype, shutting up
* compiler warnings about unused variable */
(void)user_data;
/* Any sorting function would work, as long as its usage is consistent
* between sort and lookup. strcmp should be one of the fastest. */
return strcmp(a, b);
}
/* Read the list of tables to skip from the given filename, and prepares them
* for future lookups. */
void read_tables_skiplist(const gchar *filename) {
GIOChannel *tables_skiplist_channel = NULL;
gchar *buf = NULL;
GError *error = NULL;
/* Create skiplist if it does not exist */
if (!tables_skiplist) {
tables_skiplist = g_sequence_new(NULL);
};
tables_skiplist_channel = g_io_channel_new_file(filename, "r", &error);
/* Error opening/reading the file? bail out. */
if (!tables_skiplist_channel) {
g_critical("cannot read/open file %s, %sn", filename, error->message);
errors++;
return;
};
/* Read lines, push them to the list */
do {
g_io_channel_read_line(tables_skiplist_channel, &buf, NULL, NULL, NULL);
if (buf) {
g_strchomp(buf);
g_sequence_append(tables_skiplist, buf);
};
} while (buf);
g_io_channel_shutdown(tables_skiplist_channel, FALSE, NULL);
/* Sort the list, so that lookups work */
g_sequence_sort(tables_skiplist, tables_skiplist_cmp, NULL);
g_message("Omit list file contains %d tables to skipn",
g_sequence_get_length(tables_skiplist));
return;
}
/* Write some stuff知识?? we know about snapshot快照, before it changes */
//获取mysql源库的备份时刻的binlog信息并写入到metedata
void write_snapshot_info(MYSQL *conn, FILE *file) {
MYSQL_RES *master = NULL, *slave = NULL, *mdb = NULL;
MYSQL_FIELD *fields;
MYSQL_ROW row;
char *masterlog = NULL;
char *masterpos = NULL;
char *mastergtid = NULL;
char *connname = NULL;
char *slavehost = NULL;
char *slavelog = NULL;
char *slavepos = NULL;
char *slavegtid = NULL;
guint isms;
guint i;
mysql_query(conn, "SHOW MASTER STATUS");
master = mysql_store_result(conn);
if (master && (row = mysql_fetch_row(master))) {
masterlog = row[0];
masterpos = row[1];
/* Oracle/Percona GTID */
if (mysql_num_fields(master) == 5) {
mastergtid = row[4];
} else {
/* Let's try with MariaDB 10.x */
/* Use gtid_binlog_pos due to issue with gtid_current_pos with galera
* cluster, gtid_binlog_pos works as well with normal mariadb server
* https://jira.mariadb.org/browse/MDEV-10279 */
mysql_query(conn, "SELECT @@gtid_binlog_pos");
mdb = mysql_store_result(conn);
if (mdb && (row = mysql_fetch_row(mdb))) {
mastergtid = row[0];
}
}
}
if (masterlog) {
fprintf(file, "SHOW MASTER STATUS:ntLog: %sntPos: %sntGTID:%snn",
masterlog, masterpos, mastergtid);
g_message("Written master status");
//mydumper日志 ** Message: Written master status
}
//这里不管源库是否是备库,都会执行
isms = 0;
mysql_query(conn, "SELECT @@default_master_connection");
MYSQL_RES *rest = mysql_store_result(conn);
if (rest != NULL && mysql_num_rows(rest)) {
/*MySQL C API 中的mysql_num_rows() 返回结果集中的行数
C语言mysql接口学习资料参考
https://blog.csdn.net/qq_40421919/article/details/93355219*/
mysql_free_result(rest);
g_message("Multisource slave detected."); //当前数据库角色为slave,且有多个master
isms = 1;
}
if (isms)
mysql_query(conn, "SHOW ALL SLAVES STATUS"); //mysql没有这个命令吧
else
mysql_query(conn, "SHOW SLAVE STATUS"); //这里应该是mysql ????
//get 返回的每值得到下面几个参数
slave = mysql_store_result(conn);
while (slave && (row = mysql_fetch_row(slave))) {
fields = mysql_fetch_fields(slave);
for (i = 0; i < mysql_num_fields(slave); i++) {
if (isms && !strcasecmp("connection_name", fields[i].name))
connname = row[i];
if (!strcasecmp("exec_master_log_pos", fields[i].name)) {
slavepos = row[i];
} else if (!strcasecmp("relay_master_log_file", fields[i].name)) {
slavelog = row[i];
} else if (!strcasecmp("master_host", fields[i].name)) {
slavehost = row[i];
} else if (!strcasecmp("Executed_Gtid_Set", fields[i].name) ||
!strcasecmp("Gtid_Slave_Pos", fields[i].name)) {
slavegtid = row[i];
}
}
//如果是slave 也拿到slave到复制info 这个可以在metedata中查看
if (slavehost) {
fprintf(file, "SHOW SLAVE STATUS:");
if (isms)
fprintf(file, "ntConnection name: %s", connname);
fprintf(file, "ntHost: %sntLog: %sntPos: %sntGTID:%snn",
slavehost, slavelog, slavepos, slavegtid);
g_message("Written slave status");
}
}
//文件落盘
fflush(file);
if (master)
mysql_free_result(master);
if (slave)
mysql_free_result(slave);
if (mdb)
mysql_free_result(mdb);
}
//创建--thread参数指定个数的thread,并行获取mysql数据
void *process_queue(struct thread_data *td) {
struct configuration *conf = td->conf;
// mysql_init is not thread safe, especially in Connector/C
g_mutex_lock(init_mutex);
MYSQL *thrconn = mysql_init(NULL);
g_mutex_unlock(init_mutex);
configure_connection(thrconn, "mydumper");
if (!mysql_real_connect(thrconn, hostname, username, password, NULL, port,
socket_path, 0)) {
g_critical("Failed to connect to database: %s", mysql_error(thrconn));
exit(EXIT_FAILURE);
} else {
g_message("Thread %d connected using MySQL connection ID %lu",
td->thread_id, mysql_thread_id(thrconn));
}
//** Message: Thread 11 connected using MySQL connection ID 265
//bool use_savepoints
if (use_savepoints && mysql_query(thrconn, "SET SQL_LOG_BIN = 0")) {
g_critical("Failed to disable binlog for the thread: %s",
mysql_error(thrconn));
exit(EXIT_FAILURE);
}
//以下几步见general log 设置一个比较大的session级别的超时时间,设置RR级别开始快照读。 类似于single transcation
//这一步是针对mysql的
if ((detected_server == SERVER_TYPE_MYSQL) &&
mysql_query(thrconn, "SET SESSION wait_timeout = 2147483")) {
g_warning("Failed to increase wait_timeout: %s", mysql_error(thrconn));
}
if (mysql_query(thrconn,
"SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ")) {
g_critical("Failed to set isolation level: %s", mysql_error(thrconn));
exit(EXIT_FAILURE);
}
if (mysql_query(thrconn,
"START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */")) {
g_critical("Failed to start consistent snapshot: %s", mysql_error(thrconn));
exit(EXIT_FAILURE);
}
if (!skip_tz && mysql_query(thrconn, "/*!40103 SET TIME_ZONE='+00:00' */")) {
g_critical("Failed to set time zone: %s", mysql_error(thrconn));
}
//这一部分是针对tidb的
if (detected_server == SERVER_TYPE_TIDB) {
// Worker threads must set their tidb_snapshot in order to be safe
// Because no locking has been used.
gchar *query =
g_strdup_printf("SET SESSION tidb_snapshot = '%s'", tidb_snapshot);
if (mysql_query(thrconn, query)) {
g_critical("Failed to set tidb_snapshot: %s", mysql_error(thrconn));
exit(EXIT_FAILURE);
}
g_free(query);
g_message("Thread %d set to tidb_snapshot '%s'", td->thread_id,
tidb_snapshot);
}
/* Unfortunately version before 4.1.8 did not support consistent snapshot 4.1.8之前的版本不支持一致的快照
* transaction starts, so we cheat */
if (need_dummy_read) {
mysql_query(thrconn,
"SELECT /*!40001 SQL_NO_CACHE */ * FROM mysql.mydumperdummy");
MYSQL_RES *res = mysql_store_result(thrconn);
if (res)
mysql_free_result(res);
}
if (need_dummy_toku_read) {
mysql_query(thrconn,
"SELECT /*!40001 SQL_NO_CACHE */ * FROM mysql.tokudbdummy");
MYSQL_RES *res = mysql_store_result(thrconn);
if (res)
mysql_free_result(res);
}
mysql_query(thrconn, set_names_str);
g_async_queue_push(conf->ready, GINT_TO_POINTER(1));
struct job *job = NULL;
struct table_job *tj = NULL;
struct dump_database_job *ddj = NULL;
struct create_database_job *cdj = NULL;
struct schema_job *sj = NULL;
struct view_job *vj = NULL;
struct schema_post_job *sp = NULL;
#ifdef WITH_BINLOG
struct binlog_job *bj = NULL;
#endif
/* if less locking we need to wait until that threads finish
progressively waking up these threads */
if (less_locking) {
g_mutex_lock(ll_mutex);
while (less_locking_threads >= td->thread_id) {
g_cond_wait(ll_cond, ll_mutex);
}
g_mutex_unlock(ll_mutex);
}
for (;;) {
//while(true)比for( ; ; )多了几步操作,先将括号中的结果给寄存器,然后寄存进行比较,若比较结果为0则跳转到循环的结束执行其他操作
//理解为where true
GTimeVal tv;
//GTimeVal *end_time) { gpointer retval; //判断是否有数据在队列中,如果没有就要执行if语句相应的睡眠等待,直到被写进程唤醒
g_get_current_time(&tv);
g_time_val_add(&tv, 1000 * 1000 * 1);
job = (struct job *)g_async_queue_pop(conf->queue);
if (shutdown_triggered && (job->type != JOB_SHUTDOWN)) {
continue;
}
//根据job的类型执行不同的预定义的job,case的这些内容也取决于传入参数 弄清楚这里savepoint的含义
switch (job->type) {
case JOB_DUMP:
tj = (struct table_job *)job->job_data;
if (tj->where)
//这里是拆分了表了吗?? 什么时候有where
g_message("Thread %d dumping data for `%s`.`%s` where %s",
td->thread_id, tj->database, tj->table, tj->where);
else
g_message("Thread %d dumping data for `%s`.`%s`", td->thread_id,
tj->database, tj->table);
if (use_savepoints && mysql_query(thrconn, "SAVEPOINT mydumper")) {
g_critical("Savepoint failed: %s", mysql_error(thrconn));
}
dump_table_data_file(thrconn, tj->database, tj->table, tj->where,
tj->filename);
if (use_savepoints &&
mysql_query(thrconn, "ROLLBACK TO SAVEPOINT mydumper")) {
g_critical("Rollback to savepoint failed: %s", mysql_error(thrconn));
}
if (tj->database)
g_free(tj->database);
if (tj->table)
g_free(tj->table);
if (tj->where)
g_free(tj->where);
if (tj->filename)
g_free(tj->filename);
g_free(tj);
g_free(job);
break;
case JOB_DUMP_NON_INNODB:
tj = (struct table_job *)job->job_data;
if (tj->where)
g_message("Thread %d dumping data for `%s`.`%s` where %s",
td->thread_id, tj->database, tj->table, tj->where);
else
g_message("Thread %d dumping data for `%s`.`%s`", td->thread_id,
tj->database, tj->table);
if (use_savepoints && mysql_query(thrconn, "SAVEPOINT mydumper")) {
g_critical("Savepoint failed: %s", mysql_error(thrconn));
}
dump_table_data_file(thrconn, tj->database, tj->table, tj->where,
tj->filename);
if (use_savepoints &&
mysql_query(thrconn, "ROLLBACK TO SAVEPOINT mydumper")) {
g_critical("Rollback to savepoint failed: %s", mysql_error(thrconn));
}
if (tj->database)
g_free(tj->database);
if (tj->table)
g_free(tj->table);
if (tj->where)
g_free(tj->where);
if (tj->filename)
g_free(tj->filename);
g_free(tj);
g_free(job);
if (g_atomic_int_dec_and_test(&non_innodb_table_counter) &&
g_atomic_int_get(&non_innodb_done)) {
g_async_queue_push(conf->unlock_tables, GINT_TO_POINTER(1));
}
break;
case JOB_DUMP_DATABASE:
ddj = (struct dump_database_job *)job->job_data;
g_message("Thread %d dumping db information for `%s`", td->thread_id,
ddj->database);
dump_database_thread(thrconn, ddj->database);
if (ddj->database)
g_free(ddj->database);
g_free(ddj);
g_free(job);
if (g_atomic_int_dec_and_test(&database_counter)) {
g_async_queue_push(conf->ready_database_dump, GINT_TO_POINTER(1));
}
break;
case JOB_CREATE_DATABASE:
cdj = (struct create_database_job *)job->job_data;
g_message("Thread %d dumping schema create for `%s`", td->thread_id,
cdj->database);
dump_create_database_data(thrconn, cdj->database, cdj->filename);
if (cdj->database)
g_free(cdj->database);
if (cdj->filename)
g_free(cdj->filename);
g_free(cdj);
g_free(job);
break;
case JOB_SCHEMA:
sj = (struct schema_job *)job->job_data;
g_message("Thread %d dumping schema for `%s`.`%s`", td->thread_id,
sj->database, sj->table);
dump_schema_data(thrconn, sj->database, sj->table, sj->filename);
if (sj->database)
g_free(sj->database);
if (sj->table)
g_free(sj->table);
if (sj->filename)
g_free(sj->filename);
g_free(sj);
g_free(job);
break;
case JOB_VIEW:
vj = (struct view_job *)job->job_data;
g_message("Thread %d dumping view for `%s`.`%s`", td->thread_id,
vj->database, vj->table);
dump_view_data(thrconn, vj->database, vj->table, vj->filename,
vj->filename2);
if (vj->database)
g_free(vj->database);
if (vj->table)
g_free(vj->table);
if (vj->filename)
g_free(vj->filename);
if (vj->filename2)
g_free(vj->filename2);
g_free(vj);
g_free(job);
break;
case JOB_TRIGGERS:
sj = (struct schema_job *)job->job_data;
g_message("Thread %d dumping triggers for `%s`.`%s`", td->thread_id,
sj->database, sj->table);
dump_triggers_data(thrconn, sj->database, sj->table, sj->filename);
if (sj->database)
g_free(sj->database);
if (sj->table)
g_free(sj->table);
if (sj->filename)
g_free(sj->filename);
g_free(sj);
g_free(job);
break;
case JOB_SCHEMA_POST:
sp = (struct schema_post_job *)job->job_data;
g_message("Thread %d dumping SP and VIEWs for `%s`", td->thread_id,
sp->database);
dump_schema_post_data(thrconn, sp->database, sp->filename);
if (sp->database)
g_free(sp->database);
if (sp->filename)
g_free(sp->filename);
g_free(sp);
g_free(job);
break;
#ifdef WITH_BINLOG
case JOB_BINLOG:
thrconn = reconnect_for_binlog(thrconn);
g_message(
"Thread %d connected using MySQL connection ID %lu (in binlog mode)",
td->thread_id, mysql_thread_id(thrconn));
bj = (struct binlog_job *)job->job_data;
g_message("Thread %d dumping binary log file %s", td->thread_id,
bj->filename);
get_binlog_file(thrconn, bj->filename, binlog_directory,
bj->start_position, bj->stop_position, FALSE);
if (bj->filename)
g_free(bj->filename);
g_free(bj);
g_free(job);
break;
#endif
case JOB_SHUTDOWN:
g_message("Thread %d shutting down", td->thread_id);
if (thrconn)
mysql_close(thrconn);
g_free(job);
mysql_thread_end();
return NULL;
break;
default:
g_critical("Something very bad happened!");
exit(EXIT_FAILURE);
}
}
if (thrconn)
mysql_close(thrconn);
mysql_thread_end();
return NULL;
}
void *process_queue_less_locking(struct thread_data *td) {
struct configuration *conf = td->conf;
// mysql_init is not thread safe, especially in Connector/C
g_mutex_lock(init_mutex);
MYSQL *thrconn = mysql_init(NULL);
g_mutex_unlock(init_mutex);
configure_connection(thrconn, "mydumper");
if (!mysql_real_connect(thrconn, hostname, username, password, NULL, port,
socket_path, 0)) {
g_critical("Failed to connect to database: %s", mysql_error(thrconn));
exit(EXIT_FAILURE);
} else {
g_message("Thread %d connected using MySQL connection ID %lu",
td->thread_id, mysql_thread_id(thrconn));
}
if ((detected_server == SERVER_TYPE_MYSQL) &&
mysql_query(thrconn, "SET SESSION wait_timeout = 2147483")) {
g_warning("Failed to increase wait_timeout: %s", mysql_error(thrconn));
}
if (!skip_tz && mysql_query(thrconn, "/*!40103 SET TIME_ZONE='+00:00' */")) {
g_critical("Failed to set time zone: %s", mysql_error(thrconn));
}
mysql_query(thrconn, set_names_str);
g_async_queue_push(conf->ready_less_locking, GINT_TO_POINTER(1));
struct job *job = NULL;
struct table_job *tj = NULL;
struct tables_job *mj = NULL;
struct dump_database_job *ddj = NULL;
struct create_database_job *cdj = NULL;
struct schema_job *sj = NULL;
struct view_job *vj = NULL;
struct schema_post_job *sp = NULL;
#ifdef WITH_BINLOG
struct binlog_job *bj = NULL;
#endif
GList *glj;
int first = 1;
GString *query = g_string_new(NULL);
GString *prev_table = g_string_new(NULL);
GString *prev_database = g_string_new(NULL);
for (;;) {
GTimeVal tv;
g_get_current_time(&tv);
g_time_val_add(&tv, 1000 * 1000 * 1);
job = (struct job *)g_async_queue_pop(conf->queue_less_locking);
if (shutdown_triggered && (job->type != JOB_SHUTDOWN)) {
continue;
}
switch (job->type) {
case JOB_LOCK_DUMP_NON_INNODB: //non innodb表直接对表加只读锁定
mj = (struct tables_job *)job->job_data;
for (glj = mj->table_job_list; glj != NULL; glj = glj->next) {
tj = (struct table_job *)glj->data;
if (first) {
g_string_printf(query, "LOCK TABLES `%s`.`%s` READ LOCAL",
tj->database, tj->table);
first = 0;
} else {
if (g_ascii_strcasecmp(prev_database->str, tj->database) ||
g_ascii_strcasecmp(prev_table->str, tj->table)) {
g_string_append_printf(query, ", `%s`.`%s` READ LOCAL",
tj->database, tj->table);
}
}
g_string_printf(prev_table, "%s", tj->table);
g_string_printf(prev_database, "%s", tj->database);
}
first = 1;
if (mysql_query(thrconn, query->str)) {
g_critical("Non Innodb lock tables fail: %s", mysql_error(thrconn));
exit(EXIT_FAILURE);
}
if (g_atomic_int_dec_and_test(&non_innodb_table_counter) &&
g_atomic_int_get(&non_innodb_done)) {
g_async_queue_push(conf->unlock_tables, GINT_TO_POINTER(1));
}
for (glj = mj->table_job_list; glj != NULL; glj = glj->next) {
tj = (struct table_job *)glj->data;
if (tj->where)
g_message("Thread %d dumping data for `%s`.`%s` where %s",
td->thread_id, tj->database, tj->table, tj->where);
else
g_message("Thread %d dumping data for `%s`.`%s`", td->thread_id,
tj->database, tj->table);
dump_table_data_file(thrconn, tj->database, tj->table, tj->where,
tj->filename);
if (tj->database)
g_free(tj->database);
if (tj->table)
g_free(tj->table);
if (tj->where)
g_free(tj->where);
if (tj->filename)
g_free(tj->filename);
g_free(tj);
}
mysql_query(thrconn, "UNLOCK TABLES /* Non Innodb */");
g_list_free(mj->table_job_list);
g_free(mj);
g_free(job);
break;
case JOB_DUMP_DATABASE:
ddj = (struct dump_database_job *)job->job_data;
g_message("Thread %d dumping db information for `%s`", td->thread_id,
ddj->database);
dump_database_thread(thrconn, ddj->database);
if (ddj->database)
g_free(ddj->database);
g_free(ddj);
g_free(job);
if (g_atomic_int_dec_and_test(&database_counter)) {
g_async_queue_push(conf->ready_database_dump, GINT_TO_POINTER(1));
}
break;
case JOB_CREATE_DATABASE:
cdj = (struct create_database_job *)job->job_data;
g_message("Thread %d dumping schema create for `%s`", td->thread_id,
cdj->database);
dump_create_database_data(thrconn, cdj->database, cdj->filename);
if (cdj->database)
g_free(cdj->database);
if (cdj->filename)
g_free(cdj->filename);
g_free(cdj);
g_free(job);
break;
case JOB_SCHEMA:
sj = (struct schema_job *)job->job_data;
g_message("Thread %d dumping schema for `%s`.`%s`", td->thread_id,
sj->database, sj->table);
dump_schema_data(thrconn, sj->database, sj->table, sj->filename);
if (sj->database)
g_free(sj->database);
if (sj->table)
g_free(sj->table);
if (sj->filename)
g_free(sj->filename);
g_free(sj);
g_free(job);
break;
case JOB_VIEW:
vj = (struct view_job *)job->job_data;
g_message("Thread %d dumping view for `%s`.`%s`", td->thread_id,
sj->database, sj->table);
dump_view_data(thrconn, vj->database, vj->table, vj->filename,
vj->filename2);
if (vj->database)
g_free(vj->database);
if (vj->table)
g_free(vj->table);
if (vj->filename)
g_free(vj->filename);
if (vj->filename2)
g_free(vj->filename2);
g_free(vj);
g_free(job);
break;
case JOB_TRIGGERS:
sj = (struct schema_job *)job->job_data;
g_message("Thread %d dumping triggers for `%s`.`%s`", td->thread_id,
sj->database, sj->table);
dump_triggers_data(thrconn, sj->database, sj->table, sj->filename);
if (sj->database)
g_free(sj->database);
if (sj->table)
g_free(sj->table);
if (sj->filename)
g_free(sj->filename);
g_free(sj);
g_free(job);
break;
case JOB_SCHEMA_POST:
sp = (struct schema_post_job *)job->job_data;
g_message("Thread %d dumping SP and VIEWs for `%s`", td->thread_id,
sp->database);
dump_schema_post_data(thrconn, sp->database, sp->filename);
if (sp->database)
g_free(sp->database);
if (sp->filename)
g_free(sp->filename);
g_free(sp);
g_free(job);
break;
#ifdef WITH_BINLOG
case JOB_BINLOG:
thrconn = reconnect_for_binlog(thrconn);
g_message(
"Thread %d connected using MySQL connection ID %lu (in binlog mode)",
td->thread_id, mysql_thread_id(thrconn));
bj = (struct binlog_job *)job->job_data;
g_message("Thread %d dumping binary log file %s", td->thread_id,
bj->filename);
get_binlog_file(thrconn, bj->filename, binlog_directory,
bj->start_position, bj->stop_position, FALSE);
if (bj->filename)
g_free(bj->filename);
g_free(bj);
g_free(job);
break;
#endif
case JOB_SHUTDOWN:
g_message("Thread %d shutting down", td->thread_id);
g_mutex_lock(ll_mutex);
less_locking_threads--;
g_cond_broadcast(ll_cond);
g_mutex_unlock(ll_mutex);
g_string_free(query, TRUE);
g_string_free(prev_table, TRUE);
g_string_free(prev_database, TRUE);
if (thrconn)
mysql_close(thrconn);
g_free(job);
mysql_thread_end();
return NULL;
break;
default:
g_critical("Something very bad happened!");
exit(EXIT_FAILURE);
}
}
if (thrconn)
mysql_close(thrconn);
mysql_thread_end();
return NULL;
}
#ifdef WITH_BINLOG
MYSQL *reconnect_for_binlog(MYSQL *thrconn) {
if (thrconn) {
mysql_close(thrconn);
}
g_mutex_lock(init_mutex);
thrconn = mysql_init(NULL);
g_mutex_unlock(init_mutex);
configure_connection(thrconn, "mydumper");
int timeout = 1;
mysql_options(thrconn, MYSQL_OPT_READ_TIMEOUT, (const char *)&timeout);
if (!mysql_real_connect(thrconn, hostname, username, password, NULL, port,
socket_path, 0)) {
g_critical("Failed to re-connect to database: %s", mysql_error(thrconn));
exit(EXIT_FAILURE);
}
return thrconn;
}
#endif
//主函数在这里
int main(int argc, char *argv[]) {
GError *error = NULL;
GOptionContext *context;
g_thread_init(NULL);
init_mutex = g_mutex_new();
innodb_tables_mutex = g_mutex_new();
non_innodb_table_mutex = g_mutex_new();
table_schemas_mutex = g_mutex_new();
view_schemas_mutex = g_mutex_new();
schema_post_mutex = g_mutex_new();
ll_mutex = g_mutex_new();
ll_cond = g_cond_new();
context = g_option_context_new("multi-threaded MySQL dumping");
GOptionGroup *main_group =
g_option_group_new("main", "Main Options", "Main Options", NULL, NULL);
g_option_group_add_entries(main_group, entries);
g_option_group_add_entries(main_group, common_entries);
g_option_context_set_main_group(context, main_group);
gchar ** tmpargv=g_strdupv(argv);
int tmpargc=argc;
if (!g_option_context_parse(context, &tmpargc, &tmpargv, &error)) {
g_print("option parsing failed: %s, try --helpn", error->message);
exit(EXIT_FAILURE);
}
g_option_context_free(context);
if (password != NULL){
for(int i=1; i < argc; i++){
gchar * p= g_strstr_len(argv[i],-1,password);
if (p != NULL){
strncpy(p, "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", strlen(password));
}
}
}
// prompt for password if it's NULL
if (sizeof(password) == 0 || (password == NULL && askPassword)) {
password = passwordPrompt();
}
if (set_names_str){
gchar *tmp_str=g_strdup_printf("/*!40101 SET NAMES %s*/",set_names_str);
set_names_str=tmp_str;
} else
set_names_str=g_strdup("/*!40101 SET NAMES binary*/");
// printf("your password is %s and the size is %d
// n",password,sizeof(password));
if (program_version) {
g_print("mydumper %s, built against MySQL %sn", VERSION,
MYSQL_VERSION_STR);
exit(EXIT_SUCCESS);
}
set_verbose(verbose);
time_t t;
time(&t);
localtime_r(&t, &tval);
// rows chunks have precedence over chunk_filesize row chunk 优先于 chunk_filesize
if (rows_per_file > 0 && chunk_filesize > 0) {
chunk_filesize = 0;
g_warning("--chunk-filesize disabled by --rows option");
}
// until we have an unique option on lock types we need to ensure this
//直到我们对锁类型有一个唯一的选择,我们需要确保这一点
//说白了 就是选一个唯一的索引,第一肯定是主键,没有主键的话找唯一索引。唯一索引不存在找区分度最高的列。如果还是没有,根据表级别并行备份
if (no_locks || trx_consistency_only) //事务仅一致性??
less_locking = 0;
/* savepoints workaround to avoid metadata locking issues
doesnt work for chuncks */
if (rows_per_file && use_savepoints) {
use_savepoints = FALSE;
g_warning("--use-savepoints disabled by --rows");
}
// clarify binlog coordinates with trx_consistency_only
if (trx_consistency_only)
g_warning("Using trx_consistency_only, binlog coordinates will not be "
"accurate if you are writing to non transactional tables.");
if (!output_directory)
output_directory = g_strdup_printf(
"%s-%04d%02d%02d-%02d%02d%02d", DIRECTORY, tval.tm_year + 1900,
tval.tm_mon + 1, tval.tm_mday, tval.tm_hour, tval.tm_min, tval.tm_sec);
create_backup_dir(output_directory);
if (daemon_mode) {
pid_t pid, sid;
pid = fork();
if (pid < 0)
exit(EXIT_FAILURE);
else if (pid > 0)
exit(EXIT_SUCCESS);
umask(0);
sid = setsid();
if (sid < 0)
exit(EXIT_FAILURE);
char *dump_directory = g_strdup_printf("%s/0", output_directory);
create_backup_dir(dump_directory);
g_free(dump_directory);
dump_directory = g_strdup_printf("%s/1", output_directory);
create_backup_dir(dump_directory);
g_free(dump_directory);
#ifdef WITH_BINLOG
daemon_binlog_directory =
g_strdup_printf("%s/%s", output_directory, DAEMON_BINLOGS);
create_backup_dir(daemon_binlog_directory);
#endif
}
#ifdef WITH_BINLOG
if (need_binlogs) {
binlog_directory =
g_strdup_printf("%s/%s", output_directory, BINLOG_DIRECTORY);
create_backup_dir(binlog_directory);
}
#endif
/* Give ourselves an array of engines to ignore */
if (ignore_engines)
ignore = g_strsplit(ignore_engines, ",", 0);
/* Give ourselves an array of tables to dump */
if (tables_list)
tables = g_strsplit(tables_list, ",", 0);
/* Process list of tables to omit if specified */
if (tables_skiplist_file)
read_tables_skiplist(tables_skiplist_file);
if (daemon_mode) {
GError *terror;
#ifdef WITH_BINLOG
GThread *bthread =
g_thread_create(binlog_thread, GINT_TO_POINTER(1), FALSE, &terror);
if (bthread == NULL) {
g_critical("Could not create binlog thread: %s", terror->message);
g_error_free(terror);
exit(EXIT_FAILURE);
}
#endif
start_scheduled_dump = g_async_queue_new();
GThread *ethread =
g_thread_create(exec_thread, GINT_TO_POINTER(1), FALSE, &terror);
if (ethread == NULL) {
g_critical("Could not create exec thread: %s", terror->message);
g_error_free(terror);
exit(EXIT_FAILURE);
}
// Run initial snapshot
run_snapshot(NULL);
#if GLIB_MINOR_VERSION < 14
g_timeout_add(snapshot_interval * 60 * 1000, (GSourceFunc)run_snapshot,
NULL);
#else
g_timeout_add_seconds(snapshot_interval * 60, (GSourceFunc)run_snapshot,
NULL);
#endif
guint sigsource = g_unix_signal_add(SIGINT, sig_triggered, NULL);
sigsource = g_unix_signal_add(SIGTERM, sig_triggered, NULL);
m1 = g_main_loop_new(NULL, TRUE);
g_main_loop_run(m1);
g_source_remove(sigsource);
} else {
MYSQL *conn = create_main_connection();
start_dump(conn);
}
// sleep(5);
mysql_thread_end();
mysql_library_end();
g_free(output_directory);
g_strfreev(ignore);
g_strfreev(tables);
if (logoutfile) {
fclose(logoutfile);
}
exit(errors ? EXIT_FAILURE : EXIT_SUCCESS);
}
MYSQL *create_main_connection() {
MYSQL *conn;
conn = mysql_init(NULL);
configure_connection(conn, "mydumper");
if (!mysql_real_connect(conn, hostname, username, password, db, port,
socket_path, 0)) {
g_critical("Error connecting to database: %s", mysql_error(conn));
exit(EXIT_FAILURE);
}
detected_server = detect_server(conn);
if ((detected_server == SERVER_TYPE_MYSQL) &&
mysql_query(conn, "SET SESSION wait_timeout = 2147483")) {
g_warning("Failed to increase wait_timeout: %s", mysql_error(conn));
}
if ((detected_server == SERVER_TYPE_MYSQL) &&
mysql_query(conn, "SET SESSION net_write_timeout = 2147483")) {
g_warning("Failed to increase net_write_timeout: %s", mysql_error(conn));
}
switch (detected_server) {
case SERVER_TYPE_MYSQL:
g_message("Connected to a MySQL server");
break;
case SERVER_TYPE_DRIZZLE:
g_message("Connected to a Drizzle server");
break;
case SERVER_TYPE_TIDB:
g_message("Connected to a TiDB server");
break;
default:
g_critical("Cannot detect server type");
exit(EXIT_FAILURE);
break;
}
return conn;
}
void *exec_thread(void *data) {
(void)data;
while (1) {
g_async_queue_pop(start_scheduled_dump);
clear_dump_directory();
MYSQL *conn = create_main_connection();
start_dump(conn);
// start_dump already closes mysql
// mysql_close(conn);
mysql_thread_end();
// Don't switch the symlink on shutdown because the dump is probably
// incomplete.
if (!shutdown_triggered) {
const char *dump_symlink_source = (dump_number == 0) ? "0" : "1";
char *dump_symlink_dest =
g_strdup_printf("%s/last_dump", output_directory);
// We don't care if this fails
g_unlink(dump_symlink_dest);
if (symlink(dump_symlink_source, dump_symlink_dest) == -1) {
g_critical("error setting last good dump symlink %s, %d",
dump_symlink_dest, errno);
}
g_free(dump_symlink_dest);
dump_number = (dump_number == 1) ? 0 : 1;
}
}
return NULL;
}
#ifdef WITH_BINLOG
void *binlog_thread(void *data) {
(void)data;
MYSQL_RES *master = NULL;
MYSQL_ROW row;
MYSQL *conn;
conn = mysql_init(NULL);
if (defaults_file != NULL) {
mysql_options(conn, MYSQL_READ_DEFAULT_FILE, defaults_file);
}
mysql_options(conn, MYSQL_READ_DEFAULT_GROUP, "mydumper");
if (!mysql_real_connect(conn, hostname, username, password, db, port,
socket_path, 0)) {
g_critical("Error connecting to database: %s", mysql_error(conn));
exit(EXIT_FAILURE);
}
mysql_query(conn, "SHOW MASTER STATUS");
master = mysql_store_result(conn);
if (master && (row = mysql_fetch_row(master))) {
MYSQL *binlog_connection = NULL;
binlog_connection = reconnect_for_binlog(binlog_connection);
binlog_connect_id = mysql_thread_id(binlog_connection);
guint64 start_position = g_ascii_strtoull(row[1], NULL, 10);
gchar *filename = g_strdup(row[0]);
mysql_free_result(master);
mysql_close(conn);
g_message(
"Continuous binlog thread connected using MySQL connection ID %lu",
mysql_thread_id(binlog_connection));
get_binlog_file(binlog_connection, filename, daemon_binlog_directory,
start_position, 0, TRUE);
g_free(filename);
mysql_close(binlog_connection);
} else {
mysql_free_result(master);
mysql_close(conn);
}
g_message("Continuous binlog thread shutdown");
mysql_thread_end();
return NULL;
}
#endif
void start_dump(MYSQL *conn) {
struct configuration conf = {1, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 0};
char *p;
char *p2;
char *p3;
char *u;
guint64 nits[num_threads];
GList *nitl[num_threads];
int tn = 0;
guint64 min = 0;
time_t t;
struct db_table *dbt;
struct schema_post *sp;
guint n;
FILE *nufile = NULL;
guint have_backup_locks = 0;
for (n = 0; n < num_threads; n++) {
nits[n] = 0;
nitl[n] = NULL;
}
if (daemon_mode)
p = g_strdup_printf("%s/%d/metadata.partial", output_directory,
dump_number);
else
p = g_strdup_printf("%s/metadata.partial", output_directory);
p2 = g_strndup(p, (unsigned)strlen(p) - 8);
FILE *mdfile = g_fopen(p, "w");
if (!mdfile) {
g_critical("Couldn't write metadata file (%d)", errno);
exit(EXIT_FAILURE);
}
if (updated_since > 0) {
if (daemon_mode)
u = g_strdup_printf("%s/%d/not_updated_tables", output_directory,
dump_number);
else
u = g_strdup_printf("%s/not_updated_tables", output_directory);
nufile = g_fopen(u, "w");
if (!nufile) {
g_critical("Couldn't write not_updated_tables file (%d)", errno);
exit(EXIT_FAILURE);
}
get_not_updated(conn, nufile);
}
/* We check SHOW PROCESSLIST, and if there're queries
larger than preset value, we terminate the process.
This avoids stalling whole server with flush */
if (!no_locks) {
while (TRUE) {
int longquery_count = 0;
if (mysql_query(conn, "SHOW PROCESSLIST")) {
g_warning("Could not check PROCESSLIST, no long query guard enabled: %s",
mysql_error(conn));
break;
} else {
MYSQL_RES *res = mysql_store_result(conn);
MYSQL_ROW row;
/* Just in case PROCESSLIST output column order changes */
MYSQL_FIELD *fields = mysql_fetch_fields(res);
guint i;
int tcol = -1, ccol = -1, icol = -1, ucol = -1;
for (i = 0; i < mysql_num_fields(res); i++) {
if (!strcasecmp(fields[i].name, "Command"))
ccol = i;
else if (!strcasecmp(fields[i].name, "Time"))
tcol = i;
else if (!strcasecmp(fields[i].name, "Id"))
icol = i;
else if (!strcasecmp(fields[i].name, "User"))
ucol = i;
}
if ((tcol < 0) || (ccol < 0) || (icol < 0)) {
g_critical("Error obtaining information from processlist");
exit(EXIT_FAILURE);
}
while ((row = mysql_fetch_row(res))) {
if (row[ccol] && strcmp(row[ccol], "Query"))
continue;
if (row[ucol] && !strcmp(row[ucol], "system user"))
continue;
if (row[tcol] && atoi(row[tcol]) > longquery) {
if (killqueries) {
if (mysql_query(conn,
p3 = g_strdup_printf("KILL %lu", atol(row[icol])))) {
g_warning("Could not KILL slow query: %s", mysql_error(conn));
longquery_count++;
} else {
g_warning("Killed a query that was running for %ss", row[tcol]);
}
g_free(p3);
} else {
longquery_count++;
}
}
}
mysql_free_result(res);
if (longquery_count == 0)
break;
else {
if (longquery_retries == 0) {
g_critical("There are queries in PROCESSLIST running longer than "
"%us, aborting dump,nt"
"use --long-query-guard to change the guard value, kill "
"queries (--kill-long-queries) or use ntdifferent "
"server for dump",
longquery);
exit(EXIT_FAILURE);
}
longquery_retries--;
g_warning("There are queries in PROCESSLIST running longer than "
"%us, retrying in %u seconds (%u left).",
longquery, longquery_retry_interval, longquery_retries);
sleep(longquery_retry_interval);
}
}
}
}
if (!no_locks && (detected_server != SERVER_TYPE_TIDB)) {
// Percona Server 8 removed LOCK BINLOG so backup locks is useless for
// mydumper now and we need to fail back to FTWRL
mysql_query(conn, "SELECT @@version_comment, @@version");
MYSQL_RES *res2 = mysql_store_result(conn);
MYSQL_ROW ver;
while ((ver = mysql_fetch_row(res2))) {
if (g_str_has_prefix(ver[0], "Percona") &&
g_str_has_prefix(ver[1], "8.")) {
g_message("Disabling Percona Backup Locks for Percona Server 8");
no_backup_locks = 1;
}
}
mysql_free_result(res2);
// Percona Backup Locks
if (!no_backup_locks) {
mysql_query(conn, "SELECT @@have_backup_locks");
MYSQL_RES *rest = mysql_store_result(conn);
if (rest != NULL && mysql_num_rows(rest)) {
mysql_free_result(rest);
g_message("Using Percona Backup Locks");
have_backup_locks = 1;
}
}
if (have_backup_locks) {
if (mysql_query(conn, "LOCK TABLES FOR BACKUP")) {
g_critical("Couldn't acquire LOCK TABLES FOR BACKUP, snapshots will "
"not be consistent: %s",
mysql_error(conn));
errors++;
}
if (mysql_query(conn, "LOCK BINLOG FOR BACKUP")) {
g_critical("Couldn't acquire LOCK BINLOG FOR BACKUP, snapshots will "
"not be consistent: %s",
mysql_error(conn));
errors++;
}
} else if (lock_all_tables) {
// LOCK ALL TABLES
GString *query = g_string_sized_new(16777216);
gchar *dbtb = NULL;
gchar **dt = NULL;
GList *tables_lock = NULL;
GList *iter = NULL;
guint success = 0;
guint retry = 0;
guint lock = 1;
int i = 0;
if (db) {
g_string_printf(
query,
"SELECT TABLE_SCHEMA, TABLE_NAME FROM information_schema.TABLES "
"WHERE TABLE_SCHEMA = '%s' AND TABLE_TYPE ='BASE TABLE' AND NOT "
"(TABLE_SCHEMA = 'mysql' AND (TABLE_NAME = 'slow_log' OR "
"TABLE_NAME = 'general_log'))",
db);
} else if (tables) {
for (i = 0; tables[i] != NULL; i++) {
dt = g_strsplit(tables[i], ".", 0);
dbtb = g_strdup_printf("`%s`.`%s`", dt[0], dt[1]);
tables_lock = g_list_prepend(tables_lock, dbtb);
}
tables_lock = g_list_reverse(tables_lock);
} else {
g_string_printf(
query,
"SELECT TABLE_SCHEMA, TABLE_NAME FROM information_schema.TABLES "
"WHERE TABLE_TYPE ='BASE TABLE' AND TABLE_SCHEMA NOT IN "
"('information_schema', 'performance_schema', 'data_dictionary') "
"AND NOT (TABLE_SCHEMA = 'mysql' AND (TABLE_NAME = 'slow_log' OR "
"TABLE_NAME = 'general_log'))");
}
if (tables_lock == NULL) {
if (mysql_query(conn, query->str)) {
g_critical("Couldn't get table list for lock all tables: %s",
mysql_error(conn));
errors++;
} else {
MYSQL_RES *res = mysql_store_result(conn);
MYSQL_ROW row;
while ((row = mysql_fetch_row(res))) {
lock = 1;
if (tables) {
int table_found = 0;
for (i = 0; tables[i] != NULL; i++)
if (g_ascii_strcasecmp(tables[i], row[1]) == 0)
table_found = 1;
if (!table_found)
lock = 0;
}
if (lock && tables_skiplist_file && check_skiplist(row[0], row[1]))
continue;
if (lock && regexstring && !check_regex(row[0], row[1]))
continue;
if (lock) {
dbtb = g_strdup_printf("`%s`.`%s`", row[0], row[1]);
tables_lock = g_list_prepend(tables_lock, dbtb);
}
}
tables_lock = g_list_reverse(tables_lock);
}
}
// Try three times to get the lock, this is in case of tmp tables
// disappearing
while (!success && retry < 4) {
n = 0;
for (iter = tables_lock; iter != NULL; iter = iter->next) {
if (n == 0) {
g_string_printf(query, "LOCK TABLE %s READ", (char *)iter->data);
n = 1;
} else {
g_string_append_printf(query, ", %s READ", (char *)iter->data);
}
}
if (mysql_query(conn, query->str)) {
gchar *failed_table = NULL;
gchar **tmp_fail;
tmp_fail = g_strsplit(mysql_error(conn), "'", 0);
tmp_fail = g_strsplit(tmp_fail[1], ".", 0);
failed_table = g_strdup_printf("`%s`.`%s`", tmp_fail[0], tmp_fail[1]);
for (iter = tables_lock; iter != NULL; iter = iter->next) {
if (strcmp(iter->data, failed_table) == 0) {
tables_lock = g_list_remove(tables_lock, iter->data);
}
}
g_free(tmp_fail);
g_free(failed_table);
} else {
success = 1;
}
retry += 1;
}
if (!success) {
g_critical("Lock all tables fail: %s", mysql_error(conn));
exit(EXIT_FAILURE);
}
g_free(query->str);
g_list_free(tables_lock);
} else {
if (mysql_query(conn, "FLUSH TABLES WITH READ LOCK")) {
g_critical("Couldn't acquire global lock, snapshots will not be "
"consistent: %s",
mysql_error(conn));
errors++;
}
}
} else if (detected_server == SERVER_TYPE_TIDB) {
g_message("Skipping locks because of TiDB");
if (!tidb_snapshot) {
// Generate a @@tidb_snapshot to use for the worker threads since
// the tidb-snapshot argument was not specified when starting mydumper
if (mysql_query(conn, "SHOW MASTER STATUS")) {
g_critical("Couldn't generate @@tidb_snapshot: %s", mysql_error(conn));
exit(EXIT_FAILURE);
} else {
MYSQL_RES *result = mysql_store_result(conn);
MYSQL_ROW row = mysql_fetch_row(
result); /* There should never be more than one row */
tidb_snapshot = g_strdup(row[1]);
mysql_free_result(result);
}
}
// Need to set the @@tidb_snapshot for the master thread
gchar *query =
g_strdup_printf("SET SESSION tidb_snapshot = '%s'", tidb_snapshot);
g_message("Set to tidb_snapshot '%s'", tidb_snapshot);
if (mysql_query(conn, query)) {
g_critical("Failed to set tidb_snapshot: %s", mysql_error(conn));
exit(EXIT_FAILURE);
}
g_free(query);
} else {
g_warning("Executing in no-locks mode, snapshot will not be consistent");
}
if (mysql_get_server_version(conn) < 40108) {
mysql_query(
conn,
"CREATE TABLE IF NOT EXISTS mysql.mydumperdummy (a INT) ENGINE=INNODB");
need_dummy_read = 1;
}
// tokudb do not support consistent snapshot
mysql_query(conn, "SELECT @@tokudb_version");
MYSQL_RES *rest = mysql_store_result(conn);
if (rest != NULL && mysql_num_rows(rest)) {
mysql_free_result(rest);
g_message("TokuDB detected, creating dummy table for CS");
mysql_query(
conn,
"CREATE TABLE IF NOT EXISTS mysql.tokudbdummy (a INT) ENGINE=TokuDB");
need_dummy_toku_read = 1;
}
// Do not start a transaction when lock all tables instead of FTWRL,
// since it can implicitly release read locks we hold
if (!lock_all_tables) {
mysql_query(conn, "START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */");
}
if (need_dummy_read) {
mysql_query(conn,
"SELECT /*!40001 SQL_NO_CACHE */ * FROM mysql.mydumperdummy");
MYSQL_RES *res = mysql_store_result(conn);
if (res)
mysql_free_result(res);
}
if (need_dummy_toku_read) {
mysql_query(conn,
"SELECT /*!40001 SQL_NO_CACHE */ * FROM mysql.tokudbdummy");
MYSQL_RES *res = mysql_store_result(conn);
if (res)
mysql_free_result(res);
}
time(&t);
localtime_r(&t, &tval);
fprintf(mdfile, "Started dump at: %04d-%02d-%02d %02d:%02d:%02dn",
tval.tm_year + 1900, tval.tm_mon + 1, tval.tm_mday, tval.tm_hour,
tval.tm_min, tval.tm_sec);
g_message("Started dump at: %04d-%02d-%02d %02d:%02d:%02dn",
tval.tm_year + 1900, tval.tm_mon + 1, tval.tm_mday, tval.tm_hour,
tval.tm_min, tval.tm_sec);
if (detected_server == SERVER_TYPE_MYSQL) {
mysql_query(conn, set_names_str);
write_snapshot_info(conn, mdfile);
}
GThread **threads = g_new(GThread *, num_threads * (less_locking + 1));
struct thread_data *td =
g_new(struct thread_data, num_threads * (less_locking + 1));
if (less_locking) {
conf.queue_less_locking = g_async_queue_new();
conf.ready_less_locking = g_async_queue_new();
less_locking_threads = num_threads;
for (n = num_threads; n < num_threads * 2; n++) {
td[n].conf = &conf;
td[n].thread_id = n + 1;
threads[n] = g_thread_create((GThreadFunc)process_queue_less_locking,
&td[n], TRUE, NULL);
g_async_queue_pop(conf.ready_less_locking);
}
g_async_queue_unref(conf.ready_less_locking);
}
conf.queue = g_async_queue_new();
conf.ready = g_async_queue_new();
conf.unlock_tables = g_async_queue_new();
conf.ready_database_dump = g_async_queue_new();
for (n = 0; n < num_threads; n++) {
td[n].conf = &conf;
td[n].thread_id = n + 1;
threads[n] =
g_thread_create((GThreadFunc)process_queue, &td[n], TRUE, NULL);
g_async_queue_pop(conf.ready);
}
g_async_queue_unref(conf.ready);
if (trx_consistency_only) {
g_message("Transactions started, unlocking tables");
mysql_query(conn, "UNLOCK TABLES /* trx-only */");
if (have_backup_locks)
mysql_query(conn, "UNLOCK BINLOG");
}
if (db) {
dump_database(db, &conf);
if (!no_schemas)
dump_create_database(db, &conf);
} else if (tables) {
get_tables(conn, &conf);
} else {
MYSQL_RES *databases;
MYSQL_ROW row;
if (mysql_query(conn, "SHOW DATABASES") ||
!(databases = mysql_store_result(conn))) {
g_critical("Unable to list databases: %s", mysql_error(conn));
exit(EXIT_FAILURE);
}
while ((row = mysql_fetch_row(databases))) {
if (!strcasecmp(row[0], "information_schema") ||
!strcasecmp(row[0], "performance_schema") ||
(!strcasecmp(row[0], "data_dictionary")))
continue;
dump_database(row[0], &conf);
/* Checks PCRE expressions on 'database' string */
if (!no_schemas && (regexstring == NULL || check_regex(row[0], NULL)))
dump_create_database(row[0], &conf);
}
mysql_free_result(databases);
}
g_async_queue_pop(conf.ready_database_dump);
g_async_queue_unref(conf.ready_database_dump);
g_list_free(no_updated_tables);
if (!non_innodb_table) {
g_async_queue_push(conf.unlock_tables, GINT_TO_POINTER(1));
}
non_innodb_table = g_list_reverse(non_innodb_table);
if (less_locking) {
GList *iter;
for (iter = non_innodb_table; iter != NULL; iter = iter->next) {
dbt = (struct db_table *)iter->data;
tn = 0;
min = nits[0];
for (n = 1; n < num_threads; n++) {
if (nits[n] < min) {
min = nits[n];
tn = n;
}
}
nitl[tn] = g_list_prepend(nitl[tn], dbt);
nits[tn] += dbt->datalength;
}
nitl[tn] = g_list_reverse(nitl[tn]);
for (n = 0; n < num_threads; n++) {
if (nits[n] > 0) {
g_atomic_int_inc(&non_innodb_table_counter);
dump_tables(conn, nitl[n], &conf);
g_list_free(nitl[n]);
}
}
g_list_free(non_innodb_table);
if (g_atomic_int_get(&non_innodb_table_counter))
g_atomic_int_inc(&non_innodb_done);
else
g_async_queue_push(conf.unlock_tables, GINT_TO_POINTER(1));
for (n = 0; n < num_threads; n++) {
struct job *j = g_new0(struct job, 1);
j->type = JOB_SHUTDOWN;
g_async_queue_push(conf.queue_less_locking, j);
}
} else {
GList *iter;
for (iter = non_innodb_table; iter != NULL; iter = iter->next) {
dbt = (struct db_table *)iter->data;
dump_table(conn, dbt->database, dbt->table, &conf, FALSE);
g_atomic_int_inc(&non_innodb_table_counter);
}
g_list_free(non_innodb_table);
g_atomic_int_inc(&non_innodb_done);
}
innodb_tables = g_list_reverse(innodb_tables);
GList *iter;
for (iter = innodb_tables; iter != NULL; iter = iter->next) {
dbt = (struct db_table *)iter->data;
dump_table(conn, dbt->database, dbt->table, &conf, TRUE);
}
g_list_free(innodb_tables);
table_schemas = g_list_reverse(table_schemas);
for (iter = table_schemas; iter != NULL; iter = iter->next) {
dbt = (struct db_table *)iter->data;
dump_schema(conn, dbt->database, dbt->table, &conf);
g_free(dbt->table);
g_free(dbt->database);
g_free(dbt);
}
g_list_free(table_schemas);
view_schemas = g_list_reverse(view_schemas);
for (iter = view_schemas; iter != NULL; iter = iter->next) {
dbt = (struct db_table *)iter->data;
dump_view(dbt->database, dbt->table, &conf);
g_free(dbt->table);
g_free(dbt->database);
g_free(dbt);
}
g_list_free(view_schemas);
schema_post = g_list_reverse(schema_post);
for (iter = schema_post; iter != NULL; iter = iter->next) {
sp = (struct schema_post *)iter->data;
dump_schema_post(sp->database, &conf);
g_free(sp->database);
g_free(sp);
}
g_list_free(schema_post);
if (!no_locks && !trx_consistency_only) {
g_async_queue_pop(conf.unlock_tables);
g_message("Non-InnoDB dump complete, unlocking tables");
mysql_query(conn, "UNLOCK TABLES /* FTWRL */");
if (have_backup_locks)
mysql_query(conn, "UNLOCK BINLOG");
}
#ifdef WITH_BINLOG
if (need_binlogs) {
get_binlogs(conn, &conf);
}
#endif
// close main connection
mysql_close(conn);
if (less_locking) {
for (n = num_threads; n < num_threads * 2; n++) {
g_thread_join(threads[n]);
}
g_async_queue_unref(conf.queue_less_locking);
}
for (n = 0; n < num_threads; n++) {
struct job *j = g_new0(struct job, 1);
j->type = JOB_SHUTDOWN;
g_async_queue_push(conf.queue, j);
}
for (n = 0; n < num_threads; n++) {
g_thread_join(threads[n]);
}
g_async_queue_unref(conf.queue);
g_async_queue_unref(conf.unlock_tables);
time(&t);
localtime_r(&t, &tval);
fprintf(mdfile, "Finished dump at: %04d-%02d-%02d %02d:%02d:%02dn",
tval.tm_year + 1900, tval.tm_mon + 1, tval.tm_mday, tval.tm_hour,
tval.tm_min, tval.tm_sec);
fclose(mdfile);
if (updated_since > 0)
fclose(nufile);
g_rename(p, p2);
g_free(p);
g_free(p2);
g_message("Finished dump at: %04d-%02d-%02d %02d:%02d:%02dn",
tval.tm_year + 1900, tval.tm_mon + 1, tval.tm_mday, tval.tm_hour,
tval.tm_min, tval.tm_sec);
g_free(td);
g_free(threads);
}
void dump_create_database(char *database, struct configuration *conf) {
struct job *j = g_new0(struct job, 1);
struct create_database_job *cdj = g_new0(struct create_database_job, 1);
j->job_data = (void *)cdj;
cdj->database = g_strdup(database);
j->conf = conf;
j->type = JOB_CREATE_DATABASE;
if (daemon_mode)
cdj->filename =
g_strdup_printf("%s/%d/%s-schema-create.sql%s", output_directory,
dump_number, database, (compress_output ? ".gz" : ""));
else
cdj->filename =
g_strdup_printf("%s/%s-schema-create.sql%s", output_directory, database,
(compress_output ? ".gz" : ""));
g_async_queue_push(conf->queue, j);
return;
}
void dump_create_database_data(MYSQL *conn, char *database, char *filename) {
void *outfile = NULL;
char *query = NULL;
MYSQL_RES *result = NULL;
MYSQL_ROW row;
if (!compress_output)
outfile = g_fopen(filename, "w");
else
outfile = (void *)gzopen(filename, "w");
if (!outfile) {
g_critical("Error: DB: %s Could not create output file %s (%d)", database,
filename, errno);
errors++;
return;
}
GString *statement = g_string_sized_new(statement_size);
query = g_strdup_printf("SHOW CREATE DATABASE IF NOT EXISTS `%s`", database);
if (mysql_query(conn, query) || !(result = mysql_use_result(conn))) {
if (success_on_1146 && mysql_errno(conn) == 1146) {
g_warning("Error dumping create database (%s): %s", database,
mysql_error(conn));
} else {
g_critical("Error dumping create database (%s): %s", database,
mysql_error(conn));
errors++;
}
g_free(query);
return;
}
/* There should never be more than one row */
row = mysql_fetch_row(result);
g_string_append(statement, row[1]);
g_string_append(statement, ";n");
if (!write_data((FILE *)outfile, statement)) {
g_critical("Could not write create database for %s", database);
errors++;
}
g_free(query);
if (!compress_output)
fclose((FILE *)outfile);
else
gzclose((gzFile)outfile);
g_string_free(statement, TRUE);
if (result)
mysql_free_result(result);
return;
}
void get_not_updated(MYSQL *conn, FILE *file) {
MYSQL_RES *res = NULL;
MYSQL_ROW row;
gchar *query =
g_strdup_printf("SELECT CONCAT(TABLE_SCHEMA,'.',TABLE_NAME) FROM "
"information_schema.TABLES WHERE TABLE_TYPE = 'BASE "
"TABLE' AND UPDATE_TIME < NOW() - INTERVAL %d DAY",
updated_since);
mysql_query(conn, query);
g_free(query);
res = mysql_store_result(conn);
while ((row = mysql_fetch_row(res))) {
no_updated_tables = g_list_prepend(no_updated_tables, row[0]);
fprintf(file, "%sn", row[0]);
}
no_updated_tables = g_list_reverse(no_updated_tables);
fflush(file);
}
gboolean detect_generated_fields(MYSQL *conn, char *database, char *table) {
MYSQL_RES *res = NULL;
MYSQL_ROW row;
gboolean result = FALSE;
gchar *query = g_strdup_printf(
"select COLUMN_NAME from information_schema.COLUMNS where "
"TABLE_SCHEMA='%s' and TABLE_NAME='%s' and extra like '%%GENERATED%%' and extra not like '%%DEFAULT_GENERATED%%'",
database, table);
mysql_query(conn, query);
g_free(query);
res = mysql_store_result(conn);
if ((row = mysql_fetch_row(res))) {
result = TRUE;
}
mysql_free_result(res);
return result;
}
GString *get_insertable_fields(MYSQL *conn, char *database, char *table) {
MYSQL_RES *res = NULL;
MYSQL_ROW row;
GString *field_list = g_string_new("");
gchar *query =
g_strdup_printf("select COLUMN_NAME from information_schema.COLUMNS "
"where TABLE_SCHEMA='%s' and TABLE_NAME='%s' and extra "
"not like '%%VIRTUAL GENERATED%%' and extra not like '%%STORED GENERATED%%'",
database, table);
mysql_query(conn, query);
g_free(query);
res = mysql_store_result(conn);
gboolean first = TRUE;
while ((row = mysql_fetch_row(res))) {
if (first) {
first = FALSE;
} else {
g_string_append(field_list, ",");
}
gchar *tb = g_strdup_printf("`%s`", row[0]);
g_string_append(field_list, tb);
g_free(tb);
}
mysql_free_result(res);
return field_list;
}
/* Heuristic chunks building - based on estimates, produces list of ranges for
datadumping WORK IN PROGRESS
*/
GList *get_chunks_for_table(MYSQL *conn, char *database, char *table,
struct configuration *conf) {
GList *chunks = NULL;
MYSQL_RES *indexes = NULL, *minmax = NULL, *total = NULL;
MYSQL_ROW row;
char *field = NULL;
int showed_nulls = 0;
/* first have to pick index, in future should be able to preset in
* configuration too */
gchar *query = g_strdup_printf("SHOW INDEX FROM `%s`.`%s`", database, table);
mysql_query(conn, query);
g_free(query);
indexes = mysql_store_result(conn);
/*选一个唯一的索引,第一肯定是主键,没有主键的话找唯一索引。唯一索引不存在找区分度最高的列。如果还是没有,根据表级别并行备份*/
while ((row = mysql_fetch_row(indexes))) {
if (!strcmp(row[2], "PRIMARY") && (!strcmp(row[3], "1"))) {
/* Pick first column in PK, cardinality doesn't matter */
field = row[4];
break;
}
}
/* If no PK found, try using first UNIQUE index */
if (!field) {
mysql_data_seek(indexes, 0);
while ((row = mysql_fetch_row(indexes))) {
if (!strcmp(row[1], "0") && (!strcmp(row[3], "1"))) {
/* Again, first column of any unique index */
field = row[4];
break;
}
}
}
/* Still unlucky? Pick any high-cardinality index */
if (!field && conf->use_any_index) {
guint64 max_cardinality = 0;
guint64 cardinality = 0;
mysql_data_seek(indexes, 0);
while ((row = mysql_fetch_row(indexes))) {
if (!strcmp(row[3], "1")) {
if (row[6])
cardinality = strtoul(row[6], NULL, 10);
if (cardinality > max_cardinality) {
field = row[4];
max_cardinality = cardinality;
}
}
}
}
/* Oh well, no chunks today - no suitable index */
if (!field)
goto cleanup;
/*在确定了chunk划分字段后,先获取该子段的最大值和最小值*/
/* Get minimum/maximum */
mysql_query(conn, query = g_strdup_printf(
"SELECT %s MIN(`%s`),MAX(`%s`) FROM `%s`.`%s`",
(detected_server == SERVER_TYPE_MYSQL)
? "/*!40001 SQL_NO_CACHE */"
: "",
field, field, database, table));
g_free(query);
minmax = mysql_store_result(conn);
if (!minmax)
goto cleanup;
row = mysql_fetch_row(minmax);
MYSQL_FIELD *fields = mysql_fetch_fields(minmax);
/* Check if all values are NULL */
if (row[0] == NULL)
goto cleanup;
char *min = row[0];
char *max = row[1];
/* Got total number of rows, skip chunk logic if estimates are low */
guint64 rows = estimate_count(conn, database, table, field, NULL, NULL);
if (rows <= rows_per_file)
goto cleanup;
/* This is estimate, not to use as guarantee! Every chunk would have eventual
* adjustments */
guint64 estimated_chunks = rows / rows_per_file;
guint64 estimated_step, nmin, nmax, cutoff;
/* Support just bigger INTs for now, very dumb, no verify approach */
switch (fields[0].type) {
case MYSQL_TYPE_LONG:
case MYSQL_TYPE_LONGLONG:
case MYSQL_TYPE_INT24:
case MYSQL_TYPE_SHORT:
/* static stepping */
nmin = strtoul(min, NULL, 10);
nmax = strtoul(max, NULL, 10);
estimated_step = (nmax - nmin) / estimated_chunks + 1;
cutoff = nmin;
while (cutoff <= nmax) {
chunks = g_list_prepend(
chunks,
g_strdup_printf("%s%s%s%s(`%s` >= %llu AND `%s` < %llu)",
!showed_nulls ? "`" : "",
!showed_nulls ? field : "",
!showed_nulls ? "`" : "",
!showed_nulls ? " IS NULL OR " : "", field,
(unsigned long long)cutoff, field,
(unsigned long long)(cutoff + estimated_step)));
cutoff += estimated_step;
showed_nulls = 1;
}
chunks = g_list_reverse(chunks);
default:
goto cleanup;
}
cleanup:
if (indexes)
mysql_free_result(indexes);
if (minmax)
mysql_free_result(minmax);
if (total)
mysql_free_result(total);
return chunks;
}
/* Try to get EXPLAIN'ed estimates of row in resultset */
/*通过执行explain select field from db.table来估计该表的记录数,根据这个记录个数拆分多个chunk*/
guint64 estimate_count(MYSQL *conn, char *database, char *table, char *field,
char *from, char *to) {
char *querybase, *query;
int ret;
g_assert(conn && database && table);
querybase = g_strdup_printf("EXPLAIN SELECT `%s` FROM `%s`.`%s`",
(field ? field : "*"), database, table);
if (from || to) {
g_assert(field != NULL);
char *fromclause = NULL, *toclause = NULL;
char *escaped;
if (from) {
escaped = g_new(char, strlen(from) * 2 + 1);
mysql_real_escape_string(conn, escaped, from, strlen(from));
fromclause = g_strdup_printf(" `%s` >= "%s" ", field, escaped);
g_free(escaped);
}
if (to) {
escaped = g_new(char, strlen(to) * 2 + 1);
mysql_real_escape_string(conn, escaped, from, strlen(from));
toclause = g_strdup_printf(" `%s` <= "%s"", field, escaped);
g_free(escaped);
}
query = g_strdup_printf("%s WHERE `%s` %s %s", querybase,
(from ? fromclause : ""),
((from && to) ? "AND" : ""), (to ? toclause : ""));
if (toclause)
g_free(toclause);
if (fromclause)
g_free(fromclause);
ret = mysql_query(conn, query);
g_free(querybase);
g_free(query);
} else {
ret = mysql_query(conn, querybase);
g_free(querybase);
}
if (ret) {
g_warning("Unable to get estimates for %s.%s: %s", database, table,
mysql_error(conn));
}
MYSQL_RES *result = mysql_store_result(conn);
MYSQL_FIELD *fields = mysql_fetch_fields(result);
guint i;
for (i = 0; i < mysql_num_fields(result); i++) {
if (!strcmp(fields[i].name, "rows"))
break;
}
MYSQL_ROW row = NULL;
guint64 count = 0;
if (result)
row = mysql_fetch_row(result);
if (row && row[i])
count = strtoul(row[i], NULL, 10);
if (result)
mysql_free_result(result);
return (count);
}
void create_backup_dir(char *new_directory) {
if (g_mkdir(new_directory, 0700) == -1) {
if (errno != EEXIST) {
g_critical("Unable to create `%s': %s", new_directory, g_strerror(errno));
exit(EXIT_FAILURE);
}
}
}
void dump_database(char *database, struct configuration *conf) {
g_atomic_int_inc(&database_counter);
struct job *j = g_new0(struct job, 1);
struct dump_database_job *ddj = g_new0(struct dump_database_job, 1);
j->job_data = (void *)ddj;
ddj->database = g_strdup(database);
j->conf = conf;
j->type = JOB_DUMP_DATABASE;
if (less_locking)
g_async_queue_push(conf->queue_less_locking, j);
else
g_async_queue_push(conf->queue, j);
return;
}
void dump_database_thread(MYSQL *conn, char *database) {
char *query;
mysql_select_db(conn, database);
if (detected_server == SERVER_TYPE_MYSQL ||
detected_server == SERVER_TYPE_TIDB)
query = g_strdup("SHOW TABLE STATUS");
else
query =
g_strdup_printf("SELECT TABLE_NAME, ENGINE, TABLE_TYPE as COMMENT FROM "
"DATA_DICTIONARY.TABLES WHERE TABLE_SCHEMA='%s'",
database);
if (mysql_query(conn, (query))) {
g_critical("Error: DB: %s - Could not execute query: %s", database,
mysql_error(conn));
errors++;
g_free(query);
return;
}
MYSQL_RES *result = mysql_store_result(conn);
MYSQL_FIELD *fields = mysql_fetch_fields(result);
guint i;
int ecol = -1, ccol = -1;
for (i = 0; i < mysql_num_fields(result); i++) {
if (!strcasecmp(fields[i].name, "Engine"))
ecol = i;
else if (!strcasecmp(fields[i].name, "Comment"))
ccol = i;
}
if (!result) {
g_critical("Could not list tables for %s: %s", database, mysql_error(conn));
errors++;
return;
}
MYSQL_ROW row;
while ((row = mysql_fetch_row(result))) {
int dump = 1;
int is_view = 0;
/* We now do care about views!
num_fields>1 kicks in only in case of 5.0 SHOW FULL TABLES or SHOW
TABLE STATUS row[1] == NULL if it is a view in 5.0 'SHOW TABLE STATUS'
row[1] == "VIEW" if it is a view in 5.0 'SHOW FULL TABLES'
*/
if ((detected_server == SERVER_TYPE_MYSQL) &&
(row[ccol] == NULL || !strcmp(row[ccol], "VIEW")))
is_view = 1;
/* Check for broken tables, i.e. mrg with missing source tbl */
if (!is_view && row[ecol] == NULL) {
g_warning("Broken table detected, please review: %s.%s", database,
row[0]);
dump = 0;
}
/* Skip ignored engines, handy for avoiding Merge, Federated or Blackhole
* :-) dumps */
if (dump && ignore && !is_view) {
for (i = 0; ignore[i] != NULL; i++) {
if (g_ascii_strcasecmp(ignore[i], row[ecol]) == 0) {
dump = 0;
break;
}
}
}
/* Skip views */
if (is_view && no_dump_views)
dump = 0;
if (!dump)
continue;
/* In case of table-list option is enabled, check if table is part of the
* list */
if (tables) {
int table_found = 0;
for (i = 0; tables[i] != NULL; i++)
if (g_ascii_strcasecmp(tables[i], row[0]) == 0)
table_found = 1;
if (!table_found)
dump = 0;
}
if (!dump)
continue;
/* Special tables */
if (g_ascii_strcasecmp(database, "mysql") == 0 &&
(g_ascii_strcasecmp(row[0], "general_log") == 0 ||
g_ascii_strcasecmp(row[0], "slow_log") == 0 ||
g_ascii_strcasecmp(row[0], "innodb_index_stats") == 0 ||
g_ascii_strcasecmp(row[0], "innodb_table_stats") == 0)) {
dump = 0;
continue;
}
/* Checks skip list on 'database.table' string */
if (tables_skiplist && check_skiplist(database, row[0]))
continue;
/* Checks PCRE expressions on 'database.table' string */
if (regexstring && !check_regex(database, row[0]))
continue;
/* Check if the table was recently updated */
if (no_updated_tables && !is_view) {
GList *iter;
for (iter = no_updated_tables; iter != NULL; iter = iter->next) {
if (g_ascii_strcasecmp(
iter->data, g_strdup_printf("%s.%s", database, row[0])) == 0) {
g_message("NO UPDATED TABLE: %s.%s", database, row[0]);
dump = 0;
}
}
}
if (!dump)
continue;
/* Green light! */
struct db_table *dbt = g_new(struct db_table, 1);
dbt->database = g_strdup(database);
dbt->table = g_strdup(row[0]);
if (!row[6])
dbt->datalength = 0;
else
dbt->datalength = g_ascii_strtoull(row[6], NULL, 10);
// if is a view we care only about schema
if (!is_view) {
// with trx_consistency_only we dump all as innodb_tables
if (!no_data) {
if (row[ecol] != NULL && g_ascii_strcasecmp("MRG_MYISAM", row[ecol])) {
if (trx_consistency_only ||
(row[ecol] != NULL && !g_ascii_strcasecmp("InnoDB", row[ecol]))) {
g_mutex_lock(innodb_tables_mutex);
innodb_tables = g_list_prepend(innodb_tables, dbt);
g_mutex_unlock(innodb_tables_mutex);
} else if (row[ecol] != NULL &&
!g_ascii_strcasecmp("TokuDB", row[ecol])) {
g_mutex_lock(innodb_tables_mutex);
innodb_tables = g_list_prepend(innodb_tables, dbt);
g_mutex_unlock(innodb_tables_mutex);
} else {
g_mutex_lock(non_innodb_table_mutex);
non_innodb_table = g_list_prepend(non_innodb_table, dbt);
g_mutex_unlock(non_innodb_table_mutex);
}
}
}
if (!no_schemas) {
g_mutex_lock(table_schemas_mutex);
table_schemas = g_list_prepend(table_schemas, dbt);
g_mutex_unlock(table_schemas_mutex);
}
} else {
if (!no_schemas) {
g_mutex_lock(view_schemas_mutex);
view_schemas = g_list_prepend(view_schemas, dbt);
g_mutex_unlock(view_schemas_mutex);
}
}
}
mysql_free_result(result);
// Store Procedures and Events
// As these are not attached to tables we need to define when we need to dump
// or not Having regex filter make this hard because we dont now if a full
// schema is filtered or not Also I cant decide this based on tables from a
// schema being dumped So I will use only regex to dump or not SP and EVENTS I
// only need one match to dump all
int post_dump = 0;
if (dump_routines) {
// SP
query = g_strdup_printf("SHOW PROCEDURE STATUS WHERE Db = '%s'", database);
if (mysql_query(conn, (query))) {
g_critical("Error: DB: %s - Could not execute query: %s", database,
mysql_error(conn));
errors++;
g_free(query);
return;
}
result = mysql_store_result(conn);
while ((row = mysql_fetch_row(result)) && !post_dump) {
/* Checks skip list on 'database.sp' string */
if (tables_skiplist && check_skiplist(database, row[1]))
continue;
/* Checks PCRE expressions on 'database.sp' string */
if (regexstring && !check_regex(database, row[1]))
continue;
post_dump = 1;
}
if (!post_dump) {
// FUNCTIONS
query = g_strdup_printf("SHOW FUNCTION STATUS WHERE Db = '%s'", database);
if (mysql_query(conn, (query))) {
g_critical("Error: DB: %s - Could not execute query: %s", database,
mysql_error(conn));
errors++;
g_free(query);
return;
}
result = mysql_store_result(conn);
while ((row = mysql_fetch_row(result)) && !post_dump) {
/* Checks skip list on 'database.sp' string */
if (tables_skiplist_file && check_skiplist(database, row[1]))
continue;
/* Checks PCRE expressions on 'database.sp' string */
if (regexstring && !check_regex(database, row[1]))
continue;
post_dump = 1;
}
}
mysql_free_result(result);
}
if (dump_events && !post_dump) {
// EVENTS
query = g_strdup_printf("SHOW EVENTS FROM `%s`", database);
if (mysql_query(conn, (query))) {
g_critical("Error: DB: %s - Could not execute query: %s", database,
mysql_error(conn));
errors++;
g_free(query);
return;
}
result = mysql_store_result(conn);
while ((row = mysql_fetch_row(result)) && !post_dump) {
/* Checks skip list on 'database.sp' string */
if (tables_skiplist_file && check_skiplist(database, row[1]))
continue;
/* Checks PCRE expressions on 'database.sp' string */
if (regexstring && !check_regex(database, row[1]))
continue;
post_dump = 1;
}
mysql_free_result(result);
}
if (post_dump) {
struct schema_post *sp = g_new(struct schema_post, 1);
sp->database = g_strdup(database);
schema_post = g_list_prepend(schema_post, sp);
}
g_free(query);
return;
}
void get_tables(MYSQL *conn, struct configuration *conf) {
gchar **dt = NULL;
char *query = NULL;
guint i, x;
for (x = 0; tables[x] != NULL; x++) {
dt = g_strsplit(tables[x], ".", 0);
query =
g_strdup_printf("SHOW TABLE STATUS FROM %s LIKE '%s'", dt[0], dt[1]);
if (mysql_query(conn, (query))) {
g_critical("Error: DB: %s - Could not execute query: %s", dt[0],
mysql_error(conn));
errors++;
return;
}
MYSQL_RES *result = mysql_store_result(conn);
MYSQL_FIELD *fields = mysql_fetch_fields(result);
guint ecol = -1;
guint ccol = -1;
for (i = 0; i < mysql_num_fields(result); i++) {
if (!strcasecmp(fields[i].name, "Engine"))
ecol = i;
else if (!strcasecmp(fields[i].name, "Comment"))
ccol = i;
}
if (!result) {
g_warning("Could not list table for %s.%s: %s", dt[0], dt[1],
mysql_error(conn));
errors++;
return;
}
MYSQL_ROW row;
while ((row = mysql_fetch_row(result))) {
int is_view = 0;
if ((detected_server == SERVER_TYPE_MYSQL) &&
(row[ccol] == NULL || !strcmp(row[ccol], "VIEW")))
is_view = 1;
/* Green light! */
struct db_table *dbt = g_new(struct db_table, 1);
dbt->database = g_strdup(dt[0]);
dbt->table = g_strdup(dt[1]);
if (!row[6])
dbt->datalength = 0;
else
dbt->datalength = g_ascii_strtoull(row[6], NULL, 10);
if (!is_view) {
if (trx_consistency_only) {
dump_table(conn, dbt->database, dbt->table, conf, TRUE);
} else if (!g_ascii_strcasecmp("InnoDB", row[ecol])) {
g_mutex_lock(innodb_tables_mutex);
innodb_tables = g_list_prepend(innodb_tables, dbt);
g_mutex_unlock(innodb_tables_mutex);
} else if (!g_ascii_strcasecmp("TokuDB", row[ecol])) {
g_mutex_lock(innodb_tables_mutex);
innodb_tables = g_list_prepend(innodb_tables, dbt);
g_mutex_unlock(innodb_tables_mutex);
} else {
g_mutex_lock(non_innodb_table_mutex);
non_innodb_table = g_list_prepend(non_innodb_table, dbt);
g_mutex_unlock(non_innodb_table_mutex);
}
if (!no_schemas) {
g_mutex_lock(table_schemas_mutex);
table_schemas = g_list_prepend(table_schemas, dbt);
g_mutex_unlock(table_schemas_mutex);
}
} else {
if (!no_schemas) {
g_mutex_lock(view_schemas_mutex);
view_schemas = g_list_prepend(view_schemas, dbt);
g_mutex_unlock(view_schemas_mutex);
}
}
}
}
g_free(query);
}
void set_charset(GString *statement, char *character_set,
char *collation_connection) {
g_string_printf(statement,
"SET @PREV_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT;n");
g_string_append(statement,
"SET @PREV_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS;n");
g_string_append(statement,
"SET @PREV_COLLATION_CONNECTION=@@COLLATION_CONNECTION;n");
g_string_append_printf(statement, "SET character_set_client = %s;n",
character_set);
g_string_append_printf(statement, "SET character_set_results = %s;n",
character_set);
g_string_append_printf(statement, "SET collation_connection = %s;n",
collation_connection);
}
void restore_charset(GString *statement) {
g_string_append(statement,
"SET character_set_client = @PREV_CHARACTER_SET_CLIENT;n");
g_string_append(statement,
"SET character_set_results = @PREV_CHARACTER_SET_RESULTS;n");
g_string_append(statement,
"SET collation_connection = @PREV_COLLATION_CONNECTION;n");
}
void dump_schema_post_data(MYSQL *conn, char *database, char *filename) {
void *outfile;
char *query = NULL;
MYSQL_RES *result = NULL;
MYSQL_RES *result2 = NULL;
MYSQL_ROW row;
MYSQL_ROW row2;
gchar **splited_st = NULL;
if (!compress_output)
outfile = g_fopen(filename, "w");
else
outfile = (void *)gzopen(filename, "w");
if (!outfile) {
g_critical("Error: DB: %s Could not create output file %s (%d)", database,
filename, errno);
errors++;
return;
}
GString *statement = g_string_sized_new(statement_size);
if (dump_routines) {
// get functions
query = g_strdup_printf("SHOW FUNCTION STATUS WHERE Db = '%s'", database);
if (mysql_query(conn, query) || !(result = mysql_store_result(conn))) {
if (success_on_1146 && mysql_errno(conn) == 1146) {
g_warning("Error dumping functions from %s: %s", database,
mysql_error(conn));
} else {
g_critical("Error dumping functions from %s: %s", database,
mysql_error(conn));
errors++;
}
g_free(query);
return;
}
while ((row = mysql_fetch_row(result))) {
set_charset(statement, row[8], row[9]);
g_string_append_printf(statement, "DROP FUNCTION IF EXISTS `%s`;n",
row[1]);
if (!write_data((FILE *)outfile, statement)) {
g_critical("Could not write stored procedure data for %s.%s", database,
row[1]);
errors++;
return;
}
g_string_set_size(statement, 0);
query =
g_strdup_printf("SHOW CREATE FUNCTION `%s`.`%s`", database, row[1]);
mysql_query(conn, query);
result2 = mysql_store_result(conn);
row2 = mysql_fetch_row(result2);
g_string_printf(statement, "%s", row2[2]);
splited_st = g_strsplit(statement->str, ";n", 0);
g_string_printf(statement, "%s", g_strjoinv("; n", splited_st));
g_string_append(statement, ";n");
restore_charset(statement);
if (!write_data((FILE *)outfile, statement)) {
g_critical("Could not write function data for %s.%s", database, row[1]);
errors++;
return;
}
g_string_set_size(statement, 0);
}
// get sp
query = g_strdup_printf("SHOW PROCEDURE STATUS WHERE Db = '%s'", database);
if (mysql_query(conn, query) || !(result = mysql_store_result(conn))) {
if (success_on_1146 && mysql_errno(conn) == 1146) {
g_warning("Error dumping stored procedures from %s: %s", database,
mysql_error(conn));
} else {
g_critical("Error dumping stored procedures from %s: %s", database,
mysql_error(conn));
errors++;
}
g_free(query);
return;
}
while ((row = mysql_fetch_row(result))) {
set_charset(statement, row[8], row[9]);
g_string_append_printf(statement, "DROP PROCEDURE IF EXISTS `%s`;n",
row[1]);
if (!write_data((FILE *)outfile, statement)) {
g_critical("Could not write stored procedure data for %s.%s", database,
row[1]);
errors++;
return;
}
g_string_set_size(statement, 0);
query =
g_strdup_printf("SHOW CREATE PROCEDURE `%s`.`%s`", database, row[1]);
mysql_query(conn, query);
result2 = mysql_store_result(conn);
row2 = mysql_fetch_row(result2);
g_string_printf(statement, "%s", row2[2]);
splited_st = g_strsplit(statement->str, ";n", 0);
g_string_printf(statement, "%s", g_strjoinv("; n", splited_st));
g_string_append(statement, ";n");
restore_charset(statement);
if (!write_data((FILE *)outfile, statement)) {
g_critical("Could not write stored procedure data for %s.%s", database,
row[1]);
errors++;
return;
}
g_string_set_size(statement, 0);
}
}
// get events
if (dump_events) {
query = g_strdup_printf("SHOW EVENTS FROM `%s`", database);
if (mysql_query(conn, query) || !(result = mysql_store_result(conn))) {
if (success_on_1146 && mysql_errno(conn) == 1146) {
g_warning("Error dumping events from %s: %s", database,
mysql_error(conn));
} else {
g_critical("Error dumping events from %s: %s", database,
mysql_error(conn));
errors++;
}
g_free(query);
return;
}
while ((row = mysql_fetch_row(result))) {
set_charset(statement, row[12], row[13]);
g_string_append_printf(statement, "DROP EVENT IF EXISTS `%s`;n", row[1]);
if (!write_data((FILE *)outfile, statement)) {
g_critical("Could not write stored procedure data for %s.%s", database,
row[1]);
errors++;
return;
}
query = g_strdup_printf("SHOW CREATE EVENT `%s`.`%s`", database, row[1]);
mysql_query(conn, query);
result2 = mysql_store_result(conn);
// DROP EVENT IF EXISTS event_name
row2 = mysql_fetch_row(result2);
g_string_printf(statement, "%s", row2[3]);
splited_st = g_strsplit(statement->str, ";n", 0);
g_string_printf(statement, "%s", g_strjoinv("; n", splited_st));
g_string_append(statement, ";n");
restore_charset(statement);
if (!write_data((FILE *)outfile, statement)) {
g_critical("Could not write event data for %s.%s", database, row[1]);
errors++;
return;
}
g_string_set_size(statement, 0);
}
}
g_free(query);
if (!compress_output)
fclose((FILE *)outfile);
else
gzclose((gzFile)outfile);
g_string_free(statement, TRUE);
g_strfreev(splited_st);
if (result)
mysql_free_result(result);
if (result2)
mysql_free_result(result2);
return;
}
void dump_triggers_data(MYSQL *conn, char *database, char *table,
char *filename) {
void *outfile;
char *query = NULL;
MYSQL_RES *result = NULL;
MYSQL_RES *result2 = NULL;
MYSQL_ROW row;
MYSQL_ROW row2;
gchar **splited_st = NULL;
if (!compress_output)
outfile = g_fopen(filename, "w");
else
outfile = (void *)gzopen(filename, "w");
if (!outfile) {
g_critical("Error: DB: %s Could not create output file %s (%d)", database,
filename, errno);
errors++;
return;
}
GString *statement = g_string_sized_new(statement_size);
// get triggers
query = g_strdup_printf("SHOW TRIGGERS FROM `%s` LIKE '%s'", database, table);
if (mysql_query(conn, query) || !(result = mysql_store_result(conn))) {
if (success_on_1146 && mysql_errno(conn) == 1146) {
g_warning("Error dumping triggers (%s.%s): %s", database, table,
mysql_error(conn));
} else {
g_critical("Error dumping triggers (%s.%s): %s", database, table,
mysql_error(conn));
errors++;
}
g_free(query);
return;
}
while ((row = mysql_fetch_row(result))) {
set_charset(statement, row[8], row[9]);
if (!write_data((FILE *)outfile, statement)) {
g_critical("Could not write triggers data for %s.%s", database, table);
errors++;
return;
}
g_string_set_size(statement, 0);
query = g_strdup_printf("SHOW CREATE TRIGGER `%s`.`%s`", database, row[0]);
mysql_query(conn, query);
result2 = mysql_store_result(conn);
row2 = mysql_fetch_row(result2);
g_string_append_printf(statement, "%s", row2[2]);
splited_st = g_strsplit(statement->str, ";n", 0);
g_string_printf(statement, "%s", g_strjoinv("; n", splited_st));
g_string_append(statement, ";n");
restore_charset(statement);
if (!write_data((FILE *)outfile, statement)) {
g_critical("Could not write triggers data for %s.%s", database, table);
errors++;
return;
}
g_string_set_size(statement, 0);
}
g_free(query);
if (!compress_output)
fclose((FILE *)outfile);
else
gzclose((gzFile)outfile);
g_string_free(statement, TRUE);
g_strfreev(splited_st);
if (result)
mysql_free_result(result);
if (result2)
mysql_free_result(result2);
return;
}
void dump_schema_data(MYSQL *conn, char *database, char *table,
char *filename) {
void *outfile;
char *query = NULL;
MYSQL_RES *result = NULL;
MYSQL_ROW row;
if (!compress_output)
outfile = g_fopen(filename, "w");
else
outfile = (void *)gzopen(filename, "w");
if (!outfile) {
g_critical("Error: DB: %s Could not create output file %s (%d)", database,
filename, errno);
errors++;
return;
}
GString *statement = g_string_sized_new(statement_size);
if (detected_server == SERVER_TYPE_MYSQL) {
g_string_printf(statement,"%s;n",set_names_str);
g_string_append(statement, "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;nn");
if (!skip_tz) {
g_string_append(statement, "/*!40103 SET TIME_ZONE='+00:00' */;n");
}
} else if (detected_server == SERVER_TYPE_TIDB) {
if (!skip_tz) {
g_string_printf(statement, "/*!40103 SET TIME_ZONE='+00:00' */;n");
}
} else {
g_string_printf(statement, "SET FOREIGN_KEY_CHECKS=0;n");
}
if (!write_data((FILE *)outfile, statement)) {
g_critical("Could not write schema data for %s.%s", database, table);
errors++;
return;
}
query = g_strdup_printf("SHOW CREATE TABLE `%s`.`%s`", database, table);
if (mysql_query(conn, query) || !(result = mysql_use_result(conn))) {
if (success_on_1146 && mysql_errno(conn) == 1146) {
g_warning("Error dumping schemas (%s.%s): %s", database, table,
mysql_error(conn));
} else {
g_critical("Error dumping schemas (%s.%s): %s", database, table,
mysql_error(conn));
errors++;
}
g_free(query);
return;
}
g_string_set_size(statement, 0);
/* There should never be more than one row */
row = mysql_fetch_row(result);
g_string_append(statement, row[1]);
g_string_append(statement, ";n");
if (!write_data((FILE *)outfile, statement)) {
g_critical("Could not write schema for %s.%s", database, table);
errors++;
}
g_free(query);
if (!compress_output)
fclose((FILE *)outfile);
else
gzclose((gzFile)outfile);
g_string_free(statement, TRUE);
if (result)
mysql_free_result(result);
return;
}
void dump_view_data(MYSQL *conn, char *database, char *table, char *filename,
char *filename2) {
void *outfile, *outfile2;
char *query = NULL;
MYSQL_RES *result = NULL;
MYSQL_ROW row;
GString *statement = g_string_sized_new(statement_size);
mysql_select_db(conn, database);
if (!compress_output) {
outfile = g_fopen(filename, "w");
outfile2 = g_fopen(filename2, "w");
} else {
outfile = (void *)gzopen(filename, "w");
outfile2 = (void *)gzopen(filename2, "w");
}
if (!outfile || !outfile2) {
g_critical("Error: DB: %s Could not create output file (%d)", database,
errno);
errors++;
return;
}
if (detected_server == SERVER_TYPE_MYSQL) {
g_string_printf(statement,"%s;n",set_names_str);
}
if (!write_data((FILE *)outfile, statement)) {
g_critical("Could not write schema data for %s.%s", database, table);
errors++;
return;
}
g_string_append_printf(statement, "DROP TABLE IF EXISTS `%s`;n", table);
g_string_append_printf(statement, "DROP VIEW IF EXISTS `%s`;n", table);
if (!write_data((FILE *)outfile2, statement)) {
g_critical("Could not write schema data for %s.%s", database, table);
errors++;
return;
}
// we create tables as workaround
// for view dependencies
query = g_strdup_printf("SHOW FIELDS FROM `%s`.`%s`", database, table);
if (mysql_query(conn, query) || !(result = mysql_use_result(conn))) {
if (success_on_1146 && mysql_errno(conn) == 1146) {
g_warning("Error dumping schemas (%s.%s): %s", database, table,
mysql_error(conn));
} else {
g_critical("Error dumping schemas (%s.%s): %s", database, table,
mysql_error(conn));
errors++;
}
g_free(query);
return;
}
g_free(query);
g_string_set_size(statement, 0);
g_string_append_printf(statement, "CREATE TABLE `%s`(n", table);
row = mysql_fetch_row(result);
g_string_append_printf(statement, "`%s` int", row[0]);
while ((row = mysql_fetch_row(result))) {
g_string_append(statement, ",n");
g_string_append_printf(statement, "`%s` int", row[0]);
}
g_string_append(statement, "n);n");
if (result)
mysql_free_result(result);
if (!write_data((FILE *)outfile, statement)) {
g_critical("Could not write view schema for %s.%s", database, table);
errors++;
}
// real view
query = g_strdup_printf("SHOW CREATE VIEW `%s`.`%s`", database, table);
if (mysql_query(conn, query) || !(result = mysql_use_result(conn))) {
if (success_on_1146 && mysql_errno(conn) == 1146) {
g_warning("Error dumping schemas (%s.%s): %s", database, table,
mysql_error(conn));
} else {
g_critical("Error dumping schemas (%s.%s): %s", database, table,
mysql_error(conn));
errors++;
}
g_free(query);
return;
}
g_string_set_size(statement, 0);
/* There should never be more than one row */
row = mysql_fetch_row(result);
set_charset(statement, row[2], row[3]);
g_string_append(statement, row[1]);
g_string_append(statement, ";n");
restore_charset(statement);
if (!write_data((FILE *)outfile2, statement)) {
g_critical("Could not write schema for %s.%s", database, table);
errors++;
}
g_free(query);
if (!compress_output) {
fclose((FILE *)outfile);
fclose((FILE *)outfile2);
} else {
gzclose((gzFile)outfile);
gzclose((gzFile)outfile2);
}
g_string_free(statement, TRUE);
if (result)
mysql_free_result(result);
return;
}
void dump_table_data_file(MYSQL *conn, char *database, char *table, char *where,
char *filename) {
void *outfile = NULL;
if (!compress_output)
outfile = g_fopen(filename, "w");
else
outfile = (void *)gzopen(filename, "w");
if (!outfile) {
g_critical("Error: DB: %s TABLE: %s Could not create output file %s (%d)",
database, table, filename, errno);
errors++;
return;
}
guint64 rows_count =
dump_table_data(conn, (FILE *)outfile, database, table, where, filename);
if (!rows_count)
g_message("Empty table %s.%s", database, table);
}
void dump_schema(MYSQL *conn, char *database, char *table,
struct configuration *conf) {
struct job *j = g_new0(struct job, 1);
struct schema_job *sj = g_new0(struct schema_job, 1);
j->job_data = (void *)sj;
sj->database = g_strdup(database);
sj->table = g_strdup(table);
j->conf = conf;
j->type = JOB_SCHEMA;
if (daemon_mode)
sj->filename = g_strdup_printf("%s/%d/%s.%s-schema.sql%s", output_directory,
dump_number, database, table,
(compress_output ? ".gz" : ""));
else
sj->filename =
g_strdup_printf("%s/%s.%s-schema.sql%s", output_directory, database,
table, (compress_output ? ".gz" : ""));
g_async_queue_push(conf->queue, j);
if (dump_triggers) {
char *query = NULL;
MYSQL_RES *result = NULL;
query =
g_strdup_printf("SHOW TRIGGERS FROM `%s` LIKE '%s'", database, table);
if (mysql_query(conn, query) || !(result = mysql_store_result(conn))) {
g_critical("Error Checking triggers for %s.%s. Err: %s", database, table,
mysql_error(conn));
errors++;
} else {
if (mysql_num_rows(result)) {
struct job *t = g_new0(struct job, 1);
struct schema_job *st = g_new0(struct schema_job, 1);
t->job_data = (void *)st;
st->database = g_strdup(database);
st->table = g_strdup(table);
t->conf = conf;
t->type = JOB_TRIGGERS;
if (daemon_mode)
st->filename = g_strdup_printf(
"%s/%d/%s.%s-schema-triggers.sql%s", output_directory,
dump_number, database, table, (compress_output ? ".gz" : ""));
else
st->filename = g_strdup_printf("%s/%s.%s-schema-triggers.sql%s",
output_directory, database, table,
(compress_output ? ".gz" : ""));
g_async_queue_push(conf->queue, t);
}
}
g_free(query);
if (result) {
mysql_free_result(result);
}
}
return;
}
void dump_view(char *database, char *table, struct configuration *conf) {
struct job *j = g_new0(struct job, 1);
struct view_job *vj = g_new0(struct view_job, 1);
j->job_data = (void *)vj;
vj->database = g_strdup(database);
vj->table = g_strdup(table);
j->conf = conf;
j->type = JOB_VIEW;
if (daemon_mode) {
vj->filename = g_strdup_printf("%s/%d/%s.%s-schema.sql%s", output_directory,
dump_number, database, table,
(compress_output ? ".gz" : ""));
vj->filename2 = g_strdup_printf("%s/%d/%s.%s-schema-view.sql%s",
output_directory, dump_number, database,
table, (compress_output ? ".gz" : ""));
} else {
vj->filename =
g_strdup_printf("%s/%s.%s-schema.sql%s", output_directory, database,
table, (compress_output ? ".gz" : ""));
vj->filename2 =
g_strdup_printf("%s/%s.%s-schema-view.sql%s", output_directory,
database, table, (compress_output ? ".gz" : ""));
}
g_async_queue_push(conf->queue, j);
return;
}
void dump_schema_post(char *database, struct configuration *conf) {
struct job *j = g_new0(struct job, 1);
struct schema_post_job *sp = g_new0(struct schema_post_job, 1);
j->job_data = (void *)sp;
sp->database = g_strdup(database);
j->conf = conf;
j->type = JOB_SCHEMA_POST;
if (daemon_mode) {
sp->filename =
g_strdup_printf("%s/%d/%s-schema-post.sql%s", output_directory,
dump_number, database, (compress_output ? ".gz" : ""));
} else {
sp->filename = g_strdup_printf("%s/%s-schema-post.sql%s", output_directory,
database, (compress_output ? ".gz" : ""));
}
g_async_queue_push(conf->queue, j);
return;
}
void dump_table(MYSQL *conn, char *database, char *table,
struct configuration *conf, gboolean is_innodb) {
GList *chunks = NULL;
if (rows_per_file)
chunks = get_chunks_for_table(conn, database, table, conf);
if (chunks) {
int nchunk = 0;
GList *iter;
for (iter = chunks; iter != NULL; iter = iter->next) {
struct job *j = g_new0(struct job, 1);
struct table_job *tj = g_new0(struct table_job, 1);
j->job_data = (void *)tj;
tj->database = g_strdup(database);
tj->table = g_strdup(table);
j->conf = conf;
j->type = is_innodb ? JOB_DUMP : JOB_DUMP_NON_INNODB;
if (daemon_mode)
tj->filename = g_strdup_printf(
"%s/%d/%s.%s.%05d.sql%s", output_directory, dump_number, database,
table, nchunk, (compress_output ? ".gz" : ""));
else
tj->filename =
g_strdup_printf("%s/%s.%s.%05d.sql%s", output_directory, database,
table, nchunk, (compress_output ? ".gz" : ""));
tj->where = (char *)iter->data;
if (!is_innodb && nchunk)
g_atomic_int_inc(&non_innodb_table_counter);
g_async_queue_push(conf->queue, j);
nchunk++;
}
g_list_free(chunks);
} else {
struct job *j = g_new0(struct job, 1);
struct table_job *tj = g_new0(struct table_job, 1);
j->job_data = (void *)tj;
tj->database = g_strdup(database);
tj->table = g_strdup(table);
j->conf = conf;
j->type = is_innodb ? JOB_DUMP : JOB_DUMP_NON_INNODB;
if (daemon_mode)
tj->filename = g_strdup_printf(
"%s/%d/%s.%s%s.sql%s", output_directory, dump_number, database, table,
(chunk_filesize ? ".00001" : ""), (compress_output ? ".gz" : ""));
else
tj->filename = g_strdup_printf(
"%s/%s.%s%s.sql%s", output_directory, database, table,
(chunk_filesize ? ".00001" : ""), (compress_output ? ".gz" : ""));
g_async_queue_push(conf->queue, j);
return;
}
}
void dump_tables(MYSQL *conn, GList *noninnodb_tables_list,
struct configuration *conf) {
struct db_table *dbt;
GList *chunks = NULL;
struct job *j = g_new0(struct job, 1);
struct tables_job *tjs = g_new0(struct tables_job, 1);
j->conf = conf;
j->type = JOB_LOCK_DUMP_NON_INNODB;
j->job_data = (void *)tjs;
GList *iter;
for (iter = noninnodb_tables_list; iter != NULL; iter = iter->next) {
dbt = (struct db_table *)iter->data;
if (rows_per_file)
chunks = get_chunks_for_table(conn, dbt->database, dbt->table, conf);
if (chunks) {
int nchunk = 0;
GList *citer;
for (citer = chunks; citer != NULL; citer = citer->next) {
struct table_job *tj = g_new0(struct table_job, 1);
tj->database = g_strdup_printf("%s", dbt->database);
tj->table = g_strdup_printf("%s", dbt->table);
if (daemon_mode)
tj->filename =
g_strdup_printf("%s/%d/%s.%s.%05d.sql%s", output_directory,
dump_number, dbt->database, dbt->table, nchunk,
(compress_output ? ".gz" : ""));
else
tj->filename = g_strdup_printf(
"%s/%s.%s.%05d.sql%s", output_directory, dbt->database,
dbt->table, nchunk, (compress_output ? ".gz" : ""));
tj->where = (char *)citer->data;
tjs->table_job_list = g_list_prepend(tjs->table_job_list, tj);
nchunk++;
}
g_list_free(chunks);
} else {
struct table_job *tj = g_new0(struct table_job, 1);
tj->database = g_strdup_printf("%s", dbt->database);
tj->table = g_strdup_printf("%s", dbt->table);
if (daemon_mode)
tj->filename = g_strdup_printf("%s/%d/%s.%s%s.sql%s", output_directory,
dump_number, dbt->database, dbt->table,
(chunk_filesize ? ".00001" : ""),
(compress_output ? ".gz" : ""));
else
tj->filename = g_strdup_printf(
"%s/%s.%s%s.sql%s", output_directory, dbt->database, dbt->table,
(chunk_filesize ? ".00001" : ""), (compress_output ? ".gz" : ""));
tj->where = NULL;
tjs->table_job_list = g_list_prepend(tjs->table_job_list, tj);
}
}
tjs->table_job_list = g_list_reverse(tjs->table_job_list);
g_async_queue_push(conf->queue_less_locking, j);
}
/* Do actual data chunk reading/writing magic */
guint64 dump_table_data(MYSQL *conn, FILE *file, char *database, char *table,
char *where, char *filename) {
guint i;
guint fn = 1;
guint st_in_file = 0;
guint num_fields = 0;
guint64 num_rows = 0;
guint64 num_rows_st = 0;
MYSQL_RES *result = NULL;
char *query = NULL;
gchar *fcfile = NULL;
gchar *filename_prefix = NULL;
/* Buffer for escaping field values */
GString *escaped = g_string_sized_new(3000);
fcfile = g_strdup(filename);
if (chunk_filesize) {
gchar **split_filename = g_strsplit(filename, ".00001.sql", 0);
filename_prefix = g_strdup(split_filename[0]);
g_strfreev(split_filename);
}
gboolean has_generated_fields =
detect_generated_fields(conn, database, table);
/* Ghm, not sure if this should be statement_size - but default isn't too big
* for now */
GString *statement = g_string_sized_new(statement_size);
GString *statement_row = g_string_sized_new(0);
GString *select_fields;
if (has_generated_fields) {
select_fields = get_insertable_fields(conn, database, table);
} else {
select_fields = g_string_new("*");
}
/* Poor man's database code */
query = g_strdup_printf(
"SELECT %s %s FROM `%s`.`%s` %s %s",
(detected_server == SERVER_TYPE_MYSQL) ? "/*!40001 SQL_NO_CACHE */" : "",
select_fields->str, database, table, where ? "WHERE" : "",
where ? where : "");
g_string_free(select_fields, TRUE);
if (mysql_query(conn, query) || !(result = mysql_use_result(conn))) {
// ERROR 1146
if (success_on_1146 && mysql_errno(conn) == 1146) {
g_warning("Error dumping table (%s.%s) data: %s ", database, table,
mysql_error(conn));
} else {
g_critical("Error dumping table (%s.%s) data: %s ", database, table,
mysql_error(conn));
errors++;
}
goto cleanup;
}
num_fields = mysql_num_fields(result);
MYSQL_FIELD *fields = mysql_fetch_fields(result);
MYSQL_ROW row;
g_string_set_size(statement, 0);
/* Poor man's data dump code */
while ((row = mysql_fetch_row(result))) {
gulong *lengths = mysql_fetch_lengths(result);
num_rows++;
if (!statement->len) {
if (!st_in_file) {
if (detected_server == SERVER_TYPE_MYSQL) {
g_string_printf(statement,"%s;n",set_names_str);
g_string_append(statement, "/*!40014 SET FOREIGN_KEY_CHECKS=0*/;n");
if (!skip_tz) {
g_string_append(statement, "/*!40103 SET TIME_ZONE='+00:00' */;n");
}
} else if (detected_server == SERVER_TYPE_TIDB) {
if (!skip_tz) {
g_string_printf(statement, "/*!40103 SET TIME_ZONE='+00:00' */;n");
}
} else {
g_string_printf(statement, "SET FOREIGN_KEY_CHECKS=0;n");
}
if (!write_data(file, statement)) {
g_critical("Could not write out data for %s.%s", database, table);
goto cleanup;
}
}
if (complete_insert || has_generated_fields) {
if (insert_ignore) {
g_string_printf(statement, "INSERT IGNORE INTO `%s` (", table);
} else {
g_string_printf(statement, "INSERT INTO `%s` (", table);
}
for (i = 0; i < num_fields; ++i) {
if (i > 0) {
g_string_append_c(statement, ',');
}
g_string_append_printf(statement, "`%s`", fields[i].name);
}
g_string_append(statement, ") VALUES");
} else {
if (insert_ignore) {
g_string_printf(statement, "INSERT IGNORE INTO `%s` VALUES", table);
} else {
g_string_printf(statement, "INSERT INTO `%s` VALUES", table);
}
}
num_rows_st = 0;
}
if (statement_row->len) {
g_string_append(statement, statement_row->str);
g_string_set_size(statement_row, 0);
num_rows_st++;
}
g_string_append(statement_row, "n(");
for (i = 0; i < num_fields; i++) {
/* Don't escape safe formats, saves some time */
if (!row[i]) {
g_string_append(statement_row, "NULL");
} else if (fields[i].flags & NUM_FLAG) {
g_string_append(statement_row, row[i]);
} else {
/* We reuse buffers for string escaping, growing is expensive just at
* the beginning */
g_string_set_size(escaped, lengths[i] * 2 + 1);
mysql_real_escape_string(conn, escaped->str, row[i], lengths[i]);
if (fields[i].type == MYSQL_TYPE_JSON)
g_string_append(statement_row, "CONVERT(");
g_string_append_c(statement_row, '"');
g_string_append(statement_row, escaped->str);
g_string_append_c(statement_row, '"');
if (fields[i].type == MYSQL_TYPE_JSON)
g_string_append(statement_row, " USING UTF8MB4)");
}
if (i < num_fields - 1) {
g_string_append_c(statement_row, ',');
} else {
g_string_append_c(statement_row, ')');
/* INSERT statement is closed before over limit */
if (statement->len + statement_row->len + 1 > statement_size) {
if (num_rows_st == 0) {
g_string_append(statement, statement_row->str);
g_string_set_size(statement_row, 0);
g_warning("Row bigger than statement_size for %s.%s", database,
table);
}
g_string_append(statement, ";n");
if (!write_data(file, statement)) {
g_critical("Could not write out data for %s.%s", database, table);
goto cleanup;
} else {
st_in_file++;
if (chunk_filesize &&
st_in_file * (guint)ceil((float)statement_size / 1024 / 1024) >
chunk_filesize) {
fn++;
g_free(fcfile);
fcfile = g_strdup_printf("%s.%05d.sql%s", filename_prefix, fn,
(compress_output ? ".gz" : ""));
if (!compress_output) {
fclose((FILE *)file);
file = g_fopen(fcfile, "w");
} else {
gzclose((gzFile)file);
file = (void *)gzopen(fcfile, "w");
}
st_in_file = 0;
}
}
g_string_set_size(statement, 0);
} else {
if (num_rows_st)
g_string_append_c(statement, ',');
g_string_append(statement, statement_row->str);
num_rows_st++;
g_string_set_size(statement_row, 0);
}
}
}
}
if (mysql_errno(conn)) {
g_critical("Could not read data from %s.%s: %s", database, table,
mysql_error(conn));
errors++;
}
if (statement_row->len > 0) {
/* this last row has not been written out */
if (statement->len > 0) {
/* strange, should not happen */
g_string_append(statement, statement_row->str);
} else {
if (complete_insert) {
if (insert_ignore) {
g_string_printf(statement, "INSERT IGNORE INTO `%s` (", table);
} else {
g_string_printf(statement, "INSERT INTO `%s` (", table);
}
for (i = 0; i < num_fields; ++i) {
if (i > 0) {
g_string_append_c(statement, ',');
}
g_string_append_printf(statement, "`%s`", fields[i].name);
}
g_string_append(statement, ") VALUES");
} else {
if (insert_ignore) {
g_string_printf(statement, "INSERT IGNORE INTO `%s` VALUES", table);
} else {
g_string_printf(statement, "INSERT INTO `%s` VALUES", table);
}
}
g_string_append(statement, statement_row->str);
}
}
if (statement->len > 0) {
g_string_append(statement, ";n");
if (!write_data(file, statement)) {
g_critical(
"Could not write out closing newline for %s.%s, now this is sad!",
database, table);
goto cleanup;
}
st_in_file++;
}
cleanup:
g_free(query);
g_string_free(escaped, TRUE);
g_string_free(statement, TRUE);
g_string_free(statement_row, TRUE);
if (result) {
mysql_free_result(result);
}
if (file) {
if (!compress_output) {
fclose((FILE *)file);
} else {
gzclose((gzFile)file);
}
}
if (!st_in_file && !build_empty_files) {
// dropping the useless file
if (remove(fcfile)) {
g_warning("Failed to remove empty file : %sn", fcfile);
}
} else if (chunk_filesize && fn == 1) {
g_free(fcfile);
fcfile = g_strdup_printf("%s.sql%s", filename_prefix,
(compress_output ? ".gz" : ""));
g_rename(filename, fcfile);
}
g_free(filename_prefix);
g_free(fcfile);
return num_rows;
}
gboolean write_data(FILE *file, GString *data) {
size_t written = 0;
ssize_t r = 0;
while (written < data->len) {
if (!compress_output)
r = write(fileno(file), data->str + written, data->len);
else
r = gzwrite((gzFile)file, data->str + written, data->len);
if (r < 0) {
g_critical("Couldn't write data to a file: %s", strerror(errno));
errors++;
return FALSE;
}
written += r;
}
return TRUE;
}
// chunk 拆分规则
// if 有索引(first,要有一个索引)
// if 存在 pk
// 优先选择主键索引(主键索引可能是多个列,复合主键的场景)
// else
// if 存在唯一索引
// 优先选择第一个唯一索引
// else
// 选择区分度最高的索引
// else(无索引)
// 按照表的级别,不拆分chunk,并行复制(会慢一点)
// /*计算出row个数然后拆分chunk,select * from db.table where key [range] */
// /*所以如果我是程序开发者,为了快速查找数据,肯定是要走索引啊*/
最后
以上就是魔幻月饼为你收集整理的为什么mysqldumper mydumper能实现一致性备份?--源码分析为什么mysqldumper mydumper能实现一致性备份?–源码分析的全部内容,希望文章能够帮你解决为什么mysqldumper mydumper能实现一致性备份?--源码分析为什么mysqldumper mydumper能实现一致性备份?–源码分析所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复