From 342ade03f63eb5bc81cda1848687bb33a5d85afe Mon Sep 17 00:00:00 2001 From: Harshavardhana Date: Tue, 9 Jun 2020 17:09:19 -0700 Subject: [PATCH] deprecate listDir usage for healing (#9792) listDir was incorrectly used for healing which is slower, instead use Walk() to heal the entire set. --- cmd/background-newdisks-heal-ops.go | 2 +- cmd/global-heal.go | 40 +++++-- cmd/tree-walk_test.go | 48 ++++---- cmd/xl-sets.go | 18 +-- cmd/xl-v1-list-objects.go | 170 +--------------------------- cmd/xl-v1.go | 3 - cmd/xl-zones.go | 18 +-- 7 files changed, 82 insertions(+), 217 deletions(-) diff --git a/cmd/background-newdisks-heal-ops.go b/cmd/background-newdisks-heal-ops.go index e95334d36a301..5bfc86fde7b55 100644 --- a/cmd/background-newdisks-heal-ops.go +++ b/cmd/background-newdisks-heal-ops.go @@ -115,7 +115,7 @@ func monitorLocalDisksAndHeal(ctx context.Context, objAPI ObjectLayer) { // Heal all erasure sets that need for i, erasureSetToHeal := range erasureSetInZoneToHeal { for _, setIndex := range erasureSetToHeal { - err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex]) + err := healErasureSet(ctx, setIndex, z.zones[i].sets[setIndex], z.zones[i].drivesPerSet) if err != nil { logger.LogIf(ctx, err) } diff --git a/cmd/global-heal.go b/cmd/global-heal.go index c8c7ed7c0aefc..7309eb7db3364 100644 --- a/cmd/global-heal.go +++ b/cmd/global-heal.go @@ -79,7 +79,7 @@ func getLocalBackgroundHealStatus() madmin.BgHealState { } // healErasureSet lists and heals all objects in a specific erasure set -func healErasureSet(ctx context.Context, setIndex int, xlObj *xlObjects) error { +func healErasureSet(ctx context.Context, setIndex int, xlObj *xlObjects, drivesPerSet int) error { buckets, err := xlObj.ListBuckets(ctx) if err != nil { return err @@ -108,12 +108,38 @@ func healErasureSet(ctx context.Context, setIndex int, xlObj *xlObjects) error { path: bucket.Name, } - // List all objects in the current bucket and heal them - listDir := listDirFactory(ctx, xlObj.getLoadBalancedDisks()...) - walkResultCh := startTreeWalk(ctx, bucket.Name, "", "", true, listDir, nil) - for walkEntry := range walkResultCh { + var entryChs []FileInfoCh + for _, disk := range xlObj.getLoadBalancedDisks() { + if disk == nil { + // Disk can be offline + continue + } + entryCh, err := disk.Walk(bucket.Name, "", "", true, xlMetaJSONFile, readMetadata, ctx.Done()) + if err != nil { + // Disk walk returned error, ignore it. + continue + } + entryChs = append(entryChs, FileInfoCh{ + Ch: entryCh, + }) + } + + entriesValid := make([]bool, len(entryChs)) + entries := make([]FileInfo, len(entryChs)) + + for { + entry, quorumCount, ok := lexicallySortedEntry(entryChs, entries, entriesValid) + if !ok { + return nil + } + + if quorumCount == drivesPerSet { + // Skip good entries. + continue + } + bgSeq.sourceCh <- healSource{ - path: pathJoin(bucket.Name, walkEntry.entry), + path: pathJoin(bucket.Name, entry.Name), } } } @@ -173,7 +199,7 @@ func execLeaderTasks(ctx context.Context, z *xlZones) { for _, zone := range z.zones { // Heal set by set for i, set := range zone.sets { - if err := healErasureSet(ctx, i, set); err != nil { + if err := healErasureSet(ctx, i, set, zone.drivesPerSet); err != nil { logger.LogIf(ctx, err) continue } diff --git a/cmd/tree-walk_test.go b/cmd/tree-walk_test.go index 78e3f1d91f85b..0ea5e02d1f026 100644 --- a/cmd/tree-walk_test.go +++ b/cmd/tree-walk_test.go @@ -27,6 +27,24 @@ import ( "time" ) +// Returns function "listDir" of the type listDirFunc. +// disks - used for doing disk.ListDir() +func listDirFactory(ctx context.Context, disk StorageAPI) ListDirFunc { + // Returns sorted merged entries from all the disks. + listDir := func(volume, dirPath, dirEntry string) (bool, []string) { + entries, err := disk.ListDir(volume, dirPath, -1, xlMetaJSONFile) + if err != nil { + return false, nil + } + if len(entries) == 0 { + return true, nil + } + sort.Strings(entries) + return false, filterMatchingPrefix(entries, dirEntry) + } + return listDir +} + // Fixed volume name that could be used across tests const volume = "testvolume" @@ -219,8 +237,7 @@ func TestTreeWalkTimeout(t *testing.T) { } } -// Test ListDir - listDir should list entries from the first disk, if the first disk is down, -// it should list from the next disk. +// Test ListDir - listDir is expected to only list one disk. func TestListDir(t *testing.T) { file1 := "file1" file2 := "file2" @@ -248,7 +265,8 @@ func TestListDir(t *testing.T) { } // create listDir function. - listDir := listDirFactory(context.Background(), disk1, disk2) + listDir1 := listDirFactory(context.Background(), disk1) + listDir2 := listDirFactory(context.Background(), disk2) // Create file1 in fsDir1 and file2 in fsDir2. disks := []StorageAPI{disk1, disk2} @@ -260,35 +278,23 @@ func TestListDir(t *testing.T) { } // Should list "file1" from fsDir1. - _, entries := listDir(volume, "", "") - if len(entries) != 2 { - t.Fatal("Expected the number of entries to be 2") + _, entries := listDir1(volume, "", "") + if len(entries) != 1 { + t.Fatal("Expected the number of entries to be 1") } + if entries[0] != file1 { t.Fatal("Expected the entry to be file1") } - if entries[1] != file2 { - t.Fatal("Expected the entry to be file2") - } - - // Remove fsDir1, list should return entries from fsDir2 - err = os.RemoveAll(fsDir1) - if err != nil { - t.Error(err) - } - // Should list "file2" from fsDir2. - _, entries = listDir(volume, "", "") + _, entries = listDir2(volume, "", "") if len(entries) != 1 { t.Fatal("Expected the number of entries to be 1") } + if entries[0] != file2 { t.Fatal("Expected the entry to be file2") } - err = os.RemoveAll(fsDir2) - if err != nil { - t.Error(err) - } } // TestRecursiveWalk - tests if treeWalk returns entries correctly with and diff --git a/cmd/xl-sets.go b/cmd/xl-sets.go index 9f6ef67c5b6a7..a081270575fcc 100644 --- a/cmd/xl-sets.go +++ b/cmd/xl-sets.go @@ -814,9 +814,9 @@ func (f *FileInfoCh) Push(fi FileInfo) { // we found this entry. Additionally also returns a boolean // to indicate if the caller needs to call this function // again to list the next entry. It is callers responsibility -// if the caller wishes to list N entries to call leastEntry +// if the caller wishes to list N entries to call lexicallySortedEntry // N times until this boolean is 'false'. -func leastEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) (FileInfo, int, bool) { +func lexicallySortedEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) (FileInfo, int, bool) { for i := range entryChs { entries[i], entriesValid[i] = entryChs[i].Pop() } @@ -852,7 +852,7 @@ func leastEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) return lentry, 0, isTruncated } - leastEntryCount := 0 + lexicallySortedEntryCount := 0 for i, valid := range entriesValid { if !valid { continue @@ -861,7 +861,7 @@ func leastEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) // Entries are duplicated across disks, // we should simply skip such entries. if lentry.Name == entries[i].Name && lentry.ModTime.Equal(entries[i].ModTime) { - leastEntryCount++ + lexicallySortedEntryCount++ continue } @@ -870,7 +870,7 @@ func leastEntry(entryChs []FileInfoCh, entries []FileInfo, entriesValid []bool) entryChs[i].Push(entries[i]) } - return lentry, leastEntryCount, isTruncated + return lentry, lexicallySortedEntryCount, isTruncated } // mergeEntriesCh - merges FileInfo channel to entries upto maxKeys. @@ -879,7 +879,7 @@ func mergeEntriesCh(entryChs []FileInfoCh, maxKeys int, ndisks int) (entries Fil entriesInfos := make([]FileInfo, len(entryChs)) entriesValid := make([]bool, len(entryChs)) for { - fi, quorumCount, valid := leastEntry(entryChs, entriesInfos, entriesValid) + fi, quorumCount, valid := lexicallySortedEntry(entryChs, entriesInfos, entriesValid) if !valid { // We have reached EOF across all entryChs, break the loop. break @@ -1003,7 +1003,7 @@ func (s *xlSets) listObjectsNonSlash(ctx context.Context, bucket, prefix, marker break } - result, quorumCount, ok := leastEntry(entryChs, entries, entriesValid) + result, quorumCount, ok := lexicallySortedEntry(entryChs, entries, entriesValid) if !ok { eof = true break @@ -1690,7 +1690,7 @@ func (s *xlSets) Walk(ctx context.Context, bucket, prefix string, results chan<- defer close(results) for { - entry, quorumCount, ok := leastEntry(entryChs, entries, entriesValid) + entry, quorumCount, ok := lexicallySortedEntry(entryChs, entries, entriesValid) if !ok { return } @@ -1716,7 +1716,7 @@ func (s *xlSets) HealObjects(ctx context.Context, bucket, prefix string, opts ma entriesValid := make([]bool, len(entryChs)) entries := make([]FileInfo, len(entryChs)) for { - entry, quorumCount, ok := leastEntry(entryChs, entries, entriesValid) + entry, quorumCount, ok := lexicallySortedEntry(entryChs, entries, entriesValid) if !ok { break } diff --git a/cmd/xl-v1-list-objects.go b/cmd/xl-v1-list-objects.go index f947fbfe940e3..16e267293c148 100644 --- a/cmd/xl-v1-list-objects.go +++ b/cmd/xl-v1-list-objects.go @@ -18,174 +18,10 @@ package cmd import ( "context" - "sort" ) -// Returns function "listDir" of the type listDirFunc. -// disks - used for doing disk.ListDir() -func listDirFactory(ctx context.Context, disks ...StorageAPI) ListDirFunc { - // Returns sorted merged entries from all the disks. - listDir := func(bucket, prefixDir, prefixEntry string) (emptyDir bool, mergedEntries []string) { - for _, disk := range disks { - if disk == nil { - continue - } - var entries []string - var newEntries []string - var err error - entries, err = disk.ListDir(bucket, prefixDir, -1, xlMetaJSONFile) - if err != nil || len(entries) == 0 { - continue - } - - // Find elements in entries which are not in mergedEntries - for _, entry := range entries { - idx := sort.SearchStrings(mergedEntries, entry) - // if entry is already present in mergedEntries don't add. - if idx < len(mergedEntries) && mergedEntries[idx] == entry { - continue - } - newEntries = append(newEntries, entry) - } - - if len(newEntries) > 0 { - // Merge the entries and sort it. - mergedEntries = append(mergedEntries, newEntries...) - sort.Strings(mergedEntries) - } - } - - if len(mergedEntries) == 0 { - return true, nil - } - - return false, filterMatchingPrefix(mergedEntries, prefixEntry) - } - return listDir -} - -// listObjects - wrapper function implemented over file tree walk. -func (xl xlObjects) listObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { - // Default is recursive, if delimiter is set then list non recursive. - recursive := true - if delimiter == SlashSeparator { - recursive = false - } - - walkResultCh, endWalkCh := xl.listPool.Release(listParams{bucket, recursive, marker, prefix}) - if walkResultCh == nil { - endWalkCh = make(chan struct{}) - listDir := listDirFactory(ctx, xl.getLoadBalancedDisks()...) - walkResultCh = startTreeWalk(ctx, bucket, prefix, marker, recursive, listDir, endWalkCh) - } - - var objInfos []ObjectInfo - var eof bool - var nextMarker string - for i := 0; i < maxKeys; { - - walkResult, ok := <-walkResultCh - if !ok { - // Closed channel. - eof = true - break - } - entry := walkResult.entry - var objInfo ObjectInfo - if HasSuffix(entry, SlashSeparator) { - // Object name needs to be full path. - objInfo.Bucket = bucket - objInfo.Name = entry - objInfo.IsDir = true - } else { - // Set the Mode to a "regular" file. - var err error - objInfo, err = xl.getObjectInfo(ctx, bucket, entry, ObjectOptions{}) - if err != nil { - // Ignore errFileNotFound as the object might have got - // deleted in the interim period of listing and getObjectInfo(), - // ignore quorum error as it might be an entry from an outdated disk. - if IsErrIgnored(err, []error{ - errFileNotFound, - errXLReadQuorum, - }...) { - continue - } - return loi, toObjectErr(err, bucket, prefix) - } - } - nextMarker = objInfo.Name - objInfos = append(objInfos, objInfo) - i++ - if walkResult.end { - eof = true - break - } - } - - params := listParams{bucket, recursive, nextMarker, prefix} - if !eof { - xl.listPool.Set(params, walkResultCh, endWalkCh) - } - - result := ListObjectsInfo{} - for _, objInfo := range objInfos { - if objInfo.IsDir && delimiter == SlashSeparator { - result.Prefixes = append(result.Prefixes, objInfo.Name) - continue - } - result.Objects = append(result.Objects, objInfo) - } - - if !eof { - result.IsTruncated = true - if len(objInfos) > 0 { - result.NextMarker = objInfos[len(objInfos)-1].Name - } - } - - return result, nil -} - -// ListObjects - list all objects at prefix, delimited by '/'. +// ListObjects - list all objects at prefix, delimited by '/', shouldn't be called. func (xl xlObjects) ListObjects(ctx context.Context, bucket, prefix, marker, delimiter string, maxKeys int) (loi ListObjectsInfo, e error) { - if err := checkListObjsArgs(ctx, bucket, prefix, marker, xl); err != nil { - return loi, err - } - - // With max keys of zero we have reached eof, return right here. - if maxKeys == 0 { - return loi, nil - } - - // Marker is set validate pre-condition. - if marker != "" { - // Marker not common with prefix is not implemented.Send an empty response - if !HasPrefix(marker, prefix) { - return ListObjectsInfo{}, e - } - } - - // For delimiter and prefix as '/' we do not list anything at all - // since according to s3 spec we stop at the 'delimiter' along - // with the prefix. On a flat namespace with 'prefix' as '/' - // we don't have any entries, since all the keys are of form 'keyName/...' - if delimiter == SlashSeparator && prefix == SlashSeparator { - return loi, nil - } - - // Over flowing count - reset to maxObjectList. - if maxKeys < 0 || maxKeys > maxObjectList { - maxKeys = maxObjectList - } - - // Initiate a list operation, if successful filter and return quickly. - listObjInfo, err := xl.listObjects(ctx, bucket, prefix, marker, delimiter, maxKeys) - if err == nil { - // We got the entries successfully return. - return listObjInfo, nil - } - - // Return error at the end. - return loi, toObjectErr(err, bucket, prefix) + // Shouldn't be called + return loi, NotImplemented{} } diff --git a/cmd/xl-v1.go b/cmd/xl-v1.go index 886b7e1c36604..c128faa0c74c1 100644 --- a/cmd/xl-v1.go +++ b/cmd/xl-v1.go @@ -65,9 +65,6 @@ type xlObjects struct { // Byte pools used for temporary i/o buffers. bp *bpool.BytePoolCap - // TODO: ListObjects pool management, should be removed in future. - listPool *TreeWalkPool - mrfUploadCh chan partialUpload } diff --git a/cmd/xl-zones.go b/cmd/xl-zones.go index 0575fd6e804da..0be9df8de1719 100644 --- a/cmd/xl-zones.go +++ b/cmd/xl-zones.go @@ -624,7 +624,7 @@ func (z *xlZones) listObjectsNonSlash(ctx context.Context, bucket, prefix, marke break } - result, quorumCount, _, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) + result, quorumCount, _, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) if !ok { eof = true break @@ -852,9 +852,9 @@ func (z *xlZones) listObjects(ctx context.Context, bucket, prefix, marker, delim // we found this entry. Additionally also returns a boolean // to indicate if the caller needs to call this function // again to list the next entry. It is callers responsibility -// if the caller wishes to list N entries to call leastEntry +// if the caller wishes to list N entries to call lexicallySortedEntry // N times until this boolean is 'false'. -func leastEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, int, int, bool) { +func lexicallySortedEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneEntriesValid [][]bool) (FileInfo, int, int, bool) { for i, entryChs := range zoneEntryChs { for j := range entryChs { zoneEntries[i][j], zoneEntriesValid[i][j] = entryChs[j].Pop() @@ -902,7 +902,7 @@ func leastEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneE return lentry, 0, zoneIndex, isTruncated } - leastEntryCount := 0 + lexicallySortedEntryCount := 0 for i, entriesValid := range zoneEntriesValid { for j, valid := range entriesValid { if !valid { @@ -912,7 +912,7 @@ func leastEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneE // Entries are duplicated across disks, // we should simply skip such entries. if lentry.Name == zoneEntries[i][j].Name && lentry.ModTime.Equal(zoneEntries[i][j].ModTime) { - leastEntryCount++ + lexicallySortedEntryCount++ continue } @@ -922,7 +922,7 @@ func leastEntryZone(zoneEntryChs [][]FileInfoCh, zoneEntries [][]FileInfo, zoneE } } - return lentry, leastEntryCount, zoneIndex, isTruncated + return lentry, lexicallySortedEntryCount, zoneIndex, isTruncated } // mergeZonesEntriesCh - merges FileInfo channel to entries upto maxKeys. @@ -935,7 +935,7 @@ func mergeZonesEntriesCh(zonesEntryChs [][]FileInfoCh, maxKeys int, ndisks int) zonesEntriesValid = append(zonesEntriesValid, make([]bool, len(entryChs))) } for { - fi, quorumCount, _, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) + fi, quorumCount, _, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) if !ok { // We have reached EOF across all entryChs, break the loop. break @@ -1476,7 +1476,7 @@ func (z *xlZones) Walk(ctx context.Context, bucket, prefix string, results chan< defer close(results) for { - entry, quorumCount, zoneIndex, ok := leastEntryZone(zonesEntryChs, + entry, quorumCount, zoneIndex, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) if !ok { return @@ -1519,7 +1519,7 @@ func (z *xlZones) HealObjects(ctx context.Context, bucket, prefix string, opts m } for { - entry, quorumCount, zoneIndex, ok := leastEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) + entry, quorumCount, zoneIndex, ok := lexicallySortedEntryZone(zonesEntryChs, zonesEntriesInfos, zonesEntriesValid) if !ok { break }