目录
1 实现内容
2 原理
2.1 事务的开始与结束
2.4 同事务不同表格处理
2.5 转义字符处理
2.6 timestamp数据类型处理
2.7 负数值处理
2.8 单个事务行记录总SQL超过max_allowed_package处理
2.9 针对性回滚
3.1 参数说明
3.2 应用场景说明
3.3 测试案例
3.3.1 全库回滚某段时间
3.3.2 某段时间某些表格回滚某些操作
3.3.3 回滚某个/些SQL
4 python脚本
首页 数据库 mysql教程 mysql基于binlog回滚工具实例详解

mysql基于binlog回滚工具实例详解

Jun 21, 2017 am 10:27 AM
binlog mysql 基于 工具

 

    update、delete的条件写错甚至没有写,导致数据操作错误,需要恢复被误操作的行记录。这种情形,其实时有发生,可以选择用备份文件+binlog来恢复到测试环境,然后再做数据修复,但是这样其实需要耗费一定的时间跟资源。

    其实,如果binlog format为row,binlog文件中是会详细记录每一个事务涉及到操作,并把每一个事务影响到行记录均存储起来,能否给予binlog 文件来反解析数据库的行记录变动情况呢?
    业界已有不少相关的脚本及工具,但是随着MySQL版本的更新、binlog记录内容的变化以及需求不一致,大多数脚本不太适合个人目前的使用需求,所以开始着手编写 mysql的 flash back脚本。
 


 
    如果转载,请注明博文来源: www.cnblogs.com/xinysu/   ,版权归 博客园 苏家小萝卜 所有。望各位支持!
 


 
     仅在MySQL 5.6/5.7版本测试,python运行环境需要安装pymysql模块。

1 实现内容

    根据binlog文件,对某个\些事务、某段时间某些表、某段时间全库 做回滚操作,实现闪回功能。工具处理过程中,会把binlog中的事务修改的行记录存储到表格中去,通过 dml_sql 列,可以查看每一个事务内部的所有行记录变更情况,通过 undo_sql 查看回滚的SQL内容。如下图,然后再根据表格内容做回滚操作。 
      
    那么这个脚本有哪些优点呢?
  • 回滚分为2个命令:第一个命令 分析binglog并存储进入数据库;第二个命令 执行回滚操作;

  • 回滚的时候,可以把执行脚本跟回滚脚本统一存放到数据库中,可以查看 更新内容以及回滚内容;

  • 根据存储的分析表格,方便指定事务或者指定表格来来恢复;

  • 详细的日志输出,说明分析进度跟执行进度。

    分析binlog的输出截图(分析1G的binlog文件)
   
 
   回滚数据库的输出截图:
   

2 原理

    前提:实例启动了binlog并且格式为ROW。
    
    使用python对mysqlbinlog后的log文件进行文本分析处理,在整个处理过程中,有以下6个疑难点需要处理:
  1. 判断一个事务的开始跟结束

  2. 同一个事务的执行顺序需要反序执行

  3. 解析回滚SQL

  4. 同一个事务操作不同表格处理

  5. 转义字符处理,比如换行符、tab符等等

  6. timestamp数据类型参数值转换

  7. 负数处理

  8. 单个事务涉及到行修改SQL操作了 max_allow

  9. 针对某个表格做回滚,而不是全库回滚

    

2.1 事务的开始与结束

    按照Xid出现的位置来判断,从binlog文件的最开始开始读取,遇到SQL语句则提取出来,直到遇到Xid,统一把之前提取出来的SQL汇总为一个事务,然后继续提取SQL语句,直到遇到下一个Xid,再把这个事务的SQL汇总成一个事务,一直这样循环,直至文件顺序遍历结束。
   

 

2.2 事务内部反序处理

    同一个事务中,如果有多个表格多行记录发生变更,在回滚的时候,应该反序回滚SQL,那么,如何将提取出来的SQL反序存储呢?思路如下:
  • 每行记录的修改SQL独立出来

  • 将独立出来的SQL反序存储

   假设正序的事务SQL语句存储在变量 dml_sql 中,反序后的可以回滚的SQL存储在变量 undo_sql中。按顺序把行记录修改的SQL抽取出来 存储到变量 record_sql 中去,然后 赋值 undo_sql =record_sql + undo_sql ,再置空 record_sql 变量,如此,便可实现反序事务内部的执行SQL。

2.3 解析回滚SQL

    首先,查看binlog的日志内容,发现行修改的SQL情形如下,提取过程中需要注意这几个问题:
  • 行记录的列名配对,binlog file存储的列序号,不能直接使用

  • WHERE部分 跟 SET部分 之间并无关键字或者符号,需要添加 AND 或者 逗号

  • DELETE SQL 需要反转为 INSERT

  • UPDATE SQL 需要把WHERE 跟 SET的部分进行替换

  • INSERT SQL需要反转为 DELETE

