Home > Database > Mysql Tutorial > Analyze mysql row record modifications based on binlog

Analyze mysql row record modifications based on binlog

大家讲道理
Release: 2018-05-15 11:17:24
Original
1883 people have browsed it

I recently finished writing mysql flashback, and suddenly found that there is such a usage scenario: In some cases, it may be calculated that MySQL modifications during a certain period of time How much data is there? How many transactions occurred? Which tables are mainly changed? What is the amount of change? But there is no need to modify the row record, you only need to know the changes in the row data. So I also sorted it out.

I wrote the script last night. Due to my limited python ability, I originally thought not to post this article. But then I thought about it, maybe some garden friends would give it to me. Provide optimization suggestions.

1 Implementation content

In some cases, it may be calculated how much data MySQL modified during a certain period of time? How many transactions occurred? Which tables are mainly changed? What is the amount of change? But there is no need to modify the row record, you only need to know the changes in the row data.

These situations can be roughly understood through monitoring, but they can also be fully analyzed based on binlog. The format of binlog is row mode.

When writing flashback, I also wrote this step by the way. It is written in python. The principles are similar, but this one is simpler. Since my python is not good enough, the performance may be greatly improved. I hope garden friends can help optimize the space.

First, the analysis results of the python script are posted as follows, which are divided into 4 parts: transaction time-consuming, number of rows affected by the transaction, number of DML rows, and the status of the most frequently operated tables.

2 Brief description of the script

Among the modules that the script depends on, pymysql needs to be installed by yourself .

Create a class queryanalyse, in which there are 5 functions defined: _get_db, create_tab, rowrecord, binlogdesc and closeconn.

2.1 _get_db

This function is used to parse input parameter values. There are 7 parameter values ​​in total, all of which must be filled in. They are host, user, password, port, table name for transaction, and table name for records. The corresponding abbreviations are as follows:


ALL options need to assign:

-h : host, the database host, which database will store the results after analysis

-u : user, the db user

-p : password, the db user's password

-P : port, the db port

-f : file path, the binlog file

-tr : table name for record, the table name to store the row record

-tt : table name for transaction, the table name to store transactions

For example, execute the script: python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f=/tmp/stock_binlog.log -tt=flashback.tbtran -tr=flashback.tbrow, this function is responsible for processing each option Parameter value status and storage.

2.2 create_tab

Create two tables to store the analysis results of the binlog file. One is used to store the execution start time and end time of the transaction, and the table name is assigned by the option -tt; the other is used to store the modification of each row of records, and the table name is assigned by the option -tr.

Transaction table record content: transaction start time and transaction end time.

Contents of the row record table: library name, table name, DML type and number of the transaction table corresponding to the transaction.

root@localhost:mysql3310.sock  14:42:29 [flashback]>show create table tbrow \G*************************** 1. row ***************************
       Table: tbrowCreate Table: CREATE TABLE `tbrow` (
  `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `sqltype` int(11) NOT NULL COMMENT '1 is insert,2 is update,3 is delete',
  `tran_num` int(11) NOT NULL COMMENT 'the transaction number',
  `dbname` varchar(50) NOT NULL,
  `tbname` varchar(50) NOT NULL,  PRIMARY KEY (`auto_id`),  KEY `sqltype` (`sqltype`),  KEY `dbname` (`dbname`),  KEY `tbname` (`tbname`)
) ENGINE=InnoDB AUTO_INCREMENT=295151 DEFAULT CHARSET=utf81 row in set (0.00 sec)
 
root@localhost:mysql3310.sock  14:42:31 [flashback]>SHOW CREATE TABLE TBTRAN \G*************************** 1. row ***************************
       Table: TBTRANCreate Table: CREATE TABLE `tbtran` (
  `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `begin_time` datetime NOT NULL,
  `end_time` datetime NOT NULL,  PRIMARY KEY (`auto_id`)
) ENGINE=InnoDB AUTO_INCREMENT=6390 DEFAULT CHARSET=utf81 row in set (0.00 sec)
Copy after login

2.3 rowrecord

