


Detaillierte Erläuterung des Beispiels eines MySQL-basierten Binlog-Rollback-Tools
Die Bedingungen für Aktualisierung und Löschung werden falsch oder gar nicht geschrieben, was zu Datenoperationsfehlern führt und die Notwendigkeit verursacht, die fehlerhaften Zeilendatensätze wiederherzustellen. Diese Situation tritt tatsächlich auf. Sie können die Sicherungsdatei + das Binlog zur Wiederherstellung in der Testumgebung verwenden und dann die Datenreparatur durchführen. Dies erfordert jedoch tatsächlich eine gewisse Zeit und Ressourcen.
1 Implementierungsinhalt

Rollback ist in zwei Befehle unterteilt: Der erste Befehl analysiert das Binglog und speichert es in der Datenbank; der zweite Befehl führt den Rollback-Vorgang aus; 🎜>Beim Rollback können das Ausführungsskript und das Rollback-Skript in der Datenbank gespeichert und der Aktualisierungsinhalt und Rollback-Inhalt angezeigt werden
Gemäß der gespeicherten Analysetabelle; Es ist praktisch, die wiederherzustellende Transaktion oder bestimmte Tabelle anzugeben.
Detaillierte Protokollausgabe, die den Analyse- und Ausführungsfortschritt anzeigt.
Binlog-Ausgabe-Screenshot analysieren (1G-Binlog-Datei analysieren)


Voraussetzung: Die Instanz hat binlog gestartet und das Format ist ROW.
Die Ausführungsreihenfolge derselben Transaktion muss in umgekehrter Reihenfolge ausgeführt werden
-
Rollback-SQL analysieren
Unterschiedliche Tabellenverarbeitung für denselben Transaktionsvorgang
Escape-Zeichenverarbeitung, z. B. Zeilenumbrüche, Tab Zeichen usw.
-
Zeitstempel-Datentyp-Parameterwertkonvertierung
Verarbeitung negativer Zahlen
A Eine einzelne Transaktion beinhaltet die Zeilenänderung. SQL-Operationen max_allow
Rollback für eine bestimmte Tabelle statt der gesamten Datenbank
Lesen Sie basierend auf der Position, an der Xid erscheint, vom Anfang der Binlog-Datei und extrahieren Sie die SQL-Anweisung, wenn Sie darauf stoßen Zuvor extrahiertes SQL wird in einer Transaktion zusammengefasst. Anschließend werden weiterhin SQL-Anweisungen extrahiert, bis die nächste XID auftritt, und dann wird das SQL dieser Transaktion in einer Transaktion zusammengefasst.

2.2
In derselben Transaktion Wenn es Änderungen an Datensätzen in mehreren Tabellen und Zeilen gibt, sollte das SQL beim Rollback in umgekehrter Reihenfolge zurückgesetzt werden. Wie speichert man also das extrahierte SQL in umgekehrter Reihenfolge? Die Idee ist wie folgt:
Das geänderte SQL jeder Datensatzzeile wird getrennt
Das unabhängige SQL wird in umgekehrter Reihenfolge gespeichert
Annahme: Die SQL-Anweisungen für Transaktionen in Vorwärtsreihenfolge werden in der Variablen dml_sql gespeichert, und die SQL-Anweisungen in umgekehrter Reihenfolge, die zurückgesetzt werden können, werden in der Variablen undo_sql gespeichert. Extrahieren Sie die SQL der Zeilendatensatzänderung der Reihe nach und speichern Sie sie in der Variablen record_sql, weisen Sie dann den Wert undo_sql =record_sql + undo_sql zu und setzen Sie dann die Variable record_sql leer. Auf diese Weise kann die Ausführung von SQL innerhalb der umgekehrten Transaktion realisiert werden .
2.3 Rollback-SQL analysieren
Überprüfen Sie zunächst den Protokollinhalt von binlog und stellen Sie fest, dass die SQL-Situation der Zeilenänderung wie folgt ist Während des Extraktionsprozesses müssen folgende Probleme beachtet werden:
Der Spaltennamenabgleich von Zeilendatensätzen und die in der Binlog-Datei gespeicherte Spaltenseriennummer können nicht direkt verwendet werden
WHERE-Teil und SET Es müssen keine Schlüsselwörter oder Symbole zwischen den Teilen UND oder Kommas hinzugefügt werden
DELETE SQL muss in INSERT umgekehrt werden
UPDATE SQL muss umgekehrt werden. Ersetzen Sie die WHERE- und SET-Teile
INSERT SQL muss in DELETE umgekehrt werden

2.4 Verarbeitung verschiedener Tabellen in derselben Transaktion
In derselben Transaktion sind Datenänderungen zulässig Dies muss beim Ersetzen von Spaltennummern durch Spaltennamen beachtet werden.
Vor jedem Zeilendatensatz befindet sich eine Datensatzzeile mit der Markierung „Table_map“, die angibt, welche Tabelle in dieser Datensatzzeile geändert wurde. Sie können dieser Eingabeaufforderung folgen, um die Spaltenseriennummer zu ersetzen im Binlog mit dem Spaltennamen.
2.5 Verarbeitung von Escape-Zeichen
Die Binlog-Datei verarbeitet Nicht-Leerzeichen-Leerzeichen und verwendet die Speicherung von Escape-Zeichenfolgen, die beispielsweise in der Tabelleneinfügungsspalte enthalten sind Tatsächlich wird x0a in der Binlog-Datei verwendet, um die Newline-Operation zu ersetzen. Daher muss beim Zurücksetzen der Daten das Escape-Zeichen verarbeitet werden.


Beachten Sie hier eines: Das Escape-Zeichen von 039 ist nicht im Funktion Es wird einheitlich in esc_code verarbeitet, aber separat verarbeitet.

Die Übertragungszeichentabelle ist unten dargestellt:

2.6 Verarbeitung des Zeitstempel-Datentyps
Der tatsächlich in der Datenbank gespeicherte Zeitstempelwert ist vom Typ INT und muss mit der Funktion from_unixtime konvertiert werden.
Erstellen Sie eine Testtabelle tbtest mit nur einer Zeitstempelspalte. Sehen Sie sich nach dem Speichern des Werts den Inhalt des Binlogs an. Der spezifische Screenshot lautet wie folgt:

Bei der Verarbeitung von Zeilendatensätzen muss der Wert des Zeitstempels verarbeitet und die Funktionskonvertierung from_unixtime hinzugefügt werden.
2.7 Verarbeitung negativer Werte
Dies wurde nicht berücksichtigt, als ich den Code zum ersten Mal schrieb. Bei umfangreichen Tests wurde festgestellt, dass alle ganzzahligen Datentypen beim Speichern negativer Zahlen einen maximalen Bereichswert speichern. Der Mechanismus, wie Binlog damit umgeht, ist nicht ganz klar. Der Test läuft wie folgt ab:

