Skip to content

Commit

Permalink
修复instant相关BUG
Browse files Browse the repository at this point in the history
  • Loading branch information
ddcw committed Jul 8, 2024
1 parent c99a711 commit 1d182cb
Show file tree
Hide file tree
Showing 4 changed files with 320 additions and 98 deletions.
4 changes: 3 additions & 1 deletion ibd2sql/ibd2sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def _init_sql_prefix(self):
#self.SQL_PREFIX = f"{ 'REPLACE' if self.REPLACE else 'INSERT'} INTO {self.tablename}{'(`'+'`,`'.join([ self.table.column[x]['name'] for x in self.table.column ]) + '`)' if self.COMPLETE_SQL else ''} VALUES "
SQL_PREFIX = f"{'REPLACE' if self.REPLACE else 'INSERT'} INTO {self.tablename}("
for x in self.table.column:
if self.table.column[x]['is_virtual'] or not self.COMPLETE_SQL:
if self.table.column[x]['is_virtual'] or not self.COMPLETE_SQL or self.table.column[x]['version_dropped'] > 0:
continue
else:
SQL_PREFIX += f"`{self.table.column[x]['name']}`,"
Expand Down Expand Up @@ -294,6 +294,8 @@ def _tosql(self,row):
"""
sql = '('
for colno in self.table.column:
if self.table.column[colno]['version_dropped'] > 0:
continue
data = row[colno]
if data is None:
sql = f"{sql}NULL, "
Expand Down
324 changes: 236 additions & 88 deletions ibd2sql/innodb_page_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ def __init__(self,bdata):
fb = struct.unpack('>B',bdata[:1])[0]
#print(fb&(REC_INFO_DELETED_FLAG*2),fb&(REC_INFO_DELETED_FLAG*4))
self.instant = True if fb&128 or fb&64 else False # fix issue 12
self.instant_flag = True if fb&128 else False
self.row_version_flag = True if fb&64 else False
self.deleted = True if fb&REC_INFO_DELETED_FLAG else False #是否被删除
self.min_rec = True if fb&REC_INFO_MIN_REC_FLAG else False #if and only if the record is the first user record on a non-leaf
self.owned = fb&REC_N_OWNED_MASK # 大于0表示这个rec是这组的第一个, 就是地址被记录在page_directory里面
Expand Down Expand Up @@ -303,21 +305,20 @@ def _read_all_row(self):
self.debug(f"\tREC HEAP_NO : {rheader.heap_no}")
self.debug(f"\tREC TYPE : {rheader.record_type}")
self.debug(f"\tREC NEXT : {rheader.next_record}")
self.debug(f"\tINSTANT FLAG : {rheader.instant_flag}")
self.debug(f"\tROW VERSION FLAG : {rheader.row_version_flag}")
self.debug(f"\t20 bytes ON BOTH SIDES OF RECORD, {self.bdata[self._offset-20:self._offset]}, {self.bdata[self._offset:self._offset+20]}")


#INSTANT
self.debug("GET COUNT COLUMN FOR THIS ROW")
if self.table.instant and rheader.instant:
self.debug(f"\tREAD INSTANT SIZE, _OFFSET:{self._offset} OFFSET:{self.offset} START.")
col_count = self._read_innodb_varsize()
self.debug(f"\tREAD INSTANT SIZE, _OFFSET:{self._offset} OFFSET:{self.offset} FINISH")
# ROW VERSION
if rheader.row_version_flag:
ROW_VERSION = self._read_innodb_varsize()
self.debug(f"\tROW_VERSION: {ROW_VERSION}")
else:
self.debug(f"\tREAD COLUM COUNT")
col_count = len(self.table.column)
self.debug(f"\tREAD COLUM COUNT FINISH")
self.debug(f"\tTHIS ROW HAS {col_count} FILEDS")
ROW_VERSION = -1

if self.table.mysqld_version_id == 80028 and rheader.instant: # 不知道为啥要多读1字节
_ = self._read_innodb_varsize()

#NULL
null_bitmask = 0
Expand All @@ -338,6 +339,32 @@ def _read_all_row(self):
null_bitmask_count = self.table.null_bitmask_count_instant
else:
self.debug("\tNO NULLABLE FIELD.")
#if ROW_VERSION >0: # >=8.0.29
# null_bitmask_count = self.table.null_bitmask_count + self.table.null_bitmask_count_instant


null_bitmask_count = 0
for colno in self.table.column:
col = self.table.column[colno]
#print(col['name'],ROW_VERSION,col['version_dropped'],col['version_added'])
if col['is_nullable']:
if col['version_dropped'] > col['version_added'] :#and col['version_added'] >= ROW_VERSION:
pass
else:
null_bitmask_count += 1

null_bitmask_count = 0
for _phno,colno in self.table.column_ph:
col = self.table.column[colno]
if rheader.row_version_flag:
if (ROW_VERSION >= col['version_added'] and (col['version_dropped'] == 0 or col['version_dropped'] > ROW_VERSION)) or (col['version_dropped'] > ROW_VERSION and ROW_VERSION >= col['version_added']):
null_bitmask_count += 1 if col['is_nullable'] else 0
else:
null_bitmask_count += 1 if col['is_nullable'] else 0


#print(null_bitmask_count)
#null_bitmask_count = self.table.null_bitmask_count
null_bitmask_len = int((null_bitmask_count+7)/8)
null_bitmask = self._readreverse_uint(null_bitmask_len)
self.debug(f"\tNULLABLE FILED COUNT: {self.table.null_bitmask_count} NULLABLE FIELD COUNT(FOR INSTANT):{self.table.null_bitmask_count_instant}")
Expand Down Expand Up @@ -378,90 +405,211 @@ def _read_all_row(self):


#读剩余字段
self.debug(f"READ THE REST OF FIELD. (column count: {col_count})")
_nc = -1 #可为空的字段的计数
for colno in self.table.column:
col = self.table.column[colno]
self.debug("\tREAD FIELD COLNO:",colno,"NAME:",col['name'],"TYPE:",col['type'],'CT:',col['ct'])
#if colno in _data or (rheader.instant and col['instant']):
if colno in _data:
self.debug("\tIS KEY, WILL CONTINUE")
continue

col_count -= 1
if not rheader.instant and col['instant']:
self.debug("\t IS INSTANT AND DEFAULT VALUE")
_data[colno] = col['default'] if not col['instant_null'] else None
_nc = -1
self.debug(f"ROW VERSION : {ROW_VERSION}")
self.debug(f"INSTANT FLAG : {rheader.instant_flag}")
self.debug(f"ROW VERSION FLAG : {rheader.row_version_flag}")
_nullable_count = -1
for _phno,colno in self.table.column_ph:
if colno in _data: # KEY
continue

if col['instant']:
self.debug("\t IS INSTANT, READ BUT FINALLY")
continue


if col['is_nullable']:
_nc += 1
self.debug(f"\tCOL {colno} {col['name']} MAYBE NULL. USE NULL_BITMASK")
if null_bitmask&(1<<_nc):
_data[colno] = None #NULL
self.debug(f"\t\tCOL {colno} {col['name']} IS NULL, WILL CONTINE")
continue
else:
self.debug(f"\t\tCOL {colno} {col['name']} IS NOT NULL.")
_data[colno],_expage[colno] = self._read_field(col)
else:
self.debug(f"\tCOL {colno} {col['name']} REQUIRE NOT NULL. READ DATA")
_data[colno],_expage[colno] = self._read_field(col)

#匹配条件
self.debug(f'FILTER TRX and ROLLPTR')
if _row['trx'] and ( _row['trx'] <= self.mintrx or _row['trx'] >= self.maxtrx ):
self.debug(f"!!! SKIP ROW NO {rn} . {_row['trx']} not in ({self.mintrx},{self.maxtrx})")
continue
if _row['rollptr'] and (_row['rollptr'] <= self.minrollptr or _row['rollptr'] >= self.maxrollptr):
self.debug(f"!!! SKIP ROW NO {rn} . {_row['rollptr']} not in ({self.minrollptr},{self.maxrollptr})")
continue
self.debug(f"TRX:{_row['rollptr']} and ROLLPTR:{_row['rollptr']} is PASS")

#读剩下的字段(FOR INSTANT)
for colno in self.table.column:
col = self.table.column[colno]
if colno in _data or (not col['instant']):
continue
self.debug(f'READ THE REST OF FILED (INSTANT) (column count:{col_count})')
col_count -= 1

if self.table.mysqld_version_id <= 80028 and (col_count + _icc < 1 and not self.haveindex): #记录的字段取完了, 剩余的就是默认值
self.debug(f"\t NO MORE RECORD FILED, COL({colno})({col['name']}) WILL USE DEFAULT VALUE.{col['default']}")
#_data[colno],_expage[colno] = None if col['instant_null'] else col['default'],None
_data[colno],_expage[colno] = col['instant_value'],None
self.debug(col)
continue

if not rheader.instant:
_data[colno],_expage[colno] = col['default'],None
self.debug(f"\tINSTANT:{rheader.instant}",col['instant_value'])
continue
self.debug(f"\tNAME: {col['name']} VERSION_ADDED:{col['version_added']} VERSION_DROPED:{col['version_dropped']} COL_INSTANT:{col['instant']} ROW VERSION:{ROW_VERSION}")
if rheader.row_version_flag: # >=8.0.29 的online ddl
if (ROW_VERSION >= col['version_added'] and (col['version_dropped'] == 0 or col['version_dropped'] > ROW_VERSION)) or (col['version_dropped'] > ROW_VERSION and ROW_VERSION >= col['version_added']):
if col['is_nullable']:
_nullable_count += 1
if col['is_nullable'] and null_bitmask&(1<<_nullable_count):
_data[colno],_expage[colno] = None,None
self.debug(f"######## DDCW FLAG 1 ########")
else:
_data[colno],_expage[colno] = self._read_field(col)
self.debug(f"######## DDCW FLAG 2 ########")
elif ROW_VERSION < col['version_added']:
_data[colno] = col['default'] if not col['instant_null'] else None
_expage[colno] = None
self.debug(f"######## DDCW FLAG 3 ########")
#elif col['version_dropped'] <= ROW_VERSION:
else:
_data[colno],_expage[colno] = None,None
self.debug(f"######## DDCW FLAG 4 ########")
elif ( not rheader.instant_flag and col['instant']): # 8.0.12-28 的online ddl
self.debug(f'mysql 8.0.12-28 版本的add column (instant) 读默认值 (实际版本:{self.table.mysqld_version_id}) ')
_data[colno] = col['default'] if not col['instant_null'] else None
_expage[colno] = None
self.debug(f"######## DDCW FLAG 5 ########")
elif not rheader.instant and col['version_dropped'] > 0: # 删除的字段就不读了(无instant情况下)
_data[colno],_expage[colno] = None,None
self.debug(f"######## DDCW FLAG 6 ########")
#elif col['instant'] and rheader.instant_flag:
else:
self.debug(f"\tINSTANT:{rheader.instant}",col['instant_value'])
#break

if col['is_nullable']:
_nc += 1
self.debug(f"\tINSTANT COL {colno} {col['name']} MAYBE NULL.")
if null_bitmask&(1<<_nc):
self.debug(f"\tINSTANT COL {colno} {col['name']} IS NULL. WILL CONTINE")
#self.debug(f"NULLABLE {col['name']}: is_nullable:{col['is_nullable']} null_bitmask:{null_bitmask} _nullable_count:{_nullable_count}")
if col['is_nullable']:
_nullable_count += 1
if col['is_nullable'] and null_bitmask&(1<<_nullable_count):
_data[colno],_expage[colno] = None,None
#_data[colno],_expage[colno] = col['default'],None
continue
self.debug(f"######## DDCW FLAG 7 ########")
else:
self.debug(f"\tINSTANT COL {colno} {col['name']} IS NOT NULL. READ DATA")
#_data[colno],_expage[colno] = col['default'],None
_data[colno],_expage[colno] = self._read_field(col)
else:
self.debug(f"\tINSTANT COL {colno} {col['name']} REQUIRE NOT NULL. READ DATA")
_data[colno],_expage[colno] = self._read_field(col)

self.debug(f"######## DDCW FLAG 8 ########")

# for _phno,colno in self.table.column_ph:
# if colno in _data:
# continue
# col = self.table.column[colno]
# self.debug(f"\tNAME: {col['name']} VERSION_ADDED:{col['version_added']} VERSION_DROPED:{col['version_dropped']} INSTANT:{col['instant']} ROW VERSION:{ROW_VERSION}")
# if rheader.instant: # 存在online ddl
# if ROW_VERSION > 0: # >= 8.0.29 存在drop
# if col['version_added'] > ROW_VERSION and col['version_dropped'] == 0: # 新增的字段, 还没有删除, 就取默认值
# #_data[colno],_expage[colno] = None,None
# _data[colno] = col['default'] if not col['instant_null'] else None
# _expage[colno] = None
# continue
# elif col['version_dropped'] > ROW_VERSION and col['version_added'] == 0: # 数据被标记为删除了
# pass
# elif col['version_dropped'] > ROW_VERSION and col['version_added'] > ROW_VERSION:
# _data[colno],_expage[colno] = None,None
# continue
# elif col['version_dropped'] > ROW_VERSION and col['version_added'] < ROW_VERSION:
# pass
# elif col['version_dropped'] > ROW_VERSION and col['version_added'] > ROW_VERSION :
# _data[colno],_expage[colno] = None,None
# continue
# elif col['version_dropped'] < ROW_VERSION and ROW_VERSION > col['version_added']:
# _data[colno],_expage[colno] = None,None
# #continue
# else:
# pass
#
# else: # 8.0.13 --> 8.0.28 只有add
# if col['instant']: # 有新增字段
# continue
# elif col['instant']:
# _data[colno],_expage[colno] = None,None
# continue
#
# #if ROW_VERSION > 0 and ROW_VERSION <= col['version_dropped']:
# # _data[colno],_expage[colno] = None,None
# # continue
# #elif ROW_VERSION > 0 and ROW_VERSION < col['version_added']:
# # _data[colno] = col['default'] if not col['instant_null'] else None
# # _expage[colno] = None
# # continue
# #elif ROW_VERSION < 0 and col['version_added'] > 0:
# # _data[colno] = col['default'] if not col['instant_null'] else None
# # _expage[colno] = None
# # continue
#
# if col['is_nullable']:
# _nc += 1
# if col['is_nullable'] and null_bitmask&(1<<_nc):
# _data[colno],_expage[colno] = None,None
# else:
# _data[colno],_expage[colno] = self._read_field(col)
#
# # instant 默认值
# for _phno,colno in self.table.column_ph:
# if colno in _data:
# continue
# col = self.table.column[colno]
# self.debug(f"INSTANT COL: {col['name']}")
# if ROW_VERSION < 0 :
# if rheader.instant_flag:
# _data[colno],_expage[colno] = _data[colno],_expage[colno] = self._read_field(col)
# else:
# _data[colno],_expage[colno] = col['default'] if not col['instant_null'] else None
# _expage[colno] = None
# elif ROW_VERSION < col['version_added']:
# _data[colno] = col['default'] if not col['instant_null'] else None
# _expage[colno] = None
# else:
# _data[colno],_expage[colno] = _data[colno],_expage[colno] = self._read_field(col)
#

# self.debug(f"READ THE REST OF FIELD. (column count: {col_count})")
# _nc = -1 #可为空的字段的计数
# for colno in self.table.column:
# col = self.table.column[colno]
# self.debug("\tREAD FIELD COLNO:",colno,"NAME:",col['name'],"TYPE:",col['type'],'CT:',col['ct'])
# #if colno in _data or (rheader.instant and col['instant']):
# if colno in _data:
# self.debug("\tIS KEY, WILL CONTINUE")
# continue
#
# col_count -= 1
# if not rheader.instant and col['instant']:
# self.debug("\t IS INSTANT AND DEFAULT VALUE")
# _data[colno] = col['default'] if not col['instant_null'] else None
# continue
#
# if col['instant']:
# self.debug("\t IS INSTANT, READ BUT FINALLY")
# continue
#
#
# if col['is_nullable']:
# _nc += 1
# self.debug(f"\tCOL {colno} {col['name']} MAYBE NULL. USE NULL_BITMASK")
# if null_bitmask&(1<<_nc):
# _data[colno] = None #NULL
# self.debug(f"\t\tCOL {colno} {col['name']} IS NULL, WILL CONTINE")
# continue
# else:
# self.debug(f"\t\tCOL {colno} {col['name']} IS NOT NULL.")
# _data[colno],_expage[colno] = self._read_field(col)
# else:
# self.debug(f"\tCOL {colno} {col['name']} REQUIRE NOT NULL. READ DATA")
# _data[colno],_expage[colno] = self._read_field(col)
#
# #匹配条件
# self.debug(f'FILTER TRX and ROLLPTR')
# if _row['trx'] and ( _row['trx'] <= self.mintrx or _row['trx'] >= self.maxtrx ):
# self.debug(f"!!! SKIP ROW NO {rn} . {_row['trx']} not in ({self.mintrx},{self.maxtrx})")
# continue
# if _row['rollptr'] and (_row['rollptr'] <= self.minrollptr or _row['rollptr'] >= self.maxrollptr):
# self.debug(f"!!! SKIP ROW NO {rn} . {_row['rollptr']} not in ({self.minrollptr},{self.maxrollptr})")
# continue
# self.debug(f"TRX:{_row['rollptr']} and ROLLPTR:{_row['rollptr']} is PASS")
#
# #读剩下的字段(FOR INSTANT)
# for colno in self.table.column:
# col = self.table.column[colno]
# if colno in _data or (not col['instant']):
# continue
# self.debug(f'READ THE REST OF FILED (INSTANT) (column count:{col_count})')
# col_count -= 1
#
# if self.table.mysqld_version_id <= 80028 and (col_count + _icc < 1 and not self.haveindex): #记录的字段取完了, 剩余的就是默认值
# self.debug(f"\t NO MORE RECORD FILED, COL({colno})({col['name']}) WILL USE DEFAULT VALUE.{col['default']}")
# #_data[colno],_expage[colno] = None if col['instant_null'] else col['default'],None
# _data[colno],_expage[colno] = col['instant_value'],None
# self.debug(col)
# continue
#
# if not rheader.instant:
# _data[colno],_expage[colno] = col['default'],None
# self.debug(f"\tINSTANT:{rheader.instant}",col['instant_value'])
# continue
# else:
# self.debug(f"\tINSTANT:{rheader.instant}",col['instant_value'])
# #break
#
# if col['is_nullable']:
# _nc += 1
# self.debug(f"\tINSTANT COL {colno} {col['name']} MAYBE NULL.")
# if null_bitmask&(1<<_nc):
# self.debug(f"\tINSTANT COL {colno} {col['name']} IS NULL. WILL CONTINE")
# _data[colno],_expage[colno] = None,None
# #_data[colno],_expage[colno] = col['default'],None
# continue
# else:
# self.debug(f"\tINSTANT COL {colno} {col['name']} IS NOT NULL. READ DATA")
# #_data[colno],_expage[colno] = col['default'],None
# _data[colno],_expage[colno] = self._read_field(col)
# else:
# self.debug(f"\tINSTANT COL {colno} {col['name']} REQUIRE NOT NULL. READ DATA")
# _data[colno],_expage[colno] = self._read_field(col)
#



Expand Down
Loading

0 comments on commit 1d182cb

Please sign in to comment.