Skip to content

Commit

Permalink
fix: Make coordinator Register not blocked on ProcessActiveStandby (m…
Browse files Browse the repository at this point in the history
…ilvus-io#32069)

See also milvus-io#32066

This PR make coordinator register successful and let
`ProcessActiveStandBy` run async. And roles may receive stop signal and
notify servers.

---------

Signed-off-by: Congqi Xia <[email protected]>
  • Loading branch information
congqixia authored Apr 10, 2024
1 parent a697e80 commit 25a1c9e
Show file tree
Hide file tree
Showing 7 changed files with 113 additions and 47 deletions.
39 changes: 24 additions & 15 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,25 +262,34 @@ func (s *Server) Register() error {
// first register indexCoord
s.icSession.Register()
s.session.Register()
if s.enableActiveStandBy {
err := s.session.ProcessActiveStandBy(s.activateFunc)
if err != nil {
return err
}
afterRegister := func() {
metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataCoordRole).Inc()
log.Info("DataCoord Register Finished")

err = s.icSession.ForceActiveStandby(nil)
if err != nil {
return nil
}
s.session.LivenessCheck(s.ctx, func() {
logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.GetServerID()))
os.Exit(1)
})
}
if s.enableActiveStandBy {
go func() {
err := s.session.ProcessActiveStandBy(s.activateFunc)
if err != nil {
log.Error("failed to activate standby datacoord server", zap.Error(err))
return
}

metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.DataCoordRole).Inc()
log.Info("DataCoord Register Finished")
err = s.icSession.ForceActiveStandby(nil)
if err != nil {
log.Error("failed to force activate standby indexcoord server", zap.Error(err))
return
}
afterRegister()
}()
} else {
afterRegister()
}

s.session.LivenessCheck(s.ctx, func() {
logutil.Logger(s.ctx).Error("disconnected from etcd and exited", zap.Int64("serverID", s.session.GetServerID()))
os.Exit(1)
})
return nil
}

Expand Down
43 changes: 40 additions & 3 deletions internal/datacoord/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3217,13 +3217,28 @@ func newTestServer(t *testing.T, receiveCh chan any, opts ...Option) *Server {

err = svr.Init()
assert.NoError(t, err)

signal := make(chan struct{})
if Params.DataCoordCfg.EnableActiveStandby.GetAsBool() {
assert.Equal(t, commonpb.StateCode_StandBy, svr.stateCode.Load().(commonpb.StateCode))
activateFunc := svr.activateFunc
svr.activateFunc = func() error {
defer func() {
close(signal)
}()
var err error
if activateFunc != nil {
err = activateFunc()
}
return err
}
} else {
assert.Equal(t, commonpb.StateCode_Initializing, svr.stateCode.Load().(commonpb.StateCode))
close(signal)
}
err = svr.Register()
assert.NoError(t, err)
<-signal
err = svr.Start()
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_Healthy, svr.stateCode.Load().(commonpb.StateCode))
Expand Down Expand Up @@ -3523,14 +3538,35 @@ func testDataCoordBase(t *testing.T, opts ...Option) *Server {

err = svr.Init()
assert.NoError(t, err)
err = svr.Start()
assert.NoError(t, err)

signal := make(chan struct{})
if Params.DataCoordCfg.EnableActiveStandby.GetAsBool() {
assert.Equal(t, commonpb.StateCode_StandBy, svr.stateCode.Load().(commonpb.StateCode))
activateFunc := svr.activateFunc
svr.activateFunc = func() error {
defer func() {
close(signal)
}()
var err error
if activateFunc != nil {
err = activateFunc()
}
return err
}
} else {
assert.Equal(t, commonpb.StateCode_Initializing, svr.stateCode.Load().(commonpb.StateCode))
close(signal)
}
err = svr.Register()
assert.NoError(t, err)
<-signal

err = svr.Start()
assert.NoError(t, err)

resp, err := svr.GetComponentStates(context.Background(), nil)
assert.NoError(t, err)
assert.Equal(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
assert.True(t, merr.Ok(resp.GetStatus()))
assert.Equal(t, commonpb.StateCode_Healthy, resp.GetState().GetStateCode())

// stop channal watch state watcher in tests
Expand All @@ -3546,6 +3582,7 @@ func testDataCoordBase(t *testing.T, opts ...Option) *Server {

func TestDataCoord_DisableActiveStandby(t *testing.T) {
paramtable.Get().Save(Params.DataCoordCfg.EnableActiveStandby.Key, "false")
defer paramtable.Get().Reset(Params.DataCoordCfg.EnableActiveStandby.Key)
svr := testDataCoordBase(t)
defer closeTestServer(t, svr)
}
Expand Down
25 changes: 16 additions & 9 deletions internal/querycoordv2/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,17 +141,24 @@ func NewQueryCoord(ctx context.Context) (*Server, error) {

func (s *Server) Register() error {
s.session.Register()
afterRegister := func() {
metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryCoordRole).Inc()
s.session.LivenessCheck(s.ctx, func() {
log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.GetServerID()))
os.Exit(1)
})
}
if s.enableActiveStandBy {
if err := s.session.ProcessActiveStandBy(s.activateFunc); err != nil {
log.Error("failed to activate standby server", zap.Error(err))
return err
}
go func() {
if err := s.session.ProcessActiveStandBy(s.activateFunc); err != nil {
log.Error("failed to activate standby server", zap.Error(err))
return
}
afterRegister()
}()
} else {
afterRegister()
}
metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.QueryCoordRole).Inc()
s.session.LivenessCheck(s.ctx, func() {
log.Error("QueryCoord disconnected from etcd, process will exit", zap.Int64("serverID", s.session.GetServerID()))
os.Exit(1)
})
return nil
}

