Skip to content

Commit

Permalink
Add warn logs for shard closures (cadence-workflow#3387)
Browse files Browse the repository at this point in the history
emrahs authored Jul 14, 2020
1 parent 5cf711c commit 5aac215
Showing 1 changed file with 60 additions and 5 deletions.
65 changes: 60 additions & 5 deletions service/history/shard/context.go
Original file line number Diff line number Diff line change
@@ -509,6 +509,11 @@ Create_Loop:
continue Create_Loop
} else {
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: CreateWorkflowExecution failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Create_Loop
}
@@ -524,6 +529,11 @@ Create_Loop:
if err1 != nil {
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.logger.Warn(
"Closing shard: CreateWorkflowExecution failed due to unknown error.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Create_Loop
}
@@ -602,6 +612,11 @@ Update_Loop:
continue Update_Loop
} else {
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: UpdateWorkflowExecution failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Update_Loop
}
@@ -617,6 +632,11 @@ Update_Loop:
if err1 != nil {
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.logger.Warn(
"Closing shard: UpdateWorkflowExecution failed due to unknown error.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Update_Loop
}
@@ -690,6 +710,11 @@ Reset_Loop:
continue Reset_Loop
} else {
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: ResetWorkflowExecution failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Reset_Loop
}
@@ -705,6 +730,11 @@ Reset_Loop:
if err1 != nil {
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.logger.Warn(
"Closing shard: ResetWorkflowExecution failed due to unknown error.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Reset_Loop
}
@@ -772,7 +802,7 @@ func (s *contextImpl) ConflictResolveWorkflowExecution(
}
defer s.updateMaxReadLevelLocked(transferMaxReadLevel)

Reset_Loop:
Conflict_Resolve_Loop:
for attempt := 0; attempt < conditionalRetryCount; attempt++ {
currentRangeID := s.getRangeID()
request.RangeID = currentRangeID
@@ -788,11 +818,16 @@ Reset_Loop:
// RangeID might have been renewed by the same host while this update was in flight
// Retry the operation if we still have the shard ownership
if currentRangeID != s.getRangeID() {
continue Reset_Loop
continue Conflict_Resolve_Loop
} else {
// Shard is stolen, trigger shutdown of history engine
s.logger.Warn(
"Closing shard: ConflictResolveWorkflowExecution failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Reset_Loop
break Conflict_Resolve_Loop
}
}
default:
@@ -806,8 +841,13 @@ Reset_Loop:
if err1 != nil {
// At this point we have no choice but to unload the shard, so that it
// gets a new RangeID when it's reloaded.
s.logger.Warn(
"Closing shard: ConflictResolveWorkflowExecution failed due to unknown error.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break Reset_Loop
break Conflict_Resolve_Loop
}
}
}
@@ -939,10 +979,15 @@ func (s *contextImpl) renewRangeLocked(isStealing bool) error {
if err != nil {
// Shard is stolen, trigger history engine shutdown
if _, ok := err.(*persistence.ShardOwnershipLostError); ok {
s.logger.Warn(
"Closing shard: renewRangeLocked failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
} else {
// Failure in updating shard to grab new RangeID
s.logger.Error("Persistent store operation failure",
s.logger.Error("renewRangeLocked failed due to unknown error.",
tag.StoreOperationUpdateShard,
tag.Error(err),
tag.ShardRangeID(updatedShardInfo.RangeID),
@@ -994,6 +1039,11 @@ func (s *contextImpl) updateShardInfoLocked() error {
if err != nil {
// Shard is stolen, trigger history engine shutdown
if _, ok := err.(*persistence.ShardOwnershipLostError); ok {
s.logger.Warn(
"Closing shard: updateShardInfoLocked failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
}
} else {
@@ -1211,6 +1261,11 @@ func (s *contextImpl) ReplicateFailoverMarkers(
break
case *persistence.ShardOwnershipLostError:
// do not retry on ShardOwnershipLostError
s.logger.Warn(
"Closing shard: ReplicateFailoverMarkers failed due to stolen shard.",
tag.ShardID(s.GetShardID()),
tag.Error(err),
)
s.closeShard()
break
default:

0 comments on commit 5aac215

Please sign in to comment.