Key function, analyze the contents of the binlog file. Here are a few rules:

  1. 每个事务的结束点,是以 'Xid = ' 来查找

    1. 事务的开始时间,是事务内的第一个 'Table_map' 行里边的时间

    2. 事务的结束时间,是以 'Xid = '所在行的 里边的时间

  2. 每个行数据是属于哪个表格,是以 'Table_map'来查找

  3. DML的类型是按照 行记录开头的情况是否为:'### INSERT INTO'  、'### UPDATE' 、'### DELETE FROM' 

  4. 注意,单个事务可以包含多个表格多种DML多行数据修改的情况。

2.4 binlogdesc

    描述分析结果,简单4个SQL分析。

  1. 分析修改行数据的 事务耗时情况

  2. 分析修改行数据的 事务影响行数情况

  3. 分析DML分布情况

  4. 分析 最多DML操作的表格 ,取前十个分析

2.5 closeconn

    关闭数据库连接。

3 使用说明

    首先,确保python安装了pymysql模块,把python脚本拷贝到文件 queryanalyse.py。

    然后,把要分析的binlog文件先用 mysqlbinlog 指令分析存储,具体binlog的文件说明,可以查看之前的博文:关于binary log那些事——认真码了好长一篇。mysqlbinlog的指令使用方法,可以详细查看文档:https://dev.mysql.com/doc/refman/5.7/en/mysqlbinlog.html 。

    比较常用通过指定开始时间跟结束时间来分析 binlog文件。


mysqlbinlog --start-datetime='2017-04-23 00:00:03' --stop-datetime='2017-04-23 00:30:00' --base64-output=decode-rows -v /data/mysql/logs/mysql-bin.007335 > /tmp/binlog_test.log   

    分析后,可以把这个 binlog_test.log文件拷贝到其他空闲服务器执行分析,只需要有个空闲的DB来存储分析记录即可。

    假设这个时候,拷贝 binlog_test.log到测试服务器上,测试服务器上的数据库可以用来存储分析内容,则可以执行python脚本了,注意要进入到python脚本的目录中,或者指定python脚本路径。


python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f= /tmp/binlog_test.log -tt=flashback.tbtran -tr=flashback.tbrow

    没了,就等待输出吧。

    性能是硬伤,在虚拟机上测试,大概500M的binlog文件需要分析2-3min,有待提高!

