Skip to content

Commit

Permalink
bugfix: when setting up new cluster, there should be a way do replica…
Browse files Browse the repository at this point in the history
…te existing domain to new cluster (cadence-workflow#619)
  • Loading branch information
wxing1292 authored Mar 21, 2018
1 parent ae6b94c commit 93a5728
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 31 deletions.
5 changes: 3 additions & 2 deletions common/persistence/cassandraMetadataPersistence.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ const (
`VALUES(?, {name: ?}) IF NOT EXISTS`

templateCreateDomainByNameQuery = `INSERT INTO domains_by_name (` +
`name, domain, config, replication_config, is_global_domain, failover_version) ` +
`VALUES(?, ` + templateDomainType + `, ` + templateDomainConfigType + `, ` + templateDomainReplicationConfigType + `, ?, ?) IF NOT EXISTS`
`name, domain, config, replication_config, is_global_domain, config_version, failover_version) ` +
`VALUES(?, ` + templateDomainType + `, ` + templateDomainConfigType + `, ` + templateDomainReplicationConfigType + `, ?, ?, ?) IF NOT EXISTS`

templateGetDomainQuery = `SELECT domain.name ` +
`FROM domains ` +
Expand Down Expand Up @@ -149,6 +149,7 @@ func (m *cassandraMetadataPersistence) CreateDomain(request *CreateDomainRequest
request.ReplicationConfig.ActiveClusterName,
serializeClusterConfigs(request.ReplicationConfig.Clusters),
request.IsGlobalDomain,
request.ConfigVersion,
request.FailoverVersion,
)

Expand Down
19 changes: 15 additions & 4 deletions common/persistence/cassandraMetadataPersistence_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func (m *metadataPersistenceSuite) TestCreateDomain() {
retention := int32(10)
emitMetric := true
isGlobalDomain := false
configVersion := int64(0)
failoverVersion := int64(0)

resp0, err0 := m.CreateDomain(
Expand All @@ -89,6 +90,7 @@ func (m *metadataPersistenceSuite) TestCreateDomain() {
},
&DomainReplicationConfig{},
isGlobalDomain,
configVersion,
failoverVersion,
)

Expand All @@ -111,7 +113,7 @@ func (m *metadataPersistenceSuite) TestCreateDomain() {
m.Equal(testCurrentClusterName, resp1.ReplicationConfig.ActiveClusterName)
m.Equal(1, len(resp1.ReplicationConfig.Clusters))
m.Equal(isGlobalDomain, resp1.IsGlobalDomain)
m.Equal(int64(0), resp1.ConfigVersion)
m.Equal(configVersion, resp1.ConfigVersion)
m.Equal(failoverVersion, resp1.FailoverVersion)
m.True(resp1.ReplicationConfig.Clusters[0].ClusterName == testCurrentClusterName)
m.Equal(int64(0), resp1.DBVersion)
Expand All @@ -130,6 +132,7 @@ func (m *metadataPersistenceSuite) TestCreateDomain() {
},
&DomainReplicationConfig{},
isGlobalDomain,
configVersion,
failoverVersion,
)
m.NotNil(err2)
Expand All @@ -148,6 +151,7 @@ func (m *metadataPersistenceSuite) TestGetDomain() {

clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(11)
failoverVersion := int64(59)
isGlobalDomain := true
clusters := []*ClusterReplicationConfig{
Expand Down Expand Up @@ -181,6 +185,7 @@ func (m *metadataPersistenceSuite) TestGetDomain() {
Clusters: clusters,
},
isGlobalDomain,
configVersion,
failoverVersion,
)
m.Nil(err1)
Expand All @@ -203,7 +208,7 @@ func (m *metadataPersistenceSuite) TestGetDomain() {
m.Equal(clusters[index], resp2.ReplicationConfig.Clusters[index])
}
m.Equal(isGlobalDomain, resp2.IsGlobalDomain)
m.Equal(int64(0), resp2.ConfigVersion)
m.Equal(configVersion, resp2.ConfigVersion)
m.Equal(failoverVersion, resp2.FailoverVersion)
m.Equal(int64(0), resp2.DBVersion)

Expand All @@ -223,7 +228,7 @@ func (m *metadataPersistenceSuite) TestGetDomain() {
m.Equal(clusters[index], resp3.ReplicationConfig.Clusters[index])
}
m.Equal(isGlobalDomain, resp2.IsGlobalDomain)
m.Equal(int64(0), resp2.ConfigVersion)
m.Equal(configVersion, resp2.ConfigVersion)
m.Equal(failoverVersion, resp3.FailoverVersion)
m.Equal(int64(0), resp3.DBVersion)

Expand All @@ -244,6 +249,7 @@ func (m *metadataPersistenceSuite) TestUpdateDomain() {

clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(10)
failoverVersion := int64(59)
isGlobalDomain := true
clusters := []*ClusterReplicationConfig{
Expand Down Expand Up @@ -272,6 +278,7 @@ func (m *metadataPersistenceSuite) TestUpdateDomain() {
Clusters: clusters,
},
isGlobalDomain,
configVersion,
failoverVersion,
)
m.Nil(err1)
Expand Down Expand Up @@ -371,6 +378,7 @@ func (m *metadataPersistenceSuite) TestDeleteDomain() {

clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(10)
failoverVersion := int64(59)
isGlobalDomain := true
clusters := []*ClusterReplicationConfig{
Expand Down Expand Up @@ -399,6 +407,7 @@ func (m *metadataPersistenceSuite) TestDeleteDomain() {
Clusters: clusters,
},
isGlobalDomain,
configVersion,
failoverVersion,
)
m.Nil(err1)
Expand Down Expand Up @@ -438,6 +447,7 @@ func (m *metadataPersistenceSuite) TestDeleteDomain() {
Clusters: clusters,
},
isGlobalDomain,
configVersion,
failoverVersion,
)
m.Nil(err6)
Expand All @@ -458,12 +468,13 @@ func (m *metadataPersistenceSuite) TestDeleteDomain() {
}

func (m *metadataPersistenceSuite) CreateDomain(info *DomainInfo, config *DomainConfig,
replicationConfig *DomainReplicationConfig, isGlobaldomain bool, failoverVersion int64) (*CreateDomainResponse, error) {
replicationConfig *DomainReplicationConfig, isGlobaldomain bool, configVersion int64, failoverVersion int64) (*CreateDomainResponse, error) {
return m.MetadataManager.CreateDomain(&CreateDomainRequest{
Info: info,
Config: config,
ReplicationConfig: replicationConfig,
IsGlobalDomain: isGlobaldomain,
ConfigVersion: configVersion,
FailoverVersion: failoverVersion,
})
}
Expand Down
1 change: 1 addition & 0 deletions common/persistence/dataInterfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ type (
Config *DomainConfig
ReplicationConfig *DomainReplicationConfig
IsGlobalDomain bool
ConfigVersion int64
FailoverVersion int64
}

Expand Down
31 changes: 6 additions & 25 deletions service/worker/domainReplicationTaskHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/uber-common/bark"
"github.com/uber/cadence/.gen/go/replicator"
"github.com/uber/cadence/.gen/go/shared"
"github.com/uber/cadence/common"
"github.com/uber/cadence/common/persistence"
)

Expand Down Expand Up @@ -109,6 +108,7 @@ func (domainReplicator *domainReplicatorImpl) handleDomainCreationReplicationTas
Clusters: domainReplicator.convertClusterReplicationConfigFromThrift(task.ReplicationConfig.Clusters),
},
IsGlobalDomain: true, // local domain will not be replicated
ConfigVersion: task.GetConfigVersion(),
FailoverVersion: task.GetFailoverVersion(),
}

Expand All @@ -131,6 +131,11 @@ func (domainReplicator *domainReplicatorImpl) handleDomainUpdateReplicationTask(
Name: task.Info.GetName(),
})
if err != nil {
if _, ok := err.(*shared.EntityNotExistsError); ok {
// this can happen if the create domain replication task is to processed.
// e.g. new cluster being lanuched
return domainReplicator.handleDomainCreationReplicationTask(task)
}
return err
}

Expand Down Expand Up @@ -206,16 +211,6 @@ func (domainReplicator *domainReplicatorImpl) convertClusterReplicationConfigFro
return output
}

