Skip to content

Commit

Permalink
Close WatchDocument stream on agent shutdown (yorkie-team#210)
Browse files Browse the repository at this point in the history
  • Loading branch information
hackerwins authored Jul 7, 2021
1 parent 76c455b commit 7aafb1d
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 9 deletions.
75 changes: 75 additions & 0 deletions test/integration/agent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
// +build integration

/*
* Copyright 2021 The Yorkie Authors. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package integration

import (
"context"
"io"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

"github.com/yorkie-team/yorkie/client"
"github.com/yorkie-team/yorkie/pkg/document"
"github.com/yorkie-team/yorkie/test/helper"
)

func TestAgent(t *testing.T) {
t.Run("closing WatchDocument stream on agent shutdown test", func(t *testing.T) {
ctx := context.Background()
agent := helper.TestYorkie()
assert.NoError(t, agent.Start())

cli, err := client.Dial(agent.RPCAddr())
assert.NoError(t, err)
assert.NoError(t, cli.Activate(ctx))

doc := document.New(helper.Collection, t.Name())
assert.NoError(t, cli.Attach(ctx, doc))

wg := sync.WaitGroup{}
wrch := cli.Watch(ctx, doc)

go func() {
for {
select {
case <-ctx.Done():
assert.Fail(t, "unexpected ctx done")
return
case wr := <-wrch:
if wr.Err == io.EOF || status.Code(wr.Err) == codes.Canceled {
peers := wr.PeersMapByDoc[doc.Key().BSONKey()]
assert.Len(t, peers, 0)
wg.Done()
return
}
}
}
}()

wg.Add(1)
assert.NoError(t, agent.Shutdown(true))

wg.Wait()
})
}

17 changes: 12 additions & 5 deletions yorkie/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package rpc

import (
"context"
"fmt"
"net"

Expand All @@ -42,8 +43,9 @@ type Config struct {

// Server is a normal server that processes the logic requested by the client.
type Server struct {
conf *Config
grpcServer *grpc.Server
conf *Config
grpcServer *grpc.Server
yorkieServiceCancel context.CancelFunc
}

// NewServer creates a new instance of Server.
Expand Down Expand Up @@ -73,15 +75,18 @@ func NewServer(conf *Config, be *backend.Backend) (*Server, error) {
opts = append(opts, grpc.Creds(creds))
}

yorkieServiceCtx, yorkieServiceCancel := context.WithCancel(context.Background())

grpcServer := grpc.NewServer(opts...)
healthpb.RegisterHealthServer(grpcServer, health.NewServer())
api.RegisterYorkieServer(grpcServer, newYorkieServer(be))
api.RegisterYorkieServer(grpcServer, newYorkieServer(yorkieServiceCtx, be))
api.RegisterClusterServer(grpcServer, newClusterServer(be))
grpcprometheus.Register(grpcServer)

return &Server{
conf: conf,
grpcServer: grpcServer,
conf: conf,
grpcServer: grpcServer,
yorkieServiceCancel: yorkieServiceCancel,
}, nil
}

Expand All @@ -92,6 +97,8 @@ func (s *Server) Start() error {

// Shutdown shuts down this server.
func (s *Server) Shutdown(graceful bool) {
s.yorkieServiceCancel()

if graceful {
s.grpcServer.GracefulStop()
} else {
Expand Down
14 changes: 10 additions & 4 deletions yorkie/rpc/yorkie_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,16 @@ import (
)

type yorkieServer struct {
backend *backend.Backend
backend *backend.Backend
serviceCtx context.Context
}

// newYorkieServer creates a new instance of yorkieServer
func newYorkieServer(be *backend.Backend) *yorkieServer {
return &yorkieServer{backend: be}
func newYorkieServer(serviceCtx context.Context, be *backend.Backend) *yorkieServer {
return &yorkieServer{
backend: be,
serviceCtx: serviceCtx,
}
}

// ActivateClient activates the given client.
Expand Down Expand Up @@ -320,7 +324,6 @@ func (s *yorkieServer) WatchDocuments(
return err
}

// TODO(hackerwins): unwatchDocs when shutting down the agent.
if err := stream.Send(&api.WatchDocumentsResponse{
Body: &api.WatchDocumentsResponse_Initialization_{
Initialization: &api.WatchDocumentsResponse_Initialization{
Expand All @@ -335,6 +338,9 @@ func (s *yorkieServer) WatchDocuments(

for {
select {
case <-s.serviceCtx.Done():
s.unwatchDocs(docKeys, subscription)
return nil
case <-stream.Context().Done():
s.unwatchDocs(docKeys, subscription)
return nil
Expand Down

0 comments on commit 7aafb1d

Please sign in to comment.