Skip to content

Commit

Permalink
Move history service logic into separate packages (part 1) (cadence-w…
Browse files Browse the repository at this point in the history
…orkflow#3180)

* Move history config and events cache to separate package
  • Loading branch information
yycptt authored Apr 10, 2020
1 parent 651a26b commit e62d76e
Show file tree
Hide file tree
Showing 58 changed files with 854 additions and 683 deletions.
8 changes: 4 additions & 4 deletions common/membership/interfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 51 additions & 0 deletions common/task/priority.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
// Copyright (c) 2020 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package task

const (
numBitsPerLevel = 3
)

const (
// HighPriorityClass is the priority class for high priority tasks
HighPriorityClass = iota << numBitsPerLevel
// DefaultPriorityClass is the priority class for default priority tasks
DefaultPriorityClass
// LowPriorityClass is the priority class for low priority tasks
LowPriorityClass
)

const (
// HighPrioritySubclass is the priority subclass for high priority tasks
HighPrioritySubclass = iota
// DefaultPrioritySubclass is the priority subclass for high priority tasks
DefaultPrioritySubclass
// LowPrioritySubclass is the priority subclass for high priority tasks
LowPrioritySubclass
)

// GetTaskPriority returns priority given a task's priority class and subclass
func GetTaskPriority(
class int,
subClass int,
) int {
return class | subClass
}
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,6 @@ go.uber.org/atomic v1.5.0 h1:OI5t8sDa1Or+q8AeE+yKeB/SDYioSHAgcVljj9JIETY=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.5.1 h1:rsqfU5vBkVknbhUGbAUwQKR2H4ItV8tjJ+6kJX4cxHM=
go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/cadence v0.9.1-0.20200128004345-b282629d5ba9 h1:pmI3CA6StKZ56BfVQBdqjRbOScrgKY9zAQHw2QiYplg=
go.uber.org/cadence v0.9.1-0.20200128004345-b282629d5ba9/go.mod h1:VQIAZxvehjwln/xaRv1ghzcxo7TjCHEetZQkc8NlrvE=
go.uber.org/cadence v0.11.3-0.20200408211929-2e8992a5ef2d h1:65lxEqVKw7HDFojwOrc7CaIhaQQKQ2UJonogoI4nCKg=
go.uber.org/cadence v0.11.3-0.20200408211929-2e8992a5ef2d/go.mod h1:VQIAZxvehjwln/xaRv1ghzcxo7TjCHEetZQkc8NlrvE=
go.uber.org/dig v1.7.0 h1:E5/L92iQTNJTjfgJF2KgU+/JpMaiuvK2DHLBj0+kSZk=
Expand Down
2 changes: 1 addition & 1 deletion idls
Submodule idls updated from 05e891 to b07c62
2 changes: 1 addition & 1 deletion service/frontend/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination interface_mock.go
//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination interface_mock.go -self_package github.com/uber/cadence/service/frontend

package frontend

Expand Down
395 changes: 395 additions & 0 deletions service/history/config/config.go

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions service/history/conflictResolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/service/history/events"
)

