-
Notifications
You must be signed in to change notification settings - Fork 367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor block adapter walker #8575
Conversation
…-factory-walker-8570 # Conflicts: # pkg/catalog/catalog.go # pkg/ingest/store/factory.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice clean up 🧹
I'm assuming our integration tests already test the import flows for all adapters, but I'm asking just to be on the safe side, how was this tested?
return nil, err | ||
} | ||
|
||
err := VerifyAbsPath(uri.Path, l.path, l.allowedExternalPrefixes) | ||
uriPath := strings.TrimSuffix(opts.StorageURI.Path, string(filepath.Separator)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm probably missing something
Why is this suddenly required?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method was never called when creating a local walker so this validation never ran.
The Abs check is not really important, what is more important are the other checks for is in namespace and in allowed prefixes.
return NewAzureDataLakeWalker(client, false) | ||
return NewAzureDataLakeWalker(client, opts.SkipOutOfOrder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this changed? bug you found along the way?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No - we just never used the GetWalker in that capacity. Now that we use it we need to propagate the value accordingly
pkg/catalog/catalog.go
Outdated
} | ||
walker, err := block.NewWalkerWrapperFromAdapter(c.BlockAdapter, repository.StorageID.String(), source.Path, block.WalkerOptions{StorageURI: uri}) | ||
if err != nil { | ||
return err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return err | |
return fmt.Errorf("creating object-store walker on path %s: %w", source.Path, err) |
pkg/catalog/catalog.go
Outdated
} | ||
walker, err := block.NewWalkerWrapperFromAdapter(c.BlockAdapter, repository.StorageID.String(), params.SourceURI, block.WalkerOptions{StorageURI: uri, SkipOutOfOrder: true}) | ||
if err != nil { | ||
return nil, nil, err |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return nil, nil, err | |
return nil, nil, fmt.Errorf("creating object-store walker: %w", err) |
@@ -2504,10 +2499,13 @@ func (c *Catalog) WriteRange(ctx context.Context, repositoryID string, params Wr | |||
return nil, nil, err | |||
} | |||
|
|||
// TODO (niro): Need to handle this at some point (use adapter GetWalker) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤩
pkg/catalog/catalog.go
Outdated
@@ -412,13 +405,13 @@ func New(ctx context.Context, cfg Config) (*Catalog, error) { | |||
workPool := pond.New(sharedWorkers, sharedWorkers*pendingTasksPerWorker, pond.Context(ctx)) | |||
|
|||
return &Catalog{ | |||
Config: cfg.Config, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this is this needed? it's not used anywhere
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another PR - removed
func NewWalkerWrapper(walker Walker, uri *url.URL) *WalkerWrapper { | ||
return &WalkerWrapper{ | ||
walker: walker, | ||
uri: uri, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this needed? I only see calls from tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Only for tests
pkg/block/adapter.go
Outdated
func NewWalkerWrapperFromAdapter(adapter Adapter, storageID, path string, opts WalkerOptions) (*WalkerWrapper, error) { | ||
walker, err := adapter.GetWalker(storageID, opts) | ||
if err != nil { | ||
return nil, fmt.Errorf("creating object-store walker on path %s: %w", path, err) | ||
} | ||
return &WalkerWrapper{ | ||
walker: walker, | ||
uri: opts.StorageURI, | ||
}, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like to many ways to initiate the WalkerWrapper, IMO leaving only NewWalkerWrapper
is nicer, NewWalkerWrapperFromAdapter
not worth the 1 line of code it reduces. It's a matter of taste though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -867,6 +867,7 @@ func TestLakectlFsStat(t *testing.T) { | |||
|
|||
func TestLakectlImport(t *testing.T) { | |||
// TODO(barak): generalize test to work all supported object stores | |||
const IngestTestBucketPath = "s3://esti-system-testing-data/ingest-test-data/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: move up or use inline
The existing esti tests cover the changes for all blockstore types |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
Approving, but please remove unused config
pkg/block/adapter.go
Outdated
func NewWalkerWrapperFromAdapter(adapter Adapter, storageID, path string, opts WalkerOptions) (*WalkerWrapper, error) { | ||
walker, err := adapter.GetWalker(storageID, opts) | ||
if err != nil { | ||
return nil, fmt.Errorf("creating object-store walker on path %s: %w", path, err) | ||
} | ||
return &WalkerWrapper{ | ||
walker: walker, | ||
uri: opts.StorageURI, | ||
}, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be removed
pkg/catalog/catalog.go
Outdated
SettingsManagerOption settings.ManagerOption | ||
PathProvider *upload.PathPartitionProvider | ||
} | ||
|
||
type Catalog struct { | ||
Config config.Config |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's still here
Co-authored-by: guy-har <[email protected]>
Closes #8570
Change Description
Background
Remove usage of ingest store factory and catalog walker interface.
Catalog will now use underlying block adapter implementation of walker