Skip to content

Commit

Permalink
lightning: turn off zstd decoding concurrency (pingcap#53589)
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 authored Jun 3, 2024
1 parent 6183610 commit c6e6c15
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 6 deletions.
4 changes: 3 additions & 1 deletion lightning/pkg/importer/chunk_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func openParser(
tblInfo *model.TableInfo,
) (mydump.Parser, error) {
blockBufSize := int64(cfg.Mydumper.ReadBlockSize)
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store, storage.DecompressConfig{})
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store, storage.DecompressConfig{
ZStdDecodeConcurrency: 1,
})
if err != nil {
return nil, err
}
Expand Down
8 changes: 6 additions & 2 deletions lightning/pkg/importer/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,9 @@ func (p *PreImportInfoGetterImpl) ReadFirstNRowsByTableName(ctx context.Context,
// ReadFirstNRowsByFileMeta reads the first N rows of an data file.
// It implements the PreImportInfoGetter interface.
func (p *PreImportInfoGetterImpl) ReadFirstNRowsByFileMeta(ctx context.Context, dataFileMeta mydump.SourceFileMeta, n int) ([]string, [][]types.Datum, error) {
reader, err := mydump.OpenReader(ctx, &dataFileMeta, p.srcStorage, storage.DecompressConfig{})
reader, err := mydump.OpenReader(ctx, &dataFileMeta, p.srcStorage, storage.DecompressConfig{
ZStdDecodeConcurrency: 1,
})
if err != nil {
return nil, nil, errors.Trace(err)
}
Expand Down Expand Up @@ -610,7 +612,9 @@ func (p *PreImportInfoGetterImpl) sampleDataFromTable(
return resultIndexRatio, isRowOrdered, nil
}
sampleFile := tableMeta.DataFiles[0].FileMeta
reader, err := mydump.OpenReader(ctx, &sampleFile, p.srcStorage, storage.DecompressConfig{})
reader, err := mydump.OpenReader(ctx, &sampleFile, p.srcStorage, storage.DecompressConfig{
ZStdDecodeConcurrency: 1,
})
if err != nil {
return 0.0, false, errors.Trace(err)
}
Expand Down
4 changes: 3 additions & 1 deletion lightning/pkg/importer/table_import.go
Original file line number Diff line number Diff line change
Expand Up @@ -1188,7 +1188,9 @@ func getChunkCompressedSizeForParquet(
chunk *checkpoints.ChunkCheckpoint,
store storage.ExternalStorage,
) (int64, error) {
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store, storage.DecompressConfig{})
reader, err := mydump.OpenReader(ctx, &chunk.FileMeta, store, storage.DecompressConfig{
ZStdDecodeConcurrency: 1,
})
if err != nil {
return 0, errors.Trace(err)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/executor/load_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ func (e *LoadDataWorker) LoadLocal(ctx context.Context, r io.ReadCloser) error {
readers := []importer.LoadDataReaderInfo{{
Opener: func(_ context.Context) (io.ReadSeekCloser, error) {
addedSeekReader := NewSimpleSeekerOnReadCloser(r)
return storage.InterceptDecompressReader(addedSeekReader, compressTp2, storage.DecompressConfig{})
return storage.InterceptDecompressReader(addedSeekReader, compressTp2, storage.DecompressConfig{
ZStdDecodeConcurrency: 1,
})
}}}
return e.load(ctx, readers)
}
Expand Down
4 changes: 3 additions & 1 deletion pkg/lightning/mydump/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,9 @@ func ExportStatement(ctx context.Context, store storage.ExternalStorage,
if err != nil {
return nil, errors.Trace(err)
}
store = storage.WithCompression(store, compressType, storage.DecompressConfig{})
store = storage.WithCompression(store, compressType, storage.DecompressConfig{
ZStdDecodeConcurrency: 1,
})
}
fd, err := store.Open(ctx, sqlFile.FileMeta.Path, nil)
if err != nil {
Expand Down

0 comments on commit c6e6c15

Please sign in to comment.