Skip to content

Commit

Permalink
fix: avoid panic when load segment with pkoracle and idforacle alread…
Browse files Browse the repository at this point in the history
…y exist (milvus-io#36959)

relate: milvus-io#36949

Signed-off-by: aoiasd <[email protected]>
  • Loading branch information
aoiasd authored Oct 18, 2024
1 parent 50da48a commit fbe177d
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 13 deletions.
12 changes: 6 additions & 6 deletions internal/querynodev2/delegator/delegator.go
Original file line number Diff line number Diff line change
Expand Up @@ -913,10 +913,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni

excludedSegments := NewExcludedSegments(paramtable.Get().QueryNodeCfg.CleanExcludeSegInterval.GetAsDuration(time.Second))

var idfOracle IDFOracle
if len(collection.Schema().GetFunctions()) > 0 {
idfOracle = NewIDFOracle(collection.Schema().GetFunctions())
}
sd := &shardDelegator{
collectionID: collectionID,
replicaID: replicaID,
Expand All @@ -926,7 +922,7 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
segmentManager: manager.Segment,
workerManager: workerManager,
lifetime: lifetime.NewLifetime(lifetime.Initializing),
distribution: NewDistribution(idfOracle),
distribution: NewDistribution(),
deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock),
pkOracle: pkoracle.NewPkOracle(),
tsafeManager: tsafeManager,
Expand All @@ -935,7 +931,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
factory: factory,
queryHook: queryHook,
chunkManager: chunkManager,
idfOracle: idfOracle,
partitionStats: make(map[UniqueID]*storage.PartitionStatsSnapshot),
excludedSegments: excludedSegments,
functionRunners: make(map[int64]function.FunctionRunner),
Expand All @@ -955,6 +950,11 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
}
}

if len(sd.isBM25Field) > 0 {
sd.idfOracle = NewIDFOracle(collection.Schema().GetFunctions())
sd.distribution.SetIDFOracle(sd.idfOracle)
}

m := sync.Mutex{}
sd.tsCond = sync.NewCond(&m)
if sd.lifetime.Add(lifetime.NotStopped) == nil {
Expand Down
7 changes: 5 additions & 2 deletions internal/querynodev2/delegator/delegator_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
})

var bm25Stats *typeutil.ConcurrentMap[int64, map[int64]*storage.BM25Stats]
if len(sd.isBM25Field) > 0 {
if sd.idfOracle != nil {
bm25Stats, err = sd.loader.LoadBM25Stats(ctx, req.GetCollectionID(), infos...)
if err != nil {
log.Warn("failed to load bm25 stats for segment", zap.Error(err))
Expand Down Expand Up @@ -690,8 +690,11 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
sd.pkOracle.Register(candidate, targetNodeID)
}

if sd.idfOracle != nil {
if sd.idfOracle != nil && bm25Stats != nil {
bm25Stats.Range(func(segmentID int64, stats map[int64]*storage.BM25Stats) bool {
log.Info("register sealed segment bm25 stats into idforacle",
zap.Int64("segmentID", segmentID),
)
sd.idfOracle.Register(segmentID, stats, segments.SegmentTypeSealed)
return false
})
Expand Down
9 changes: 7 additions & 2 deletions internal/querynodev2/delegator/distribution.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ type SegmentEntry struct {
}

// NewDistribution creates a new distribution instance with all field initialized.
func NewDistribution(idfOracle IDFOracle) *distribution {
func NewDistribution() *distribution {
dist := &distribution{
serviceable: atomic.NewBool(false),
growingSegments: make(map[UniqueID]SegmentEntry),
Expand All @@ -100,13 +100,18 @@ func NewDistribution(idfOracle IDFOracle) *distribution {
current: atomic.NewPointer[snapshot](nil),
offlines: typeutil.NewSet[int64](),
targetVersion: atomic.NewInt64(initialTargetVersion),
idfOracle: idfOracle,
}

dist.genSnapshot()
return dist
}

func (d *distribution) SetIDFOracle(idfOracle IDFOracle) {
d.mut.Lock()
defer d.mut.Unlock()
d.idfOracle = idfOracle
}

func (d *distribution) PinReadableSegments(partitions ...int64) (sealed []SnapshotItem, growing []SegmentEntry, version int64, err error) {
d.mut.RLock()
defer d.mut.RUnlock()
Expand Down
4 changes: 1 addition & 3 deletions internal/querynodev2/delegator/distribution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import (
"time"

"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
)

type DistributionSuite struct {
Expand All @@ -31,7 +29,7 @@ type DistributionSuite struct {
}

func (s *DistributionSuite) SetupTest() {
s.dist = NewDistribution(NewIDFOracle([]*schemapb.FunctionSchema{}))
s.dist = NewDistribution()
s.Equal(initialTargetVersion, s.dist.getTargetVersion())
}

Expand Down

0 comments on commit fbe177d

Please sign in to comment.