Skip to content

Commit

Permalink
br/storage: disable keep alive for GCS on default to speed up request (
Browse files Browse the repository at this point in the history
  • Loading branch information
D3Hunter authored Dec 12, 2023
1 parent 16e3bd3 commit 8c8333a
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 71 deletions.
3 changes: 2 additions & 1 deletion br/pkg/lightning/backend/external/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ go_test(
],
embed = [":external"],
flaky = True,
shard_count = 48,
shard_count = 50,
deps = [
"//br/pkg/lightning/backend/kv",
"//br/pkg/lightning/common",
Expand All @@ -80,6 +80,7 @@ go_test(
"@com_github_aws_aws_sdk_go//aws/session",
"@com_github_aws_aws_sdk_go//service/s3",
"@com_github_cockroachdb_pebble//:pebble",
"@com_github_docker_go_units//:go-units",
"@com_github_jfcg_sorty_v2//:sorty",
"@com_github_johannesboyne_gofakes3//:gofakes3",
"@com_github_johannesboyne_gofakes3//backend/s3mem",
Expand Down
157 changes: 96 additions & 61 deletions br/pkg/lightning/backend/external/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@ import (
"testing"
"time"

"github.com/docker/go-units"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"
)
Expand All @@ -43,7 +45,7 @@ func openTestingStorage(t *testing.T) storage.ExternalStorage {
t.Skip("testingStorageURI is not set")
}
s, err := storage.NewFromURL(context.Background(), *testingStorageURI, nil)
intest.AssertNoError(err)
require.NoError(t, err)
return s
}

Expand Down Expand Up @@ -473,7 +475,7 @@ type readTestSuite struct {
afterReaderClose func()
}

func readFileSequential(s *readTestSuite) {
func readFileSequential(t *testing.T, s *readTestSuite) {
ctx := context.Background()
files, _, err := GetAllFileNames(ctx, s.store, "/"+s.subDir)
intest.AssertNoError(err)
Expand All @@ -482,14 +484,21 @@ func readFileSequential(s *readTestSuite) {
if s.beforeCreateReader != nil {
s.beforeCreateReader()
}
var totalFileSize atomic.Int64
startTime := time.Now()
for i, file := range files {
reader, err := s.store.Open(ctx, file, nil)
intest.AssertNoError(err)
_, err = reader.Read(buf)
for err == nil {
_, err = reader.Read(buf)
var size int
for {
n, err := reader.Read(buf)
size += n
if err != nil {
break
}
}
intest.Assert(err == io.EOF)
totalFileSize.Add(int64(size))
if i == len(files)-1 {
if s.beforeReaderClose != nil {
s.beforeReaderClose()
Expand All @@ -501,9 +510,15 @@ func readFileSequential(s *readTestSuite) {
if s.afterReaderClose != nil {
s.afterReaderClose()
}
t.Logf(
"sequential read speed for %s bytes(%d files): %s/s",
units.BytesSize(float64(totalFileSize.Load())),
len(files),
units.BytesSize(float64(totalFileSize.Load())/time.Since(startTime).Seconds()),
)
}

func readFileConcurrently(s *readTestSuite) {
func readFileConcurrently(t *testing.T, s *readTestSuite) {
ctx := context.Background()
files, _, err := GetAllFileNames(ctx, s.store, "/"+s.subDir)
intest.AssertNoError(err)
Expand All @@ -516,16 +531,24 @@ func readFileConcurrently(s *readTestSuite) {
if s.beforeCreateReader != nil {
s.beforeCreateReader()
}
for _, file := range files {
var totalFileSize atomic.Int64
startTime := time.Now()
for i := range files {
file := files[i]
eg.Go(func() error {
buf := make([]byte, s.memoryLimit/conc)
reader, err := s.store.Open(ctx, file, nil)
intest.AssertNoError(err)
_, err = reader.Read(buf)
for err == nil {
_, err = reader.Read(buf)
var size int
for {
n, err := reader.Read(buf)
size += n
if err != nil {
break
}
}
intest.Assert(err == io.EOF)
totalFileSize.Add(int64(size))
once.Do(func() {
if s.beforeReaderClose != nil {
s.beforeReaderClose()
Expand All @@ -541,6 +564,13 @@ func readFileConcurrently(s *readTestSuite) {
if s.afterReaderClose != nil {
s.afterReaderClose()
}
totalDur := time.Since(startTime)
t.Logf(
"concurrent read speed for %s bytes(%d files): %s/s, total-dur=%s",
units.BytesSize(float64(totalFileSize.Load())),
len(files),
units.BytesSize(float64(totalFileSize.Load())/totalDur.Seconds()), totalDur,
)
}

func createEvenlyDistributedFiles(
Expand Down Expand Up @@ -581,7 +611,7 @@ func createEvenlyDistributedFiles(
return store, kvCnt
}

func readMergeIter(s *readTestSuite) {
func readMergeIter(t *testing.T, s *readTestSuite) {
ctx := context.Background()
files, _, err := GetAllFileNames(ctx, s.store, "/"+s.subDir)
intest.AssertNoError(err)
Expand All @@ -590,6 +620,8 @@ func readMergeIter(s *readTestSuite) {
s.beforeCreateReader()
}

startTime := time.Now()
var totalSize int
readBufSize := s.memoryLimit / len(files)
zeroOffsets := make([]uint64, len(files))
iter, err := NewMergeKVIter(ctx, files, zeroOffsets, s.store, readBufSize, s.mergeIterHotspot, 0)
Expand All @@ -603,13 +635,20 @@ func readMergeIter(s *readTestSuite) {
s.beforeReaderClose()
}
}
totalSize += len(iter.Key()) + len(iter.Value()) + lengthBytes*2
}
intest.Assert(kvCnt == s.totalKVCnt)
err = iter.Close()
intest.AssertNoError(err)
if s.afterReaderClose != nil {
s.afterReaderClose()
}
t.Logf(
"merge iter read (hotspot=%t) speed for %s bytes: %s/s",
s.mergeIterHotspot,
units.BytesSize(float64(totalSize)),
units.BytesSize(float64(totalSize)/time.Since(startTime).Seconds()),
)
}

func TestCompareReaderEvenlyDistributedContent(t *testing.T) {
Expand Down Expand Up @@ -656,21 +695,21 @@ func TestCompareReaderEvenlyDistributedContent(t *testing.T) {
subDir: subDir,
}

readFileSequential(suite)
readFileSequential(t, suite)
t.Logf(
"sequential read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)

readFileConcurrently(suite)
readFileConcurrently(t, suite)
t.Logf(
"concurrent read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)

readMergeIter(suite)
readMergeIter(t, suite)
t.Logf(
"merge iter read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
Expand All @@ -679,11 +718,10 @@ func TestCompareReaderEvenlyDistributedContent(t *testing.T) {
}

func createAscendingFiles(
t *testing.T,
store storage.ExternalStorage,
fileSize, fileCount int,
subDir string,
) (storage.ExternalStorage, int) {
store := openTestingStorage(t)
) int {
ctx := context.Background()

cleanOldFiles(ctx, store, "/"+subDir)
Expand Down Expand Up @@ -712,29 +750,54 @@ func createAscendingFiles(
err := writer.Close(ctx)
intest.AssertNoError(err)
}
return store, kvCnt
return kvCnt
}

func TestCompareReaderAscendingContent(t *testing.T) {
fileSize := 50 * 1024 * 1024
fileCnt := 24
subDir := "ascending"
store, kvCnt := createAscendingFiles(t, fileSize, fileCnt, subDir)
memoryLimit := 64 * 1024 * 1024
var (
objectPrefix = flag.String("object-prefix", "ascending", "object prefix")
fileSize = flag.Int("file-size", 50*units.MiB, "file size")
fileCount = flag.Int("file-count", 24, "file count")
concurrency = flag.Int("concurrency", 100, "concurrency")
memoryLimit = flag.Int("memory-limit", 64*units.MiB, "memory limit")
skipCreate = flag.Bool("skip-create", false, "skip create files")
)

func TestReadFileConcurrently(t *testing.T) {
testCompareReaderAscendingContent(t, readFileConcurrently)
}

func TestReadFileSequential(t *testing.T) {
testCompareReaderAscendingContent(t, readFileSequential)
}

func TestReadMergeIterCheckHotspot(t *testing.T) {
testCompareReaderAscendingContent(t, func(t *testing.T, suite *readTestSuite) {
suite.mergeIterHotspot = true
readMergeIter(t, suite)
})
}

func TestReadMergeIterWithoutCheckHotspot(t *testing.T) {
testCompareReaderAscendingContent(t, readMergeIter)
}

func testCompareReaderAscendingContent(t *testing.T, fn func(t *testing.T, suite *readTestSuite)) {
store := openTestingStorage(t)
kvCnt := 0
if !*skipCreate {
kvCnt = createAscendingFiles(store, *fileSize, *fileCount, *objectPrefix)
}
fileIdx := 0
var (
now time.Time
elapsed time.Duration
file *os.File
err error
file *os.File
err error
)
beforeTest := func() {
fileIdx++
file, err = os.Create(fmt.Sprintf("cpu-profile-%d.prof", fileIdx))
intest.AssertNoError(err)
err = pprof.StartCPUProfile(file)
intest.AssertNoError(err)
now = time.Now()
}
beforeClose := func() {
file, err = os.Create(fmt.Sprintf("heap-profile-%d.prof", fileIdx))
Expand All @@ -744,49 +807,21 @@ func TestCompareReaderAscendingContent(t *testing.T) {
intest.AssertNoError(err)
}
afterClose := func() {
elapsed = time.Since(now)
pprof.StopCPUProfile()
}

suite := &readTestSuite{
store: store,
totalKVCnt: kvCnt,
concurrency: 100,
memoryLimit: memoryLimit,
concurrency: *concurrency,
memoryLimit: *memoryLimit,
beforeCreateReader: beforeTest,
beforeReaderClose: beforeClose,
afterReaderClose: afterClose,
subDir: subDir,
subDir: *objectPrefix,
}

readFileSequential(suite)
t.Logf(
"sequential read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)

readFileConcurrently(suite)
t.Logf(
"concurrent read speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)

readMergeIter(suite)
t.Logf(
"merge iter read (hotspot=false) speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)

suite.mergeIterHotspot = true
readMergeIter(suite)
t.Logf(
"merge iter read (hotspot=true) speed for %d bytes: %.2f MB/s",
fileSize*fileCnt,
float64(fileSize*fileCnt)/elapsed.Seconds()/1024/1024,
)
fn(t, suite)
}

const largeAscendingDataPath = "large_ascending_data"
Expand Down
1 change: 1 addition & 0 deletions br/pkg/storage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ go_library(
"@com_google_cloud_go_storage//:storage",
"@org_golang_google_api//iterator",
"@org_golang_google_api//option",
"@org_golang_google_api//transport/http",
"@org_golang_x_oauth2//google",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_zap//:zap",
Expand Down
25 changes: 20 additions & 5 deletions br/pkg/storage/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"bytes"
"context"
"io"
"net/http"
"os"
"path"
"strings"
Expand All @@ -20,6 +21,7 @@ import (
"golang.org/x/oauth2/google"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
htransport "google.golang.org/api/transport/http"
)

const (
Expand Down Expand Up @@ -319,12 +321,25 @@ func NewGCSStorage(ctx context.Context, gcs *backuppb.GCS, opts *ExternalStorage
if gcs.Endpoint != "" {
clientOps = append(clientOps, option.WithEndpoint(gcs.Endpoint))
}
// the HTTPClient should has credential, currently the HTTPClient only has the http.Transport.
// So we remove the HTTPClient in the storage.New().
// Issue: https: //github.com/pingcap/tidb/issues/47022
if opts.HTTPClient != nil {
clientOps = append(clientOps, option.WithHTTPClient(opts.HTTPClient))

httpClient := opts.HTTPClient
if httpClient == nil {
// http2 will reuse the connection to read multiple files, which is
// very slow, the speed is about the same speed as reading a single file.
// So we disable keepalive here to use multiple connections to read files.
// open a new connection takes about 20~50ms, which is acceptable.
transport, _ := CloneDefaultHttpTransport()
transport.DisableKeepAlives = true
httpClient = &http.Client{Transport: transport}
// see https://github.com/pingcap/tidb/issues/47022#issuecomment-1722913455
var err error
httpClient.Transport, err = htransport.NewTransport(ctx, httpClient.Transport, clientOps...)
if err != nil {
return nil, errors.Trace(err)
}
}
clientOps = append(clientOps, option.WithHTTPClient(httpClient))

client, err := storage.NewClient(ctx, clientOps...)
if err != nil {
return nil, errors.Trace(err)
Expand Down
Loading

0 comments on commit 8c8333a

Please sign in to comment.