2.4 同事务不同表格处理

    同一个事务中,允许对不同表格进行数据修改,这点在列名替换列序号的时候,需要留意处理。
    每一个的行记录前有一行记录,含有 'Table_map' 标识,会说明这一行当行记录是修改哪个表格,可以根据这个提示,来替换binlog里边的列序号为列名。

2.5 转义字符处理

    binlog文件在对非空格的空白字符处理,采用转义字符字符串存储,比如,在表格insert一列记录含换行符,而实际上在binlog文件中,是使用了 \x0a 替换了 换行操作,所以在回滚数据的过程中,需要对转义字符做处理。
   

   

 

    
    这里注意一个地方,039的转义字符是没有在函数 esc_code 中统一处理,而是单独做另外处理。
   

 

    转移字符表相见下图:
   

 

2.6 timestamp数据类型处理

    timestamp实际在数据库中的存储值是 INT类型,需要使用 函数 from_unixtime转换。
    建立测试表格tbtest,只有一列timestamp的列,存储值后查看binlog的内容,具体截图如下:
   

 

    在处理行记录的时候,要对timestamp的value做处理,添加from_unixtime函数转换。

2.7 负数值处理

    这个一开始写代码的时候,并没有考虑到。大量测试的过程中发现,所有整型的数据类型,在存储负数的时候,都会存入一个最大范围值。binlog在处理这块的机制有些不是很了解。测试如下:
   

 

   所以当遇到INT的各种数据类型并且VALUE为负数的时候,需要把 这个范围值去除,才能执行执行undo_sql。

2.8 单个事务行记录总SQL超过max_allowed_package处理

    分析binlog后存储两种sql类型,一种是行记录的修改SQL,即 dml_sql;一种是 行记录的回滚sql,即 undo_sql。从代码可知,存储这两个sql的列是longtext,最大可存储4G的内容。但是 MySQL中单个会话的包大小是有限制的,限制的参数为 max_allowed_packet,默认大小为 4Mb,最大为1G,所以这个脚本使用前,请手动设置 存储binlog file的数据库实例以及线上的数据库实例这个参数:
set global max_allowed_packet = 1073741824; #记得后续修改回来
    
    万一操作了呢?那么回滚只能分段来回滚,先回滚到 这个大事务,然后单独执行这个大事务,紧接着继续回滚,这部分不能使用pymysql嗲用source 文件执行,所以只能手动做这个操作。 求高能人士修改这个逻辑代码!!!

2.9 针对性回滚

    假设误操作的没有明确的时间点,只有一个区间,而这个区间还有其他的表格操作,那么这个时候,需要在分析binlog文件的时候,添加--database选项,先帅选到同一个数据库中binlog文件中。
    这里的处理是将这段区间的dml_sql跟undo_sql都存储到数据库表格中,然后再删除不需要回滚的事务,剩余需要回滚的事务。再执行回滚操作。

3 使用说明

3.1 参数说明

    这个脚本的参数稍微多些,可以 --help 查看具体说明。
   

 

    本人喜欢用各种颜色来分类参数(blingbling五颜六色,看着多有趣多精神),所以,按颜色来说明这些参数。
  • 黄色区域:这6个参数,提供的是 分析并存储binlog file的相关值,说明存储分析结果的数据库的链接方式、binlog文件的位置以及存储结果的表格名字;

  • 蓝色区域:这4个参数,提供 与线上数据库表结构一致的DB实例连接方式,仅需跟线上一模一样的表结构,不一定需要是主从库;

  • 绿色区域:最最重要的选项 -a,0代表仅分析binlog文件,1代表仅执行回滚操作,必须先执行0才可以执行1;

  • 紫色区域:举例说明。

3.2 应用场景说明

  • 全库回滚某段时间

    • 需要回滚某个时间段的所有SQL操作,回滚到某一个时间点

    • 这种情况下呢,大多数是使用备份文件+binlog解决

    • 但是这个脚本也可以满足,但请勿直接在线上操作,先 -a=0,看下分析结果,是否符合,符合的话,停掉某个从库,再在从库上执行,最后开发业务接入检查是否恢复到指定时间点,数据是否正常。

  • 某段时间某些表格回滚某些操作

    • 比如,开发提交了一个批量更新脚本,各个测试层面验证没有问题,提交线上执行,但是执行后,发现有个业务漏测试,导致某些字段更新后影响到其他业务,现在需要紧急 把被批量更新的表格回滚到原先的行记录

    • 这个并不能单纯从技术角度来处理,要综合考虑

      •  

      • 这种情况下,如何回顾tab A表格的修改操作呢?

      • 个人觉得,这种方式比较行得通,dump tabA表格的数据到测试环境,然后再分析 binlog file 从11点-12点的undo sql,接着在测试环境回滚该表格到11点这个时刻,紧接着,由开发跟业务对比测试环境11点的数据跟线上现有的数据中,看下是哪些行哪些列需要在线上进行回滚,哪些是不需要的,然后开发再提交SQL脚本,再在线上执行。其实,这里边,DBA仅提供一个角色,就是把 表格 tab A 在一个新的环境上,回滚到某个时间点,但是不提供直接线上回滚SQL的处理。

  • 回滚某个/些SQL

    • 这种情况比较常见,某个update某个delete缺少where条件或者where条件执行错误

    • 这种情况下,找到对应的事务,执行回滚即可,回滚流程请参考上面一说,对的,我就是这么胆小怕事 

       

