Skip to content

Commit

Permalink
Merge "Complete chaincode execution on stream termination"
Browse files Browse the repository at this point in the history
  • Loading branch information
mastersingh24 authored and Gerrit Code Review committed Nov 9, 2019
2 parents 5665cf5 + fbb4c89 commit ffa3335
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 7 deletions.
25 changes: 20 additions & 5 deletions core/chaincode/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ type Handler struct {
UUIDGenerator UUIDGenerator
// AppConfig is used to retrieve the application config for a channel
AppConfig ApplicationConfigRetriever
// Metrics holds chaincode handler metrics
Metrics *HandlerMetrics

// state holds the current handler state. It will be created, established, or
// ready.
Expand All @@ -135,8 +137,10 @@ type Handler struct {
chatStream ccintf.ChaincodeStream
// errChan is used to communicate errors from the async send to the receive loop
errChan chan error
// Metrics holds chaincode handler metrics
Metrics *HandlerMetrics
// mutex is used to serialze the stream closed chan.
mutex sync.Mutex
// streamDoneChan is closed when the chaincode stream terminates.
streamDoneChan chan struct{}
}

// handleMessage is called by ProcessStream to dispatch messages.
Expand Down Expand Up @@ -342,9 +346,20 @@ func (h *Handler) deregister() {
h.Registry.Deregister(h.chaincodeID)
}

func (h *Handler) streamDone() <-chan struct{} {
h.mutex.Lock()
defer h.mutex.Unlock()
return h.streamDoneChan
}

func (h *Handler) ProcessStream(stream ccintf.ChaincodeStream) error {
defer h.deregister()

h.mutex.Lock()
h.streamDoneChan = make(chan struct{})
h.mutex.Unlock()
defer close(h.streamDoneChan)

h.chatStream = stream
h.errChan = make(chan error, 1)

Expand Down Expand Up @@ -1198,9 +1213,9 @@ func (h *Handler) Execute(txParams *ccprovider.TransactionParams, namespace stri
// are typically treated as error
case <-time.After(timeout):
err = errors.New("timeout expired while executing transaction")
h.Metrics.ExecuteTimeouts.With(
"chaincode", h.chaincodeID,
).Add(1)
h.Metrics.ExecuteTimeouts.With("chaincode", h.chaincodeID).Add(1)
case <-h.streamDone():
err = errors.New("chaincode stream terminated")
}

return ccresp, err
Expand Down
8 changes: 8 additions & 0 deletions core/chaincode/handler_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,11 @@ func SetHandlerChaincodeID(h *Handler, chaincodeID string) {
func SetHandlerChatStream(h *Handler, chatStream ccintf.ChaincodeStream) {
h.chatStream = chatStream
}

func StreamDone(h *Handler) <-chan struct{} {
return h.streamDone()
}

func SetStreamDoneChan(h *Handler, ch chan struct{}) {
h.streamDoneChan = ch
}
36 changes: 34 additions & 2 deletions core/chaincode/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2539,6 +2539,23 @@ var _ = Describe("Handler", func() {
})
})

Context("when the chaincode stream terminates", func() {
It("returns an error", func() {
streamDoneChan := make(chan struct{})
chaincode.SetStreamDoneChan(handler, streamDoneChan)

errCh := make(chan error, 1)
go func() {
_, err := handler.Execute(txParams, "chaincode-name", incomingMessage, time.Hour)
errCh <- err
}()
Consistently(errCh).ShouldNot(Receive())

close(streamDoneChan)
Eventually(errCh).Should(Receive(MatchError("chaincode stream terminated")))
})
})

Context("when execute times out", func() {
It("returns an error", func() {
errCh := make(chan error, 1)
Expand Down Expand Up @@ -2716,6 +2733,22 @@ var _ = Describe("Handler", func() {
Eventually(fakeChatStream.RecvCallCount).Should(Equal(100))
})

It("manages the stream done channel", func() {
releaseChan := make(chan struct{})
fakeChatStream.RecvStub = func() (*pb.ChaincodeMessage, error) {
<-releaseChan
return nil, errors.New("cc-went-away")
}
go handler.ProcessStream(fakeChatStream)
Eventually(fakeChatStream.RecvCallCount).Should(Equal(1))

streamDoneChan := chaincode.StreamDone(handler)
Consistently(streamDoneChan).ShouldNot(Receive())

close(releaseChan)
Eventually(streamDoneChan).Should(BeClosed())
})

Context("when receive fails with an io.EOF", func() {
BeforeEach(func() {
fakeChatStream.RecvReturns(nil, io.EOF)
Expand Down Expand Up @@ -2817,8 +2850,7 @@ var _ = Describe("Handler", func() {
Context("when an async error is sent", func() {
var (
incomingMessage *pb.ChaincodeMessage

recvChan chan *pb.ChaincodeMessage
recvChan chan *pb.ChaincodeMessage
)

BeforeEach(func() {
Expand Down

0 comments on commit ffa3335

Please sign in to comment.