Skip to content

Commit

Permalink
Implement of NDC events reapply logic (cadence-workflow#2560)
Browse files Browse the repository at this point in the history
  • Loading branch information
yux0 authored and wxing1292 committed Sep 18, 2019
1 parent 1087510 commit 525a83d
Show file tree
Hide file tree
Showing 3 changed files with 181 additions and 4 deletions.
8 changes: 4 additions & 4 deletions .gen/go/shared/shared.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

92 changes: 92 additions & 0 deletions service/history/nDCEventsReapplier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination nDCEventsReapplier_mock.go

package history

import (
ctx "context"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/metrics"
)

type (
nDCEventsReapplier interface {
reapplyEvents(ctx ctx.Context, msBuilder mutableState, historyEvents []*shared.HistoryEvent) error
}

nDCEventsReapplierImpl struct {
metricsClient metrics.Client
logger log.Logger
}
)

func newNDCEventsReapplier(
metricsClient metrics.Client,
logger log.Logger,
) nDCEventsReapplier {

return &nDCEventsReapplierImpl{
metricsClient: metricsClient,
logger: logger,
}
}

func (r *nDCEventsReapplierImpl) reapplyEvents(
ctx ctx.Context,
msBuilder mutableState,
historyEvents []*shared.HistoryEvent,
) error {
reapplyEvents := []*shared.HistoryEvent{}
// TODO: need to implement Reapply policy
for _, event := range historyEvents {
switch event.GetEventType() {
case shared.EventTypeWorkflowExecutionSignaled:
reapplyEvents = append(reapplyEvents, event)
}
}

if len(reapplyEvents) == 0 {
return nil
}

if !msBuilder.IsWorkflowExecutionRunning() {
// TODO when https://github.com/uber/cadence/issues/2420 is finished
// reset to workflow finish event
// ignore this case for now
return nil
}

// TODO: need to have signal deduplicate logic
for _, event := range reapplyEvents {
signal := event.GetWorkflowExecutionSignaledEventAttributes()
if _, err := msBuilder.AddWorkflowExecutionSignaled(
signal.GetSignalName(),
signal.GetInput(),
signal.GetIdentity(),
); err != nil {
return err
}
}
return nil
}
85 changes: 85 additions & 0 deletions service/history/nDCEventsReapplier_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package history

import (
"context"
"testing"

"github.com/pborman/uuid"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/uber-go/tally"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/log/loggerimpl"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
)

type (
nDCEventReapplicationSuite struct {
suite.Suite
nDCReapplication nDCEventsReapplier
}
)

func TestNDCEventReapplicationSuite(t *testing.T) {
s := new(nDCEventReapplicationSuite)
suite.Run(t, s)
}

func (s *nDCEventReapplicationSuite) SetupTest() {
logger := loggerimpl.NewDevelopmentForTest(s.Suite)
metricsClient := metrics.NewClient(tally.NoopScope, metrics.History)
s.nDCReapplication = newNDCEventsReapplier(
metricsClient,
logger,
)
}

func (s *nDCEventReapplicationSuite) TestReapplyEvents() {
execution := &persistence.WorkflowExecutionInfo{
DomainID: uuid.New(),
}
event := &shared.HistoryEvent{
EventId: common.Int64Ptr(1),
EventType: common.EventTypePtr(shared.EventTypeWorkflowExecutionSignaled),
WorkflowExecutionSignaledEventAttributes: &shared.WorkflowExecutionSignaledEventAttributes{
Identity: common.StringPtr("test"),
SignalName: common.StringPtr("signal"),
Input: []byte{},
},
}
msBuilderCurrent := &mockMutableState{}
msBuilderCurrent.On("GetLastWriteVersion").Return(int64(1), nil)
msBuilderCurrent.On("UpdateReplicationStateVersion", int64(1), true).Return()
msBuilderCurrent.On("GetExecutionInfo").Return(execution)
msBuilderCurrent.On("AddWorkflowExecutionSignaled", mock.Anything, mock.Anything, mock.Anything).Return(event, nil).Once()
msBuilderCurrent.On("IsWorkflowExecutionRunning").Return(true)

events := []*shared.HistoryEvent{
{EventType: common.EventTypePtr(shared.EventTypeWorkflowExecutionStarted)},
event,
}
err := s.nDCReapplication.reapplyEvents(context.Background(), msBuilderCurrent, events)
s.NoError(err)
}

0 comments on commit 525a83d

Please sign in to comment.