Skip to content

Commit

Permalink
lightning: support opening empty files and skip directory objects in …
Browse files Browse the repository at this point in the history
  • Loading branch information
dsdashun authored Mar 18, 2022
1 parent 64b057d commit 7309b08
Show file tree
Hide file tree
Showing 5 changed files with 266 additions and 57 deletions.
1 change: 1 addition & 0 deletions br/pkg/lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ func MakeTableRegions(
break
}
if err != nil {
log.L().Error("make source file region error", zap.Error(err), zap.String("file_path", info.FileMeta.Path))
break
}
}
Expand Down
67 changes: 52 additions & 15 deletions br/pkg/storage/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,14 +461,6 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
return errors.Trace(err)
}
for _, r := range res.Contents {
// when walk on specify directory, the result include storage.Prefix,
// which can not be reuse in other API(Open/Read) directly.
// so we use TrimPrefix to filter Prefix for next Open/Read.
path := strings.TrimPrefix(*r.Key, rs.options.Prefix)
if err = fn(path, *r.Size); err != nil {
return errors.Trace(err)
}

// https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjects.html#AmazonS3-ListObjects-response-NextMarker -
//
// `res.NextMarker` is populated only if we specify req.Delimiter.
Expand All @@ -479,6 +471,22 @@ func (rs *S3Storage) WalkDir(ctx context.Context, opt *WalkOption, fn func(strin
// you can use the value of the last Key in the response as the marker
// in the subsequent request to get the next set of object keys."
req.Marker = r.Key

// when walk on specify directory, the result include storage.Prefix,
// which can not be reuse in other API(Open/Read) directly.
// so we use TrimPrefix to filter Prefix for next Open/Read.
path := strings.TrimPrefix(*r.Key, rs.options.Prefix)
itemSize := *r.Size

// filter out s3's empty directory items
if itemSize <= 0 && strings.HasSuffix(path, "/") {
log.Info("this path is an empty directory and cannot be opened in S3. Skip it", zap.String("path", path))
continue
}
if err = fn(path, itemSize); err != nil {
return errors.Trace(err)
}

}
if !aws.BoolValue(res.IsTruncated) {
break
Expand Down Expand Up @@ -533,12 +541,21 @@ func (rs *S3Storage) open(
Key: aws.String(rs.options.Prefix + path),
}

// always set rangeOffset to fetch file size info
// s3 endOffset is inclusive
// If we just open part of the object, we set `Range` in the request.
// If we meant to open the whole object, not just a part of it,
// we do not pass the range in the request,
// so that even if the object is empty, we can still get the response without errors.
// Then this behavior is similar to openning an empty file in local file system.
isFullRangeRequest := false
var rangeOffset *string
if endOffset > startOffset {
switch {
case endOffset > startOffset:
// s3 endOffset is inclusive
rangeOffset = aws.String(fmt.Sprintf("bytes=%d-%d", startOffset, endOffset-1))
} else {
case startOffset == 0:
// openning the whole object, no need to fill the `Range` field in the request
isFullRangeRequest = true
default:
rangeOffset = aws.String(fmt.Sprintf("bytes=%d-", startOffset))
}
input.Range = rangeOffset
Expand All @@ -547,9 +564,26 @@ func (rs *S3Storage) open(
return nil, RangeInfo{}, errors.Trace(err)
}

r, err := ParseRangeInfo(result.ContentRange)
if err != nil {
return nil, RangeInfo{}, errors.Trace(err)
var r RangeInfo
// Those requests without a `Range` will have no `ContentRange` in the response,
// In this case, we'll parse the `ContentLength` field instead.
if isFullRangeRequest {
// We must ensure the `ContentLengh` has data even if for empty objects,
// otherwise we have no places to get the object size
if result.ContentLength == nil {
return nil, RangeInfo{}, errors.Annotatef(berrors.ErrStorageUnknown, "open file '%s' failed. The S3 object has no content length", path)
}
objectSize := *(result.ContentLength)
r = RangeInfo{
Start: 0,
End: objectSize - 1,
Size: objectSize,
}
} else {
r, err = ParseRangeInfo(result.ContentRange)
if err != nil {
return nil, RangeInfo{}, errors.Trace(err)
}
}

if startOffset != r.Start || (endOffset != 0 && endOffset != r.End+1) {
Expand Down Expand Up @@ -657,6 +691,9 @@ func (r *s3ObjectReader) Seek(offset int64, whence int) (int64, error) {
default:
return 0, errors.Annotatef(berrors.ErrStorageUnknown, "Seek: invalid whence '%d'", whence)
}
if realOffset < 0 {
return 0, errors.Annotatef(berrors.ErrStorageUnknown, "Seek in '%s': invalid offset to seek '%d'.", r.name, realOffset)
}

if realOffset == r.pos {
return realOffset, nil
Expand Down
37 changes: 27 additions & 10 deletions br/pkg/storage/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,10 +620,10 @@ func TestOpenAsBufio(t *testing.T) {
s.s3.EXPECT().
GetObjectWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
require.Equal(t, "bytes=0-", aws.StringValue(input.Range))
require.Equal(t, (*string)(nil), input.Range)
return &s3.GetObjectOutput{
Body: io.NopCloser(bytes.NewReader([]byte("plain text\ncontent"))),
ContentRange: aws.String("bytes 0-17/18"),
Body: io.NopCloser(bytes.NewReader([]byte("plain text\ncontent"))),
ContentLength: aws.Int64(18),
}, nil
})

Expand Down Expand Up @@ -669,8 +669,8 @@ func TestOpenReadSlowly(t *testing.T) {
s.s3.EXPECT().
GetObjectWithContext(ctx, gomock.Any()).
Return(&s3.GetObjectOutput{
Body: &alphabetReader{character: 'A'},
ContentRange: aws.String("bytes 0-25/26"),
Body: &alphabetReader{character: 'A'},
ContentLength: aws.Int64(26),
}, nil)

reader, err := s.storage.Open(ctx, "alphabets")
Expand Down Expand Up @@ -725,6 +725,10 @@ func TestOpenSeek(t *testing.T) {
require.Equal(t, 100, n)
require.Equal(t, someRandomBytes[998000:998100], slice)

// jumping to a negative position would cause error.
_, err = reader.Seek(-8000, io.SeekStart)
require.Error(t, err)

// jumping backward should be fine, but would perform a new GetObject request.
offset, err = reader.Seek(-8000, io.SeekCurrent)
require.NoError(t, err)
Expand Down Expand Up @@ -769,11 +773,24 @@ func (s *s3Suite) expectedCalls(ctx context.Context, t *testing.T, data []byte,
thisCall := s.s3.EXPECT().
GetObjectWithContext(ctx, gomock.Any()).
DoAndReturn(func(_ context.Context, input *s3.GetObjectInput, opt ...request.Option) (*s3.GetObjectOutput, error) {
require.Equal(t, fmt.Sprintf("bytes=%d-", thisOffset), aws.StringValue(input.Range))
return &s3.GetObjectOutput{
Body: newReader(data, thisOffset),
ContentRange: aws.String(fmt.Sprintf("bytes %d-%d/%d", thisOffset, len(data)-1, len(data))),
}, nil
if thisOffset > 0 {
require.Equal(t, fmt.Sprintf("bytes=%d-", thisOffset), aws.StringValue(input.Range))
} else {
require.Equal(t, (*string)(nil), input.Range)
}
var response *s3.GetObjectOutput
if thisOffset > 0 {
response = &s3.GetObjectOutput{
Body: newReader(data, thisOffset),
ContentRange: aws.String(fmt.Sprintf("bytes %d-%d/%d", thisOffset, len(data)-1, len(data))),
}
} else {
response = &s3.GetObjectOutput{
Body: newReader(data, thisOffset),
ContentLength: aws.Int64(int64(len(data))),
}
}
return response, nil
})
if lastCall != nil {
thisCall = thisCall.After(lastCall)
Expand Down
20 changes: 20 additions & 0 deletions br/tests/lightning_s3/config_manual_files.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[mydumper]
default-file-rules = true

[[mydumper.files]]
pattern = "data-sql/"
schema = "s3_test"
table = "tbl"
type = "sql"

[[mydumper.files]]
pattern = "data-csv/"
schema = "s3_test"
table = "tbl"
type = "csv"

[[mydumper.files]]
pattern = "data-parquet/"
schema = "s3_test"
table = "tbl"
type = "parquet"
Loading

0 comments on commit 7309b08

Please sign in to comment.