Skip to content

Commit

Permalink
Add fixer impl (cadence-workflow#3291)
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjdawson2016 authored May 29, 2020
1 parent c854872 commit 950591c
Show file tree
Hide file tree
Showing 8 changed files with 809 additions and 32 deletions.
12 changes: 6 additions & 6 deletions service/worker/scanner/executions/common/blobstoreIterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,22 @@ type (
func NewBlobstoreIterator(
client blobstore.Client,
keys Keys,
) ExecutionIterator {
) ScanOutputIterator {
return &blobstoreIterator{
itr: pagination.NewIterator(keys.MinPage, getBlobstoreFetchPageFn(client, keys)),
}
}

// Next returns the next Execution
func (i *blobstoreIterator) Next() (*Execution, error) {
// Next returns the next ScanOutputEntity
func (i *blobstoreIterator) Next() (*ScanOutputEntity, error) {
exec, err := i.itr.Next()
if exec != nil {
return exec.(*Execution), err
return exec.(*ScanOutputEntity), err
}
return nil, err
}

// HasNext returns true if there is a next Execution false otherwise
// HasNext returns true if there is a next ScanOutputEntity false otherwise
func (i *blobstoreIterator) HasNext() bool {
return i.itr.HasNext()
}
Expand Down Expand Up @@ -90,7 +90,7 @@ func getBlobstoreFetchPageFn(
if err := ValidateExecution(&soe.Execution); err != nil {
return pagination.Page{}, err
}
executions = append(executions, &soe.Execution)
executions = append(executions, &soe)
}
var nextPageToken interface{} = index + 1
if nextPageToken.(int) > keys.MaxPage {
Expand Down
8 changes: 4 additions & 4 deletions service/worker/scanner/executions/common/blobstoreWriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,14 @@ type (
blobstoreWriter struct {
writer pagination.Writer
uuid string
extension string
extension Extension
}
)

// NewBlobstoreWriter constructs a new blobstore writer
func NewBlobstoreWriter(
uuid string,
extension string,
extension Extension,
client blobstore.Client,
flushThreshold int,
) ExecutionWriter {
Expand Down Expand Up @@ -84,7 +84,7 @@ func (bw *blobstoreWriter) FlushedKeys() *Keys {

func getBlobstoreWriteFn(
uuid string,
extension string,
extension Extension,
client blobstore.Client,
) pagination.WriteFn {
return func(page pagination.Page) (pagination.PageToken, error) {
Expand Down Expand Up @@ -123,6 +123,6 @@ func getBlobstoreShouldFlushFn(
}
}

func pageNumberToKey(uuid string, extension string, pageNum int) string {
func pageNumberToKey(uuid string, extension Extension, pageNum int) string {
return fmt.Sprintf("%v_%v.%v", uuid, pageNum, extension)
}
12 changes: 11 additions & 1 deletion service/worker/scanner/executions/common/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,17 @@ type (
// or converting store entry to Execution will result in an error after which iterator cannot be used.
Next() (*Execution, error)
// HasNext indicates if the iterator has a next element. If HasNext is true
// it is guaranteed that Next will return a nil error and a non-nil ExecutionIteratorResult.
// it is guaranteed that Next will return a nil error and a non-nil Execution.
HasNext() bool
}

// ScanOutputIterator gets ScanOutputEntities from underlying store
ScanOutputIterator interface {
// Next returns the next ScanOutputEntity found. Any error reading from underlying store
// or converting store entry to ScanOutputEntity will result in an error after which iterator cannot be used.
Next() (*ScanOutputEntity, error)
// HasNext indicates if the iterator has a next element. If HasNext is true it is
// guaranteed that Next will return a nil error and non-nil ScanOutputEntity.
HasNext() bool
}

Expand Down
52 changes: 52 additions & 0 deletions service/worker/scanner/executions/common/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions service/worker/scanner/executions/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,37 +52,37 @@ const (
// CheckResultTypeFailed indicates a failure occurred while attempting to run check
CheckResultTypeFailed CheckResultType = "failed"
// CheckResultTypeCorrupted indicates check successfully ran and detected a corruption
CheckResultTypeCorrupted = "corrupted"
CheckResultTypeCorrupted CheckResultType = "corrupted"
// CheckResultTypeHealthy indicates check successfully ran and detected no corruption
CheckResultTypeHealthy = "healthy"
CheckResultTypeHealthy CheckResultType = "healthy"

// FixResultTypeSkipped indicates that fix skipped execution
FixResultTypeSkipped FixResultType = "skipped"
// FixResultTypeFixed indicates that fix successfully fixed an execution
FixResultTypeFixed = "fixed"
FixResultTypeFixed FixResultType = "fixed"
// FixResultTypeFailed indicates that fix attempted to fix an execution but failed to do so
FixResultTypeFailed = "failed"
FixResultTypeFailed FixResultType = "failed"

// HistoryExistsInvariantType asserts that history must exist if concrete execution exists
HistoryExistsInvariantType InvariantType = "history_exists"
// ValidFirstEventInvariantType asserts that the first event in a history must be of a specific form
ValidFirstEventInvariantType = "valid_first_event"
ValidFirstEventInvariantType InvariantType = "valid_first_event"
// OpenCurrentExecutionInvariantType asserts that an open concrete execution must have a valid current execution
OpenCurrentExecutionInvariantType = "open_current_execution"
OpenCurrentExecutionInvariantType InvariantType = "open_current_execution"

// InvariantCollectionMutableState is the collection of invariants relating to mutable state
InvariantCollectionMutableState InvariantCollection = iota
InvariantCollectionMutableState InvariantCollection = 0
// InvariantCollectionHistory is the collection of invariants relating to history
InvariantCollectionHistory
InvariantCollectionHistory InvariantCollection = 1

// SkippedExtension is the extension for files which contain skips
SkippedExtension Extension = "skipped"
// FailedExtension is the extension for files which contain failures
FailedExtension = "failed"
FailedExtension Extension = "failed"
// FixedExtension is the extension for files which contain fixes
FixedExtension = "fixed"
FixedExtension Extension = "fixed"
// CorruptedExtension is the extension for files which contain corruptions
CorruptedExtension = "corrupted"
CorruptedExtension Extension = "corrupted"
)

// The following are types related to Invariant.
Expand Down Expand Up @@ -212,7 +212,7 @@ type (
UUID string
MinPage int
MaxPage int
Extension string
Extension Extension
}
)

Expand Down
18 changes: 9 additions & 9 deletions service/worker/scanner/executions/common/writerIterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *WriterIteratorSuite) TestWriterIterator() {
pr := NewPersistenceRetryer(getMockExecutionManager(10, 10), nil)
pItr := NewPersistenceIterator(pr, executionPageSize, testShardID)
uuid := "uuid"
extension := "test"
extension := Extension("test")
outputDir, err := ioutil.TempDir("", "TestWriterIterator")
s.NoError(err)
defer os.RemoveAll(outputDir)
Expand All @@ -72,32 +72,32 @@ func (s *WriterIteratorSuite) TestWriterIterator() {
blobstore, err := filestore.NewFilestoreClient(cfg)
s.NoError(err)
blobstoreWriter := NewBlobstoreWriter(uuid, extension, blobstore, 10)
var executions []*Execution
var outputs []*ScanOutputEntity
for pItr.HasNext() {
exec, err := pItr.Next()
s.NoError(err)
executions = append(executions, exec)
err = blobstoreWriter.Add(&ScanOutputEntity{
soe := &ScanOutputEntity{
Execution: *exec,
})
s.NoError(err)
}
outputs = append(outputs, soe)
s.NoError(blobstoreWriter.Add(soe))
}
s.NoError(blobstoreWriter.Flush())
s.Len(executions, 100)
s.Len(outputs, 100)
s.False(pItr.HasNext())
_, err = pItr.Next()
s.Equal(pagination.ErrIteratorFinished, err)
flushedKeys := blobstoreWriter.FlushedKeys()
s.Equal(uuid, flushedKeys.UUID)
s.Equal(0, flushedKeys.MinPage)
s.Equal(9, flushedKeys.MaxPage)
s.Equal("test", flushedKeys.Extension)
s.Equal(Extension("test"), flushedKeys.Extension)
blobstoreItr := NewBlobstoreIterator(blobstore, *flushedKeys)
i := 0
for blobstoreItr.HasNext() {
exec, err := blobstoreItr.Next()
s.NoError(err)
s.Equal(*executions[i], *exec)
s.Equal(*outputs[i], *exec)
i++
}
}
Expand Down
Loading

0 comments on commit 950591c

Please sign in to comment.