4 python脚本

  1 import pymysql  2 from pymysql.cursors import DictCursor  3 import re  4 import os  5 import sys  6 import datetime  7 import time  8 import logging  9 import importlib 10 importlib.reload(logging) 11 logging.basicConfig(level=logging.DEBUG,format='%(asctime)s %(levelname)s %(message)s ') 12  13  14 usage=''' usage: python [script's path] [option] 15 ALL options need to assign: 16  17 -h     : host, the database host,which database will store the results after analysis 
 18 -u     : user, the db user 19 -p     : password, the db user's password 20 -P     : port, the db port 21 -f     : file path, the binlog file 22 -tr    : table name for record , the table name to store the row record 23 -tt    : table name for transaction, the table name to store transactions 24 Example: python queryanalyse.py -h=127.0.0.1 -P=3310 -u=root -p=password -f=/tmp/stock_binlog.log -tt=flashback.tbtran -tr=flashback.tbrow 25  26 ''' 27  28 class queryanalyse: 29     def init(self): 30         #初始化 31         self.host='' 32         self.user='' 33         self.password='' 34         self.port='3306' 35         self.fpath='' 36         self.tbrow='' 37         self.tbtran='' 38  39         self._get_db() 40         logging.info('assign values to parameters is done:host={},user={},password=***,port={},fpath={},tb_for_record={},tb_for_tran={}'.format(self.host,self.user,self.port,self.fpath,self.tbrow,self.tbtran)) 41  42         self.mysqlconn = pymysql.connect(host=self.host, user=self.user, password=self.password, port=self.port,charset='utf8') 43         self.cur = self.mysqlconn.cursor(cursor=DictCursor) 44         logging.info('MySQL which userd to store binlog event connection is ok') 45  46         self.begin_time='' 47         self.end_time='' 48         self.db_name='' 49         self.tb_name='' 50  51     def _get_db(self): 52         #解析用户输入的选项参数值,这里对password的处理是明文输入,可以自行处理成是input格式, 53         #由于可以拷贝binlog文件到非线上环境分析,所以password这块,没有特殊处理 54         logging.info('begin to assign values to parameters') 55         if len(sys.argv) == 1: 56             print(usage) 57             sys.exit(1) 58         elif sys.argv[1] == '--help': 59             print(usage) 60             sys.exit() 61         elif len(sys.argv) > 2: 62             for i in sys.argv[1:]: 63                 _argv = i.split('=') 64                 if _argv[0] == '-h': 65                     self.host = _argv[1] 66                 elif _argv[0] == '-u': 67                     self.user = _argv[1] 68                 elif _argv[0] == '-P': 69                     self.port = int(_argv[1]) 70                 elif _argv[0] == '-f': 71                     self.fpath = _argv[1] 72                 elif _argv[0] == '-tr': 73                     self.tbrow = _argv[1] 74                 elif _argv[0] == '-tt': 75                     self.tbtran = _argv[1] 76                 elif _argv[0] == '-p': 77                     self.password = _argv[1] 78                 else: 79                     print(usage) 80  81     def create_tab(self): 82         #创建两个表格:一个用户存储事务情况,一个用户存储每一行数据修改的情况 83         #注意,一个事务可以存储多行数据修改的情况 84         logging.info('creating table ...') 85         create_tb_sql ='''CREATE TABLE IF NOT EXISTS  {} ( 86                           `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT, 87                           `begin_time` datetime NOT NULL, 88                           `end_time` datetime NOT NULL, 89                           PRIMARY KEY (`auto_id`) 90                         ) ENGINE=InnoDB DEFAULT CHARSET=utf8; 91                         CREATE TABLE IF NOT EXISTS  {} ( 92                           `auto_id` int(10) unsigned NOT NULL AUTO_INCREMENT, 93                           `sqltype` int(11) NOT NULL COMMENT '1 is insert,2 is update,3 is delete', 94                           `tran_num` int(11) NOT NULL COMMENT 'the transaction number', 95                           `dbname` varchar(50) NOT NULL, 96                           `tbname` varchar(50) NOT NULL, 97                           PRIMARY KEY (`auto_id`), 98                           KEY `sqltype` (`sqltype`), 99                           KEY `dbname` (`dbname`),100                           KEY `tbname` (`tbname`)101                         ) ENGINE=InnoDB DEFAULT CHARSET=utf8;102                         truncate table {};103                         truncate table {};104                         '''.format(self.tbtran,self.tbrow,self.tbtran,self.tbrow)105 106         self.cur.execute(create_tb_sql)107         logging.info('created table {} and {}'.format(self.tbrow,self.tbtran))108 109     def rowrecord(self):110         #处理每一行binlog111         #事务的结束采用 'Xid =' 来划分112         #分析结果,按照一个事务为单位存储提交一次到db113         try:114             tran_num=1    #事务数115             record_sql='' #行记录的insert sql116             tran_sql=''   #事务的insert sql117 118             self.create_tab()119 120             with open(self.fpath,'r') as binlog_file:121                 logging.info('begining to analyze the binlog file ,this may be take a long time !!!')122                 logging.info('analyzing...')123 124                 for bline in binlog_file:125 126                     if bline.find('Table_map:') != -1:127                         l = bline.index('server')128                         n = bline.index('Table_map')129                         begin_time = bline[:l:].rstrip(' ').replace('#', '20')130 131                         if record_sql=='':132                             self.begin_time = begin_time[0:4] + '-' + begin_time[4:6] + '-' + begin_time[6:]133 134                         self.db_name = bline[n::].split(' ')[1].replace('`', '').split('.')[0]135                         self.tb_name = bline[n::].split(' ')[1].replace('`', '').split('.')[1]136                         bline=''137 138                     elif bline.startswith('### INSERT INTO'):139                        record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (1,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)140 141                     elif bline.startswith('### UPDATE'):142                        record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (2,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)143 144                     elif bline.startswith('### DELETE FROM'):145                        record_sql=record_sql+"insert into {}(sqltype,tran_num,dbname,tbname) VALUES (3,{},'{}','{}');".format(self.tbrow,tran_num,self.db_name,self.tb_name)146 147                     elif bline.find('Xid =') != -1:148 149                         l = bline.index('server')150                         end_time = bline[:l:].rstrip(' ').replace('#', '20')151                         self.end_time = end_time[0:4] + '-' + end_time[4:6] + '-' + end_time[6:]152                         tran_sql=record_sql+"insert into {}(begin_time,end_time) VALUES ('{}','{}')".format(self.tbtran,self.begin_time,self.end_time)153 154                         self.cur.execute(tran_sql)155                         self.mysqlconn.commit()156                         record_sql = ''157                         tran_num += 1158 159         except Exception:160             return 'funtion rowrecord error'161 162     def binlogdesc(self):163         sql=''164         t_num=0165         r_num=0166         logging.info('Analysed result printing...\n')167         #分析总的事务数跟行修改数量168         sql="select 'tbtran' name,count(*) nums from {}  union all select 'tbrow' name,count(*) nums from {};".format(self.tbtran,self.tbrow)169         self.cur.execute(sql)170         rows=self.cur.fetchall()171         for row in rows:172             if row['name']=='tbtran':173                 t_num = row['nums']174             else:175                 r_num = row['nums']176         print('This binlog file has {} transactions, {} rows are changed '.format(t_num,r_num))177 178         # 计算 最耗时 的单个事务179         # 分析每个事务的耗时情况,分为5个时间段来描述180         # 这里正常应该是 以毫秒来分析的,但是binlog中,只精确时间到second181         sql='''select 