Expand Down
12 changes: 5 additions & 7 deletions internal/querycoordv2/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ func (suite *ServerSuite) TestDisableActiveStandby() {

func (suite *ServerSuite) TestEnableActiveStandby() {
paramtable.Get().Save(Params.QueryCoordCfg.EnableActiveStandby.Key, "true")
defer paramtable.Get().Reset(Params.QueryCoordCfg.EnableActiveStandby.Key)

err := suite.server.Stop()
suite.NoError(err)
Expand Down Expand Up @@ -346,14 +347,11 @@ func (suite *ServerSuite) TestEnableActiveStandby() {
suite.Equal(commonpb.StateCode_StandBy, states1.GetState().GetStateCode())
err = suite.server.Register()
suite.NoError(err)
err = suite.server.Start()
suite.NoError(err)

states2, err := suite.server.GetComponentStates(context.Background(), nil)
suite.NoError(err)
suite.Equal(commonpb.StateCode_Healthy, states2.GetState().GetStateCode())

paramtable.Get().Save(Params.QueryCoordCfg.EnableActiveStandby.Key, "false")
suite.Eventually(func() bool {
state, err := suite.server.GetComponentStates(context.Background(), nil)
return err == nil && state.GetState().GetStateCode() == commonpb.StateCode_Healthy
}, time.Second*5, time.Millisecond*200)
}

func (suite *ServerSuite) TestStop() {
Expand Down
26 changes: 17 additions & 9 deletions internal/rootcoord/root_coord.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,17 +268,25 @@ func (c *Core) SetQueryCoordClient(s types.QueryCoordClient) error {
// Register register rootcoord at etcd
func (c *Core) Register() error {
c.session.Register()
afterRegister := func() {
metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.RootCoordRole).Inc()
log.Info("RootCoord Register Finished")
c.session.LivenessCheck(c.ctx, func() {
log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID))
os.Exit(1)
})
}
if c.enableActiveStandBy {
if err := c.session.ProcessActiveStandBy(c.activateFunc); err != nil {
return err
}
go func() {
if err := c.session.ProcessActiveStandBy(c.activateFunc); err != nil {
log.Warn("failed to activate standby rootcoord server", zap.Error(err))
return
}
afterRegister()
}()
} else {
afterRegister()
}
metrics.NumNodes.WithLabelValues(fmt.Sprint(paramtable.GetNodeID()), typeutil.RootCoordRole).Inc()
log.Info("RootCoord Register Finished")
c.session.LivenessCheck(c.ctx, func() {
log.Error("Root Coord disconnected from etcd, process will exit", zap.Int64("Server Id", c.session.ServerID))
os.Exit(1)
})

return nil
}
Expand Down
13 changes: 10 additions & 3 deletions internal/rootcoord/root_coord_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1264,9 +1264,13 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) {
// Need to reset global etcd to follow new path
kvfactory.CloseEtcdClient()
paramtable.Get().Save(Params.RootCoordCfg.EnableActiveStandby.Key, "true")
defer paramtable.Get().Reset(Params.RootCoordCfg.EnableActiveStandby.Key)
paramtable.Get().Save(Params.CommonCfg.RootCoordTimeTick.Key, fmt.Sprintf("rootcoord-time-tick-%d", randVal))
defer paramtable.Get().Reset(Params.CommonCfg.RootCoordTimeTick.Key)
paramtable.Get().Save(Params.CommonCfg.RootCoordStatistics.Key, fmt.Sprintf("rootcoord-statistics-%d", randVal))
defer paramtable.Get().Reset(Params.CommonCfg.RootCoordStatistics.Key)
paramtable.Get().Save(Params.CommonCfg.RootCoordDml.Key, fmt.Sprintf("rootcoord-dml-test-%d", randVal))
defer paramtable.Get().Reset(Params.CommonCfg.RootCoordDml.Key)

ctx := context.Background()
coreFactory := dependency.NewDefaultFactory(true)
Expand All @@ -1288,12 +1292,15 @@ func TestRootcoord_EnableActiveStandby(t *testing.T) {
err = core.Init()
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_StandBy, core.GetStateCode())
err = core.Start()
assert.NoError(t, err)
core.session.TriggerKill = false
err = core.Register()
assert.NoError(t, err)
assert.Equal(t, commonpb.StateCode_Healthy, core.GetStateCode())
err = core.Start()
assert.NoError(t, err)

assert.Eventually(t, func() bool {
return core.GetStateCode() == commonpb.StateCode_Healthy
}, time.Second*5, time.Millisecond*200)
resp, err := core.DescribeCollection(ctx, &milvuspb.DescribeCollectionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_DescribeCollection,
Expand Down
2 changes: 1 addition & 1 deletion internal/util/sessionutil/session_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -1151,7 +1151,7 @@ func (s *Session) ForceActiveStandby(activateFunc func() error) error {
0)).
Then(clientv3.OpPut(s.activeKey, string(sessionJSON), clientv3.WithLease(*s.LeaseID))).Commit()

if !resp.Succeeded {
if err != nil || !resp.Succeeded {
msg := fmt.Sprintf("failed to force register ACTIVE %s", s.ServerName)
log.Error(msg, zap.Error(err), zap.Any("resp", resp))
return errors.New(msg)
Expand Down

0 comments on commit 25a1c9e

Please sign in to comment.