Skip to content

Commit

Permalink
Add handover namespaces to GetReplicationStatus response (temporalio#…
Browse files Browse the repository at this point in the history
…2248)

* Add handover namespaces to GetReplicationStatus response
  • Loading branch information
yiminc authored Dec 2, 2021
1 parent 815bfd7 commit 873f280
Show file tree
Hide file tree
Showing 6 changed files with 709 additions and 278 deletions.
941 changes: 678 additions & 263 deletions api/historyservice/v1/request_response.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,12 @@ message ShardReplicationStatus {
// Local time on this shard
google.protobuf.Timestamp shard_local_time = 3 [(gogoproto.stdtime) = true];
map<string, ShardReplicationStatusPerCluster> remote_clusters = 4;
map<string, HandoverNamespaceInfo> handover_namespaces = 5;
}

message HandoverNamespaceInfo {
// max replication task id when namespace transition to Handover state
int64 handover_replication_task_id = 1;
}

message ShardReplicationStatusPerCluster {
Expand Down
3 changes: 2 additions & 1 deletion service/history/historyEngine.go
Original file line number Diff line number Diff line change
Expand Up @@ -3288,11 +3288,12 @@ func (e *historyEngineImpl) GetReplicationStatus(
resp.MaxReplicationTaskId = *e.replicatorProcessor.maxTaskID
}

remoteClusters, err := e.shard.GetRemoteClusterAckInfo(request.RemoteClusters)
remoteClusters, handoverNamespaces, err := e.shard.GetReplicationStatus(request.RemoteClusters)
if err != nil {
return nil, err
}
resp.RemoteClusters = remoteClusters
resp.HandoverNamespaces = handoverNamespaces
return resp, nil
}

Expand Down
2 changes: 1 addition & 1 deletion service/history/shard/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type (
GetLastUpdatedTime() time.Time
GetTimerMaxReadLevel(cluster string) time.Time

GetRemoteClusterAckInfo(cluster []string) (map[string]*historyservice.ShardReplicationStatusPerCluster, error)
GetReplicationStatus(cluster []string) (map[string]*historyservice.ShardReplicationStatusPerCluster, map[string]*historyservice.HandoverNamespaceInfo, error)

GetTransferAckLevel() int64
UpdateTransferAckLevel(ackLevel int64) error
Expand Down
18 changes: 13 additions & 5 deletions service/history/shard/context_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -1488,30 +1488,38 @@ func (s *ContextImpl) loadShardMetadata(ownershipChanged *bool) error {
return nil
}

func (s *ContextImpl) GetRemoteClusterAckInfo(cluster []string) (map[string]*historyservice.ShardReplicationStatusPerCluster, error) {
resp := make(map[string]*historyservice.ShardReplicationStatusPerCluster)
func (s *ContextImpl) GetReplicationStatus(cluster []string) (map[string]*historyservice.ShardReplicationStatusPerCluster, map[string]*historyservice.HandoverNamespaceInfo, error) {
remoteClusters := make(map[string]*historyservice.ShardReplicationStatusPerCluster)
handoverNamespaces := make(map[string]*historyservice.HandoverNamespaceInfo)
s.rLock()
defer s.rUnlock()

if len(cluster) == 0 {
// remote acked info for all known remote clusters
for k, v := range s.remoteClusterInfos {
resp[k] = &historyservice.ShardReplicationStatusPerCluster{
remoteClusters[k] = &historyservice.ShardReplicationStatusPerCluster{
AckedTaskId: v.AckedReplicationTaskID,
AckedTaskVisibilityTime: timestamp.TimePtr(v.AckedReplicationTimestamp),
}
}
} else {
for _, k := range cluster {
if v, ok := s.remoteClusterInfos[k]; ok {
resp[k] = &historyservice.ShardReplicationStatusPerCluster{
remoteClusters[k] = &historyservice.ShardReplicationStatusPerCluster{
AckedTaskId: v.AckedReplicationTaskID,
AckedTaskVisibilityTime: timestamp.TimePtr(v.AckedReplicationTimestamp),
}
}
}
}

return resp, nil
for k, v := range s.handoverNamespaces {
handoverNamespaces[k] = &historyservice.HandoverNamespaceInfo{
HandoverReplicationTaskId: v.MaxReplicationTaskID,
}
}

return remoteClusters, handoverNamespaces, nil
}

func (s *ContextImpl) getRemoteClusterInfoLocked(clusterName string) *remoteClusterInfo {
Expand Down
17 changes: 9 additions & 8 deletions service/history/shard/context_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 873f280

Please sign in to comment.