diff --git a/.gen/go/shared/shared.go b/.gen/go/shared/shared.go index f4b16c23975..27ee57bc567 100644 --- a/.gen/go/shared/shared.go +++ b/.gen/go/shared/shared.go @@ -1,17 +1,17 @@ // The MIT License (MIT) -// +// // Copyright (c) 2017 Uber Technologies, Inc. -// +// // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal // in the Software without restriction, including without limitation the rights // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell // copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: -// +// // The above copyright notice and this permission notice shall be included in all // copies or substantial portions of the Software. -// +// // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE diff --git a/service/history/nDCEventsReapplier.go b/service/history/nDCEventsReapplier.go new file mode 100644 index 00000000000..adc91d2603d --- /dev/null +++ b/service/history/nDCEventsReapplier.go @@ -0,0 +1,92 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination nDCEventsReapplier_mock.go + +package history + +import ( + ctx "context" + + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/metrics" +) + +type ( + nDCEventsReapplier interface { + reapplyEvents(ctx ctx.Context, msBuilder mutableState, historyEvents []*shared.HistoryEvent) error + } + + nDCEventsReapplierImpl struct { + metricsClient metrics.Client + logger log.Logger + } +) + +func newNDCEventsReapplier( + metricsClient metrics.Client, + logger log.Logger, +) nDCEventsReapplier { + + return &nDCEventsReapplierImpl{ + metricsClient: metricsClient, + logger: logger, + } +} + +func (r *nDCEventsReapplierImpl) reapplyEvents( + ctx ctx.Context, + msBuilder mutableState, + historyEvents []*shared.HistoryEvent, +) error { + reapplyEvents := []*shared.HistoryEvent{} + // TODO: need to implement Reapply policy + for _, event := range historyEvents { + switch event.GetEventType() { + case shared.EventTypeWorkflowExecutionSignaled: + reapplyEvents = append(reapplyEvents, event) + } + } + + if len(reapplyEvents) == 0 { + return nil + } + + if !msBuilder.IsWorkflowExecutionRunning() { + // TODO when https://github.com/uber/cadence/issues/2420 is finished + // reset to workflow finish event + // ignore this case for now + return nil + } + + // TODO: need to have signal deduplicate logic + for _, event := range reapplyEvents { + signal := event.GetWorkflowExecutionSignaledEventAttributes() + if _, err := msBuilder.AddWorkflowExecutionSignaled( + signal.GetSignalName(), + signal.GetInput(), + signal.GetIdentity(), + ); err != nil { + return err + } + } + return nil +} diff --git a/service/history/nDCEventsReapplier_test.go b/service/history/nDCEventsReapplier_test.go new file mode 100644 index 00000000000..696bb43b730 --- /dev/null +++ b/service/history/nDCEventsReapplier_test.go @@ -0,0 +1,85 @@ +// Copyright (c) 2019 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package history + +import ( + "context" + "testing" + + "github.com/pborman/uuid" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "github.com/uber-go/tally" + "github.com/uber/cadence/.gen/go/shared" + "github.com/uber/cadence/common" + "github.com/uber/cadence/common/log/loggerimpl" + "github.com/uber/cadence/common/metrics" + "github.com/uber/cadence/common/persistence" +) + +type ( + nDCEventReapplicationSuite struct { + suite.Suite + nDCReapplication nDCEventsReapplier + } +) + +func TestNDCEventReapplicationSuite(t *testing.T) { + s := new(nDCEventReapplicationSuite) + suite.Run(t, s) +} + +func (s *nDCEventReapplicationSuite) SetupTest() { + logger := loggerimpl.NewDevelopmentForTest(s.Suite) + metricsClient := metrics.NewClient(tally.NoopScope, metrics.History) + s.nDCReapplication = newNDCEventsReapplier( + metricsClient, + logger, + ) +} + +func (s *nDCEventReapplicationSuite) TestReapplyEvents() { + execution := &persistence.WorkflowExecutionInfo{ + DomainID: uuid.New(), + } + event := &shared.HistoryEvent{ + EventId: common.Int64Ptr(1), + EventType: common.EventTypePtr(shared.EventTypeWorkflowExecutionSignaled), + WorkflowExecutionSignaledEventAttributes: &shared.WorkflowExecutionSignaledEventAttributes{ + Identity: common.StringPtr("test"), + SignalName: common.StringPtr("signal"), + Input: []byte{}, + }, + } + msBuilderCurrent := &mockMutableState{} + msBuilderCurrent.On("GetLastWriteVersion").Return(int64(1), nil) + msBuilderCurrent.On("UpdateReplicationStateVersion", int64(1), true).Return() + msBuilderCurrent.On("GetExecutionInfo").Return(execution) + msBuilderCurrent.On("AddWorkflowExecutionSignaled", mock.Anything, mock.Anything, mock.Anything).Return(event, nil).Once() + msBuilderCurrent.On("IsWorkflowExecutionRunning").Return(true) + + events := []*shared.HistoryEvent{ + {EventType: common.EventTypePtr(shared.EventTypeWorkflowExecutionStarted)}, + event, + } + err := s.nDCReapplication.reapplyEvents(context.Background(), msBuilderCurrent, events) + s.NoError(err) +}