3.3 测试案例

3.3.1 全库回滚某段时间

假设需要回滚9点10分到9点15分间数据库的所有操作:
  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 线上测试环境修改set global max_allowed_packet = 1073741824

  • 回滚数据,action=1

  • 线上测试环境修改set global max_allowed_packet = 4194304

 1 --测试环境(请安装pymysql):IP: 192.168.9.242,PORT:3310 ,数据库:flashback,表格:tbevent 2 --具有线上表结构的db:IP:192.168.9.243 PORT:3310 3  4  5 mysql> show global variables like 'max_allowed_packet'; 6 +--------------------+----------+ 7 | Variable_name      | Value    | 8 +--------------------+----------+ 9 | max_allowed_packet | 16777216 |10 +--------------------+----------+11 1 row in set (0.00 sec)12 13 mysql> set global max_allowed_packet = 1073741824;14 Query OK, 0 rows affected (0.00 sec)15 16 [root@sutest244 ~]# mysqlbinlog --start-datetime='2017-06-19 09:00:00' --stop-datetime='2017-06-19 10:00:00' --base64-output=decode-rows -v ~/data/mysql/data/mysql-bin.007335 > /tmp/binlog.log17 18 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=019 2017-06-19 10:59:39,041 INFO begin to assign values to parameters20 2017-06-19 10:59:39,041 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent21 2017-06-19 10:59:39,049 INFO MySQL which userd to store binlog event connection is ok22 2017-06-19 10:59:39,050 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331023 2017-06-19 10:59:39,054 INFO MySQL which userd to analyse online table schema connection is ok24 2017-06-19 10:59:39,054 INFO MySQL connection is ok25 2017-06-19 10:59:39,055 INFO creating table flashback.tbevent to store binlog event26 2017-06-19 10:59:39,058 INFO created table flashback.tbevent 
27 2017-06-19 10:59:39,060 INFO begining to analyze the binlog file ,this may be take a long time !!!28 2017-06-19 10:59:39,061 INFO analyzing...29 2017-06-19 11:49:53,781 INFO finished to analyze the binlog file !!!30 2017-06-19 11:49:53,782 INFO release all db connections31 2017-06-19 11:49:53,782 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310 32 33 34 [root@sutest242 pycharm]# python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=**** -f=/tmp/binlog.log -t=flashback.tbevent -oh=192.168.9.244 -oP=3310 -u=root -op=**** -a=135 2017-06-19 16:30:20,633 INFO begin to assign values to parameters36 2017-06-19 16:30:20,635 INFO assign values to parameters is done:host=127.0.0.1,user=root,password=***,port=3310,fpath=/tmp/binlog.log,tbevent=flashback.tbevent37 2017-06-19 16:30:20,865 INFO MySQL which userd to store binlog event connection is ok38 2017-06-19 16:30:20,866 INFO assign values to online mysql parameters is done:host=192.168.9.244,user=,password=***,port=331039 2017-06-19 16:30:20,871 INFO MySQL which userd to analyse online table schema connection is ok40 2017-06-19 16:30:20,871 INFO MySQL connection is ok41 2017-06-19 16:30:21,243 INFO There has 347868 transactions ,need 35 batchs ,each batche doing 10000 transactions 
42 2017-06-19 16:30:21,243 INFO doing batch : 1 43 2017-06-19 16:31:01,182 INFO doing batch : 2 44 2017-06-19 16:31:16,909 INFO doing batch : 3 45 -------省空间忽略不截图--------------46 2017-06-19 16:41:11,287 INFO doing batch : 34 47 2017-06-19 16:41:25,577 INFO doing batch : 35 48 2017-06-19 16:41:44,629 INFO release all db connections49 2017-06-19 16:41:44,630 INFO All done,check the flashback.tbevent which stored binlog event on host 127.0.0.1 , port 3310
登录后复制

3.3.2 某段时间某些表格回滚某些操作

  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 分析帅选需要的事务,rename表格

  • dump 对应的表格到测试环境

  • 回滚数据,action=1

  • 提交给开发业务对比数据

3.3.3 回滚某个/些SQL

  • 准备测试环境实例存储分析后的数据 

  • 测试环境修改set global max_allowed_packet = 1073741824

  • mysqlbinlog分析binlog文件

  • python脚本分析文件,action=0

  • 分析帅选需要的事务,rename表格

  • dump 对应的表格到测试环境

  • 回滚数据,action=1

  • 提交给开发业务对比数据