Wenn also verschiedene Datentypen wie INT und VALUE auf eine negative Zahl stoßen, muss dieser Bereichswert vorher entfernt werden Ausführung Führen Sie undo_sql aus.
2.8 Die Gesamt-SQL eines einzelnen Transaktionszeilendatensatzes übersteigt die max_allowed_package-Verarbeitung
Nach der Analyse des Binlogs werden zwei SQL-Typen gespeichert, einer davon ist der geänderte SQL der Zeile Datensatz, d. h. dml_sql; der andere ist Zeilendatensatz-Rollback, d. h. undo_sql. Aus dem Code ist ersichtlich, dass es sich bei der Spalte, in der diese beiden SQLs gespeichert sind, um Langtext handelt, der bis zu 4 GB Inhalt speichern kann. Die Paketgröße einer einzelnen Sitzung in MySQL ist jedoch begrenzt. Die Standardgröße beträgt 4 MB und das Maximum beträgt 1 GB. Legen Sie daher vor der Verwendung dieses Skripts die Datenbankinstanz fest, in der die Binlog-Datei gespeichert wird die Online-Datenbankinstanz:
set global max_allowed_packet = 1073741824; #Denken Sie daran, es später zu ändern
Was ist, wenn es funktioniert? Dann kann das Rollback nur in Abschnitten durchgeführt werden. Führen Sie zunächst ein Rollback zu dieser großen Transaktion durch und führen Sie dann das Rollback fort. Dieser Teil kann nicht mit pymysql oder der Quelldatei ausgeführt werden, daher ist dies möglich nur manuell erfolgen. Bitten Sie einige fähige Leute, diesen Logikcode zu ändern! ! !
2.9 Gezieltes Rollback
Unter der Annahme, dass es keinen klaren Zeitpunkt für die Fehloperation gibt, gibt es nur ein Intervall, und in diesem Intervall gibt es dann andere Tabellenoperationen Zu diesem Zeitpunkt müssen Sie beim Analysieren der Binlog-Datei die Option --database hinzufügen und zunächst die Binlog-Datei in derselben Datenbank auswählen.
Die Verarbeitung hier besteht darin, dml_sql und undo_sql dieses Intervalls in der Datenbanktabelle zu speichern und dann die Transaktionen zu löschen, die nicht zurückgesetzt werden müssen, sowie die verbleibenden Transaktionen, die zurückgesetzt werden müssen. Führen Sie den Rollback-Vorgang erneut durch.
3 Gebrauchsanweisung
3.1 Parameterbeschreibung
Dieses Skript hat etwas mehr Parameter. Sie können spezifische Anweisungen mit --help anzeigen.

Ich verwende gerne verschiedene Farben, um Parameter zu klassifizieren (Blingbling ist bunt, es sieht so interessant und energiegeladen aus), deshalb werde ich diese Parameter durch erklären Farbe .
Gelber Bereich: Diese 6 Parameter liefern die relevanten Werte zum Analysieren und Speichern der Binlog-Datei und geben die Verknüpfungsmethode der Datenbank an, in der die Analyseergebnisse gespeichert werden, sowie den Speicherort Die Binlog-Datei und die Methode zum Speichern der Ergebnisse Muss nur die gleiche Tabellenstruktur wie die Online-Bibliothek haben, nicht unbedingt Es muss eine Master-Slave-Bibliothek sein
Grüner Bereich: Die wichtigste Option -a, 0 bedeutet nur; Analyse der Binlog-Datei, 1 bedeutet, dass nur Rollback-Vorgänge ausgeführt werden, 0 muss zuerst ausgeführt werden
Lila Bereich: Beispiel.
-
3.2 Beschreibung des Anwendungsszenarios
Rollback der gesamten Datenbank für einen bestimmten Zeitraum
- Alle SQL-Vorgänge in einem bestimmten Zeitraum müssen auf einen bestimmten Zeitpunkt zurückgesetzt werden
In diesem Fall verwenden die meisten von ihnen Backups Dateien + Binlog-Lösung
Aber dieses Skript kann es auch erfüllen, aber bitte führen Sie es nicht direkt online aus. Überprüfen Sie zuerst die Analyseergebnisse, um festzustellen, ob sie konsistent sind. Wenn ja, stoppen Sie einen bestimmten Vorgang aus der Datenbank, führen Sie ihn in der Slave-Datenbank aus und entwickeln Sie schließlich den Geschäftszugriff, um zu überprüfen, ob er zum angegebenen Zeitpunkt wiederhergestellt wurde und ob die Daten normal sind.
-
Einige Tabellen haben bestimmte Vorgänge während eines bestimmten Zeitraums zurückgesetzt
-
Zum Beispiel hat die Entwicklung einen Stapel übermittelt Beim Aktualisieren des Skripts wurden alle Teststufen als in Ordnung überprüft und zur Online-Ausführung übermittelt. Nach der Ausführung wurde jedoch festgestellt, dass ein Unternehmen im Test fehlte, was dazu führte, dass einige Felder aktualisiert wurden und sich nun auf andere Unternehmen auswirkte die stapelaktualisierten Tabellen dringend auf die ursprünglichen Zeilen zurückzusetzen
Dies kann nicht rein technisch gehandhabt werden, sondern muss umfassend betrachtet werden
-

Wie kann in diesem Fall der Änderungsvorgang der Tab-A-Tabelle überprüft werden?
Ich persönlich halte diese Methode für praktikabler. Speichern Sie die Daten der tabA-Tabelle in der Testumgebung und analysieren Sie dann die Binlog-Datei undo sql von 11 bis 12 Uhr. Anschließend wird die Tabelle in der Testumgebung auf 11 Uhr zurückgesetzt. Anschließend vergleichen die Entwicklung und das Unternehmen die 11-Uhr-Daten in der Testumgebung mit den vorhandenen Daten online, um festzustellen, welche Zeilen und Spalten zurückgesetzt werden müssen Wenn dies nicht erforderlich ist, entwickeln und übermitteln Sie das SQL-Skript und führen Sie es dann online aus. Tatsächlich bietet DBA hier nur eine Rolle, nämlich das Zurücksetzen der Tabellenregisterkarte A auf einen bestimmten Zeitpunkt in einer neuen Umgebung, bietet jedoch keine
direkte - Online-SQL-Rollback-Verarbeitung .
Ein bestimmtes/einiges SQL zurücksetzen
-
Diese Situation kommt relativ häufig vor A Beim Löschen fehlt die Where-Bedingung oder der Ausführungsfehler der Where-Bedingung.
Suchen Sie in diesem Fall die entsprechende Transaktion und führen Sie den Rollback-Vorgang wie oben beschrieben durch. Ich bin so schüchtern und ängstlich
-

