Skip to content

Commit

Permalink
[FABG-954] Cancel stream context when event client closes (hyperledge…
Browse files Browse the repository at this point in the history
…r#55)

This update ensures that the GRPC stream context is cancelled when the Deliver client connection is closed.

Signed-off-by: Bob Stasyszyn <[email protected]>
  • Loading branch information
bstasyszyn authored Feb 22, 2020
1 parent ae164c9 commit ff3bdd7
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 12 deletions.
9 changes: 7 additions & 2 deletions pkg/fab/comm/streamconnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ import (
)

// StreamProvider creates a GRPC stream
type StreamProvider func(conn *grpc.ClientConn) (grpc.ClientStream, error)
type StreamProvider func(conn *grpc.ClientConn) (grpc.ClientStream, func(), error)

// StreamConnection manages the GRPC connection and client stream
type StreamConnection struct {
*GRPCConnection
chConfig fab.ChannelCfg
stream grpc.ClientStream
cancel func()
lock sync.Mutex
}

Expand All @@ -38,7 +39,7 @@ func NewStreamConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamP
return nil, err
}

stream, err := streamProvider(conn.conn)
stream, cancel, err := streamProvider(conn.conn)
if err != nil {
conn.commManager.ReleaseConn(conn.conn)
return nil, errors.Wrapf(err, "could not create stream to %s", url)
Expand Down Expand Up @@ -70,6 +71,7 @@ func NewStreamConnection(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamP
GRPCConnection: conn,
chConfig: chConfig,
stream: stream,
cancel: cancel,
}, nil
}

Expand All @@ -88,6 +90,9 @@ func (c *StreamConnection) Close() {
}

logger.Debug("Closing stream....")

c.cancel()

if err := c.stream.CloseSend(); err != nil {
logger.Warnf("error closing GRPC stream: %s", err)
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/fab/comm/streamconnection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ import (
"google.golang.org/grpc"
)

var testStream = func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) {
return pb.NewDeliverClient(grpcconn).Deliver(context.Background())
var testStream = func(grpcconn *grpc.ClientConn) (grpc.ClientStream, func(), error) {
ctx, cancel := context.WithCancel(context.Background())
stream, err := pb.NewDeliverClient(grpcconn).Deliver(ctx)
return stream, cancel, err
}

var invalidStream = func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) {
return nil, errors.New("simulated error creating stream")
var invalidStream = func(grpcconn *grpc.ClientConn) (grpc.ClientStream, func(), error) {
return nil, nil, errors.New("simulated error creating stream")
}

func TestStreamConnection(t *testing.T) {
Expand Down
16 changes: 10 additions & 6 deletions pkg/fab/events/deliverclient/connection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,21 @@ type DeliverConnection struct {
}

// StreamProvider creates a deliver stream
type StreamProvider func(pb.DeliverClient) (deliverStream, error)
type StreamProvider func(pb.DeliverClient) (stream deliverStream, cancel func(), err error)

var (
// Deliver creates a Deliver stream
Deliver = func(client pb.DeliverClient) (deliverStream, error) {
return client.Deliver(context.Background())
Deliver = func(client pb.DeliverClient) (deliverStream, func(), error) {
ctx, cancel := context.WithCancel(context.Background())
stream, err := client.Deliver(ctx)
return stream, cancel, err
}

// DeliverFiltered creates a DeliverFiltered stream
DeliverFiltered = func(client pb.DeliverClient) (deliverStream, error) {
return client.DeliverFiltered(context.Background())
DeliverFiltered = func(client pb.DeliverClient) (deliverStream, func(), error) {
ctx, cancel := context.WithCancel(context.Background())
stream, err := client.DeliverFiltered(ctx)
return stream, cancel, err
}
)

Expand All @@ -61,7 +65,7 @@ func New(ctx fabcontext.Client, chConfig fab.ChannelCfg, streamProvider StreamPr
logger.Debugf("Connecting to %s...", url)
connect, err := comm.NewStreamConnection(
ctx, chConfig,
func(grpcconn *grpc.ClientConn) (grpc.ClientStream, error) {
func(grpcconn *grpc.ClientConn) (grpc.ClientStream, func(), error) {
return streamProvider(pb.NewDeliverClient(grpcconn))
},
url, opts...,
Expand Down

0 comments on commit ff3bdd7

Please sign in to comment.