Skip to content

Commit

Permalink
mutableState checkums part#1 (cadence-workflow#2941)
Browse files Browse the repository at this point in the history
  • Loading branch information
venkat1109 authored Jan 9, 2020
1 parent b694784 commit d695915
Show file tree
Hide file tree
Showing 17 changed files with 1,999 additions and 8 deletions.
1,138 changes: 1,138 additions & 0 deletions .gen/go/checksum/checksum.go

Large diffs are not rendered by default.

138 changes: 136 additions & 2 deletions .gen/go/shared/shared.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ THRIFTRW_SRCS = \
idls/thrift/shared.thrift \
idls/thrift/admin.thrift \
idls/thrift/sqlblobs.thrift \
idls/thrift/checksum.thrift \

PROGS = cadence
TEST_TIMEOUT = 20m
Expand Down
78 changes: 78 additions & 0 deletions common/checksum/crc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package checksum

import (
"bytes"
"encoding/binary"
"fmt"
"hash/crc32"

"github.com/uber/cadence/common/codec"
)

// GenerateCRC32 generates an IEEE crc32 checksum on the
// serilized byte array of the given thrift object. The
// serialization proto used will be of type thriftRW
func GenerateCRC32(
payload codec.ThriftObject,
payloadVersion int,
) (Checksum, error) {

encoder := codec.NewThriftRWEncoder()
payloadBytes, err := encoder.Encode(payload)
if err != nil {
return Checksum{}, err
}

crc := crc32.ChecksumIEEE(payloadBytes)
checksum := make([]byte, 4)
binary.BigEndian.PutUint32(checksum, crc)
return Checksum{
Value: checksum,
Version: payloadVersion,
Flavor: FlavorIEEECRC32OverThriftBinary,
}, nil
}

// Verify verifies that the checksum generated from the
// given thrift object matches the specified expected checksum
// Return ErrMismatch when checksums mismatch
func Verify(
payload codec.ThriftObject,
checksum Checksum,
) error {

if !checksum.Flavor.IsValid() || checksum.Flavor != FlavorIEEECRC32OverThriftBinary {
return fmt.Errorf("unknown checksum flavor %v", checksum.Flavor)
}

expected, err := GenerateCRC32(payload, checksum.Version)
if err != nil {
return err
}

if !bytes.Equal(expected.Value, checksum.Value) {
return ErrMismatch
}

return nil
}
79 changes: 79 additions & 0 deletions common/checksum/crc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package checksum

import (
"sync"
"sync/atomic"
"testing"
"time"

"github.com/pborman/uuid"
"github.com/stretchr/testify/assert"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
)

func TestCRC32OverThrift(t *testing.T) {
// note: do not use a struct with map since
// iteration order is not guaranteed in Go and
// so, each call to thrift encode will result in
// different set of serialized bytes
obj := &shared.WorkflowExecutionInfo{
Execution: &shared.WorkflowExecution{
WorkflowId: common.StringPtr(uuid.New()),
RunId: common.StringPtr(uuid.New()),
},
StartTime: common.Int64Ptr(time.Now().UnixNano()),
HistoryLength: common.Int64Ptr(550),
}

parallism := 10
loopCount := 100
successCount := int64(0)

startC := make(chan struct{})
doneWG := sync.WaitGroup{}
doneWG.Add(parallism)

for i := 0; i < parallism; i++ {
go func() {
defer doneWG.Done()
<-startC
for count := 0; count < loopCount; count++ {
csum, err := GenerateCRC32(obj, 1)
if err != nil {
return
}
if err := Verify(obj, csum); err != nil {
return
}
atomic.AddInt64(&successCount, 1)
}
}()
}

close(startC)
success := common.AwaitWaitGroup(&doneWG, time.Second)
assert.True(t, success, "timed out waiting for goroutines to finish")
assert.Equal(t, int64(parallism*loopCount), successCount)
}
56 changes: 56 additions & 0 deletions common/checksum/defs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright (c) 2019 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package checksum

import "errors"

type (
// Checksum represents a checksum value along
// with associated metadata
Checksum struct {
// Version represents version of the payload from
Version int
// which this checksum was derived
Flavor Flavor
// Value is the checksum value
Value []byte
}

// Flavor is an enum type that represents the type of checksum
Flavor int
)

const (
// FlavorUnknown represents an unknown/uninitialized checksum flavor
FlavorUnknown Flavor = iota
// FlavorIEEECRC32OverThriftBinary represents crc32 checksum generated over thriftRW serialized payload
FlavorIEEECRC32OverThriftBinary
maxFlavors
)

// ErrMismatch indicates a checksum verification failure due to
// a derived checksum not being equal to expected checksum
var ErrMismatch = errors.New("checksum mismatch error")