3.3 Testfall
3.3. 1 Vollständiges Datenbank-Rollback für einen bestimmten Zeitraum
Angenommen, Sie müssen alle Vorgänge der Datenbank zwischen 9:10 und 9:15 Uhr zurücksetzen:
准备测试环境实例存储分析后的数据
测试环境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python脚本分析文件,action=0
线上测试环境修改set global max_allowed_packet = 1073741824
回滚数据,action=1
线上测试环境修改set global max_allowed_packet = 4194304
1 --测试环境(请安装pymysql):IP: 192.168.9.242,PORT:3310 ,数据库:flashback,表格:tbevent 2 --具有线上表结构的db:IP:192.168.9.243 PORT:3310 3 4 5 mysql> show global variables like 'max_allowed_packet'; 6 +--------------------+----------+ 7 | Variable_name | Value | 8 +--------------------+----------+ 9 | max_allowed_packet | 16777216 |10 +--------------------+----------+11 1 row in set (0.00 sec)12 13 mysql> set global max_allowed_packet = 1073741824;14 Query OK, 0 rows affected (0.00 sec)15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=019 2017-06-19 10:59:39,041 INFO begin to assign values to parameters20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331023 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok24 2017-06-19 10:59:39,054 INFO MySQL connection is ok25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent
27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!28 2017-06-19 10:59:39,061 INFO analyzing...29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!30 2017-06-19 11:49:53,782 INFO release all db connections31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=135 2017-06-19 16:30:20,633 INFO begin to assign values to parameters36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331039 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok40 2017-06-19 16:30:20,871 INFO MySQL connection is ok41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions
42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空间忽略不截图--------------46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310
Nach dem Login kopieren
3.3.2 某段时间某些表格回滚某些操作
准备测试环境实例存储分析后的数据
测试环境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python脚本分析文件,action=0
分析帅选需要的事务,rename表格
dump 对应的表格到测试环境
回滚数据,action=1
提交给开发业务对比数据
3.3.3 回滚某个/些SQL
准备测试环境实例存储分析后的数据
测试环境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python脚本分析文件,action=0
分析帅选需要的事务,rename表格
dump 对应的表格到测试环境
回滚数据,action=1
提交给开发业务对比数据
4 python脚本
脚本会不定期修复bug,若是感兴趣,可以往github下载: 中的 mysql_xinysu_flashback 。
1 # -*- coding: utf-8 -*- 2 __author__ = 'xinysu' 3 __date__ = '2017/6/15 10:30' 4 5 6 7 import re 8 import os 9 import sys 10 import datetime 11 import time 12 import logging 13 import importlib 14 importlib.reload(logging) 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 16 17 import pymysql 18 from pymysql.cursors import DictCursor 19 20 usage='''\nusage: python [script's path] [option] 21 ALL options need to assign: 22 \033[1;33;40m 23 -h : host, the database host,which database will store the results after analysis 24 -u : user, the db user 25 -p : password, the db user's password 26 -P : port, the db port 27 28 -f : file path, the binlog file 29 -t : table name, the table name to store the results after analysis , {dbname}.{tbname}, 30 when you want to store in `test` db and the table name is `tbevent`,then this parameter
31 is test.tbevent 32 \033[1;34;40m 33 -oh : online host, the database host,which database have the online table schema 34 -ou : online user, the db user 35 -op : online password, the db user's password 36 -oP : online port, the db port 37 \033[1;32;40m 38 -a : action,
39 0 just analyse the binlog file ,and store sql in table;
40 1 after execute self.dotype=0, execute the undo_sql in the table 41 \033[0m
42 --help: help document 43 \033[1;35;40m 44 Example: 45 analysize binlog: 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent
47 -oh=192.168.9.244 -oP=3310 -u=root -op=***
48 -a=0 49 50 flash back: 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent
52 -oh=192.168.9.244 -oP=3310 -u=root -op=***
53 -a=1 54 \033[0m
55 ''' 56 57 class flashback: 58 def __init__(self): 59 self.host='' 60 self.user='' 61 self.password='' 62 self.port='3306' 63 self.fpath='' 64 self.tbevent='' 65 66 self.on_host='' 67 self.on_user='' 68 self.on_password='' 69 self.on_port='3306' 70 71 self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table 72 73 self._get_db() # 从输入参数获取连接数据库的相关参数值 74 75 # 连接数据库,该数据库是用来存储binlog文件分析后的内容 76 logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent)) 77 self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 78 self.cur = self.mysqlconn.cursor(cursor=DictCursor) 79 logging.info('MySQL which userd to store binlog event connection is ok') 80 81 # 连接数据库,该数据库的表结构必须跟binlogfile基于对数据库表结构一致 82 # 该数据库用于提供 binlog file 文件中涉及到表结构分析 83 logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port)) 84 self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8') 85 self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor) 86 logging.info('MySQL which userd to analyse online table schema connection is ok') 87 88 logging.info('\033[33mMySQL connection is ok\033[0m') 89 90 self.dml_sql='' 91 self.undo_sql='' 92 93 self.tbfield_where = [] 94 self.tbfield_set = [] 95 96 self.begin_time='' 97 self.db_name='' 98 self.tb_name='' 99 self.end_time=''100 self.end_pos=''101 self.sqltype=0102 103 #_get_db用于获取执行命令的输入参数104 def _get_db(self):105 logging.info('begin to assign values to parameters')106 if len(sys.argv) == 1:107 print(usage)108 sys.exit(1)109 elif sys.argv[1] == '--help':110 print(usage)111 sys.exit()112 elif len(sys.argv) > 2:113 for i in sys.argv[1:]:114 _argv = i.split('=')115 if _argv[0] == '-h':116 self.host = _argv[1]117 elif _argv[0] == '-u':118 self.user = _argv[1]119 elif _argv[0] == '-P':120 self.port = int(_argv[1])121 elif _argv[0] == '-f':122 self.fpath = _argv[1]123 elif _argv[0] == '-t':124 self.tbevent = _argv[1]125 elif _argv[0] == '-p':126 self.password = _argv[1]127 128 elif _argv[0] == '-oh':129 self.on_host = _argv[1]130 elif _argv[0] == '-ou':131 self.on_user = _argv[1]132 elif _argv[0] == '-oP':133 self.on_port = int(_argv[1])134 elif _argv[0] == '-op':135 self.on_password = _argv[1]136 137 elif _argv[0] == '-a':138 self.action = _argv[1]139 140 else:141 print(usage)142 143 #创建表格,用于存储分析后的BINLOG内容144 def create_tab(self):145 logging.info('creating table {} to store binlog event'.format(self.tbevent))146 create_tb_sql ='''147 CREATE TABLE IF NOT EXISTS {}(148 auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,149 binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',150 dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',151 dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',152 end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',153 db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',154 table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',155 sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',156 dml_sql LONGTEXT NULL COMMENT 'what sql excuted',157 undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',158 PRIMARY KEY (auto_id),159 INDEX sqltype(sqltype),160 INDEX dml_start_time (dml_start_time),161 INDEX dml_end_time (dml_end_time),162 INDEX end_log_pos (end_log_pos),163 INDEX db_name (db_name),164 INDEX table_name (table_name)165 )166 COLLATE='utf8_general_ci' ENGINE=InnoDB;167 TRUNCATE TABLE {};168 169 '''.format(self.tbevent,self.tbevent)170 self.cur.execute(create_tb_sql)171 logging.info('created table {} '.format(self.tbevent))172 173 #获取表格的列顺序对应的列名,并处理where set的时候,列与列之间的连接字符串是逗号还是 and174 def tbschema(self,dbname,tbname):175 self.tbfield_where = []176 self.tbfield_set = []177 178 sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)179 180 self.on_cur.execute(sql_tb)181 tbcol=self.on_cur.fetchall()182 183 i = 0184 for l in tbcol:185 #self.tbfield.append(l['Field'])186 if i==0:187 self.tbfield_where.append('`'+l['Field']+'`')188 self.tbfield_set.append('`'+l['Field']+'`')189 i+=1190 else:191 self.tbfield_where.append('/*where*/ and /*where*/' + '`'+l['Field']+'`')192 self.tbfield_set.append( '/*set*/ , /*set*/'+'`'+l['Field']+'`' )193 194 # 一个事务记录一行,若binlog file中的行记录包含 Table_map,则为事务的开始记录195 def rowrecord(self,bl_line):196 try:197 if bl_line.find('Table_map:') != -1:198 l = bl_line.index('server')199 m = bl_line.index('end_log_pos')200 n = bl_line.index('Table_map')201 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')202 203 self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]204 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]205 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]206 207 self.tbschema(self.db_name,self.tb_name)208 except Exception:209 return 'funtion rowrecord error'210 211 def dml_tran(self,bl_line):212 try:213 214 215 if bl_line.find('Xid =') != -1:216 217 l = bl_line.index('server')218 m = bl_line.index('end_log_pos')219 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')220 self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]221 self.end_pos = int(bl_line[m::].split(' ')[1])222 223 224 225 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')226 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')227 228 if self.dml_sql.startswith(' INSERT INTO '):229 self.sqltype=1230 elif self.dml_sql.startswith(' UPDATE '):231 self.sqltype=2232 elif self.dml_sql.startswith(' DELETE '):233 self.sqltype=3234 235 record_sql = ''236 undosql_desc = ''237 238 #同个事务内部的行记录修改SQL,反序存储239 for l in self.undo_sql.splitlines():240 if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):241 undosql_desc = record_sql + undosql_desc242 record_sql = ''243 record_sql = record_sql + l244 else:245 record_sql = record_sql + l246 247 self.undo_sql = record_sql + undosql_desc248 self.undo_sql = self.undo_sql.lstrip()[1:]+';'249 250 #处理非空格的空白特殊字符251 self.dml_sql = self.esc_code(self.dml_sql)252 self.undo_sql = self.esc_code(self.undo_sql)253 254 #单独处理 转移字符: \'255 self.dml_sql = self.dml_sql.replace("'", "''").replace('\\x27',"''''") # + ';'256 self.undo_sql = self.undo_sql.replace("'", "''").replace('\\x27',"''''") # + ';'257 258 if len(self.dml_sql)>500000000:259 with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql', 'w') as w_f:260 w_f.write('begin;' + '\n')261 w_f.write(self.undo_sql)262 w_f.write('commit;' + '\n')263 self.dml_sql=''264 self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'265 logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))266 267 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(268 self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,269 self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)270 271 self.cur.execute(insert_sql)272 self.mysqlconn.commit()273 274 self.dml_sql = ''275 self.undo_sql = ''276 except Exception:277 print( 'funtion dml_tran error')278 279 280 def analyse_binlog(self):281 try:282 sqlcomma=0283 self.create_tab()284 285 with open(self.fpath,'r') as binlog_file:286 logging.info('\033[36mbegining to analyze the binlog file ,this may be take a long time !!!\033[0m')287 logging.info('\033[36manalyzing...\033[0m')288 for bline in binlog_file:289 if bline.find('Table_map:') != -1:290 self.rowrecord(bline)291 bline=''292 elif bline.rstrip()=='### SET':293 bline = bline[3:]294 sqlcomma=1295 elif bline.rstrip()=='### WHERE':296 bline = bline[3:]297 sqlcomma = 2298 elif bline.startswith('### @'):299 len_f=len('### @')300 i=bline[len_f:].split('=')[0]301 302 #处理timestamp类型303 if bline[8+len(i):].split(' ')[2] == 'TIMESTAMP(0)':304 stop_pos = bline.find(' /* TIMESTAMP(0) meta=')305 bline = bline.split('=')[0] + '=from_unixtime(' + bline[:stop_pos].split('=')[1] + ')'306 307 #处理负数存储方式308 if bline.split('=')[1].startswith('-'):309 stop_pos = bline.find(' /* TIMESTAMP(0) meta=')310 bline = bline.split('=')[0] + '=' + bline.split('=')[1].split(' ')[0]+'\n'311 312 if sqlcomma==1:313 bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]314 elif sqlcomma==2:315 bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]316 317 elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):318 bline = bline[3:]319 320 elif bline.find('Xid =') != -1:321 self.dml_tran(bline)322 bline=''323 else:324 bline = ''325 326 if bline.rstrip('\n') != '':327 self.dml_sql = self.dml_sql + bline + ' '328 except Exception:329 return 'function do error'330 331 def esc_code(self,sql):332 esc={333 '\\x07':'\a','\\x08':'\b','\\x0c':'\f','\\x0a':'\n','\\x0d':'\r','\\x09':'\t','\\x0b':'\v','\\x5c':'\\',334 #'\\x27':'\'',335 '\\x22':'\"','\\x3f':'\?','\\x00':'\0'336 }337 338 for k,v in esc.items():339 sql=sql.replace(k,v)340 return sql341 342 def binlogdesc(self):343 344 countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)345 print(countsql)346 self.cur.execute(countsql)347 count_row=self.cur.fetchall()348 349 update_count=0350 insert_couont=0351 delete_count=0352 for row in count_row:353 if row['sqltype']==1:354 insert_couont=row['numbers']355 elif row['sqltype']==2:356 update_count=row['numbers']357 elif row['sqltype']==3:358 delete_count=row['numbers']359 logging.info('\033[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !\033[0m(all number is accurate, the other is approximate value) \033[0m'.format(insert_couont+update_count+delete_count,insert_couont,update_count,delete_count))360 361 def undosql(self,number):362 #这里会有几个问题:363 #1 如果一共有几十万甚至更多的事务操作,那么这个python脚本,极为占用内存,有可能执行错误;364 #2 如果单个事务中,涉及修改的行数高达几十万行,其binlog file 达好几G,这里也会有内存损耗问题;365 #所以,针对第一点,这里考虑对超多事务进行一个分批执行处理,每个批次处理number个事务,避免一次性把所有事务放到python中;但是第2点,目前暂未处理366 367 tran_num=1368 id=0369 370 tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)371 372 self.cur.execute(tran_num_sql)373 tran_rows=self.cur.fetchall()374 375 for num in tran_rows:376 tran_num=num['table_rows']377 378 logging.info('\033[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))379 380 while id<=tran_num:381 logging.info('doing batch : {} '.format(int(id/number)+1))382 undo_sql='select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)383 self.cur.execute(undo_sql)384 385 undo_rows=self.cur.fetchall()386 f_sql=''387 388 for u_row in undo_rows:389 try:390 self.on_cur.execute(u_row['undo_sql'])391 self.on_mysqlconn.commit()392 except Exception:393 print('auto_id:',u_row['auto_id'])394 id+=number395 396 397 def undo_file(self,number):398 # 也可以选择私用undo_file将undo_sql导入到文件中,然后再source399 400 tran_num=1401 id=0402 403 tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)404 405 self.cur.execute(tran_num_sql)406 tran_rows=self.cur.fetchall()407 408 for num in tran_rows:409 tran_num=num['table_rows']410 411 logging.info('copy undo_sql to undo file on : /tmp/flashback_undosql/undo_file_flashback.sql')412 logging.info('\033[32mThere has {} transactions ,need {} batchs to copy ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))413 414 with open('/tmp/flashback_undosql/undo_file_flashback.sql', 'w') as w_f:415 while id<=tran_num:416 logging.info('doing batch : {} '.format(int(id/number)+1))417 undo_sql='select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)418 self.cur.execute(undo_sql)419 420 undo_rows=self.cur.fetchall()421 for u_row in undo_rows:422 try:423 w_f.write('begin;' + '\n')424 w_f.write('# auto_id'+str(u_row['auto_id']) + '\n')425 w_f.write(u_row['undo_sql'] + '\n')426 w_f.write('commit;' + '\n')427 except Exception:428 print('auto_id',u_row['auto_id'])429 #time.sleep(2)430 id+=number431 432 def do(self):433 if self.action=='0':434 self.analyse_binlog()435 logging.info('\033[36mfinished to analyze the binlog file !!!\033[0m')436 #self.binlogdesc()437 elif self.action=='1':438 self.undosql(10000)439 440 def closeconn(self):441 self.cur.close()442 self.on_cur.close()443 logging.info('release all db connections')444 logging.info('\033[33mAll done,check the {} which stored binlog event on host {} , port {} \033[0m'.format(self.tbevent,self.host,self.port))445 446 def main():447 p = flashback()448 p.do()449 p.closeconn()450 451 if __name__ == "__main__":452 main()
Nach dem Login kopieren
Das geänderte SQL jeder Datensatzzeile wird getrennt
Das unabhängige SQL wird in umgekehrter Reihenfolge gespeichert
Der Spaltennamenabgleich von Zeilendatensätzen und die in der Binlog-Datei gespeicherte Spaltenseriennummer können nicht direkt verwendet werden
WHERE-Teil und SET Es müssen keine Schlüsselwörter oder Symbole zwischen den Teilen UND oder Kommas hinzugefügt werden
DELETE SQL muss in INSERT umgekehrt werden
UPDATE SQL muss umgekehrt werden. Ersetzen Sie die WHERE- und SET-Teile
INSERT SQL muss in DELETE umgekehrt werden






Gelber Bereich: Diese 6 Parameter liefern die relevanten Werte zum Analysieren und Speichern der Binlog-Datei und geben die Verknüpfungsmethode der Datenbank an, in der die Analyseergebnisse gespeichert werden, sowie den Speicherort Die Binlog-Datei und die Methode zum Speichern der Ergebnisse Muss nur die gleiche Tabellenstruktur wie die Online-Bibliothek haben, nicht unbedingt Es muss eine Master-Slave-Bibliothek sein
Grüner Bereich: Die wichtigste Option -a, 0 bedeutet nur; Analyse der Binlog-Datei, 1 bedeutet, dass nur Rollback-Vorgänge ausgeführt werden, 0 muss zuerst ausgeführt werden
Lila Bereich: Beispiel.
In diesem Fall verwenden die meisten von ihnen Backups Dateien + Binlog-Lösung
Aber dieses Skript kann es auch erfüllen, aber bitte führen Sie es nicht direkt online aus. Überprüfen Sie zuerst die Analyseergebnisse, um festzustellen, ob sie konsistent sind. Wenn ja, stoppen Sie einen bestimmten Vorgang aus der Datenbank, führen Sie ihn in der Slave-Datenbank aus und entwickeln Sie schließlich den Geschäftszugriff, um zu überprüfen, ob er zum angegebenen Zeitpunkt wiederhergestellt wurde und ob die Daten normal sind.
- Einige Tabellen haben bestimmte Vorgänge während eines bestimmten Zeitraums zurückgesetzt
Dies kann nicht rein technisch gehandhabt werden, sondern muss umfassend betrachtet werden
-
Wie kann in diesem Fall der Änderungsvorgang der Tab-A-Tabelle überprüft werden?
Ich persönlich halte diese Methode für praktikabler. Speichern Sie die Daten der tabA-Tabelle in der Testumgebung und analysieren Sie dann die Binlog-Datei undo sql von 11 bis 12 Uhr. Anschließend wird die Tabelle in der Testumgebung auf 11 Uhr zurückgesetzt. Anschließend vergleichen die Entwicklung und das Unternehmen die 11-Uhr-Daten in der Testumgebung mit den vorhandenen Daten online, um festzustellen, welche Zeilen und Spalten zurückgesetzt werden müssen Wenn dies nicht erforderlich ist, entwickeln und übermitteln Sie das SQL-Skript und führen Sie es dann online aus. Tatsächlich bietet DBA hier nur eine Rolle, nämlich das Zurücksetzen der Tabellenregisterkarte A auf einen bestimmten Zeitpunkt in einer neuen Umgebung, bietet jedoch keine
direkte - Online-SQL-Rollback-Verarbeitung .
Suchen Sie in diesem Fall die entsprechende Transaktion und führen Sie den Rollback-Vorgang wie oben beschrieben durch. Ich bin so schüchtern und ängstlich
-
准备测试环境实例存储分析后的数据
测试环境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python脚本分析文件,action=0
线上测试环境修改set global max_allowed_packet = 1073741824
回滚数据,action=1
线上测试环境修改set global max_allowed_packet = 4194304
1 --测试环境(请安装pymysql):IP: 192.168.9.242,PORT:3310 ,数据库:flashback,表格:tbevent 2 --具有线上表结构的db:IP:192.168.9.243 PORT:3310 3 4 5 mysql> show global variables like 'max_allowed_packet'; 6 +--------------------+----------+ 7 | Variable_name | Value | 8 +--------------------+----------+ 9 | max_allowed_packet | 16777216 |10 +--------------------+----------+11 1 row in set (0.00 sec)12 13 mysql> set global max_allowed_packet = 1073741824;14 Query OK, 0 rows affected (0.00 sec)15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=019 2017-06-19 10:59:39,041 INFO begin to assign values to parameters20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331023 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok24 2017-06-19 10:59:39,054 INFO MySQL connection is ok25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent 27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!28 2017-06-19 10:59:39,061 INFO analyzing...29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!30 2017-06-19 11:49:53,782 INFO release all db connections31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=135 2017-06-19 16:30:20,633 INFO begin to assign values to parameters36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331039 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok40 2017-06-19 16:30:20,871 INFO MySQL connection is ok41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions 42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空间忽略不截图--------------46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310
3.3.2 某段时间某些表格回滚某些操作
准备测试环境实例存储分析后的数据
测试环境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python脚本分析文件,action=0
分析帅选需要的事务,rename表格
dump 对应的表格到测试环境
回滚数据,action=1
提交给开发业务对比数据
3.3.3 回滚某个/些SQL
准备测试环境实例存储分析后的数据
测试环境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python脚本分析文件,action=0
分析帅选需要的事务,rename表格
dump 对应的表格到测试环境
回滚数据,action=1
提交给开发业务对比数据
4 python脚本
脚本会不定期修复bug,若是感兴趣,可以往github下载: 中的 mysql_xinysu_flashback 。
1 # -*- coding: utf-8 -*- 2 __author__ = 'xinysu' 3 __date__ = '2017/6/15 10:30' 4 5 6 7 import re 8 import os 9 import sys 10 import datetime 11 import time 12 import logging 13 import importlib 14 importlib.reload(logging) 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 16 17 import pymysql 18 from pymysql.cursors import DictCursor 19 20 usage='''\nusage: python [script's path] [option] 21 ALL options need to assign: 22 \033[1;33;40m 23 -h : host, the database host,which database will store the results after analysis 24 -u : user, the db user 25 -p : password, the db user's password 26 -P : port, the db port 27 28 -f : file path, the binlog file 29 -t : table name, the table name to store the results after analysis , {dbname}.{tbname}, 30 when you want to store in `test` db and the table name is `tbevent`,then this parameter 31 is test.tbevent 32 \033[1;34;40m 33 -oh : online host, the database host,which database have the online table schema 34 -ou : online user, the db user 35 -op : online password, the db user's password 36 -oP : online port, the db port 37 \033[1;32;40m 38 -a : action, 39 0 just analyse the binlog file ,and store sql in table; 40 1 after execute self.dotype=0, execute the undo_sql in the table 41 \033[0m 42 --help: help document 43 \033[1;35;40m 44 Example: 45 analysize binlog: 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 47 -oh=192.168.9.244 -oP=3310 -u=root -op=*** 48 -a=0 49 50 flash back: 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 52 -oh=192.168.9.244 -oP=3310 -u=root -op=*** 53 -a=1 54 \033[0m 55 ''' 56 57 class flashback: 58 def __init__(self): 59 self.host='' 60 self.user='' 61 self.password='' 62 self.port='3306' 63 self.fpath='' 64 self.tbevent='' 65 66 self.on_host='' 67 self.on_user='' 68 self.on_password='' 69 self.on_port='3306' 70 71 self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table 72 73 self._get_db() # 从输入参数获取连接数据库的相关参数值 74 75 # 连接数据库,该数据库是用来存储binlog文件分析后的内容 76 logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent)) 77 self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 78 self.cur = self.mysqlconn.cursor(cursor=DictCursor) 79 logging.info('MySQL which userd to store binlog event connection is ok') 80 81 # 连接数据库,该数据库的表结构必须跟binlogfile基于对数据库表结构一致 82 # 该数据库用于提供 binlog file 文件中涉及到表结构分析 83 logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port)) 84 self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8') 85 self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor) 86 logging.info('MySQL which userd to analyse online table schema connection is ok') 87 88 logging.info('\033[33mMySQL connection is ok\033[0m') 89 90 self.dml_sql='' 91 self.undo_sql='' 92 93 self.tbfield_where = [] 94 self.tbfield_set = [] 95 96 self.begin_time='' 97 self.db_name='' 98 self.tb_name='' 99 self.end_time=''100 self.end_pos=''101 self.sqltype=0102 103 #_get_db用于获取执行命令的输入参数104 def _get_db(self):105 logging.info('begin to assign values to parameters')106 if len(sys.argv) == 1:107 print(usage)108 sys.exit(1)109 elif sys.argv[1] == '--help':110 print(usage)111 sys.exit()112 elif len(sys.argv) > 2:113 for i in sys.argv[1:]:114 _argv = i.split('=')115 if _argv[0] == '-h':116 self.host = _argv[1]117 elif _argv[0] == '-u':118 self.user = _argv[1]119 elif _argv[0] == '-P':120 self.port = int(_argv[1])121 elif _argv[0] == '-f':122 self.fpath = _argv[1]123 elif _argv[0] == '-t':124 self.tbevent = _argv[1]125 elif _argv[0] == '-p':126 self.password = _argv[1]127 128 elif _argv[0] == '-oh':129 self.on_host = _argv[1]130 elif _argv[0] == '-ou':131 self.on_user = _argv[1]132 elif _argv[0] == '-oP':133 self.on_port = int(_argv[1])134 elif _argv[0] == '-op':135 self.on_password = _argv[1]136 137 elif _argv[0] == '-a':138 self.action = _argv[1]139 140 else:141 print(usage)142 143 #创建表格,用于存储分析后的BINLOG内容144 def create_tab(self):145 logging.info('creating table {} to store binlog event'.format(self.tbevent))146 create_tb_sql ='''147 CREATE TABLE IF NOT EXISTS {}(148 auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,149 binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',150 dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',151 dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',152 end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',153 db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',154 table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',155 sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',156 dml_sql LONGTEXT NULL COMMENT 'what sql excuted',157 undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',158 PRIMARY KEY (auto_id),159 INDEX sqltype(sqltype),160 INDEX dml_start_time (dml_start_time),161 INDEX dml_end_time (dml_end_time),162 INDEX end_log_pos (end_log_pos),163 INDEX db_name (db_name),164 INDEX table_name (table_name)165 )166 COLLATE='utf8_general_ci' ENGINE=InnoDB;167 TRUNCATE TABLE {};168 169 '''.format(self.tbevent,self.tbevent)170 self.cur.execute(create_tb_sql)171 logging.info('created table {} '.format(self.tbevent))172 173 #获取表格的列顺序对应的列名,并处理where set的时候,列与列之间的连接字符串是逗号还是 and174 def tbschema(self,dbname,tbname):175 self.tbfield_where = []176 self.tbfield_set = []177 178 sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)179 180 self.on_cur.execute(sql_tb)181 tbcol=self.on_cur.fetchall()182 183 i = 0184 for l in tbcol:185 #self.tbfield.append(l['Field'])186 if i==0:187 self.tbfield_where.append('`'+l['Field']+'`')188 self.tbfield_set.append('`'+l['Field']+'`')189 i+=1190 else:191 self.tbfield_where.append('/*where*/ and /*where*/' + '`'+l['Field']+'`')192 self.tbfield_set.append( '/*set*/ , /*set*/'+'`'+l['Field']+'`' )193 194 # 一个事务记录一行,若binlog file中的行记录包含 Table_map,则为事务的开始记录195 def rowrecord(self,bl_line):196 try:197 if bl_line.find('Table_map:') != -1:198 l = bl_line.index('server')199 m = bl_line.index('end_log_pos')200 n = bl_line.index('Table_map')201 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')202 203 self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]204 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]205 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]206 207 self.tbschema(self.db_name,self.tb_name)208 except Exception:209 return 'funtion rowrecord error'210 211 def dml_tran(self,bl_line):212 try:213 214 215 if bl_line.find('Xid =') != -1:216 217 l = bl_line.index('server')218 m = bl_line.index('end_log_pos')219 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')220 self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]221 self.end_pos = int(bl_line[m::].split(' ')[1])222 223 224 225 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')226 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')227 228 if self.dml_sql.startswith(' INSERT INTO '):229 self.sqltype=1230 elif self.dml_sql.startswith(' UPDATE '):231 self.sqltype=2232 elif self.dml_sql.startswith(' DELETE '):233 self.sqltype=3234 235 record_sql = ''236 undosql_desc = ''237 238 #同个事务内部的行记录修改SQL,反序存储239 for l in self.undo_sql.splitlines():240 if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):241 undosql_desc = record_sql + undosql_desc242 record_sql = ''243 record_sql = record_sql + l244 else:245 record_sql = record_sql + l246 247 self.undo_sql = record_sql + undosql_desc248 self.undo_sql = self.undo_sql.lstrip()[1:]+';'249 250 #处理非空格的空白特殊字符251 self.dml_sql = self.esc_code(self.dml_sql)252 self.undo_sql = self.esc_code(self.undo_sql)253 254 #单独处理 转移字符: \'255 self.dml_sql = self.dml_sql.replace("'", "''").replace('\\x27',"''''") # + ';'256 self.undo_sql = self.undo_sql.replace("'", "''").replace('\\x27',"''''") # + ';'257 258 if len(self.dml_sql)>500000000:259 with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql', 'w') as w_f:260 w_f.write('begin;' + '\n')261 w_f.write(self.undo_sql)262 w_f.write('commit;' + '\n')263 self.dml_sql=''264 self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'265 logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))266 267 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(268 self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,269 self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)270 271 self.cur.execute(insert_sql)272 self.mysqlconn.commit()273 274 self.dml_sql = ''275 self.undo_sql = ''276 except Exception:277 print( 'funtion dml_tran error')278 279 280 def analyse_binlog(self):281 try:282 sqlcomma=0283 self.create_tab()284 285 with open(self.fpath,'r') as binlog_file:286 logging.info('\033[36mbegining to analyze the binlog file ,this may be take a long time !!!\033[0m')287 logging.info('\033[36manalyzing...\033[0m')288 for bline in binlog_file:289 if bline.find('Table_map:') != -1:290 self.rowrecord(bline)291 bline=''292 elif bline.rstrip()=='### SET':293 bline = bline[3:]294 sqlcomma=1295 elif bline.rstrip()=='### WHERE':296 bline = bline[3:]297 sqlcomma = 2298 elif bline.startswith('### @'):299 len_f=len('### @')300 i=bline[len_f:].split('=')[0]301 302 #处理timestamp类型303 if bline[8+len(i):].split(' ')[2] == 'TIMESTAMP(0)':304 stop_pos = bline.find(' /* TIMESTAMP(0) meta=')305 bline = bline.split('=')[0] + '=from_unixtime(' + bline[:stop_pos].split('=')[1] + ')'306 307 #处理负数存储方式308 if bline.split('=')[1].startswith('-'):309 stop_pos = bline.find(' /* TIMESTAMP(0) meta=')310 bline = bline.split('=')[0] + '=' + bline.split('=')[1].split(' ')[0]+'\n'311 312 if sqlcomma==1:313 bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]314 elif sqlcomma==2:315 bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]316 317 elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):318 bline = bline[3:]319 320 elif bline.find('Xid =') != -1:321 self.dml_tran(bline)322 bline=''323 else:324 bline = ''325 326 if bline.rstrip('\n') != '':327 self.dml_sql = self.dml_sql + bline + ' '328 except Exception:329 return 'function do error'330 331 def esc_code(self,sql):332 esc={333 '\\x07':'\a','\\x08':'\b','\\x0c':'\f','\\x0a':'\n','\\x0d':'\r','\\x09':'\t','\\x0b':'\v','\\x5c':'\\',334 #'\\x27':'\'',335 '\\x22':'\"','\\x3f':'\?','\\x00':'\0'336 }337 338 for k,v in esc.items():339 sql=sql.replace(k,v)340 return sql341 342 def binlogdesc(self):343 344 countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)345 print(countsql)346 self.cur.execute(countsql)347 count_row=self.cur.fetchall()348 349 update_count=0350 insert_couont=0351 delete_count=0352 for row in count_row:353 if row['sqltype']==1:354 insert_couont=row['numbers']355 elif row['sqltype']==2:356 update_count=row['numbers']357 elif row['sqltype']==3:358 delete_count=row['numbers']359 logging.info('\033[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !\033[0m(all number is accurate, the other is approximate value) \033[0m'.format(insert_couont+update_count+delete_count,insert_couont,update_count,delete_count))360 361 def undosql(self,number):362 #这里会有几个问题:363 #1 如果一共有几十万甚至更多的事务操作,那么这个python脚本,极为占用内存,有可能执行错误;364 #2 如果单个事务中,涉及修改的行数高达几十万行,其binlog file 达好几G,这里也会有内存损耗问题;365 #所以,针对第一点,这里考虑对超多事务进行一个分批执行处理,每个批次处理number个事务,避免一次性把所有事务放到python中;但是第2点,目前暂未处理366 367 tran_num=1368 id=0369 370 tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)371 372 self.cur.execute(tran_num_sql)373 tran_rows=self.cur.fetchall()374 375 for num in tran_rows:376 tran_num=num['table_rows']377 378 logging.info('\033[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))379 380 while id<=tran_num:381 logging.info('doing batch : {} '.format(int(id/number)+1))382 undo_sql='select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)383 self.cur.execute(undo_sql)384 385 undo_rows=self.cur.fetchall()386 f_sql=''387 388 for u_row in undo_rows:389 try:390 self.on_cur.execute(u_row['undo_sql'])391 self.on_mysqlconn.commit()392 except Exception:393 print('auto_id:',u_row['auto_id'])394 id+=number395 396 397 def undo_file(self,number):398 # 也可以选择私用undo_file将undo_sql导入到文件中,然后再source399 400 tran_num=1401 id=0402 403 tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)404 405 self.cur.execute(tran_num_sql)406 tran_rows=self.cur.fetchall()407 408 for num in tran_rows:409 tran_num=num['table_rows']410 411 logging.info('copy undo_sql to undo file on : /tmp/flashback_undosql/undo_file_flashback.sql')412 logging.info('\033[32mThere has {} transactions ,need {} batchs to copy ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))413 414 with open('/tmp/flashback_undosql/undo_file_flashback.sql', 'w') as w_f:415 while id<=tran_num:416 logging.info('doing batch : {} '.format(int(id/number)+1))417 undo_sql='select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)418 self.cur.execute(undo_sql)419 420 undo_rows=self.cur.fetchall()421 for u_row in undo_rows:422 try:423 w_f.write('begin;' + '\n')424 w_f.write('# auto_id'+str(u_row['auto_id']) + '\n')425 w_f.write(u_row['undo_sql'] + '\n')426 w_f.write('commit;' + '\n')427 except Exception:428 print('auto_id',u_row['auto_id'])429 #time.sleep(2)430 id+=number431 432 def do(self):433 if self.action=='0':434 self.analyse_binlog()435 logging.info('\033[36mfinished to analyze the binlog file !!!\033[0m')436 #self.binlogdesc()437 elif self.action=='1':438 self.undosql(10000)439 440 def closeconn(self):441 self.cur.close()442 self.on_cur.close()443 logging.info('release all db connections')444 logging.info('\033[33mAll done,check the {} which stored binlog event on host {} , port {} \033[0m'.format(self.tbevent,self.host,self.port))445 446 def main():447 p = flashback()448 p.do()449 p.closeconn()450 451 if __name__ == "__main__":452 main()
Das obige ist der detaillierte Inhalt vonDetaillierte Erläuterung des Beispiels eines MySQL-basierten Binlog-Rollback-Tools. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Heiße KI -Werkzeuge

Undresser.AI Undress
KI-gestützte App zum Erstellen realistischer Aktfotos

AI Clothes Remover
Online-KI-Tool zum Entfernen von Kleidung aus Fotos.

Undress AI Tool
Ausziehbilder kostenlos

Clothoff.io
KI-Kleiderentferner

AI Hentai Generator
Erstellen Sie kostenlos Ai Hentai.

Heißer Artikel

Heiße Werkzeuge

Notepad++7.3.1
Einfach zu bedienender und kostenloser Code-Editor

SublimeText3 chinesische Version
Chinesische Version, sehr einfach zu bedienen

Senden Sie Studio 13.0.1
Leistungsstarke integrierte PHP-Entwicklungsumgebung

Dreamweaver CS6
Visuelle Webentwicklungstools

SublimeText3 Mac-Version
Codebearbeitungssoftware auf Gottesniveau (SublimeText3)

Heiße Themen



Apache verbindet eine Verbindung zu einer Datenbank erfordert die folgenden Schritte: Installieren Sie den Datenbanktreiber. Konfigurieren Sie die Datei web.xml, um einen Verbindungspool zu erstellen. Erstellen Sie eine JDBC -Datenquelle und geben Sie die Verbindungseinstellungen an. Verwenden Sie die JDBC -API, um über den Java -Code auf die Datenbank zuzugreifen, einschließlich Verbindungen, Erstellen von Anweisungen, Bindungsparametern, Ausführung von Abfragen oder Aktualisierungen und Verarbeitungsergebnissen.

In diesem Artikel wird erläutert, wie die Effizienz der Hadoop -Datenverarbeitung auf Debian -Systemen verbessert werden kann. Optimierungsstrategien decken Hardware -Upgrades, Parameteranpassungen des Betriebssystems, Änderungen der Hadoop -Konfiguration und die Verwendung effizienter Algorithmen und Tools ab. 1. Hardware -Ressourcenverstärkung stellt sicher, dass alle Knoten konsistente Hardwarekonfigurationen aufweisen, insbesondere die Aufmerksamkeit auf die Leistung von CPU-, Speicher- und Netzwerkgeräten. Die Auswahl von Hochleistungs-Hardwarekomponenten ist wichtig, um die Gesamtverarbeitungsgeschwindigkeit zu verbessern. 2. Betriebssystem -Tunes -Dateideskriptoren und Netzwerkverbindungen: Ändern Sie die Datei /etc/security/limits.conf, um die Obergrenze der Dateideskriptoren und Netzwerkverbindungen zu erhöhen, die gleichzeitig vom System geöffnet werden dürfen. JVM-Parameteranpassung: Einstellen in der Hadoop-env.sh-Datei einstellen

Die Schritte zur Installation eines SSL -Zertifikats auf dem Debian Mail -Server sind wie folgt: 1. Installieren Sie zuerst das OpenSSL -Toolkit und stellen Sie sicher, dass das OpenSSL -Toolkit bereits in Ihrem System installiert ist. Wenn nicht installiert, können Sie den folgenden Befehl installieren: sudoapt-getupdatesudoapt-getinstallopenssl2. Generieren Sie den privaten Schlüssel und die Zertifikatanforderung als nächst

In Debian Systems ist OpenSSL eine wichtige Bibliothek für Verschlüsselung, Entschlüsselung und Zertifikatverwaltung. Um einen Mann-in-the-Middle-Angriff (MITM) zu verhindern, können folgende Maßnahmen ergriffen werden: Verwenden Sie HTTPS: Stellen Sie sicher, dass alle Netzwerkanforderungen das HTTPS-Protokoll anstelle von HTTP verwenden. HTTPS verwendet TLS (Transport Layer Security Protocol), um Kommunikationsdaten zu verschlüsseln, um sicherzustellen, dass die Daten während der Übertragung nicht gestohlen oder manipuliert werden. Überprüfen Sie das Serverzertifikat: Überprüfen Sie das Serverzertifikat im Client manuell, um sicherzustellen, dass es vertrauenswürdig ist. Der Server kann manuell durch die Delegate -Methode der URLSession überprüft werden

Das Upgrade der Zookeeper -Version auf Debian -System kann die folgenden Schritte ausführen: 1. Wenn Sie die vorhandenen Konfiguration und Daten vor einem Upgrade unterstützen, wird dringend empfohlen, die vorhandenen Zookeeper -Konfigurationsdateien und Datenverzeichnisse zu sichern. sudocp-r/var/lib/zookeeper/var/lib/zookeper_backupsudocp/etc/zookeper/conf/zoo.cfg/etc/zookeeper/conf/zookeeper/z

Wenn Sie Hadoop-Protokolle auf Debian verwalten, können Sie die folgenden Schritte und Best Practices befolgen: Protokollaggregation Aktivieren Sie die Protokollaggregation: Set Garn.log-Aggregation-Enable in true in der Datei marn-site.xml, um die Protokollaggregation zu aktivieren. Konfigurieren von Protokoll-Retentionsrichtlinien: Setzen Sie Garn.log-Aggregation.Retain-Sekunden, um die Retentionszeit des Protokolls zu definieren, z. B. 172800 Sekunden (2 Tage). Log Speicherpfad angeben: über Garn.n

In diesem Leitfaden werden Sie erfahren, wie Sie Syslog in Debian -Systemen verwenden. Syslog ist ein Schlüsseldienst in Linux -Systemen für Protokollierungssysteme und Anwendungsprotokollnachrichten. Es hilft den Administratoren, die Systemaktivitäten zu überwachen und zu analysieren, um Probleme schnell zu identifizieren und zu lösen. 1. Grundkenntnisse über syslog Die Kernfunktionen von Syslog umfassen: zentrales Sammeln und Verwalten von Protokollnachrichten; Unterstützung mehrerer Protokoll -Ausgabesformate und Zielorte (z. B. Dateien oder Netzwerke); Bereitstellung von Echtzeit-Protokoll- und Filterfunktionen. 2. Installieren und Konfigurieren von Syslog (mit Rsyslog) Das Debian -System verwendet standardmäßig Rsyslog. Sie können es mit dem folgenden Befehl installieren: sudoaptupdatesud

Verbesserung der Hadoop -Datenlokalisierung auf Debian kann durch folgende Methoden erreicht werden: ausgewogene Hardware -Ressourcen: Stellen Sie sicher, dass die Hardware -Ressourcen (wie CPU, Speicher, Speicherkapazität usw.) jedes Datanode -Knotens im HDFS -Cluster zueinander ähneln, um offensichtliche Leistungsengpässe zu vermeiden. Optimieren Sie die Strategie zum Datenschreiben: Konfigurieren Sie die HDFS -Datenschreibstrategie vernünftigerweise, z. Verwenden von Balancer -Tools: HD nutzen
