Skip to content

Commit

Permalink
fix partition state replay object info bug (matrixorigin#13181)
Browse files Browse the repository at this point in the history
the object zone map is zero when the table ranges.

the reason is partition_state.dataObjectsByCreateTS is unset when replaying the log tails into the state.

Approved by: @reusee
  • Loading branch information
gouhongshen authored Dec 3, 2023
1 parent 6f2a181 commit 133d2da
Showing 1 changed file with 89 additions and 71 deletions.
160 changes: 89 additions & 71 deletions pkg/vm/engine/disttae/logtailreplay/partition_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,52 +376,65 @@ func (p *PartitionState) HandleLogtailEntry(
}

func (p *PartitionState) HandleObjectDelete(bat *api.Batch) {
statsCol := vector.MustBytesCol(mustVectorFromProto(bat.Vecs[2]))
statsVec := mustVectorFromProto(bat.Vecs[2])
stateCol := vector.MustFixedCol[bool](mustVectorFromProto(bat.Vecs[3]))
createTSCol := vector.MustFixedCol[types.TS](mustVectorFromProto(bat.Vecs[6]))
deleteTSCol := vector.MustFixedCol[types.TS](mustVectorFromProto(bat.Vecs[7]))
commitTSCol := vector.MustFixedCol[types.TS](mustVectorFromProto(bat.Vecs[10]))

for idx := 0; idx < len(statsCol); idx++ {
for idx := 0; idx < len(stateCol); idx++ {
var objEntry ObjectEntry

objEntry.ObjectStats = objectio.ObjectStats(statsCol[idx])
objEntry.ObjectStats = objectio.ObjectStats(statsVec.GetBytesAt(idx))

if objEntry.ObjectStats.IsZero() {
continue
}

objEntry.EntryState = stateCol[idx]
objEntry.CreateTime = createTSCol[idx]
objEntry.DeleteTime = deleteTSCol[idx]
objEntry.CommitTS = commitTSCol[idx]

p.dataObjects.Set(objEntry)
p.objectDeleteHelper(objEntry, deleteTSCol[idx])
}
}

func (p *PartitionState) HandleObjectInsert(bat *api.Batch) {
statsCol := vector.MustBytesCol(mustVectorFromProto(bat.Vecs[2]))
statsVec := mustVectorFromProto(bat.Vecs[2])
stateCol := vector.MustFixedCol[bool](mustVectorFromProto(bat.Vecs[3]))
createTSCol := vector.MustFixedCol[types.TS](mustVectorFromProto(bat.Vecs[6]))
deleteTSCol := vector.MustFixedCol[types.TS](mustVectorFromProto(bat.Vecs[7]))
commitTSCol := vector.MustFixedCol[types.TS](mustVectorFromProto(bat.Vecs[10]))

for idx := 0; idx < len(statsCol); idx++ {
for idx := 0; idx < len(stateCol); idx++ {
var objEntry ObjectEntry

objEntry.ObjectStats = objectio.ObjectStats(statsCol[idx])
objEntry.ObjectStats = objectio.ObjectStats(statsVec.GetBytesAt(idx))
if objEntry.ObjectStats.IsZero() {
continue
}

if old, exist := p.dataObjects.Get(objEntry); exist {
objEntry.HasDeltaLoc = old.HasDeltaLoc
} else {
e := ObjectIndexByTSEntry{
Time: createTSCol[idx],
ShortObjName: *objEntry.ObjectShortName(),
IsDelete: false,

IsAppendable: objEntry.EntryState,
}
p.objectIndexByTS.Set(e)
}

objEntry.EntryState = stateCol[idx]
objEntry.CreateTime = createTSCol[idx]
objEntry.DeleteTime = deleteTSCol[idx]
objEntry.CommitTS = commitTSCol[idx]

if old, ok := p.dataObjects.Get(objEntry); ok {
// if overwritten delete
if !old.DeleteTime.IsEmpty() && deleteTSCol[idx].IsEmpty() {
logutil.Errorf("overwritten data objects delete time to 0-0:\n old: %s\n new: %s",
old.String(), objEntry.String())
}
}

p.dataObjects.Set(objEntry)
p.dataObjectsByCreateTS.Set(ObjectIndexByCreateTSEntry(objEntry))
}
}

Expand Down Expand Up @@ -763,6 +776,66 @@ func (p *PartitionState) HandleMetadataInsert(
})
}

func (p *PartitionState) objectDeleteHelper(pivot ObjectEntry, deleteTime types.TS) {
objEntry, ok := p.dataObjects.Get(pivot)
//TODO non-appendable block' delete maybe arrive before its insert?
if !ok {
panic(fmt.Sprintf("invalid block id. %v", pivot.String()))
}

if objEntry.DeleteTime.IsEmpty() {
// apply first delete
objEntry.DeleteTime = deleteTime
p.dataObjects.Set(objEntry)
p.dataObjectsByCreateTS.Set(ObjectIndexByCreateTSEntry(objEntry))

{
e := ObjectIndexByTSEntry{
Time: objEntry.DeleteTime,
ShortObjName: *objEntry.ObjectShortName(),
IsDelete: true,

IsAppendable: objEntry.EntryState,
}
p.objectIndexByTS.Set(e)
}
} else {
// update deletetime, if incoming delete ts is less
if objEntry.DeleteTime.Greater(deleteTime) {
old := ObjectIndexByTSEntry{
Time: objEntry.DeleteTime,
ShortObjName: *objEntry.ObjectShortName(),
IsDelete: true,

IsAppendable: objEntry.EntryState,
}
p.objectIndexByTS.Delete(old)
objEntry.DeleteTime = deleteTime
p.dataObjects.Set(objEntry)
p.dataObjectsByCreateTS.Set(ObjectIndexByCreateTSEntry(objEntry))

new := ObjectIndexByTSEntry{
Time: objEntry.DeleteTime,
ShortObjName: *objEntry.ObjectShortName(),
IsDelete: true,

IsAppendable: objEntry.EntryState,
}
p.objectIndexByTS.Set(new)
} else if objEntry.DeleteTime.Equal(deleteTime) {
//FIXME:: should we do something here?
e := ObjectIndexByTSEntry{
Time: objEntry.DeleteTime,
ShortObjName: *objEntry.ObjectShortName(),
IsDelete: true,

IsAppendable: objEntry.EntryState,
}
p.objectIndexByTS.Set(e)
}
}
}

func (p *PartitionState) HandleMetadataDelete(ctx context.Context, input *api.Batch) {
ctx, task := trace.NewTask(ctx, "PartitionState.HandleMetadataDelete")
defer task.End()
Expand All @@ -780,63 +853,8 @@ func (p *PartitionState) HandleMetadataDelete(ctx context.Context, input *api.Ba
moprobe.WithRegion(ctx, moprobe.PartitionStateHandleMetaDelete, func() {
pivot := ObjectEntry{}
objectio.SetObjectStatsShortName(&pivot.ObjectStats, objectio.ShortName(&blockID))
objEntry, ok := p.dataObjects.Get(pivot)
//TODO non-appendable block' delete maybe arrive before its insert?
if !ok {
panic(fmt.Sprintf("invalid block id. %x", rowID))
}

if objEntry.DeleteTime.IsEmpty() {
// apply first delete
objEntry.DeleteTime = deleteTimeVector[i]
p.dataObjects.Set(objEntry)
p.dataObjectsByCreateTS.Set(ObjectIndexByCreateTSEntry(objEntry))

{
e := ObjectIndexByTSEntry{
Time: objEntry.DeleteTime,
ShortObjName: *objEntry.ObjectShortName(),
IsDelete: true,

IsAppendable: objEntry.EntryState,
}
p.objectIndexByTS.Set(e)
}
} else {
// update deletetime, if incoming delete ts is less
if objEntry.DeleteTime.Greater(deleteTimeVector[i]) {
old := ObjectIndexByTSEntry{
Time: objEntry.DeleteTime,
ShortObjName: *objEntry.ObjectShortName(),
IsDelete: true,

IsAppendable: objEntry.EntryState,
}
p.objectIndexByTS.Delete(old)
objEntry.DeleteTime = deleteTimeVector[i]
p.dataObjects.Set(objEntry)
p.dataObjectsByCreateTS.Set(ObjectIndexByCreateTSEntry(objEntry))

new := ObjectIndexByTSEntry{
Time: objEntry.DeleteTime,
ShortObjName: *objEntry.ObjectShortName(),
IsDelete: true,

IsAppendable: objEntry.EntryState,
}
p.objectIndexByTS.Set(new)
} else if objEntry.DeleteTime.Equal(deleteTimeVector[i]) {
//FIXME:: should we do something here?
e := ObjectIndexByTSEntry{
Time: objEntry.DeleteTime,
ShortObjName: *objEntry.ObjectShortName(),
IsDelete: true,

IsAppendable: objEntry.EntryState,
}
p.objectIndexByTS.Set(e)
}
}
p.objectDeleteHelper(pivot, deleteTimeVector[i])
})
}

Expand Down

0 comments on commit 133d2da

Please sign in to comment.