mysql ベースの binlog ロールバック ツールの例の詳細な説明
更新および削除の条件が誤って書き込まれているか、まったく書き込まれていない場合、データ操作エラーが発生し、誤操作された行レコードを復元する必要があります。この状況は実際に発生します。バックアップ ファイル + binlog を使用してテスト環境に復元し、その後データ修復を実行することもできますが、これには実際にはある程度の時間とリソースが必要です。
1 実装内容

ロールバックは 2 つのコマンドに分かれています。最初のコマンドは binglog を分析してデータベースに保存します。2 番目のコマンドはロールバック操作を実行します。
ロールバックする場合、実行スクリプトとロールバック スクリプトを統合できます。データベースに保存されているため、更新内容とロールバック内容を確認できます。
保存された分析テーブルに従って、復元するトランザクションまたは指定したテーブルを指定すると便利です。
詳細なログ出力には、分析の進行状況が表示されます。実行の進行状況。


前提条件: インスタンスが binlog を開始しており、形式はROWです。 ️ Python を使用して、mysqlbinlog の後のログ ファイルのテキスト分析と処理を実行します。処理プロセス全体で、次の 6 つの難しい点に対処する必要があります:
- 同じトランザクション 実行順序を逆に実行する必要がある
- SQLの解析とロールバック
- 同じトランザクション操作で異なるテーブル処理
- 改行、タブなどのエスケープ文字処理文字など
- タイムスタンプのデータ型パラメータ値の変換
- 負の数値の処理
- 単一トランザクションに行変更SQL操作が含まれるmax_allow
- データベース全体ではなく特定のテーブルのロールバックを実行rollback
- 2.1 トランザクション
- の開始と終了は、Xid が出現する位置に応じて判断されます。このサイクルは、ファイルの順次走査が終了するまで続きます。
2.2

同じトランザクション内で、ロールバック中に複数のテーブルと複数の行レコードが変更された場合、SQL は逆の順序でロールバックされる必要があります。抽出したSQLを逆順に保存するには?アイデアは次のとおりです:
レコードの各行の変更されたSQLが分離されます
独立したSQLが逆順に格納されます
2.3 ロールバック SQL を解析する
行レコードの列名マッチング、binlog ファイルに保存されている列番号を直接使用することはできません。WHERE 部分と SET 部分の間にキーワードや記号を追加する必要はありません。 SQLをINSERT
- UPDATE SQLに置き換える必要があります。INSERT SQLをDELETEに置き換える必要があります。列のシリアル番号を置き換える場合は、この点に注意する必要があります。列名。 各行レコードの前には、「Table_map」マークを含むレコードの行があり、この行でどのテーブルが変更されたかを示します。このプロンプトに従って、ビンログ内の列のシリアル番号をその列に置き換えることができます。名前。
2.5 エスケープ文字の処理
- binlog ファイルは、エスケープ文字列ストレージを使用して、スペース以外の空白文字を処理します。たとえば、テーブルの挿入列のレコードには改行文字が含まれていますが、実際には binlog ファイル内にあります。 、x0a は改行操作の置き換えに使用されるため、データをロールバックするプロセス中にエスケープ文字を処理する必要があります。
転送文字テーブルを以下に示します。
2.6 タイムスタンプデータ型の処理

データベースに実際に格納されるタイムスタンプの値は必須の INT 型です 関数を使用します変換する from_unixtime 。

2.7 負の値の処理

したがって、INT および VALUE が負の数値のさまざまなデータ型に遭遇した場合、undo_sql を実行する前にこの範囲の値を削除する必要があります。
2.8 単一トランザクション行レコードの合計 SQL が max_allowed_package 処理を超えています
2.9 対象を絞ったロールバック
3 使用説明
3.1 パラメータの説明

