Skip to content

Commit

Permalink
Publish replication task to Kafka after reading from replicator queue (
Browse files Browse the repository at this point in the history
…cadence-workflow#632)

Created QueueProcessorBase which has common logic used by both transfer
queue procesor and replication queue processor.

Created replicationQueueProcessor which processes replication task from
replicator queue and publishes it to Kafka.

Bootstrap logic for replicationQueueProcessor to host it within
historyEngine.  Also bootstrap logic to pass in kafka publisher to
history engine.

History engine changes to update replication state on
StartWorkflowExecution and UpdateWorkflowExecution by updating the
failover version from domain and writing it to mutable state.
  • Loading branch information
samarabbas authored Mar 26, 2018
1 parent 115a38a commit 314d222
Show file tree
Hide file tree
Showing 22 changed files with 1,013 additions and 467 deletions.
4 changes: 2 additions & 2 deletions .gen/go/replicator/idl.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 38 additions & 8 deletions .gen/go/replicator/types.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

80 changes: 45 additions & 35 deletions common/logging/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,41 +151,6 @@ func LogDuplicateTransferTaskEvent(lg bark.Logger, taskType int, taskID int64, s
taskID, taskType, scheduleID)
}

// LogTransferQueueProcesorStartingEvent is used to log transfer queue processor starting
func LogTransferQueueProcesorStartingEvent(logger bark.Logger) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: TransferQueueProcessorStarting,
}).Info("Transfer queue processor starting.")
}

// LogTransferQueueProcesorStartedEvent is used to log transfer queue processor started
func LogTransferQueueProcesorStartedEvent(logger bark.Logger) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: TransferQueueProcessorStarted,
}).Info("Transfer queue processor started.")
}

// LogTransferQueueProcesorShuttingDownEvent is used to log transfer queue processing shutting down
func LogTransferQueueProcesorShuttingDownEvent(logger bark.Logger) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: TransferQueueProcessorShuttingDown,
}).Info("Transfer queue processor shutting down.")
}

// LogTransferQueueProcesorShutdownEvent is used to log transfer queue processor shutdown complete
func LogTransferQueueProcesorShutdownEvent(logger bark.Logger) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: TransferQueueProcessorShutdown,
}).Info("Transfer queue processor shutdown.")
}

// LogTransferQueueProcesorShutdownTimedoutEvent is used to log timeout during transfer queue processor shutdown
func LogTransferQueueProcesorShutdownTimedoutEvent(logger bark.Logger) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: TransferQueueProcessorShutdownTimedout,
}).Warn("Transfer queue processor timedout on shutdown.")
}

// LogShardRangeUpdatedEvent is used to log rangeID update for a shard
func LogShardRangeUpdatedEvent(logger bark.Logger, shardID int, rangeID, startSequence, endSequence int64) {
logger.WithFields(bark.Fields{
Expand Down Expand Up @@ -416,3 +381,48 @@ func LogReplicationTaskProcessorShutdownTimedoutEvent(logger bark.Logger) {
TagWorkflowEventID: ReplicationTaskProcessorShutdownTimedout,
}).Warn("Replication task processor timedout on shutdown.")
}

// LogQueueProcesorStartingEvent is used to log queue processor starting
func LogQueueProcesorStartingEvent(logger bark.Logger) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: TransferQueueProcessorStarting,
}).Info("Queue processor starting.")
}

// LogQueueProcesorStartedEvent is used to log queue processor started
func LogQueueProcesorStartedEvent(logger bark.Logger) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: TransferQueueProcessorStarted,
}).Info("Queue processor started.")
}

// LogQueueProcesorShuttingDownEvent is used to log queue processor shutting down
func LogQueueProcesorShuttingDownEvent(logger bark.Logger) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: TransferQueueProcessorShuttingDown,
}).Info("Queue processor shutting down.")
}

// LogQueueProcesorShutdownEvent is used to log transfer queue processor shutdown complete
func LogQueueProcesorShutdownEvent(logger bark.Logger) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: TransferQueueProcessorShutdown,
}).Info("Queue processor shutdown.")
}

// LogQueueProcesorShutdownTimedoutEvent is used to log timeout during transfer queue processor shutdown
func LogQueueProcesorShutdownTimedoutEvent(logger bark.Logger) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: TransferQueueProcessorShutdownTimedout,
}).Warn("Queue processor timedout on shutdown.")
}