type (
Expand All @@ -53,7 +54,7 @@ type (
mockTxProcessor *MocktransferQueueProcessor
mockReplicationProcessor *MockReplicatorQueueProcessor
mockTimerProcessor *MocktimerQueueProcessor
mockEventsCache *MockeventsCache
mockEventsCache *events.MockCache
mockDomainCache *cache.MockDomainCache
mockClusterMetadata *cluster.MockMetadata

Expand Down Expand Up @@ -105,7 +106,7 @@ func (s *conflictResolverSuite) SetupTest() {
s.mockClusterMetadata = s.mockShard.resource.ClusterMetadata
s.mockEventsCache = s.mockShard.mockEventsCache
s.mockClusterMetadata.EXPECT().GetCurrentClusterName().Return(cluster.TestCurrentClusterName).AnyTimes()
s.mockEventsCache.EXPECT().putEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()
s.mockEventsCache.EXPECT().PutEvent(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes()

s.logger = s.mockShard.GetLogger()

Expand Down
3 changes: 2 additions & 1 deletion service/history/decisionChecker.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/service/history/config"
)

type (
Expand Down Expand Up @@ -68,7 +69,7 @@ const (

func newDecisionAttrValidator(
domainCache cache.DomainCache,
config *Config,
config *config.Config,
logger log.Logger,
) *decisionAttrValidator {
return &decisionAttrValidator{
Expand Down
3 changes: 2 additions & 1 deletion service/history/decisionChecker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service/dynamicconfig"
"github.com/uber/cadence/service/history/config"
)

type (
Expand Down Expand Up @@ -70,7 +71,7 @@ func (s *decisionAttrValidatorSuite) SetupTest() {

s.controller = gomock.NewController(s.T())
s.mockDomainCache = cache.NewMockDomainCache(s.controller)
config := &Config{
config := &config.Config{
MaxIDLengthLimit: dynamicconfig.GetIntPropertyFn(1000),
ValidSearchAttributes: dynamicconfig.GetMapPropertyFn(definition.GetDefaultIndexedKeys()),
SearchAttributesNumberOfKeysLimit: dynamicconfig.GetIntPropertyFilteredByDomain(100),
Expand Down
6 changes: 3 additions & 3 deletions service/history/decisionHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,19 @@ import (
"fmt"
"time"

"github.com/uber/cadence/common/client"

"go.uber.org/yarpc"

h "github.com/uber/cadence/.gen/go/history"
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/client"
"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/service/history/config"
)

type (
Expand All @@ -55,7 +55,7 @@ type (

decisionHandlerImpl struct {
currentClusterName string
config *Config
config *config.Config
shard ShardContext
timeSource clock.TimeSource
historyEngine *historyEngineImpl
Expand Down
5 changes: 3 additions & 2 deletions service/history/decisionTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/service/history/config"
)

type (
Expand Down Expand Up @@ -59,7 +60,7 @@ type (
logger log.Logger
domainCache cache.DomainCache
metricsClient metrics.Client
config *Config
config *config.Config
}
)

Expand All @@ -73,7 +74,7 @@ func newDecisionTaskHandler(
logger log.Logger,
domainCache cache.DomainCache,
metricsClient metrics.Client,
config *Config,
config *config.Config,
) *decisionTaskHandlerImpl {

return &decisionTaskHandlerImpl{
Expand Down
90 changes: 56 additions & 34 deletions service/history/eventsCache.go → service/history/events/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,54 +18,55 @@
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

//go:generate mockgen -copyright_file ../../LICENSE -package $GOPACKAGE -source $GOFILE -destination eventsCache_mock.go
//go:generate mockgen -copyright_file ../../../LICENSE -package $GOPACKAGE -source $GOFILE -destination cache_mock.go

package history
package events

import (
"time"

"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cache"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/metrics"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/service/history/config"
)

type (
eventsCache interface {
getEvent(
// Cache caches workflow history event
Cache interface {
GetEvent(
domainID string,
workflowID string,
runID string,
firstEventID int64,
eventID int64,
branchToken []byte,
) (*shared.HistoryEvent, error)
putEvent(
PutEvent(
domainID string,
workflowID string,
runID string,
eventID int64,
event *shared.HistoryEvent,
)
deleteEvent(
DeleteEvent(
domainID string,
workflowID string,
runID string,
eventID int64,
)
}

eventsCacheImpl struct {
cacheImpl struct {
cache.Cache
eventsV2Mgr persistence.HistoryManager
disabled bool
logger log.Logger
metricsClient metrics.Client
shardID *int
historyManager persistence.HistoryManager
disabled bool
logger log.Logger
metricsClient metrics.Client
shardID *int
}

eventKey struct {
Expand All @@ -80,28 +81,49 @@ var (
errEventNotFoundInBatch = &shared.InternalServiceError{Message: "History event not found within expected batch"}
)

var _ eventsCache = (*eventsCacheImpl)(nil)

func newEventsCache(shardCtx ShardContext) eventsCache {
config := shardCtx.GetConfig()
shardID := common.IntPtr(shardCtx.GetShardID())
return newEventsCacheWithOptions(config.EventsCacheInitialSize(), config.EventsCacheMaxSize(), config.EventsCacheTTL(),
shardCtx.GetHistoryManager(), false, shardCtx.GetLogger(), shardCtx.GetMetricsClient(), shardID)
var _ Cache = (*cacheImpl)(nil)

// NewCache creates a new events cache
func NewCache(
shardID int,
historyManager persistence.HistoryManager,
config *config.Config,
logger log.Logger,
metricsClient metrics.Client,
) Cache {
return newCacheWithOption(
config.EventsCacheInitialSize(),
config.EventsCacheMaxSize(),
config.EventsCacheTTL(),
historyManager,
false,
logger,
metricsClient,
&shardID,
)
}

func newEventsCacheWithOptions(initialSize, maxSize int, ttl time.Duration,
eventsV2Mgr persistence.HistoryManager, disabled bool, logger log.Logger, metrics metrics.Client, shardID *int) *eventsCacheImpl {
func newCacheWithOption(
initialSize int,
maxSize int,
ttl time.Duration,
historyManager persistence.HistoryManager,
disabled bool,
logger log.Logger,
metrics metrics.Client,
shardID *int,
) *cacheImpl {
opts := &cache.Options{}
opts.InitialCapacity = initialSize
opts.TTL = ttl

return &eventsCacheImpl{
Cache: cache.New(maxSize, opts),
eventsV2Mgr: eventsV2Mgr,
disabled: disabled,
logger: logger.WithTags(tag.ComponentEventsCache),
metricsClient: metrics,
shardID: shardID,
return &cacheImpl{
Cache: cache.New(maxSize, opts),
historyManager: historyManager,
disabled: disabled,
logger: logger.WithTags(tag.ComponentEventsCache),
metricsClient: metrics,
shardID: shardID,
}
}

Expand All @@ -114,7 +136,7 @@ func newEventKey(domainID, workflowID, runID string, eventID int64) eventKey {
}
}

func (e *eventsCacheImpl) getEvent(domainID, workflowID, runID string, firstEventID, eventID int64,
func (e *cacheImpl) GetEvent(domainID, workflowID, runID string, firstEventID, eventID int64,
branchToken []byte) (*shared.HistoryEvent, error) {
e.metricsClient.IncCounter(metrics.EventsCacheGetEventScope, metrics.CacheRequests)
sw := e.metricsClient.StartTimer(metrics.EventsCacheGetEventScope, metrics.CacheLatency)
Expand Down Expand Up @@ -146,7 +168,7 @@ func (e *eventsCacheImpl) getEvent(domainID, workflowID, runID string, firstEven
return event, nil
}

func (e *eventsCacheImpl) putEvent(domainID, workflowID, runID string, eventID int64, event *shared.HistoryEvent) {
func (e *cacheImpl) PutEvent(domainID, workflowID, runID string, eventID int64, event *shared.HistoryEvent) {
e.metricsClient.IncCounter(metrics.EventsCachePutEventScope, metrics.CacheRequests)
sw := e.metricsClient.StartTimer(metrics.EventsCachePutEventScope, metrics.CacheLatency)
defer sw.Stop()
Expand All @@ -155,7 +177,7 @@ func (e *eventsCacheImpl) putEvent(domainID, workflowID, runID string, eventID i
e.Put(key, event)
}

func (e *eventsCacheImpl) deleteEvent(domainID, workflowID, runID string, eventID int64) {
func (e *cacheImpl) DeleteEvent(domainID, workflowID, runID string, eventID int64) {
e.metricsClient.IncCounter(metrics.EventsCacheDeleteEventScope, metrics.CacheRequests)
sw := e.metricsClient.StartTimer(metrics.EventsCacheDeleteEventScope, metrics.CacheLatency)
defer sw.Stop()
Expand All @@ -164,15 +186,15 @@ func (e *eventsCacheImpl) deleteEvent(domainID, workflowID, runID string, eventI
e.Delete(key)
}

func (e *eventsCacheImpl) getHistoryEventFromStore(domainID, workflowID, runID string, firstEventID, eventID int64,
func (e *cacheImpl) getHistoryEventFromStore(domainID, workflowID, runID string, firstEventID, eventID int64,
branchToken []byte) (*shared.HistoryEvent, error) {
e.metricsClient.IncCounter(metrics.EventsCacheGetFromStoreScope, metrics.CacheRequests)
sw := e.metricsClient.StartTimer(metrics.EventsCacheGetFromStoreScope, metrics.CacheLatency)
defer sw.Stop()

var historyEvents []*shared.HistoryEvent

response, err := e.eventsV2Mgr.ReadHistoryBranch(&persistence.ReadHistoryBranchRequest{
response, err := e.historyManager.ReadHistoryBranch(&persistence.ReadHistoryBranchRequest{
BranchToken: branchToken,
MinEventID: firstEventID,
MaxEventID: eventID + 1,
Expand Down
Loading

0 comments on commit e62d76e

Please sign in to comment.