黄色の領域: これらの 6 つのパラメーターは、binlog ファイルの分析と保存に関連する値を提供し、分析結果を保存するデータベースのリンク方法、binlog ファイルの場所、名前を示します。結果が保存されるテーブル
青色の領域: これらの 4 つのパラメーターは、オンライン データベースのテーブル構造と一致する DB インスタンスの接続方法を提供します。必要なのはオンライン データベースと同じテーブル構造だけです。必ずマスター/スレーブ データベースである必要があります。
緑色の領域: 最も重要なオプション -a、0 は binlog ファイルの分析のみを意味し、1 はロールバック操作の実行のみを意味し、1 を実行する前に 0 を実行する必要があります。 ;
紫色のエリア: 例。
3.2 アプリケーションシナリオの説明
一定期間のデータベース全体のロールバック
一定期間のすべての SQL 操作を一定の時点までロールバックする必要があります
-
この場合、ほとんどはバックアップファイル+binlogを使うことで解決します
このスクリプトでも要件を満たすことができますが、最初に-a=0を直接実行せずに、解析結果を確認してください。整合性がある場合は、スレーブ データベースを停止してから、スレーブ データベース上でビジネス アクセスを実行し、指定した時点に復元されているかどうか、およびデータが正常であるかどうかを確認します。
特定の操作は、特定の期間中に特定のテーブルでロールバックされます
たとえば、開発はバッチ更新スクリプトを送信しましたが、すべてのテスト レベルでの検証に問題はありませんでした。オンライン実行のために送信されましたが、実行後にビジネスがテストに失敗したことが判明し、一部のフィールドが更新され、他のビジネスに影響を及ぼしました。バッチ更新されたテーブルを元の行レコードに緊急にロールバックする必要があります。
これは技術的な観点だけで対応することはできず、総合的に考える必要があります
-
この場合、タブAテーブルの変更操作をどのように見直すか?
個人的には、tabA テーブルのデータをテスト環境にダンプし、11 時から 12 時までの binlog ファイルの undo SQL を分析してから、テーブルを 12 時にロールバックする方法がより実現可能だと思います。テスト環境の 11 時の時点で、開発とビジネスはテスト環境の 11 時のデータをオンラインの既存のデータと比較して、どの行と列がオンラインでロールバックされる必要があるかを確認します。必要ない場合は、開発側がそれを送信し、SQL スクリプトがオンラインで実行されます。実際、ここで DBA が提供する役割は 1 つだけで、新しい環境でテーブル タブ A を特定の時点にロールバックするというものですが、SQL 処理の直接オンライン ロールバックは提供しません。
特定の SQL/一部の SQL のロールバック
この状況は、更新または削除で where 条件が欠落しているか、where 条件の実行エラーが発生することが比較的一般的です
-
この場合、対応する トランザクションのロールバックを実行するだけです。ロールバックのプロセスについては上記を参照してください。データベース全体のロールバック 一定の時間帯で
9:10 から 9:15 までの間にデータベース上のすべての操作をロールバックする必要があるとします:
准备测试环境实例存储分析后的数据
测试环境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python脚本分析文件,action=0
线上测试环境修改set global max_allowed_packet = 1073741824
回滚数据,action=1
线上测试环境修改set global max_allowed_packet = 4194304
1 --测试环境(请安装pymysql):IP: 192.168.9.242,PORT:3310 ,数据库:flashback,表格:tbevent 2 --具有线上表结构的db:IP:192.168.9.243 PORT:3310 3 4 5 mysql> show global variables like 'max_allowed_packet'; 6 +--------------------+----------+ 7 | Variable_name | Value | 8 +--------------------+----------+ 9 | max_allowed_packet | 16777216 |10 +--------------------+----------+11 1 row in set (0.00 sec)12 13 mysql> set global max_allowed_packet = 1073741824;14 Query OK, 0 rows affected (0.00 sec)15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=019 2017-06-19 10:59:39,041 INFO begin to assign values to parameters20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331023 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok24 2017-06-19 10:59:39,054 INFO MySQL connection is ok25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent 27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!28 2017-06-19 10:59:39,061 INFO analyzing...29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!30 2017-06-19 11:49:53,782 INFO release all db connections31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=135 2017-06-19 16:30:20,633 INFO begin to assign values to parameters36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331039 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok40 2017-06-19 16:30:20,871 INFO MySQL connection is ok41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions 42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空间忽略不截图--------------46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310
ログイン後にコピー3.3.2 某段时间某些表格回滚某些操作
准备测试环境实例存储分析后的数据
测试环境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python脚本分析文件,action=0
分析帅选需要的事务,rename表格
dump 对应的表格到测试环境
回滚数据,action=1
提交给开发业务对比数据
3.3.3 回滚某个/些SQL
准备测试环境实例存储分析后的数据
测试环境修改set global max_allowed_packet = 1073741824
mysqlbinlog分析binlog文件
python脚本分析文件,action=0
分析帅选需要的事务,rename表格
dump 对应的表格到测试环境
回滚数据,action=1
提交给开发业务对比数据
4 python脚本
脚本会不定期修复bug,若是感兴趣,可以往github下载: 中的 mysql_xinysu_flashback 。
1 # -*- coding: utf-8 -*- 2 __author__ = 'xinysu' 3 __date__ = '2017/6/15 10:30' 4 5 6 7 import re 8 import os 9 import sys 10 import datetime 11 import time 12 import logging 13 import importlib 14 importlib.reload(logging) 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 16 17 import pymysql 18 from pymysql.cursors import DictCursor 19 20 usage='''\nusage: python [script's path] [option] 21 ALL options need to assign: 22 \033[1;33;40m 23 -h : host, the database host,which database will store the results after analysis 24 -u : user, the db user 25 -p : password, the db user's password 26 -P : port, the db port 27 28 -f : file path, the binlog file 29 -t : table name, the table name to store the results after analysis , {dbname}.{tbname}, 30 when you want to store in `test` db and the table name is `tbevent`,then this parameter 31 is test.tbevent 32 \033[1;34;40m 33 -oh : online host, the database host,which database have the online table schema 34 -ou : online user, the db user 35 -op : online password, the db user's password 36 -oP : online port, the db port 37 \033[1;32;40m 38 -a : action, 39 0 just analyse the binlog file ,and store sql in table; 40 1 after execute self.dotype=0, execute the undo_sql in the table 41 \033[0m 42 --help: help document 43 \033[1;35;40m 44 Example: 45 analysize binlog: 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 47 -oh=192.168.9.244 -oP=3310 -u=root -op=*** 48 -a=0 49 50 flash back: 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 52 -oh=192.168.9.244 -oP=3310 -u=root -op=*** 53 -a=1 54 \033[0m 55 ''' 56 57 class flashback: 58 def __init__(self): 59 self.host='' 60 self.user='' 61 self.password='' 62 self.port='3306' 63 self.fpath='' 64 self.tbevent='' 65 66 self.on_host='' 67 self.on_user='' 68 self.on_password='' 69 self.on_port='3306' 70 71 self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table 72 73 self._get_db() # 从输入参数获取连接数据库的相关参数值 74 75 # 连接数据库,该数据库是用来存储binlog文件分析后的内容 76 logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent)) 77 self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 78 self.cur = self.mysqlconn.cursor(cursor=DictCursor) 79 logging.info('MySQL which userd to store binlog event connection is ok') 80 81 # 连接数据库,该数据库的表结构必须跟binlogfile基于对数据库表结构一致 82 # 该数据库用于提供 binlog file 文件中涉及到表结构分析 83 logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port)) 84 self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8') 85 self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor) 86 logging.info('MySQL which userd to analyse online table schema connection is ok') 87 88 logging.info('\033[33mMySQL connection is ok\033[0m') 89 90 self.dml_sql='' 91 self.undo_sql='' 92 93 self.tbfield_where = [] 94 self.tbfield_set = [] 95 96 self.begin_time='' 97 self.db_name='' 98 self.tb_name='' 99 self.end_time=''100 self.end_pos=''101 self.sqltype=0102 103 #_get_db用于获取执行命令的输入参数104 def _get_db(self):105 logging.info('begin to assign values to parameters')106 if len(sys.argv) == 1:107 print(usage)108 sys.exit(1)109 elif sys.argv[1] == '--help':110 print(usage)111 sys.exit()112 elif len(sys.argv) > 2:113 for i in sys.argv[1:]:114 _argv = i.split('=')115 if _argv[0] == '-h':116 self.host = _argv[1]117 elif _argv[0] == '-u':118 self.user = _argv[1]119 elif _argv[0] == '-P':120 self.port = int(_argv[1])121 elif _argv[0] == '-f':122 self.fpath = _argv[1]123 elif _argv[0] == '-t':124 self.tbevent = _argv[1]125 elif _argv[0] == '-p':126 self.password = _argv[1]127 128 elif _argv[0] == '-oh':129 self.on_host = _argv[1]130 elif _argv[0] == '-ou':131 self.on_user = _argv[1]132 elif _argv[0] == '-oP':133 self.on_port = int(_argv[1])134 elif _argv[0] == '-op':135 self.on_password = _argv[1]136 137 elif _argv[0] == '-a':138 self.action = _argv[1]139 140 else:141 print(usage)142 143 #创建表格,用于存储分析后的BINLOG内容144 def create_tab(self):145 logging.info('creating table {} to store binlog event'.format(self.tbevent))146 create_tb_sql ='''147 CREATE TABLE IF NOT EXISTS {}(148 auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,149 binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',150 dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',151 dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',152 end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',153 db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',154 table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',155 sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',156 dml_sql LONGTEXT NULL COMMENT 'what sql excuted',157 undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',158 PRIMARY KEY (auto_id),159 INDEX sqltype(sqltype),160 INDEX dml_start_time (dml_start_time),161 INDEX dml_end_time (dml_end_time),162 INDEX end_log_pos (end_log_pos),163 INDEX db_name (db_name),164 INDEX table_name (table_name)165 )166 COLLATE='utf8_general_ci' ENGINE=InnoDB;167 TRUNCATE TABLE {};168 169 '''.format(self.tbevent,self.tbevent)170 self.cur.execute(create_tb_sql)171 logging.info('created table {} '.format(self.tbevent))172 173 #获取表格的列顺序对应的列名,并处理where set的时候,列与列之间的连接字符串是逗号还是 and174 def tbschema(self,dbname,tbname):175 self.tbfield_where = []176 self.tbfield_set = []177 178 sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)179 180 self.on_cur.execute(sql_tb)181 tbcol=self.on_cur.fetchall()182 183 i = 0184 for l in tbcol:185 #self.tbfield.append(l['Field'])186 if i==0:187 self.tbfield_where.append('`'+l['Field']+'`')188 self.tbfield_set.append('`'+l['Field']+'`')189 i+=1190 else:191 self.tbfield_where.append('/*where*/ and /*where*/' + '`'+l['Field']+'`')192 self.tbfield_set.append( '/*set*/ , /*set*/'+'`'+l['Field']+'`' )193 194 # 一个事务记录一行,若binlog file中的行记录包含 Table_map,则为事务的开始记录195 def rowrecord(self,bl_line):196 try:197 if bl_line.find('Table_map:') != -1:198 l = bl_line.index('server')199 m = bl_line.index('end_log_pos')200 n = bl_line.index('Table_map')201 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')202 203 self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]204 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]205 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]206 207 self.tbschema(self.db_name,self.tb_name)208 except Exception:209 return 'funtion rowrecord error'210 211 def dml_tran(self,bl_line):212 try:213 214 215 if bl_line.find('Xid =') != -1:216 217 l = bl_line.index('server')218 m = bl_line.index('end_log_pos')219 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')220 self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]221 self.end_pos = int(bl_line[m::].split(' ')[1])222 223 224 225 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')226 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')227 228 if self.dml_sql.startswith(' INSERT INTO '):229 self.sqltype=1230 elif self.dml_sql.startswith(' UPDATE '):231 self.sqltype=2232 elif self.dml_sql.startswith(' DELETE '):233 self.sqltype=3234 235 record_sql = ''236 undosql_desc = ''237 238 #同个事务内部的行记录修改SQL,反序存储239 for l in self.undo_sql.splitlines():240 if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):241 undosql_desc = record_sql + undosql_desc242 record_sql = ''243 record_sql = record_sql + l244 else:245 record_sql = record_sql + l246 247 self.undo_sql = record_sql + undosql_desc248 self.undo_sql = self.undo_sql.lstrip()[1:]+';'249 250 #处理非空格的空白特殊字符251 self.dml_sql = self.esc_code(self.dml_sql)252 self.undo_sql = self.esc_code(self.undo_sql)253 254 #单独处理 转移字符: \'255 self.dml_sql = self.dml_sql.replace("'", "''").replace('\\x27',"''''") # + ';'256 self.undo_sql = self.undo_sql.replace("'", "''").replace('\\x27',"''''") # + ';'257 258 if len(self.dml_sql)>500000000:259 with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql', 'w') as w_f:260 w_f.write('begin;' + '\n')261 w_f.write(self.undo_sql)262 w_f.write('commit;' + '\n')263 self.dml_sql=''264 self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'265 logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))266 267 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(268 self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,269 self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)270 271 self.cur.execute(insert_sql)272 self.mysqlconn.commit()273 274 self.dml_sql = ''275 self.undo_sql = ''276 except Exception:277 print( 'funtion dml_tran error')278 279 280 def analyse_binlog(self):281 try:282 sqlcomma=0283 self.create_tab()284 285 with open(self.fpath,'r') as binlog_file:286 logging.info('\033[36mbegining to analyze the binlog file ,this may be take a long time !!!\033[0m')287 logging.info('\033[36manalyzing...\033[0m')288 for bline in binlog_file:289 if bline.find('Table_map:') != -1:290 self.rowrecord(bline)291 bline=''292 elif bline.rstrip()=='### SET':293 bline = bline[3:]294 sqlcomma=1295 elif bline.rstrip()=='### WHERE':296 bline = bline[3:]297 sqlcomma = 2298 elif bline.startswith('### @'):299 len_f=len('### @')300 i=bline[len_f:].split('=')[0]301 302 #处理timestamp类型303 if bline[8+len(i):].split(' ')[2] == 'TIMESTAMP(0)':304 stop_pos = bline.find(' /* TIMESTAMP(0) meta=')305 bline = bline.split('=')[0] + '=from_unixtime(' + bline[:stop_pos].split('=')[1] + ')'306 307 #处理负数存储方式308 if bline.split('=')[1].startswith('-'):309 stop_pos = bline.find(' /* TIMESTAMP(0) meta=')310 bline = bline.split('=')[0] + '=' + bline.split('=')[1].split(' ')[0]+'\n'311 312 if sqlcomma==1:313 bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]314 elif sqlcomma==2:315 bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]316 317 elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):318 bline = bline[3:]319 320 elif bline.find('Xid =') != -1:321 self.dml_tran(bline)322 bline=''323 else:324 bline = ''325 326 if bline.rstrip('\n') != '':327 self.dml_sql = self.dml_sql + bline + ' '328 except Exception:329 return 'function do error'330 331 def esc_code(self,sql):332 esc={333 '\\x07':'\a','\\x08':'\b','\\x0c':'\f','\\x0a':'\n','\\x0d':'\r','\\x09':'\t','\\x0b':'\v','\\x5c':'\\',334 #'\\x27':'\'',335 '\\x22':'\"','\\x3f':'\?','\\x00':'\0'336 }337 338 for k,v in esc.items():339 sql=sql.replace(k,v)340 return sql341 342 def binlogdesc(self):343 344 countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)345 print(countsql)346 self.cur.execute(countsql)347 count_row=self.cur.fetchall()348 349 update_count=0350 insert_couont=0351 delete_count=0352 for row in count_row:353 if row['sqltype']==1:354 insert_couont=row['numbers']355 elif row['sqltype']==2:356 update_count=row['numbers']357 elif row['sqltype']==3:358 delete_count=row['numbers']359 logging.info('\033[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !\033[0m(all number is accurate, the other is approximate value) \033[0m'.format(insert_couont+update_count+delete_count,insert_couont,update_count,delete_count))360 361 def undosql(self,number):362 #这里会有几个问题:363 #1 如果一共有几十万甚至更多的事务操作,那么这个python脚本,极为占用内存,有可能执行错误;364 #2 如果单个事务中,涉及修改的行数高达几十万行,其binlog file 达好几G,这里也会有内存损耗问题;365 #所以,针对第一点,这里考虑对超多事务进行一个分批执行处理,每个批次处理number个事务,避免一次性把所有事务放到python中;但是第2点,目前暂未处理366 367 tran_num=1368 id=0369 370 tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)371 372 self.cur.execute(tran_num_sql)373 tran_rows=self.cur.fetchall()374 375 for num in tran_rows:376 tran_num=num['table_rows']377 378 logging.info('\033[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))379 380 while id<=tran_num:381 logging.info('doing batch : {} '.format(int(id/number)+1))382 undo_sql='select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)383 self.cur.execute(undo_sql)384 385 undo_rows=self.cur.fetchall()386 f_sql=''387 388 for u_row in undo_rows:389 try:390 self.on_cur.execute(u_row['undo_sql'])391 self.on_mysqlconn.commit()392 except Exception:393 print('auto_id:',u_row['auto_id'])394 id+=number395 396 397 def undo_file(self,number):398 # 也可以选择私用undo_file将undo_sql导入到文件中,然后再source399 400 tran_num=1401 id=0402 403 tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)404 405 self.cur.execute(tran_num_sql)406 tran_rows=self.cur.fetchall()407 408 for num in tran_rows:409 tran_num=num['table_rows']410 411 logging.info('copy undo_sql to undo file on : /tmp/flashback_undosql/undo_file_flashback.sql')412 logging.info('\033[32mThere has {} transactions ,need {} batchs to copy ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))413 414 with open('/tmp/flashback_undosql/undo_file_flashback.sql', 'w') as w_f:415 while id<=tran_num:416 logging.info('doing batch : {} '.format(int(id/number)+1))417 undo_sql='select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)418 self.cur.execute(undo_sql)419 420 undo_rows=self.cur.fetchall()421 for u_row in undo_rows:422 try:423 w_f.write('begin;' + '\n')424 w_f.write('# auto_id'+str(u_row['auto_id']) + '\n')425 w_f.write(u_row['undo_sql'] + '\n')426 w_f.write('commit;' + '\n')427 except Exception:428 print('auto_id',u_row['auto_id'])429 #time.sleep(2)430 id+=number431 432 def do(self):433 if self.action=='0':434 self.analyse_binlog()435 logging.info('\033[36mfinished to analyze the binlog file !!!\033[0m')436 #self.binlogdesc()437 elif self.action=='1':438 self.undosql(10000)439 440 def closeconn(self):441 self.cur.close()442 self.on_cur.close()443 logging.info('release all db connections')444 logging.info('\033[33mAll done,check the {} which stored binlog event on host {} , port {} \033[0m'.format(self.tbevent,self.host,self.port))445 446 def main():447 p = flashback()448 p.do()449 p.closeconn()450 451 if __name__ == "__main__":452 main()
ログイン後にコピー
以上がmysql ベースの binlog ロールバック ツールの例の詳細な説明の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。