// LogTaskProcessingFailedEvent is used to log failures from task processing.
func LogTaskProcessingFailedEvent(logger bark.Logger, taskID int64, taskType int, err error) {
logger.WithFields(bark.Fields{
TagWorkflowEventID: TransferTaskProcessingFailed,
TagTaskID: taskID,
TagTaskType: taskType,
TagWorkflowErr: err,
}).Errorf("Processor failed to process task: %v, type: %v. Error: %v", taskID, taskType, err)
}
1 change: 1 addition & 0 deletions common/logging/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ const (
TagValueHistoryCacheComponent = "history-cache"
TagValueTransferQueueComponent = "transfer-queue-processor"
TagValueTimerQueueComponent = "timer-queue-processor"
TagValueReplicatorQueueComponent = "replicator-queue-processor"
TagValueShardController = "shard-controller"
TagValueMatchingEngineComponent = "matching-engine"
TagValueReplicatorComponent = "replicator"
Expand Down
4 changes: 4 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,10 @@ const (
TimerTaskDeleteHistoryEvent
// HistoryEventNotificationScope is the scope used by shard history event nitification
HistoryEventNotificationScope
// ReplicatorQueueProcessorScope is the scope used by all metric emitted by replicator queue processor
ReplicatorQueueProcessorScope
// ReplicatorTaskHistoryScope is the scope used for history task processing by replicator queue processor
ReplicatorTaskHistoryScope

NumHistoryScopes
)
Expand Down
31 changes: 31 additions & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -1090,6 +1090,26 @@ func (a *HistoryReplicationTask) SetTaskID(id int64) {
a.TaskID = id
}

// GetTaskID returns the task ID for transfer task
func (t *TransferTaskInfo) GetTaskID() int64 {
return t.TaskID
}

// GetTaskType returns the task type for transfer task
func (t *TransferTaskInfo) GetTaskType() int {
return t.TaskType
}

// GetTaskID returns the task ID for replication task
func (t *ReplicationTaskInfo) GetTaskID() int64 {
return t.TaskID
}

// GetTaskType returns the task type for replication task
func (t *ReplicationTaskInfo) GetTaskType() int {
return t.TaskType
}

// NewHistoryEventBatch returns a new instance of HistoryEventBatch
func NewHistoryEventBatch(version int, events []*workflow.HistoryEvent) *HistoryEventBatch {
return &HistoryEventBatch{
Expand Down Expand Up @@ -1125,3 +1145,14 @@ func (config *ClusterReplicationConfig) serialize() map[string]interface{} {
func (config *ClusterReplicationConfig) deserialize(input map[string]interface{}) {
config.ClusterName = input["cluster_name"].(string)
}

// SetSerializedHistoryDefaults sets the version and encoding types to defaults if they
// are missing from persistence. This is purely for backwards compatibility
func SetSerializedHistoryDefaults(history *SerializedHistoryEventBatch) {
if history.Version == 0 {
history.Version = GetDefaultHistoryVersion()
}
if len(history.EncodingType) == 0 {
history.EncodingType = DefaultEncodingType
}
}
20 changes: 14 additions & 6 deletions host/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ import (
workflow "github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/service/history"
"github.com/uber/cadence/service/matching"
Expand All @@ -62,11 +64,13 @@ type (
// override suite.Suite.Assertions with require.Assertions; this means that s.NotNil(nil) will stop the test,
// not merely log an error
*require.Assertions
domainName string
foreignDomainName string
host Cadence
engine wsc.Interface
logger bark.Logger
domainName string
foreignDomainName string
mockMessagingClient messaging.Client
mockProducer messaging.Producer
host Cadence
engine wsc.Interface
logger bark.Logger
suite.Suite
persistence.TestBase
}
Expand Down Expand Up @@ -142,7 +146,11 @@ func (s *integrationSuite) setupTest(enableGlobalDomain bool, isMasterCluster bo

s.setupShards()

s.host = NewCadence(s.ClusterMetadata, s.MetadataManager, s.ShardMgr, s.HistoryMgr, s.ExecutionMgrFactory, s.TaskMgr,
// TODO: Use mock messaging client until we support kafka setup onebox to write end-to-end integration test
s.mockProducer = &mocks.KafkaProducer{}
s.mockMessagingClient = mocks.NewMockMessagingClient(s.mockProducer, nil)

s.host = NewCadence(s.ClusterMetadata, s.mockMessagingClient, s.MetadataManager, s.ShardMgr, s.HistoryMgr, s.ExecutionMgrFactory, s.TaskMgr,
s.VisibilityMgr, testNumberOfHistoryShards, testNumberOfHistoryHosts, s.logger)

s.host.Start()
Expand Down
15 changes: 11 additions & 4 deletions host/onebox.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
fecli "github.com/uber/cadence/client/frontend"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/cluster"
"github.com/uber/cadence/common/messaging"
"github.com/uber/cadence/common/mocks"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/service"
Expand Down Expand Up @@ -72,6 +73,7 @@ type (
numberOfHistoryHosts int
logger bark.Logger
clusterMetadata cluster.Metadata
messagingClient messaging.Client
metadataMgr persistence.MetadataManager
shardMgr persistence.ShardManager
historyMgr persistence.HistoryManager
Expand All @@ -90,15 +92,18 @@ type (
)

// NewCadence returns an instance that hosts full cadence in one process
func NewCadence(clusterMetadata cluster.Metadata, metadataMgr persistence.MetadataManager, shardMgr persistence.ShardManager,
historyMgr persistence.HistoryManager, executionMgrFactory persistence.ExecutionManagerFactory,
taskMgr persistence.TaskManager, visibilityMgr persistence.VisibilityManager,
numberOfHistoryShards, numberOfHistoryHosts int, logger bark.Logger) Cadence {
func NewCadence(clusterMetadata cluster.Metadata, messagingClient messaging.Client,
metadataMgr persistence.MetadataManager, shardMgr persistence.ShardManager, historyMgr persistence.HistoryManager,
executionMgrFactory persistence.ExecutionManagerFactory, taskMgr persistence.TaskManager,
visibilityMgr persistence.VisibilityManager, numberOfHistoryShards, numberOfHistoryHosts int,
logger bark.Logger) Cadence {

return &cadenceImpl{
numberOfHistoryShards: numberOfHistoryShards,
numberOfHistoryHosts: numberOfHistoryHosts,
logger: logger,
clusterMetadata: clusterMetadata,
messagingClient: messagingClient,
metadataMgr: metadataMgr,
visibilityMgr: visibilityMgr,
shardMgr: shardMgr,
Expand Down Expand Up @@ -196,6 +201,7 @@ func (c *cadenceImpl) startFrontend(logger bark.Logger, rpHosts []string, startW
params.MetricScope = tally.NewTestScope(common.FrontendServiceName, make(map[string]string))
params.RingpopFactory = newRingpopFactory(common.FrontendServiceName, rpHosts)
params.ClusterMetadata = c.clusterMetadata
params.MessagingClient = c.messagingClient
params.CassandraConfig.NumHistoryShards = c.numberOfHistoryShards
params.CassandraConfig.Hosts = "127.0.0.1"
params.DynamicConfig = dynamicconfig.NewNopClient()
Expand Down Expand Up @@ -230,6 +236,7 @@ func (c *cadenceImpl) startHistory(logger bark.Logger, shardMgr persistence.Shar
params.MetricScope = tally.NewTestScope(common.HistoryServiceName, make(map[string]string))
params.RingpopFactory = newRingpopFactory(common.FrontendServiceName, rpHosts)
params.ClusterMetadata = c.clusterMetadata
params.MessagingClient = c.messagingClient
params.CassandraConfig.NumHistoryShards = c.numberOfHistoryShards
service := service.New(params)
historyConfig := history.NewConfig(dynamicconfig.NewNopCollection(), c.numberOfHistoryShards)
Expand Down
1 change: 1 addition & 0 deletions idl/github.com/uber/cadence/replicator.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ struct HistoryTaskAttributes {
40: optional i64 (js.type = "Long") firstEventId
50: optional i64 (js.type = "Long") nextEventId
60: optional i64 (js.type = "Long") version
70: optional shared.History history
}

struct ReplicationTask {
Expand Down
14 changes: 1 addition & 13 deletions service/frontend/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -1801,7 +1801,7 @@ func (wh *WorkflowHandler) getHistory(domainID string, execution gen.WorkflowExe
}

for _, e := range response.Events {
setSerializedHistoryDefaults(&e)
persistence.SetSerializedHistoryDefaults(&e)
s, _ := wh.hSerializerFactory.Get(e.EncodingType)
history, err1 := s.Deserialize(&e)
if err1 != nil {
Expand All @@ -1821,18 +1821,6 @@ func (wh *WorkflowHandler) getHistory(domainID string, execution gen.WorkflowExe
return executionHistory, nextPageToken, nil
}

// sets the version and encoding types to defaults if they
// are missing from persistence. This is purely for backwards
// compatibility
func setSerializedHistoryDefaults(history *persistence.SerializedHistoryEventBatch) {
if history.Version == 0 {
history.Version = persistence.GetDefaultHistoryVersion()
}
if len(history.EncodingType) == 0 {
history.EncodingType = persistence.DefaultEncodingType
}
}

func (wh *WorkflowHandler) getLoggerForTask(taskToken []byte) bark.Logger {
logger := wh.Service.GetLogger()
task, err := wh.tokenSerializer.Deserialize(taskToken)
Expand Down
Loading

0 comments on commit 314d222

Please sign in to comment.