4 python脚本

     脚本会不定期修复bug,若是感兴趣,可以往github下载: 中的 mysql_xinysu_flashback 。

  1 # -*- coding: utf-8 -*-  2 __author__ = 'xinysu'  3 __date__ = '2017/6/15 10:30'  4   5   6   7 import re  8 import os  9 import sys 10 import datetime 11 import time 12 import logging 13 import importlib 14 importlib.reload(logging) 15 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 16  17 import pymysql 18 from pymysql.cursors import DictCursor 19  20 usage='''\nusage: python [script's path] [option] 21 ALL options need to assign: 22 \033[1;33;40m 23 -h    : host, the database host,which database will store the results after analysis 24 -u    : user, the db user 25 -p    : password, the db user's password 26 -P    : port, the db port 27  28 -f    : file path, the binlog file 29 -t    : table name, the table name to store the results after analysis , {dbname}.{tbname}, 30         when you want to store in `test` db and the table name is `tbevent`,then this parameter 
 31         is test.tbevent 32 \033[1;34;40m 33 -oh   : online host, the database host,which database have the online table schema 34 -ou   : online user, the db user 35 -op   : online password, the db user's password 36 -oP   : online port, the db port 37 \033[1;32;40m 38 -a    : action, 
 39         0 just analyse the binlog file ,and store sql in table; 
 40         1 after execute self.dotype=0, execute the undo_sql in the table 41 \033[0m  
 42 --help: help document 43 \033[1;35;40m 44 Example: 45 analysize binlog: 46 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 47                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 48                        -a=0 49  50 flash back: 51 python su_flashback.py -h=127.0.0.1 -P=3310 -u=root -p=*** -f=/tmp/binlog.log -t=flashback.tbevent 
 52                        -oh=192.168.9.244 -oP=3310 -u=root -op=*** 
 53                        -a=1 54 \033[0m                        
 55 ''' 56  57 class flashback: 58     def __init__(self): 59         self.host='' 60         self.user='' 61         self.password='' 62         self.port='3306' 63         self.fpath='' 64         self.tbevent='' 65  66         self.on_host='' 67         self.on_user='' 68         self.on_password='' 69         self.on_port='3306' 70  71         self.action=0 # 0 just analyse the binlog file ,and store sql in table;1 after execute self.dotype=0, execute the undo_sql in the table 72  73         self._get_db() # 从输入参数获取连接数据库的相关参数值 74  75         # 连接数据库,该数据库是用来存储binlog文件分析后的内容 76         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tbevent={}'.format(self.host,self.user,self.port,self.fpath,self.tbevent)) 77         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 78         self.cur = self.mysqlconn.cursor(cursor=DictCursor) 79         logging.info('MySQL which userd to store binlog event connection is ok') 80  81         # 连接数据库,该数据库的表结构必须跟binlogfile基于对数据库表结构一致 82         # 该数据库用于提供 binlog file 文件中涉及到表结构分析 83         logging.info('assign values to online mysql parameters is done:host={},user={},password=***,port={}'.format(self.on_host, self.on_user, self.on_port)) 84         self.on_mysqlconn = pymysql.connect(host=self.on_host, user=self.on_user, password=self.on_password, port=self.on_port,charset='utf8') 85         self.on_cur = self.on_mysqlconn.cursor(cursor=DictCursor) 86         logging.info('MySQL which userd to analyse online table schema connection is ok') 87  88         logging.info('\033[33mMySQL connection is ok\033[0m') 89  90         self.dml_sql='' 91         self.undo_sql='' 92  93         self.tbfield_where = [] 94         self.tbfield_set = [] 95  96         self.begin_time='' 97         self.db_name='' 98         self.tb_name='' 99         self.end_time=''100         self.end_pos=''101         self.sqltype=0102 103     #_get_db用于获取执行命令的输入参数104     def _get_db(self):105         logging.info('begin to assign values to parameters')106         if len(sys.argv) == 1:107             print(usage)108             sys.exit(1)109         elif sys.argv[1] == '--help':110             print(usage)111             sys.exit()112         elif len(sys.argv) > 2:113             for i in sys.argv[1:]:114                 _argv = i.split('=')115                 if _argv[0] == '-h':116                     self.host = _argv[1]117                 elif _argv[0] == '-u':118                     self.user = _argv[1]119                 elif _argv[0] == '-P':120                     self.port = int(_argv[1])121                 elif _argv[0] == '-f':122                     self.fpath = _argv[1]123                 elif _argv[0] == '-t':124                     self.tbevent = _argv[1]125                 elif _argv[0] == '-p':126                     self.password = _argv[1]127 128                 elif _argv[0] == '-oh':129                     self.on_host = _argv[1]130                 elif _argv[0] == '-ou':131                     self.on_user = _argv[1]132                 elif _argv[0] == '-oP':133                     self.on_port = int(_argv[1])134                 elif _argv[0] == '-op':135                     self.on_password = _argv[1]136 137                 elif _argv[0] == '-a':138                     self.action = _argv[1]139 140                 else:141                     print(usage)142 143     #创建表格,用于存储分析后的BINLOG内容144     def create_tab(self):145         logging.info('creating table {} to store binlog event'.format(self.tbevent))146         create_tb_sql ='''147         CREATE TABLE IF NOT EXISTS {}(148             auto_id INT(10) UNSIGNED NOT NULL AUTO_INCREMENT,149             binlog_name VARCHAR(100) NOT NULL COMMENT 'the binlog file path and name',150             dml_start_time DATETIME NOT NULL COMMENT 'when to start this transaction ',151             dml_end_time DATETIME NOT NULL COMMENT 'when to finish this transaction ',152             end_log_pos BIGINT NOT NULL COMMENT 'the log position for finish this transaction',153             db_name VARCHAR(100) NOT NULL COMMENT 'which database happened this transaction ',154             table_name VARCHAR(200) NOT NULL COMMENT 'which table happened this transaction ',155             sqltype INT NOT NULL COMMENT '1 is insert,2 is update,3 is delete',156             dml_sql LONGTEXT NULL  COMMENT 'what sql excuted',157             undo_sql LONGTEXT NULL COMMENT 'rollback sql, this sql used for flashback',158             PRIMARY KEY (auto_id),159             INDEX sqltype(sqltype),160             INDEX dml_start_time (dml_start_time),161             INDEX dml_end_time (dml_end_time),162             INDEX end_log_pos (end_log_pos),163             INDEX db_name (db_name),164             INDEX table_name (table_name)165         )166         COLLATE='utf8_general_ci' ENGINE=InnoDB;167         TRUNCATE TABLE {};168 169         '''.format(self.tbevent,self.tbevent)170         self.cur.execute(create_tb_sql)171         logging.info('created table {} '.format(self.tbevent))172 173     #获取表格的列顺序对应的列名,并处理where set的时候,列与列之间的连接字符串是逗号还是 and174     def tbschema(self,dbname,tbname):175         self.tbfield_where = []176         self.tbfield_set = []177 178         sql_tb='desc {}.{}'.format(self.db_name,self.tb_name)179 180         self.on_cur.execute(sql_tb)181         tbcol=self.on_cur.fetchall()182 183         i = 0184         for l in tbcol:185             #self.tbfield.append(l['Field'])186             if i==0:187                 self.tbfield_where.append('`'+l['Field']+'`')188                 self.tbfield_set.append('`'+l['Field']+'`')189                 i+=1190             else:191                 self.tbfield_where.append('/*where*/ and /*where*/' + '`'+l['Field']+'`')192                 self.tbfield_set.append( '/*set*/ , /*set*/'+'`'+l['Field']+'`' )193 194     # 一个事务记录一行,若binlog file中的行记录包含 Table_map,则为事务的开始记录195     def rowrecord(self,bl_line):196         try:197             if bl_line.find('Table_map:') != -1:198                 l = bl_line.index('server')199                 m = bl_line.index('end_log_pos')200                 n = bl_line.index('Table_map')201                 begin_time = bl_line[:l:].rstrip(' ').replace('#', '20')202 203                 self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]204                 self.db_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[0]205                 self.tb_name = bl_line[n::].split(' ')[1].replace('`', '').split('.')[1]206 207                 self.tbschema(self.db_name,self.tb_name)208         except Exception:209             return 'funtion rowrecord error'210 211     def dml_tran(self,bl_line):212         try:213 214 215             if bl_line.find('Xid =') != -1:216 217                 l = bl_line.index('server')218                 m = bl_line.index('end_log_pos')219                 end_time = bl_line[:l:].rstrip(' ').replace('#', '20')220                 self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]221                 self.end_pos = int(bl_line[m::].split(' ')[1])222 223 224 225                 self.undo_sql = self.dml_sql.replace(' INSERT INTO', ';DELETE FROM_su').replace(' UPDATE ',';UPDATE').replace(' DELETE FROM', ';INSERT INTO').replace(';DELETE FROM_su', ';DELETE FROM').replace('WHERE', 'WHERE_marksu').replace('SET', 'WHERE').replace('WHERE_marksu', 'SET').replace('/*set*/ , /*set*/', ' and ').replace('/*where*/ and /*where*/',' , ')226                 self.dml_sql=self.dml_sql.replace('/*set*/ , /*set*/', ' , ').replace('/*where*/ and /*where*/',' and ')227 228                 if self.dml_sql.startswith(' INSERT INTO '):229                     self.sqltype=1230                 elif self.dml_sql.startswith(' UPDATE '):231                     self.sqltype=2232                 elif self.dml_sql.startswith(' DELETE '):233                     self.sqltype=3234 235                 record_sql = ''236                 undosql_desc = ''237 238                 #同个事务内部的行记录修改SQL,反序存储239                 for l in self.undo_sql.splitlines():240                     if l.startswith(' ;UPDATE') or l.startswith(' ;INSERT') or l.startswith(' ;DELETE'):241                         undosql_desc = record_sql + undosql_desc242                         record_sql = ''243                         record_sql = record_sql + l244                     else:245                         record_sql = record_sql + l246 247                 self.undo_sql = record_sql + undosql_desc248                 self.undo_sql = self.undo_sql.lstrip()[1:]+';'249 250                 #处理非空格的空白特殊字符251                 self.dml_sql = self.esc_code(self.dml_sql)252                 self.undo_sql = self.esc_code(self.undo_sql)253 254                 #单独处理 转移字符: \'255                 self.dml_sql = self.dml_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'256                 self.undo_sql = self.undo_sql.replace("'", "''").replace('\\x27',"''''")  # + ';'257 258                 if len(self.dml_sql)>500000000:259                     with open('/tmp/flashback_undosql/'+str(self.end_pos)+'.sql', 'w') as w_f:260                         w_f.write('begin;' + '\n')261                         w_f.write(self.undo_sql)262                         w_f.write('commit;' + '\n')263                     self.dml_sql=''264                     self.undo_sql='/tmp/flashback_undosql/'+str(self.end_pos)+'.sql'265                     logging.info("the size of this transaction is more than 500Mb ,the file location : {}".format(self.undo_file))266 267                 insert_sql = "INSERT INTO {}(binlog_name,dml_start_time,dml_end_time,end_log_pos,db_name,table_name,sqltype,dml_sql,undo_sql) select  '{}','{}','{}','{}','{}','{}',{},'{}','{}'".format(268                     self.tbevent, self.fpath, self.begin_time, self.end_time, self.end_pos,269                     self.db_name, self.tb_name, self.sqltype, self.dml_sql, self.undo_sql)270 271                 self.cur.execute(insert_sql)272                 self.mysqlconn.commit()273 274                 self.dml_sql = ''275                 self.undo_sql = ''276         except Exception:277             print( 'funtion dml_tran error')278 279 280     def analyse_binlog(self):281         try:282             sqlcomma=0283             self.create_tab()284 285             with open(self.fpath,'r') as binlog_file:286                 logging.info('\033[36mbegining to analyze the binlog file ,this may be take a long time !!!\033[0m')287                 logging.info('\033[36manalyzing...\033[0m')288                 for bline in binlog_file:289                     if bline.find('Table_map:') != -1:290                         self.rowrecord(bline)291                         bline=''292                     elif bline.rstrip()=='### SET':293                         bline = bline[3:]294                         sqlcomma=1295                     elif bline.rstrip()=='### WHERE':296                         bline = bline[3:]297                         sqlcomma = 2298                     elif bline.startswith('###   @'):299                         len_f=len('###   @')300                         i=bline[len_f:].split('=')[0]301 302                         #处理timestamp类型303                         if bline[8+len(i):].split(' ')[2] == 'TIMESTAMP(0)':304                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')305                             bline = bline.split('=')[0] + '=from_unixtime(' + bline[:stop_pos].split('=')[1] + ')'306 307                         #处理负数存储方式308                         if bline.split('=')[1].startswith('-'):309                             stop_pos = bline.find(' /* TIMESTAMP(0) meta=')310                             bline = bline.split('=')[0] + '=' + bline.split('=')[1].split(' ')[0]+'\n'311 312                         if sqlcomma==1:313                             bline = self.tbfield_set[int(i) - 1]+bline[(len_f+len(i)):]314                         elif sqlcomma==2:315                             bline = self.tbfield_where[int(i) - 1] + bline[(len_f+len(i)):]316 317                     elif bline.startswith('### DELETE') or bline.startswith('### INSERT') or bline.startswith('### UPDATE'):318                         bline = bline[3:]319 320                     elif bline.find('Xid =') != -1:321                         self.dml_tran(bline)322                         bline=''323                     else:324                         bline = ''325 326                     if bline.rstrip('\n') != '':327                         self.dml_sql = self.dml_sql + bline + ' '328         except Exception:329             return 'function do error'330 331     def esc_code(self,sql):332         esc={333              '\\x07':'\a','\\x08':'\b','\\x0c':'\f','\\x0a':'\n','\\x0d':'\r','\\x09':'\t','\\x0b':'\v','\\x5c':'\\',334             #'\\x27':'\'',335             '\\x22':'\"','\\x3f':'\?','\\x00':'\0'336              }337 338         for k,v in esc.items():339             sql=sql.replace(k,v)340         return sql341 342     def binlogdesc(self):343 344         countsql='select sqltype , count(*) numbers from {} group by sqltype order by sqltype '.format(self.tbevent)345         print(countsql)346         self.cur.execute(countsql)347         count_row=self.cur.fetchall()348 349         update_count=0350         insert_couont=0351         delete_count=0352         for row in count_row:353             if row['sqltype']==1:354                 insert_couont=row['numbers']355             elif row['sqltype']==2:356                 update_count=row['numbers']357             elif row['sqltype']==3:358                 delete_count=row['numbers']359         logging.info('\033[1;35mTotal transactions number is {}: {} inserts, {} updates, {} deletes !\033[0m(all number is accurate, the other is approximate value) \033[0m'.format(insert_couont+update_count+delete_count,insert_couont,update_count,delete_count))360 361     def undosql(self,number):362         #这里会有几个问题:363         #1 如果一共有几十万甚至更多的事务操作,那么这个python脚本,极为占用内存,有可能执行错误;364         #2 如果单个事务中,涉及修改的行数高达几十万行,其binlog file 达好几G,这里也会有内存损耗问题;365         #所以,针对第一点,这里考虑对超多事务进行一个分批执行处理,每个批次处理number个事务,避免一次性把所有事务放到python中;但是第2点,目前暂未处理366 367         tran_num=1368         id=0369 370         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)371 372         self.cur.execute(tran_num_sql)373         tran_rows=self.cur.fetchall()374 375         for num in tran_rows:376             tran_num=num['table_rows']377 378         logging.info('\033[32mThere has {} transactions ,need {} batchs ,each batche doing {} transactions \033[0m'.format(tran_num,int(tran_num/number)+1,number))379 380         while id<=tran_num:381             logging.info(&#39;doing batch : {} &#39;.format(int(id/number)+1))382             undo_sql=&#39;select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;&#39;.format(self.tbevent,tran_num-(id+number),tran_num-id)383             self.cur.execute(undo_sql)384 385             undo_rows=self.cur.fetchall()386             f_sql=&#39;&#39;387 388             for u_row in undo_rows:389                 try:390                     self.on_cur.execute(u_row[&#39;undo_sql&#39;])391                     self.on_mysqlconn.commit()392                 except Exception:393                     print(&#39;auto_id:&#39;,u_row[&#39;auto_id&#39;])394             id+=number395 396 397     def undo_file(self,number):398         # 也可以选择私用undo_file将undo_sql导入到文件中,然后再source399 400         tran_num=1401         id=0402 403         tran_num_sql="select count(*) table_rows from {}".format(self.tbevent)404 405         self.cur.execute(tran_num_sql)406         tran_rows=self.cur.fetchall()407 408         for num in tran_rows:409             tran_num=num[&#39;table_rows&#39;]410 411         logging.info(&#39;copy undo_sql to undo file on : /tmp/flashback_undosql/undo_file_flashback.sql&#39;)412         logging.info(&#39;\033[32mThere has {} transactions ,need {} batchs to copy ,each batche doing {} transactions \033[0m&#39;.format(tran_num,int(tran_num/number)+1,number))413 414         with open(&#39;/tmp/flashback_undosql/undo_file_flashback.sql&#39;, &#39;w&#39;) as w_f:415             while id<=tran_num:416                 logging.info(&#39;doing batch : {} &#39;.format(int(id/number)+1))417                 undo_sql=&#39;select auto_id,undo_sql from {} where auto_id > {} and auto_id <= {} order by auto_id desc;'.format(self.tbevent,tran_num-(id+number),tran_num-id)418                 self.cur.execute(undo_sql)419 420                 undo_rows=self.cur.fetchall()421                 for u_row in undo_rows:422                     try:423                         w_f.write('begin;' + '\n')424                         w_f.write('# auto_id'+str(u_row['auto_id']) + '\n')425                         w_f.write(u_row['undo_sql'] + '\n')426                         w_f.write('commit;' + '\n')427                     except Exception:428                         print('auto_id',u_row['auto_id'])429                     #time.sleep(2)430                 id+=number431 432     def do(self):433         if self.action=='0':434             self.analyse_binlog()435             logging.info('\033[36mfinished to analyze the binlog file !!!\033[0m')436             #self.binlogdesc()437         elif self.action=='1':438             self.undosql(10000)439 440     def closeconn(self):441         self.cur.close()442         self.on_cur.close()443         logging.info('release all db connections')444         logging.info('\033[33mAll done,check the {} which stored binlog event on host {} , port {} \033[0m'.format(self.tbevent,self.host,self.port))445 446 def main():447     p = flashback()448     p.do()449     p.closeconn()450 451 if __name__ == "__main__":452     main()
登录后复制

 

以上是mysql基于binlog回滚工具实例详解的详细内容。更多信息请关注PHP中文网其他相关文章!

本站声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
1 个月前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
1 个月前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
1 个月前 By 尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.聊天命令以及如何使用它们
1 个月前 By 尊渡假赌尊渡假赌尊渡假赌

热工具

记事本++7.3.1

记事本++7.3.1

好用且免费的代码编辑器

SublimeText3汉化版

SublimeText3汉化版

中文版,非常好用

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具

SublimeText3 Mac版

SublimeText3 Mac版

神级代码编辑软件(SublimeText3)

apache怎么连接数据库 apache怎么连接数据库 Apr 13, 2025 pm 01:03 PM

Apache 连接数据库需要以下步骤:安装数据库驱动程序。配置 web.xml 文件以创建连接池。创建 JDBC 数据源,指定连接设置。从 Java 代码中使用 JDBC API 访问数据库,包括获取连接、创建语句、绑定参数、执行查询或更新以及处理结果。

Debian如何提升Hadoop数据处理速度 Debian如何提升Hadoop数据处理速度 Apr 13, 2025 am 11:54 AM

本文探讨如何在Debian系统上提升Hadoop数据处理效率。优化策略涵盖硬件升级、操作系统参数调整、Hadoop配置修改以及高效算法和工具的运用。一、硬件资源强化确保所有节点硬件配置一致,尤其关注CPU、内存和网络设备性能。选择高性能硬件组件对于提升整体处理速度至关重要。二、操作系统调优文件描述符和网络连接数:修改/etc/security/limits.conf文件,增加系统允许同时打开的文件描述符和网络连接数上限。JVM参数调整:在hadoop-env.sh文件中调整

Debian邮件服务器SSL证书安装方法 Debian邮件服务器SSL证书安装方法 Apr 13, 2025 am 11:39 AM

在Debian邮件服务器上安装SSL证书的步骤如下:1.安装OpenSSL工具包首先,确保你的系统上已经安装了OpenSSL工具包。如果没有安装,可以使用以下命令进行安装:sudoapt-getupdatesudoapt-getinstallopenssl2.生成私钥和证书请求接下来,使用OpenSSL生成一个2048位的RSA私钥和一个证书请求(CSR):openss

如何在Debian上升级Zookeeper版本 如何在Debian上升级Zookeeper版本 Apr 13, 2025 am 10:42 AM

在Debian系统上升级Zookeeper版本,可以按照以下步骤进行:1.备份现有配置和数据在进行任何升级之前,强烈建议备份现有的Zookeeper配置文件和数据目录。sudocp-r/var/lib/zookeeper/var/lib/zookeeper_backupsudocp/etc/zookeeper/conf/zoo.cfg/etc/zookeeper/conf/z

Debian Hadoop日志管理怎么做 Debian Hadoop日志管理怎么做 Apr 13, 2025 am 10:45 AM

在Debian上管理Hadoop日志,可以遵循以下步骤和最佳实践:日志聚合启用日志聚合:在yarn-site.xml文件中设置yarn.log-aggregation-enable为true,以启用日志聚合功能。配置日志保留策略:设置yarn.log-aggregation.retain-seconds来定义日志的保留时间,例如保留172800秒(2天)。指定日志存储路径:通过yarn.n

Debian OpenSSL如何防止中间人攻击 Debian OpenSSL如何防止中间人攻击 Apr 13, 2025 am 10:30 AM

在Debian系统中,OpenSSL是一个重要的库,用于加密、解密和证书管理。为了防止中间人攻击(MITM),可以采取以下措施:使用HTTPS:确保所有网络请求使用HTTPS协议,而不是HTTP。HTTPS使用TLS(传输层安全协议)加密通信数据,确保数据在传输过程中不会被窃取或篡改。验证服务器证书:在客户端手动验证服务器证书,确保其可信。可以通过URLSession的委托方法来手动验证服务器

Debian syslog如何学习 Debian syslog如何学习 Apr 13, 2025 am 11:51 AM

本指南将指导您学习如何在Debian系统中使用Syslog。Syslog是Linux系统中用于记录系统和应用程序日志消息的关键服务,它帮助管理员监控和分析系统活动,从而快速识别并解决问题。一、Syslog基础知识Syslog的核心功能包括:集中收集和管理日志消息;支持多种日志输出格式和目标位置(例如文件或网络);提供实时日志查看和过滤功能。二、安装和配置Syslog(使用Rsyslog)Debian系统默认使用Rsyslog。您可以通过以下命令安装:sudoaptupdatesud

Debian Hadoop数据本地化如何提高 Debian Hadoop数据本地化如何提高 Apr 13, 2025 am 10:51 AM

在Debian上提高Hadoop数据本地化可以通过以下几种方法实现:均衡硬件资源:确保HDFS集群中各个DataNode节点的硬件资源(如CPU、内存、磁盘容量等)相近,避免出现明显的性能瓶颈。优化数据写入策略:合理配置HDFS的数据写入策略,如根据节点的负载情况和可用资源动态选择DataNode节点进行存储,以实现数据的均衡分布。使用Balancer工具:利用HD

See all articles