ホットAIツール

Undresser.AI Undress
リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover
写真から衣服を削除するオンライン AI ツール。

Undress AI Tool
脱衣画像を無料で

Clothoff.io
AI衣類リムーバー

AI Hentai Generator
AIヘンタイを無料で生成します。

人気の記事

ホットツール

メモ帳++7.3.1
使いやすく無料のコードエディター

SublimeText3 中国語版
中国語版、とても使いやすい

ゼンドスタジオ 13.0.1
強力な PHP 統合開発環境

ドリームウィーバー CS6
ビジュアル Web 開発ツール

SublimeText3 Mac版
神レベルのコード編集ソフト(SublimeText3)

ホットトピック









Apacheはデータベースに接続するには、次の手順が必要です。データベースドライバーをインストールします。 web.xmlファイルを構成して、接続プールを作成します。 JDBCデータソースを作成し、接続設定を指定します。 JDBC APIを使用して、接続の取得、ステートメントの作成、バインディングパラメーター、クエリまたは更新の実行、結果の処理など、Javaコードのデータベースにアクセスします。

この記事では、DebianシステムのHadoopデータ処理効率を改善する方法について説明します。最適化戦略では、ハードウェアのアップグレード、オペレーティングシステムパラメーターの調整、Hadoop構成の変更、および効率的なアルゴリズムとツールの使用をカバーしています。 1.ハードウェアリソースの強化により、すべてのノードが一貫したハードウェア構成、特にCPU、メモリ、ネットワーク機器のパフォーマンスに注意を払うことが保証されます。高性能ハードウェアコンポーネントを選択することは、全体的な処理速度を改善するために不可欠です。 2。オペレーティングシステムチューニングファイル記述子とネットワーク接続:/etc/security/limits.confファイルを変更して、システムによって同時に開くことができるファイル記述子とネットワーク接続の上限を増やします。 JVMパラメーター調整:Hadoop-env.shファイルで調整します