182                       count(case when cost_sec between 0 and 1 then 1 end ) cos_1,183                       count(case when cost_sec between 1.1 and 5 then 1 end ) cos_5,184                       count(case when cost_sec between 5.1 and 10 then 1 end ) cos_10,185                       count(case when cost_sec between 10.1 and 30 then 1 end ) cos_30,186                       count(case when cost_sec >30.1 then 1 end ) cos_more,187                       max(cost_sec) cos_max188                 from 
189                 (190                         select 
191                             auto_id,timestampdiff(second,begin_time,end_time) cost_sec192                         from {}193                 ) a;'''.format(self.tbtran)194         self.cur.execute(sql)195         rows=self.cur.fetchall()196 197         for row in rows:198             print('The most cost time : {} '.format(row['cos_max']))199             print('The distribution map of each transaction costed time: ')200             print('Cost time between    0 and  1 second : {} , {}%'.format(row['cos_1'],int(row['cos_1']*100/t_num)))201             print('Cost time between  1.1 and  5 second : {} , {}%'.format(row['cos_5'], int(row['cos_5'] * 100 / t_num)))202             print('Cost time between  5.1 and 10 second : {} , {}%'.format(row['cos_10'], int(row['cos_10'] * 100 / t_num)))203             print('Cost time between 10.1 and 30 second : {} , {}%'.format(row['cos_30'], int(row['cos_30'] * 100 / t_num)))204             print('Cost time                     > 30.1 : {} , {}%\n'.format(row['cos_more'], int(row['cos_more'] * 100 / t_num)))205 206         # 计算 单个事务影响行数最多 的行数量207         # 分析每个事务 影响行数 情况,分为5个梯度来描述208         sql='''select 
209                     count(case when nums between 0 and 10 then 1 end ) row_1,210                     count(case when nums between 11 and 100 then 1 end ) row_2,211                     count(case when nums between 101 and 1000 then 1 end ) row_3,212                     count(case when nums between 1001 and 10000 then 1 end ) row_4,213                     count(case when nums >10001 then 1 end ) row_5,214                     max(nums) row_max215                from 
216                   (217                     select 
218                              count(*) nums219                     from {} group by tran_num220                    ) a;'''.format(self.tbrow)221         self.cur.execute(sql)222         rows=self.cur.fetchall()223 224         for row in rows:225             print('The most changed rows for each row: {} '.format(row['row_max']))226             print('The distribution map of each transaction changed rows : ')227             print('Changed rows between    1 and    10 second : {} , {}%'.format(row['row_1'],int(row['row_1']*100/t_num)))228             print('Changed rows between   11 and   100 second : {} , {}%'.format(row['row_2'], int(row['row_2'] * 100 / t_num)))229             print('Changed rows between  101 and  1000 second : {} , {}%'.format(row['row_3'], int(row['row_3'] * 100 / t_num)))230             print('Changed rows between 1001 and 10000 second : {} , {}%'.format(row['row_4'], int(row['row_4'] * 100 / t_num)))231             print('Changed rows                       > 10001 : {} , {}%\n'.format(row['row_5'], int(row['row_5'] * 100 / t_num)))232 233         # 分析 各个行数 DML的类型情况234         # 描述 delete,insert,update的分布情况235         sql='select sqltype ,count(*) nums from {} group by sqltype ;'.format(self.tbrow)236         self.cur.execute(sql)237         rows=self.cur.fetchall()238 239         print('The distribution map of the {} changed rows : '.format(r_num))240         for row in rows:241 242             if row['sqltype']==1:243                 print('INSERT rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num)))244             if row['sqltype']==2:245                 print('UPDATE rows :{} , {}% '.format(row['nums'],int(row['nums']*100/r_num)))246             if row['sqltype']==3:247                 print('DELETE rows :{} , {}%\n '.format(row['nums'],int(row['nums']*100/r_num)))248 249         # 描述 影响行数 最多的表格250         # 可以分析是哪些表格频繁操作,这里显示前10个table name251         sql = '''select 
252                       dbname,tbname ,253                       count(*) ALL_rows,254                       count(*)*100/{} per,255                       count(case when sqltype=1 then 1 end) INSERT_rows,256                       count(case when sqltype=2 then 1 end) UPDATE_rows,257                       count(case when sqltype=3 then 1 end) DELETE_rows258                 from {} 
259                 group by dbname,tbname 
260                 order by ALL_rows desc 
261                 limit 10;'''.format(r_num,self.tbrow)262         self.cur.execute(sql)263         rows = self.cur.fetchall()264 265         print('The distribution map of the {} changed rows : '.format(r_num))266         print('tablename'.ljust(50),267               '|','changed_rows'.center(15),268               '|','percent'.center(10),269               '|','insert_rows'.center(18),270               '|','update_rows'.center(18),271               '|','delete_rows'.center(18)272               )273         print('-------------------------------------------------------------------------------------------------------------------------------------------------')274         for row in rows:275             print((row['dbname']+'.'+row['tbname']).ljust(50),276                   '|',str(row['ALL_rows']).rjust(15),277                   '|',(str(int(row['per']))+'%').rjust(10),278                   '|',str(row['INSERT_rows']).rjust(10)+' , '+(str(int(row['INSERT_rows']*100/row['ALL_rows']))+'%').ljust(5),279                   '|',str(row['UPDATE_rows']).rjust(10)+' , '+(str(int(row['UPDATE_rows']*100/row['ALL_rows']))+'%').ljust(5),280                   '|',str(row['DELETE_rows']).rjust(10)+' , '+(str(int(row['DELETE_rows']*100/row['ALL_rows']))+'%').ljust(5),281                   )282         print('\n')283 284         logging.info('Finished to analyse the binlog file !!!')285 286     def closeconn(self):287         self.cur.close()288         logging.info('release db connections\n')289 290 def main():291     p = queryanalyse()292     p.rowrecord()293     p.binlogdesc()294     p.closeconn()295 296 if name == "main":297     main()
Copy after login

The above is the detailed content of Analyze mysql row record modifications based on binlog. For more information, please follow other related articles on the PHP Chinese website!

Related labels:
source:php.cn
Statement of this Website
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn
Popular Tutorials
More>
Latest Downloads
More>
Web Effects
Website Source Code
Website Materials
Front End Template