func (domainReplicator *domainReplicatorImpl) convertClusterReplicationConfigToThrift(
input []*persistence.ClusterReplicationConfig) []*shared.ClusterReplicationConfiguration {
output := []*shared.ClusterReplicationConfiguration{}
for _, cluster := range input {
clusterName := common.StringPtr(cluster.ClusterName)
output = append(output, &shared.ClusterReplicationConfiguration{ClusterName: clusterName})
}
return output
}

func (domainReplicator *domainReplicatorImpl) convertDomainStatusFromThrift(input *shared.DomainStatus) (int, error) {
if input == nil {
return 0, ErrInvalidDomainStatus
Expand All @@ -230,17 +225,3 @@ func (domainReplicator *domainReplicatorImpl) convertDomainStatusFromThrift(inpu
return 0, ErrInvalidDomainStatus
}
}

func (domainReplicator *domainReplicatorImpl) convertDomainStatusToThrift(input int) (*shared.DomainStatus, error) {
switch input {
case persistence.DomainStatusRegistered:
output := shared.DomainStatusRegistered
return &output, nil
case persistence.DomainStatusDeprecated:
output := shared.DomainStatusDeprecated
return &output, nil
default:
return nil, ErrInvalidDomainStatus
}

}
63 changes: 63 additions & 0 deletions service/worker/domainReplicationTaskHandler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,69 @@ func (s *domainReplicatorSuite) TestHandleReceivingTask_RegisterDomainTask() {
s.Equal(int64(0), resp.DBVersion)
}

func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_DomainNotExist() {
operation := replicator.DomainOperationUpdate
id := uuid.New()
name := "some random domain test name"
status := shared.DomainStatusRegistered
description := "some random test description"
ownerEmail := "some random test owner"
retention := int32(10)
emitMetric := true
clusterActive := "some random active cluster name"
clusterStandby := "some random standby cluster name"
configVersion := int64(12)
failoverVersion := int64(59)
clusters := []*shared.ClusterReplicationConfiguration{
&shared.ClusterReplicationConfiguration{
ClusterName: common.StringPtr(clusterActive),
},
&shared.ClusterReplicationConfiguration{
ClusterName: common.StringPtr(clusterStandby),
},
}

updateTask := &replicator.DomainTaskAttributes{
DomainOperation: &operation,
ID: common.StringPtr(id),
Info: &shared.DomainInfo{
Name: common.StringPtr(name),
Status: &status,
Description: common.StringPtr(description),
OwnerEmail: common.StringPtr(ownerEmail),
},
Config: &shared.DomainConfiguration{
WorkflowExecutionRetentionPeriodInDays: common.Int32Ptr(retention),
EmitMetric: common.BoolPtr(emitMetric),
},
ReplicationConfig: &shared.DomainReplicationConfiguration{
ActiveClusterName: common.StringPtr(clusterActive),
Clusters: clusters,
},
ConfigVersion: common.Int64Ptr(configVersion),
FailoverVersion: common.Int64Ptr(failoverVersion),
}

err := s.domainReplicator.HandleReceivingTask(updateTask)
s.Nil(err)

resp, err := s.MetadataManager.GetDomain(&persistence.GetDomainRequest{Name: name})
s.Nil(err)
s.NotNil(resp)
s.Equal(id, resp.Info.ID)
s.Equal(name, resp.Info.Name)
s.Equal(persistence.DomainStatusRegistered, resp.Info.Status)
s.Equal(description, resp.Info.Description)
s.Equal(ownerEmail, resp.Info.OwnerEmail)
s.Equal(retention, resp.Config.Retention)
s.Equal(emitMetric, resp.Config.EmitMetric)
s.Equal(clusterActive, resp.ReplicationConfig.ActiveClusterName)
s.Equal(s.domainReplicator.convertClusterReplicationConfigFromThrift(clusters), resp.ReplicationConfig.Clusters)
s.Equal(configVersion, resp.ConfigVersion)
s.Equal(failoverVersion, resp.FailoverVersion)
s.Equal(int64(0), resp.DBVersion)
}

func (s *domainReplicatorSuite) TestHandleReceivingTask_UpdateDomainTask_UpdateConfig_UpdateActiveCluster() {
operation := replicator.DomainOperationCreate
id := uuid.New()
Expand Down

0 comments on commit 93a5728

Please sign in to comment.