Skip to content

Commit

Permalink
fix: refactor getReplicationDiff for safe use (minio#16051)
Browse files Browse the repository at this point in the history
  • Loading branch information
klauspost authored Nov 15, 2022
1 parent 3bb82ef commit 8a07000
Showing 1 changed file with 18 additions and 11 deletions.
29 changes: 18 additions & 11 deletions cmd/bucket-replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -2604,31 +2604,38 @@ func saveResyncStatus(ctx context.Context, bucket string, brs BucketReplicationR
return saveConfig(ctx, objectAPI, configFile, buf)
}

// getReplicationDiff returns unreplicated objects in a channel
func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, opts madmin.ReplDiffOpts) (diffCh chan madmin.DiffInfo, err error) {
objInfoCh := make(chan ObjectInfo)
if err := objAPI.Walk(ctx, bucket, opts.Prefix, objInfoCh, ObjectOptions{}); err != nil {
logger.LogIf(ctx, err)
return diffCh, err
}
// getReplicationDiff returns un-replicated objects in a channel.
// If a non-nil channel is returned it must be consumed fully or
// the provided context must be canceled.
func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string, opts madmin.ReplDiffOpts) (chan madmin.DiffInfo, error) {
cfg, err := getReplicationConfig(ctx, bucket)
if err != nil {
logger.LogIf(ctx, err)
return diffCh, err
return nil, err
}
tgts, err := globalBucketTargetSys.ListBucketTargets(ctx, bucket)
if err != nil {
logger.LogIf(ctx, err)
return diffCh, err
return nil, err
}

objInfoCh := make(chan ObjectInfo, 10)
if err := objAPI.Walk(ctx, bucket, opts.Prefix, objInfoCh, ObjectOptions{}); err != nil {
logger.LogIf(ctx, err)
return nil, err
}
rcfg := replicationConfig{
Config: cfg,
remotes: tgts,
}
diffCh = make(chan madmin.DiffInfo, 4000)
diffCh := make(chan madmin.DiffInfo, 4000)
go func() {
defer close(diffCh)
for obj := range objInfoCh {
if contextCanceled(ctx) {
// Just consume input...
continue
}
// Ignore object prefixes which are excluded
// from versioning via the MinIO bucket versioning extension.
if globalBucketVersioningSys.PrefixSuspended(bucket, obj.Name) {
Expand Down Expand Up @@ -2682,7 +2689,7 @@ func getReplicationDiff(ctx context.Context, objAPI ObjectLayer, bucket string,
Targets: tgtsMap,
}:
case <-ctx.Done():
return
continue
}
}
}
Expand Down

0 comments on commit 8a07000

Please sign in to comment.