Debian Systemsでは、OpenSSLは暗号化、復号化、証明書管理のための重要なライブラリです。中間の攻撃(MITM)を防ぐために、以下の測定値をとることができます。HTTPSを使用する:すべてのネットワーク要求がHTTPの代わりにHTTPSプロトコルを使用していることを確認してください。 HTTPSは、TLS(Transport Layer Security Protocol)を使用して通信データを暗号化し、送信中にデータが盗まれたり改ざんされたりしないようにします。サーバー証明書の確認:クライアントのサーバー証明書を手動で確認して、信頼できることを確認します。サーバーは、urlsessionのデリゲート方法を介して手動で検証できます

Debian Mail ServerにSSL証明書をインストールする手順は次のとおりです。1。最初にOpenSSL Toolkitをインストールすると、OpenSSLツールキットがシステムに既にインストールされていることを確認してください。インストールされていない場合は、次のコマンドを使用してインストールできます。sudoapt-getUpdatesudoapt-getInstalopenssl2。秘密キーと証明書のリクエストを生成次に、OpenSSLを使用して2048ビットRSA秘密キーと証明書リクエスト(CSR)を生成します:Openss

DebianでHadoopログを管理すると、次の手順とベストプラクティスに従うことができます。ログ集約を有効にするログ集約を有効にします。Yarn.log-Aggregation-set yarn-site.xmlファイルでは、ログ集約を有効にします。ログ保持ポリシーの構成:yarn.log-aggregation.retain-secondsを設定して、172800秒(2日)などのログの保持時間を定義します。ログストレージパスを指定:Yarn.Nを介して