// IsValid returns true if the checksum flavor is valid
func (f Flavor) IsValid() bool {
return f > FlavorUnknown && f < maxFlavors
}
4 changes: 4 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -1647,6 +1647,8 @@ const (
DecisionStartToCloseTimeoutOverrideCount
ReplicationTaskCleanupCount
ReplicationTaskCleanupFailure
MutableStateChecksumMismatch
MutableStateChecksumInvalidated

NumHistoryMetrics
)
Expand Down Expand Up @@ -1978,6 +1980,8 @@ var MetricDefs = map[ServiceIdx]map[int]metricDefinition{
DecisionStartToCloseTimeoutOverrideCount: {metricName: "decision_start_to_close_timeout_overrides", metricType: Counter},
ReplicationTaskCleanupCount: {metricName: "replication_task_cleanup_count", metricType: Counter},
ReplicationTaskCleanupFailure: {metricName: "replication_task_cleanup_failed", metricType: Counter},
MutableStateChecksumMismatch: {metricName: "mutable_state_checksum_mismatch", metricType: Counter},
MutableStateChecksumInvalidated: {metricName: "mutable_state_checksum_invalidated", metricType: Counter},
},
Matching: {
PollSuccessCounter: {metricName: "poll_success"},
Expand Down
5 changes: 5 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"strings"
"time"

"github.com/uber/cadence/common/checksum"

"github.com/pborman/uuid"

"github.com/uber/cadence/.gen/go/replicator"
Expand Down Expand Up @@ -631,6 +633,7 @@ type (
ReplicationState *ReplicationState
BufferedEvents []*workflow.HistoryEvent
VersionHistories *VersionHistories
Checksum checksum.Checksum
}

// ActivityInfo details.
Expand Down Expand Up @@ -881,6 +884,7 @@ type (
TimerTasks []Task

Condition int64
Checksum checksum.Checksum
}

// WorkflowSnapshot is used as generic workflow execution state snapshot
Expand All @@ -902,6 +906,7 @@ type (
TimerTasks []Task

Condition int64
Checksum checksum.Checksum
}

// DeleteWorkflowExecutionRequest is used to delete a workflow execution
Expand Down
6 changes: 4 additions & 2 deletions common/resource/resourceTest.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,16 +161,18 @@ func NewTest(
membershipMonitor.EXPECT().GetResolver(common.HistoryServiceName).Return(historyServiceResolver, nil).AnyTimes()
membershipMonitor.EXPECT().GetResolver(common.WorkerServiceName).Return(workerServiceResolver, nil).AnyTimes()

scope := tally.NewTestScope("test", nil)

return &Test{
MetricsScope: tally.NoopScope,
MetricsScope: scope,
ClusterMetadata: cluster.NewMockMetadata(controller),

// other common resources

DomainCache: cache.NewMockDomainCache(controller),
TimeSource: clock.NewRealTimeSource(),
PayloadSerializer: persistence.NewPayloadSerializer(),
MetricsClient: metrics.NewClient(tally.NoopScope, serviceMetricsIndex),
MetricsClient: metrics.NewClient(scope, serviceMetricsIndex),
ArchivalMetadata: &archiver.MockArchivalMetadata{},
ArchiverProvider: &provider.MockArchiverProvider{},

Expand Down
9 changes: 9 additions & 0 deletions common/service/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ var keys = map[Key]string{
EnableConsistentQuery: "history.EnableConsistentQuery",
EnableConsistentQueryByDomain: "history.EnableConsistentQueryByDomain",
MaxBufferedQueryCount: "history.MaxBufferedQueryCount",
MutableStateChecksumGenProbability: "history.mutableStateChecksumGenProbability",
MutableStateChecksumVerifyProbability: "history.mutableStateChecksumVerifyProbability",
MutableStateChecksumInvalidateBefore: "history.mutableStateChecksumInvalidateBefore",

WorkerPersistenceMaxQPS: "worker.persistenceMaxQPS",
WorkerReplicatorMetaTaskConcurrency: "worker.replicatorMetaTaskConcurrency",
Expand Down Expand Up @@ -623,6 +626,12 @@ const (
EnableConsistentQueryByDomain
// MaxBufferedQueryCount indicates the maximum number of queries which can be buffered at a given time for a single workflow
MaxBufferedQueryCount
// MutableStateChecksumGenProbability is the probability [0-100] that checksum will be generated for mutable state
MutableStateChecksumGenProbability
// MutableStateChecksumVerifyProbability is the probability [0-100] that checksum will be verified for mutable state
MutableStateChecksumVerifyProbability
// MutableStateChecksumInvalidateBefore is the epoch timestamp before which all checksums are to be discarded
MutableStateChecksumInvalidateBefore

// lastKeyForTest must be the last one in this const group for testing purpose
lastKeyForTest
Expand Down
9 changes: 9 additions & 0 deletions common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/json"
"fmt"
"math/rand"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -360,6 +361,14 @@ func MaxInt(a, b int) int {
return b
}

// SortInt64Slice sorts the given int64 slice.
// Sort is not guaranteed to be stable.
func SortInt64Slice(slice []int64) {
sort.Slice(slice, func(i int, j int) bool {
return slice[i] < slice[j]
})
}

// ValidateRetryPolicy validates a retry policy
func ValidateRetryPolicy(policy *workflow.RetryPolicy) error {
if policy == nil {
Expand Down
2 changes: 1 addition & 1 deletion idls
Submodule idls updated from 56ca0b to a62e74
Loading

0 comments on commit d695915

Please sign in to comment.