DebianシステムのZookeeperバージョンのアップグレードは、以下の手順に従うことができます。1。アップグレード前に既存の構成とデータをバックアップすると、既存のZookeeper構成ファイルとデータディレクトリをバックアップすることを強くお勧めします。 sudocp-r/var/lib/zookeeper/var/lib/zookeeper_backupsudocp/etc/zookeeper/conf/zoo.cfg/etc/zookeeper/conf/zookeeper/z

このガイドでは、Debian SystemsでSyslogの使用方法を学ぶように導きます。 Syslogは、ロギングシステムとアプリケーションログメッセージのLinuxシステムの重要なサービスです。管理者がシステムアクティビティを監視および分析して、問題を迅速に特定および解決するのに役立ちます。 1. syslogの基本的な知識Syslogのコア関数には以下が含まれます。複数のログ出力形式とターゲットの場所(ファイルやネットワークなど)をサポートします。リアルタイムのログ表示およびフィルタリング機能を提供します。 2。syslog(rsyslogを使用)をインストールして構成するDebianシステムは、デフォルトでrsyslogを使用します。次のコマンドでインストールできます:sudoaptupdatesud

DebianでのHadoopデータの改善は、次の方法で達成できます。バランスの取れたハードウェアリソース:HDFSクラスターの各DataNodeノードのハードウェアリソース(CPU、メモリ、ディスク容量など)が互いに類似していることを確認してください。データライティング戦略の最適化:Nodeの負荷条件とバランスの取れたデータ分布を実現するための利用可能なリソースに基づいてストレージのDataNodeノードを動的に選択するなど、HDFSデータライティング戦略を合理的に構成します。バランサーツールの使用:HDを活用します
