From d200f72d2d4f0277c18a44d2a84e81ad0de428ed Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 16 Mar 2023 21:35:52 +0800 Subject: [PATCH 1/3] test: should not leak goroutines after test finished The original flaky test shows in CI pipeline[1], but gotestsum run into a golang issue[2]. The error message is not clear from summary, like ``` {"Time":"2023-03-02T09:19:38.754394861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1433 +0x7e4\n"} {"Time":"2023-03-02T09:19:38.754414561Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/runtime/panic.go:476 +0x32\n"} {"Time":"2023-03-02T09:19:38.754430561Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1493 +0x47\n"} {"Time":"2023-03-02T09:19:38.754482561Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:883 +0xc4\n"} {"Time":"2023-03-02T09:19:38.754497661Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:876 +0xa4\n"} {"Time":"2023-03-02T09:19:38.754512161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:927 +0x6a\n"} {"Time":"2023-03-02T09:19:38.754567661Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.uber.org/zap/zaptest.testingWriter.Write()\n"} {"Time":"2023-03-02T09:19:38.754571261Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/go/pkg/mod/go.uber.org/zap@v1.24.0/zaptest/logger.go:130 +0x12c\n"} {"Time":"2023-03-02T09:19:38.754582861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.uber.org/zap/zaptest.(*testingWriter).Write()\n"} {"Time":"2023-03-02T09:19:38.754597761Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.uber.org/zap/zapcore.(*ioCore).Write()\n"} {"Time":"2023-03-02T09:19:38.754600961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/go/pkg/mod/go.uber.org/zap@v1.24.0/zapcore/core.go:99 +0x199\n"} {"Time":"2023-03-02T09:19:38.754612761Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.uber.org/zap/zapcore.(*CheckedEntry).Write()\n"} {"Time":"2023-03-02T09:19:38.754618561Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/go/pkg/mod/go.uber.org/zap@v1.24.0/zapcore/entry.go:255 +0x2ce\n"} {"Time":"2023-03-02T09:19:38.754630161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.uber.org/zap.(*Logger).Info()\n"} {"Time":"2023-03-02T09:19:38.754633261Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/go/pkg/mod/go.uber.org/zap@v1.24.0/logger.go:220 +0x6a\n"} {"Time":"2023-03-02T09:19:38.754644861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/server/v3/storage/mvcc.(*treeIndex).Compact()\n"} {"Time":"2023-03-02T09:19:38.754648461Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/server/storage/mvcc/index.go:194 +0x144\n"} {"Time":"2023-03-02T09:19:38.754664961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/server/v3/storage/mvcc.(*store).scheduleCompaction()\n"} {"Time":"2023-03-02T09:19:38.754670161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/server/storage/mvcc/kvstore_compaction.go:29 +0xbb\n"} {"Time":"2023-03-02T09:19:38.754681861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/server/v3/storage/mvcc.(*store).compact.func1()\n"} {"Time":"2023-03-02T09:19:38.754690561Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/server/storage/mvcc/kvstore.go:235 +0x9e\n"} {"Time":"2023-03-02T09:19:38.754720061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/pkg/v3/schedule.job.Do()\n"} {"Time":"2023-03-02T09:19:38.754724161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/pkg/schedule/schedule.go:41 +0x70\n"} {"Time":"2023-03-02T09:19:38.754736161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/pkg/v3/schedule.(*job).Do()\n"} {"Time":"2023-03-02T09:19:38.754750961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/pkg/v3/schedule.(*fifo).executeJob()\n"} {"Time":"2023-03-02T09:19:38.754754161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/pkg/schedule/schedule.go:206 +0x101\n"} {"Time":"2023-03-02T09:19:38.754765861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/pkg/v3/schedule.(*fifo).run()\n"} {"Time":"2023-03-02T09:19:38.754769061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/pkg/schedule/schedule.go:187 +0x1a5\n"} {"Time":"2023-03-02T09:19:38.754780461Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/pkg/v3/schedule.NewFIFOScheduler.func1()\n"} {"Time":"2023-03-02T09:19:38.754783661Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/pkg/schedule/schedule.go:101 +0x39\n"} {"Time":"2023-03-02T09:19:38.754824061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1493 +0x75d\n"} FAIL: (code:1): % (cd server && 'env' 'ETCD_VERIFY=all' 'go' 'test' '-v' '-json' '-short' '-timeout=3m' '--race=true' '--cpu=4' './...' '-p=2') {"Time":"2023-03-02T09:19:38.754838961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1846 +0x99\n"} {"Time":"2023-03-02T09:19:38.754854961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1446 +0x216\n"} {"Time":"2023-03-02T09:19:38.754893461Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1844 +0x7ec\n"} {"Time":"2023-03-02T09:19:38.754908961Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1726 +0xa84\n"} {"Time":"2023-03-02T09:19:38.754957861Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/pkg/v3/schedule.NewFIFOScheduler()\n"} {"Time":"2023-03-02T09:19:38.754961061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/pkg/schedule/schedule.go:101 +0x3b6\n"} {"Time":"2023-03-02T09:19:38.754976161Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/server/v3/storage/mvcc.NewStore()\n"} {"Time":"2023-03-02T09:19:38.754979361Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/server/storage/mvcc/kvstore.go:111 +0x331\n"} {"Time":"2023-03-02T09:19:38.754991061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" go.etcd.io/etcd/server/v3/storage/mvcc.TestHashByRevValue()\n"} {"Time":"2023-03-02T09:19:38.754994261Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /home/runner/work/etcd/etcd/server/storage/mvcc/hash_test.go:36 +0xa4\n"} {"Time":"2023-03-02T09:19:38.755010061Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1446 +0x216\n"} {"Time":"2023-03-02T09:19:38.755024461Z","Action":"output","Package":"go.etcd.io/etcd/server/v3/storage/mvcc","Test":"TestHashByRevValue","Output":" /opt/hostedtoolcache/go/1.19.6/x64/src/testing/testing.go:1493 +0x47\n"} === Failed === FAIL: storage/mvcc (0.00s) === CONT testing.go:1319: race detected during execution of test FAIL FAIL go.etcd.io/etcd/server/v3/storage/mvcc 9.852s ``` After using the following command to reproduce it, we can get the error like: ```bash go test -v -p=2 --cpu=4 -count=1000 -failfast --race=true -short -timeout=30m ./ --- PASS: TestHashByRevValueLastRevision (0.12s) ================== WARNING: DATA RACE Read at 0x00c002024043 by goroutine 65745: testing.(*common).logDepth() /usr/lib/go-1.19/src/testing/testing.go:883 +0xc4 testing.(*common).log() /usr/lib/go-1.19/src/testing/testing.go:876 +0xa4 testing.(*common).Logf() /usr/lib/go-1.19/src/testing/testing.go:927 +0x6a testing.(*T).Logf() :1 +0x75 go.uber.org/zap/zaptest.testingWriter.Write() /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.24.0/zaptest/logger.go:130 +0x12c go.uber.org/zap/zaptest.(*testingWriter).Write() :1 +0x7e go.uber.org/zap/zapcore.(*ioCore).Write() /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.24.0/zapcore/core.go:99 +0x199 go.uber.org/zap/zapcore.(*CheckedEntry).Write() /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.24.0/zapcore/entry.go:255 +0x2ce go.uber.org/zap.(*Logger).Info() /home/fuwei/go/pkg/mod/go.uber.org/zap@v1.24.0/logger.go:220 +0x6a go.etcd.io/etcd/server/v3/storage/mvcc.(*treeIndex).Compact() /home/fuwei/go/src/go.etcd.io/etcd/server/storage/mvcc/index.go:194 +0x144 go.etcd.io/etcd/server/v3/storage/mvcc.(*store).scheduleCompaction() /home/fuwei/go/src/go.etcd.io/etcd/server/storage/mvcc/kvstore_compaction.go:29 +0xbb go.etcd.io/etcd/server/v3/storage/mvcc.(*store).compact.func1() /home/fuwei/go/src/go.etcd.io/etcd/server/storage/mvcc/kvstore.go:235 +0x9e go.etcd.io/etcd/pkg/v3/schedule.job.Do() /home/fuwei/go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:41 +0x70 go.etcd.io/etcd/pkg/v3/schedule.(*job).Do() :1 +0x29 go.etcd.io/etcd/pkg/v3/schedule.(*fifo).executeJob() /home/fuwei/go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:206 +0x101 go.etcd.io/etcd/pkg/v3/schedule.(*fifo).run() /home/fuwei/go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:187 +0x1a5 go.etcd.io/etcd/pkg/v3/schedule.NewFIFOScheduler.func1() /home/fuwei/go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:101 +0x39 Previous write at 0x00c002024043 by goroutine 65743: testing.tRunner.func1() /usr/lib/go-1.19/src/testing/testing.go:1433 +0x7e4 runtime.deferreturn() /usr/lib/go-1.19/src/runtime/panic.go:476 +0x32 testing.(*T).Run.func1() /usr/lib/go-1.19/src/testing/testing.go:1493 +0x47 Goroutine 65745 (running) created at: go.etcd.io/etcd/pkg/v3/schedule.NewFIFOScheduler() /home/fuwei/go/src/go.etcd.io/etcd/pkg/schedule/schedule.go:101 +0x3b6 go.etcd.io/etcd/server/v3/storage/mvcc.NewStore() /home/fuwei/go/src/go.etcd.io/etcd/server/storage/mvcc/kvstore.go:111 +0x331 go.etcd.io/etcd/server/v3/storage/mvcc.TestHashByRevValueLastRevision() /home/fuwei/go/src/go.etcd.io/etcd/server/storage/mvcc/hash_test.go:76 +0xa4 testing.tRunner() /usr/lib/go-1.19/src/testing/testing.go:1446 +0x216 testing.(*T).Run.func1() /usr/lib/go-1.19/src/testing/testing.go:1493 +0x47 Goroutine 65743 (running) created at: testing.(*T).Run() /usr/lib/go-1.19/src/testing/testing.go:1493 +0x75d testing.runTests.func1() /usr/lib/go-1.19/src/testing/testing.go:1846 +0x99 testing.tRunner() /usr/lib/go-1.19/src/testing/testing.go:1446 +0x216 testing.runTests() /usr/lib/go-1.19/src/testing/testing.go:1844 +0x7ec testing.(*M).Run() /usr/lib/go-1.19/src/testing/testing.go:1726 +0xa84 main.main() _testmain.go:265 +0x2e9 ================== ``` The schedule for compact is handled asynchronously and it might use `t.Logf` after go-test marks the case is done. And there is a comment from go-test: ```go // https://github.com/golang/go/blob/c69ff3a7d0c8bd2878662034c1cbce8613fa6f13/src/testing/testing.go#LL1580C3-L1582C16 // Do not lock t.done to allow race detector to detect race in case // the user does not appropriately synchronize a goroutine. t.done = true ``` We need to ensure that all the goroutines should be closed before case finish. REF: [1]: https://github.com/etcd-io/etcd/actions/runs/4312405975/jobs/7522924734 [2]: https://github.com/gotestyourself/gotestsum/issues/310 Signed-off-by: Wei Fu --- server/storage/mvcc/hash_test.go | 13 +++- .../storage/mvcc/kvstore_compaction_test.go | 9 ++- server/storage/mvcc/kvstore_test.go | 15 ++-- server/storage/mvcc/watchable_store_test.go | 70 +++++++------------ 4 files changed, 53 insertions(+), 54 deletions(-) diff --git a/server/storage/mvcc/hash_test.go b/server/storage/mvcc/hash_test.go index 2c7a35f9a60..f0d516276be 100644 --- a/server/storage/mvcc/hash_test.go +++ b/server/storage/mvcc/hash_test.go @@ -32,8 +32,9 @@ import ( // output which would have catastrophic consequences. Expected output is just // hardcoded, so please regenerate it every time you change input parameters. func TestHashByRevValue(t *testing.T) { - b, _ := betesting.NewDefaultTmpBackend(t) + b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) var totalRevisions int64 = 1210 assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions) @@ -72,8 +73,9 @@ func TestHashByRevValue(t *testing.T) { } func TestHashByRevValueLastRevision(t *testing.T) { - b, _ := betesting.NewDefaultTmpBackend(t) + b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) var totalRevisions int64 = 1210 assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions) @@ -131,8 +133,9 @@ func testHashByRev(t *testing.T, s *store, rev int64) KeyValueHash { // TestCompactionHash tests compaction hash // TODO: Change this to fuzz test func TestCompactionHash(t *testing.T) { - b, _ := betesting.NewDefaultTmpBackend(t) + b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) testutil.TestCompactionHash(context.Background(), t, hashTestCase{s}, s.cfg.CompactionBatchLimit) } @@ -176,6 +179,8 @@ func (tc hashTestCase) Compact(ctx context.Context, rev int64) error { func TestHasherStore(t *testing.T) { lg := zaptest.NewLogger(t) s := newHashStorage(lg, newFakeStore(lg)) + defer s.store.Close() + var hashes []KeyValueHash for i := 0; i < hashStorageMaxSize; i++ { hash := KeyValueHash{Hash: uint32(i), Revision: int64(i) + 10, CompactRevision: int64(i) + 100} @@ -203,6 +208,8 @@ func TestHasherStore(t *testing.T) { func TestHasherStoreFull(t *testing.T) { lg := zaptest.NewLogger(t) s := newHashStorage(lg, newFakeStore(lg)) + defer s.store.Close() + var minRevision int64 = 100 var maxRevision = minRevision + hashStorageMaxSize for i := 0; i < hashStorageMaxSize; i++ { diff --git a/server/storage/mvcc/kvstore_compaction_test.go b/server/storage/mvcc/kvstore_compaction_test.go index 2f8fac83c77..b9b7822739b 100644 --- a/server/storage/mvcc/kvstore_compaction_test.go +++ b/server/storage/mvcc/kvstore_compaction_test.go @@ -110,7 +110,10 @@ func TestScheduleCompaction(t *testing.T) { func TestCompactAllAndRestore(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer os.Remove(tmpPath) + defer func() { + b.Close() + os.Remove(tmpPath) + }() s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease) @@ -143,4 +146,8 @@ func TestCompactAllAndRestore(t *testing.T) { if err != nil { t.Errorf("unexpect range error %v", err) } + err = s1.Close() + if err != nil { + t.Fatal(err) + } } diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index c755827ce68..eb7c6acbcb3 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -22,7 +22,6 @@ import ( "fmt" "math" mrand "math/rand" - "os" "reflect" "sort" "strconv" @@ -324,6 +323,7 @@ func TestStoreDeleteRange(t *testing.T) { if s.currentRev != tt.wrev.main { t.Errorf("#%d: rev = %+v, want %+v", i, s.currentRev, tt.wrev) } + s.Close() } } @@ -370,6 +370,7 @@ func TestStoreRestore(t *testing.T) { s := newFakeStore(lg) b := s.b.(*fakeBackend) fi := s.kvindex.(*fakeIndex) + defer s.Close() putkey := newTestKeyBytes(lg, revision{3, 0}, false) putkv := mvccpb.KeyValue{ @@ -435,6 +436,7 @@ func TestRestoreDelete(t *testing.T) { b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer b.Close() keys := make(map[string]struct{}) for i := 0; i < 20; i++ { @@ -480,7 +482,7 @@ func TestRestoreDelete(t *testing.T) { func TestRestoreContinueUnfinishedCompaction(t *testing.T) { tests := []string{"recreate", "restore"} for _, test := range tests { - b, _ := betesting.NewDefaultTmpBackend(t) + b, tmpPath := betesting.NewDefaultTmpBackend(t) s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -527,9 +529,10 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { time.Sleep(100 * time.Millisecond) continue } + // FIXME(fuweid): it doesn't test restore one? return } - + cleanup(s, b, tmpPath) t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes)) } } @@ -705,7 +708,7 @@ func TestTxnPut(t *testing.T) { func TestConcurrentReadNotBlockingWrite(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer os.Remove(tmpPath) + defer cleanup(s, b, tmpPath) // write something to read later s.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -774,9 +777,7 @@ func TestConcurrentReadTxAndWrite(t *testing.T) { ) b, tmpPath := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer b.Close() - defer s.Close() - defer os.Remove(tmpPath) + defer cleanup(s, b, tmpPath) var wg sync.WaitGroup wg.Add(numOfWrites) diff --git a/server/storage/mvcc/watchable_store_test.go b/server/storage/mvcc/watchable_store_test.go index a36c3ee1430..b7d6868f84b 100644 --- a/server/storage/mvcc/watchable_store_test.go +++ b/server/storage/mvcc/watchable_store_test.go @@ -17,7 +17,6 @@ package mvcc import ( "bytes" "fmt" - "os" "reflect" "sync" "testing" @@ -34,20 +33,16 @@ import ( func TestWatch(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - - defer func() { - b.Close() - s.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b, tmpPath) testKey := []byte("foo") testValue := []byte("bar") s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() - w.Watch(0, testKey, nil, 0) + defer w.Close() + w.Watch(0, testKey, nil, 0) if !s.synced.contains(string(testKey)) { // the key must have had an entry in synced t.Errorf("existence = false, want true") @@ -57,18 +52,16 @@ func TestWatch(t *testing.T) { func TestNewWatcherCancel(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) - defer func() { - s.store.Close() - os.Remove(tmpPath) - }() testKey := []byte("foo") testValue := []byte("bar") s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() - wt, _ := w.Watch(0, testKey, nil, 0) + defer w.Close() + wt, _ := w.Watch(0, testKey, nil, 0) if err := w.Cancel(wt); err != nil { t.Error(err) } @@ -94,12 +87,10 @@ func TestCancelUnsynced(t *testing.T) { // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. synced: newWatcherGroup(), + stopc: make(chan struct{}), } - defer func() { - s.store.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b, tmpPath) // Put a key so that we can spawn watchers on that key. // (testKey in this test). This increases the rev to 1, @@ -110,6 +101,7 @@ func TestCancelUnsynced(t *testing.T) { s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() + defer w.Close() // arbitrary number for watchers watcherN := 100 @@ -146,18 +138,17 @@ func TestSyncWatchers(t *testing.T) { store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}), unsynced: newWatcherGroup(), synced: newWatcherGroup(), + stopc: make(chan struct{}), } - defer func() { - s.store.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b, tmpPath) testKey := []byte("foo") testValue := []byte("bar") s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() + defer w.Close() // arbitrary number for watchers watcherN := 100 @@ -227,11 +218,8 @@ func TestSyncWatchers(t *testing.T) { func TestWatchCompacted(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + defer cleanup(s, b, tmpPath) - defer func() { - s.store.Close() - os.Remove(tmpPath) - }() testKey := []byte("foo") testValue := []byte("bar") @@ -246,8 +234,9 @@ func TestWatchCompacted(t *testing.T) { } w := s.NewWatchStream() - wt, _ := w.Watch(0, testKey, nil, compactRev-1) + defer w.Close() + wt, _ := w.Watch(0, testKey, nil, compactRev-1) select { case resp := <-w.Chan(): if resp.WatchID != wt { @@ -264,17 +253,14 @@ func TestWatchCompacted(t *testing.T) { func TestWatchFutureRev(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - - defer func() { - b.Close() - s.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b, tmpPath) testKey := []byte("foo") testValue := []byte("bar") w := s.NewWatchStream() + defer w.Close() + wrev := int64(10) w.Watch(0, testKey, nil, wrev) @@ -317,6 +303,8 @@ func TestWatchRestore(t *testing.T) { defer cleanup(newStore, newBackend, newPath) w := newStore.NewWatchStream() + defer w.Close() + w.Watch(0, testKey, nil, rev-1) time.Sleep(delay) @@ -365,6 +353,8 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) { // create a watcher with a future revision // add to "synced" watcher group (startRev > s.store.currentRev) w1 := s1.NewWatchStream() + defer w1.Close() + w1.Watch(0, testKey, nil, startRev) // make "s2" ends up with a higher last revision @@ -407,8 +397,7 @@ func TestWatchBatchUnsynced(t *testing.T) { oldMaxRevs := watchBatchMaxRevs defer func() { watchBatchMaxRevs = oldMaxRevs - s.store.Close() - os.Remove(tmpPath) + cleanup(s, b, tmpPath) }() batches := 3 watchBatchMaxRevs = 4 @@ -419,6 +408,8 @@ func TestWatchBatchUnsynced(t *testing.T) { } w := s.NewWatchStream() + defer w.Close() + w.Watch(0, v, nil, 1) for i := 0; i < batches; i++ { if resp := <-w.Chan(); len(resp.Events) != watchBatchMaxRevs { @@ -539,9 +530,7 @@ func TestWatchVictims(t *testing.T) { s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer func() { - b.Close() - s.Close() - os.Remove(tmpPath) + cleanup(s, b, tmpPath) chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync }() @@ -616,12 +605,7 @@ func TestWatchVictims(t *testing.T) { func TestStressWatchCancelClose(t *testing.T) { b, tmpPath := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - - defer func() { - b.Close() - s.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b, tmpPath) testKey, testValue := []byte("foo"), []byte("bar") var wg sync.WaitGroup From 830d9e9eaa883bc5fa410cbdae3c711e76056d61 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Thu, 16 Mar 2023 22:19:21 +0800 Subject: [PATCH 2/3] test: fix TestRestoreContinueUnfinishedCompaction The original testcase uses `return` statement which skips `restore` case. It's aimed to enable `restore` testcase. Signed-off-by: Wei Fu --- server/storage/mvcc/kvstore_test.go | 99 +++++++++++++++-------------- 1 file changed, 52 insertions(+), 47 deletions(-) diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index eb7c6acbcb3..02ab63db26e 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -482,58 +482,63 @@ func TestRestoreDelete(t *testing.T) { func TestRestoreContinueUnfinishedCompaction(t *testing.T) { tests := []string{"recreate", "restore"} for _, test := range tests { - b, tmpPath := betesting.NewDefaultTmpBackend(t) - s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - - s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) - s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease) - s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease) - - // write scheduled compaction, but not do compaction - rbytes := newRevBytes() - revToBytes(revision{main: 2}, rbytes) - tx := s0.b.BatchTx() - tx.Lock() - UnsafeSetScheduledCompact(tx, 2) - tx.Unlock() - - s0.Close() - - var s *store - switch test { - case "recreate": - s = NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - case "restore": - s0.Restore(b) - s = s0 - } + test := test - // wait for scheduled compaction to be finished - time.Sleep(100 * time.Millisecond) + t.Run(test, func(t *testing.T) { + b, tmpPath := betesting.NewDefaultTmpBackend(t) + s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - if _, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted { - t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted) - } - // check the key in backend is deleted - revbytes := newRevBytes() - revToBytes(revision{main: 1}, revbytes) - - // The disk compaction is done asynchronously and requires more time on slow disk. - // try 5 times for CI with slow IO. - for i := 0; i < 5; i++ { - tx := s.b.BatchTx() + s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) + s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease) + s0.Put([]byte("foo"), []byte("bar2"), lease.NoLease) + + // write scheduled compaction, but not do compaction + rbytes := newRevBytes() + revToBytes(revision{main: 2}, rbytes) + tx := s0.b.BatchTx() tx.Lock() - ks, _ := tx.UnsafeRange(schema.Key, revbytes, nil, 0) + UnsafeSetScheduledCompact(tx, 2) tx.Unlock() - if len(ks) != 0 { - time.Sleep(100 * time.Millisecond) - continue + + var s *store + switch test { + case "recreate": + s0.Close() + s = NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) + case "restore": + // TODO(fuweid): store doesn't support to restore + // from a closed status because there is no lock + // for `Close` or action to mark it is closed. + s0.Restore(b) + s = s0 } - // FIXME(fuweid): it doesn't test restore one? - return - } - cleanup(s, b, tmpPath) - t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes)) + defer cleanup(s, b, tmpPath) + + // wait for scheduled compaction to be finished + time.Sleep(100 * time.Millisecond) + + if _, err := s.Range(context.TODO(), []byte("foo"), nil, RangeOptions{Rev: 1}); err != ErrCompacted { + t.Errorf("range on compacted rev error = %v, want %v", err, ErrCompacted) + } + // check the key in backend is deleted + revbytes := newRevBytes() + revToBytes(revision{main: 1}, revbytes) + + // The disk compaction is done asynchronously and requires more time on slow disk. + // try 5 times for CI with slow IO. + for i := 0; i < 5; i++ { + tx := s.b.BatchTx() + tx.Lock() + ks, _ := tx.UnsafeRange(schema.Key, revbytes, nil, 0) + tx.Unlock() + if len(ks) != 0 { + time.Sleep(100 * time.Millisecond) + continue + } + return + } + t.Errorf("key for rev %+v still exists, want deleted", bytesToRev(revbytes)) + }) } } From eb09e00541249307ff07b281d45203f81a565aa2 Mon Sep 17 00:00:00 2001 From: Wei Fu Date: Sun, 19 Mar 2023 21:41:43 +0800 Subject: [PATCH 3/3] chore: refactor cleanup fn in mvcc test The tmp path is cleanup by go testing so that the `cleanup` doesn't need to call `os.Remove`. Signed-off-by: Wei Fu --- server/storage/mvcc/hash_test.go | 12 +-- server/storage/mvcc/kv_test.go | 75 +++++++++---------- server/storage/mvcc/kvstore_bench_test.go | 20 ++--- .../storage/mvcc/kvstore_compaction_test.go | 12 +-- server/storage/mvcc/kvstore_test.go | 28 +++---- .../mvcc/watchable_store_bench_test.go | 30 ++++---- server/storage/mvcc/watchable_store_test.go | 52 ++++++------- server/storage/mvcc/watcher_bench_test.go | 5 +- server/storage/mvcc/watcher_test.go | 32 ++++---- 9 files changed, 128 insertions(+), 138 deletions(-) diff --git a/server/storage/mvcc/hash_test.go b/server/storage/mvcc/hash_test.go index f0d516276be..d906d41f17a 100644 --- a/server/storage/mvcc/hash_test.go +++ b/server/storage/mvcc/hash_test.go @@ -32,9 +32,9 @@ import ( // output which would have catastrophic consequences. Expected output is just // hardcoded, so please regenerate it every time you change input parameters. func TestHashByRevValue(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) var totalRevisions int64 = 1210 assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions) @@ -73,9 +73,9 @@ func TestHashByRevValue(t *testing.T) { } func TestHashByRevValueLastRevision(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) var totalRevisions int64 = 1210 assert.Less(t, int64(s.cfg.CompactionBatchLimit), totalRevisions) @@ -133,9 +133,9 @@ func testHashByRev(t *testing.T, s *store, rev int64) KeyValueHash { // TestCompactionHash tests compaction hash // TODO: Change this to fuzz test func TestCompactionHash(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) testutil.TestCompactionHash(context.Background(), t, hashTestCase{s}, s.cfg.CompactionBatchLimit) } diff --git a/server/storage/mvcc/kv_test.go b/server/storage/mvcc/kv_test.go index bc2081b02ec..ef5461035f4 100644 --- a/server/storage/mvcc/kv_test.go +++ b/server/storage/mvcc/kv_test.go @@ -79,9 +79,9 @@ func TestKVRange(t *testing.T) { testKVRange(t, normalRangeFunc) } func TestKVTxnRange(t *testing.T) { testKVRange(t, txnRangeFunc) } func testKVRange(t *testing.T, f rangeFunc) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) kvs := put3TestKVs(s) @@ -145,9 +145,9 @@ func TestKVRangeRev(t *testing.T) { testKVRangeRev(t, normalRangeFunc) } func TestKVTxnRangeRev(t *testing.T) { testKVRangeRev(t, txnRangeFunc) } func testKVRangeRev(t *testing.T, f rangeFunc) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) kvs := put3TestKVs(s) @@ -181,9 +181,9 @@ func TestKVRangeBadRev(t *testing.T) { testKVRangeBadRev(t, normalRangeFunc) func TestKVTxnRangeBadRev(t *testing.T) { testKVRangeBadRev(t, txnRangeFunc) } func testKVRangeBadRev(t *testing.T, f rangeFunc) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) put3TestKVs(s) if _, err := s.Compact(traceutil.TODO(), 4); err != nil { @@ -214,9 +214,9 @@ func TestKVRangeLimit(t *testing.T) { testKVRangeLimit(t, normalRangeFunc) } func TestKVTxnRangeLimit(t *testing.T) { testKVRangeLimit(t, txnRangeFunc) } func testKVRangeLimit(t *testing.T, f rangeFunc) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) kvs := put3TestKVs(s) @@ -260,9 +260,9 @@ func TestKVPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, normalP func TestKVTxnPutMultipleTimes(t *testing.T) { testKVPutMultipleTimes(t, txnPutFunc) } func testKVPutMultipleTimes(t *testing.T, f putFunc) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) for i := 0; i < 10; i++ { base := int64(i + 1) @@ -322,7 +322,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) { } for i, tt := range tests { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) s.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -334,7 +334,7 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) { t.Errorf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, tt.wN, tt.wrev) } - cleanup(s, b, tmpPath) + cleanup(s, b) } } @@ -342,9 +342,9 @@ func TestKVDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, n func TestKVTxnDeleteMultipleTimes(t *testing.T) { testKVDeleteMultipleTimes(t, txnDeleteRangeFunc) } func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) s.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -365,9 +365,9 @@ func TestKVPutWithSameLease(t *testing.T) { testKVPutWithSameLease(t, normalP func TestKVTxnPutWithSameLease(t *testing.T) { testKVPutWithSameLease(t, txnPutFunc) } func testKVPutWithSameLease(t *testing.T, f putFunc) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) leaseID := int64(1) // put foo @@ -398,9 +398,9 @@ func testKVPutWithSameLease(t *testing.T, f putFunc) { // TestKVOperationInSequence tests that range, put, delete on single key in // sequence repeatedly works correctly. func TestKVOperationInSequence(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) for i := 0; i < 10; i++ { base := int64(i*2 + 1) @@ -445,7 +445,7 @@ func TestKVOperationInSequence(t *testing.T) { } func TestKVTxnBlockWriteOperations(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) tests := []func(){ @@ -475,13 +475,13 @@ func TestKVTxnBlockWriteOperations(t *testing.T) { } // only close backend when we know all the tx are finished - cleanup(s, b, tmpPath) + cleanup(s, b) } func TestKVTxnNonBlockRange(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) txn := s.Write(traceutil.TODO()) defer txn.End() @@ -501,9 +501,9 @@ func TestKVTxnNonBlockRange(t *testing.T) { // TestKVTxnOperationInSequence tests that txn range, put, delete on single key // in sequence repeatedly works correctly. func TestKVTxnOperationInSequence(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) for i := 0; i < 10; i++ { txn := s.Write(traceutil.TODO()) @@ -551,9 +551,9 @@ func TestKVTxnOperationInSequence(t *testing.T) { } func TestKVCompactReserveLastValue(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) s.Put([]byte("foo"), []byte("bar0"), 1) s.Put([]byte("foo"), []byte("bar1"), 2) @@ -605,9 +605,9 @@ func TestKVCompactReserveLastValue(t *testing.T) { } func TestKVCompactBad(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) s.Put([]byte("foo"), []byte("bar0"), lease.NoLease) s.Put([]byte("foo"), []byte("bar1"), lease.NoLease) @@ -638,7 +638,7 @@ func TestKVHash(t *testing.T) { for i := 0; i < len(hashes); i++ { var err error - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) kv := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) kv.Put([]byte("foo0"), []byte("bar0"), lease.NoLease) kv.Put([]byte("foo1"), []byte("bar0"), lease.NoLease) @@ -646,7 +646,7 @@ func TestKVHash(t *testing.T) { if err != nil { t.Fatalf("failed to get hash: %v", err) } - cleanup(kv, b, tmpPath) + cleanup(kv, b) } for i := 1; i < len(hashes); i++ { @@ -676,7 +676,7 @@ func TestKVRestore(t *testing.T) { }, } for i, tt := range tests { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) tt(s) var kvss [][]mvccpb.KeyValue @@ -702,7 +702,7 @@ func TestKVRestore(t *testing.T) { r, _ := ns.Range(context.TODO(), []byte("a"), []byte("z"), RangeOptions{Rev: k}) nkvss = append(nkvss, r.KVs) } - cleanup(ns, b, tmpPath) + cleanup(ns, b) if !reflect.DeepEqual(nkvss, kvss) { t.Errorf("#%d: kvs history = %+v, want %+v", i, nkvss, kvss) @@ -720,9 +720,9 @@ func readGaugeInt(g prometheus.Gauge) int { } func TestKVSnapshot(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) wkvs := put3TestKVs(s) @@ -756,9 +756,9 @@ func TestKVSnapshot(t *testing.T) { } func TestWatchableKVWatch(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) w := s.NewWatchStream() defer w.Close() @@ -860,10 +860,9 @@ func TestWatchableKVWatch(t *testing.T) { } } -func cleanup(s KV, b backend.Backend, path string) { +func cleanup(s KV, b backend.Backend) { s.Close() b.Close() - os.Remove(path) } func put3TestKVs(s KV) []mvccpb.KeyValue { diff --git a/server/storage/mvcc/kvstore_bench_test.go b/server/storage/mvcc/kvstore_bench_test.go index 8b9a1456a24..eeb574a1c07 100644 --- a/server/storage/mvcc/kvstore_bench_test.go +++ b/server/storage/mvcc/kvstore_bench_test.go @@ -29,9 +29,9 @@ import ( ) func BenchmarkStorePut(b *testing.B) { - be, tmpPath := betesting.NewDefaultTmpBackend(b) + be, _ := betesting.NewDefaultTmpBackend(b) s := NewStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, be, tmpPath) + defer cleanup(s, be) // arbitrary number of bytes bytesN := 64 @@ -48,9 +48,9 @@ func BenchmarkStoreRangeKey1(b *testing.B) { benchmarkStoreRange(b, 1) } func BenchmarkStoreRangeKey100(b *testing.B) { benchmarkStoreRange(b, 100) } func benchmarkStoreRange(b *testing.B, n int) { - be, tmpPath := betesting.NewDefaultTmpBackend(b) + be, _ := betesting.NewDefaultTmpBackend(b) s := NewStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, be, tmpPath) + defer cleanup(s, be) // 64 byte key/val keys, val := createBytesSlice(64, n), createBytesSlice(64, 1) @@ -97,9 +97,9 @@ func BenchmarkConsistentIndex(b *testing.B) { // BenchmarkStorePutUpdate is same as above, but instead updates single key func BenchmarkStorePutUpdate(b *testing.B) { - be, tmpPath := betesting.NewDefaultTmpBackend(b) + be, _ := betesting.NewDefaultTmpBackend(b) s := NewStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, be, tmpPath) + defer cleanup(s, be) // arbitrary number of bytes keys := createBytesSlice(64, 1) @@ -115,9 +115,9 @@ func BenchmarkStorePutUpdate(b *testing.B) { // with transaction begin and end, where transaction involves // some synchronization operations, such as mutex locking. func BenchmarkStoreTxnPut(b *testing.B) { - be, tmpPath := betesting.NewDefaultTmpBackend(b) + be, _ := betesting.NewDefaultTmpBackend(b) s := NewStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, be, tmpPath) + defer cleanup(s, be) // arbitrary number of bytes bytesN := 64 @@ -135,10 +135,10 @@ func BenchmarkStoreTxnPut(b *testing.B) { // benchmarkStoreRestore benchmarks the restore operation func benchmarkStoreRestore(revsPerKey int, b *testing.B) { - be, tmpPath := betesting.NewDefaultTmpBackend(b) + be, _ := betesting.NewDefaultTmpBackend(b) s := NewStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) // use closure to capture 's' to pick up the reassignment - defer func() { cleanup(s, be, tmpPath) }() + defer func() { cleanup(s, be) }() // arbitrary number of bytes bytesN := 64 diff --git a/server/storage/mvcc/kvstore_compaction_test.go b/server/storage/mvcc/kvstore_compaction_test.go index b9b7822739b..dd8837637ae 100644 --- a/server/storage/mvcc/kvstore_compaction_test.go +++ b/server/storage/mvcc/kvstore_compaction_test.go @@ -16,7 +16,6 @@ package mvcc import ( "context" - "os" "reflect" "testing" "time" @@ -68,7 +67,7 @@ func TestScheduleCompaction(t *testing.T) { }, } for i, tt := range tests { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) fi := newFakeIndex() fi.indexCompactRespc <- tt.keep @@ -103,17 +102,14 @@ func TestScheduleCompaction(t *testing.T) { } tx.Unlock() - cleanup(s, b, tmpPath) + cleanup(s, b) } } func TestCompactAllAndRestore(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer func() { - b.Close() - os.Remove(tmpPath) - }() + defer b.Close() s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) s0.Put([]byte("foo"), []byte("bar1"), lease.NoLease) diff --git a/server/storage/mvcc/kvstore_test.go b/server/storage/mvcc/kvstore_test.go index 02ab63db26e..a3bdaac771b 100644 --- a/server/storage/mvcc/kvstore_test.go +++ b/server/storage/mvcc/kvstore_test.go @@ -485,7 +485,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { test := test t.Run(test, func(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s0 := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) s0.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -512,7 +512,7 @@ func TestRestoreContinueUnfinishedCompaction(t *testing.T) { s0.Restore(b) s = s0 } - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) // wait for scheduled compaction to be finished time.Sleep(100 * time.Millisecond) @@ -549,9 +549,9 @@ type hashKVResult struct { // TestHashKVWhenCompacting ensures that HashKV returns correct hash when compacting. func TestHashKVWhenCompacting(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) rev := 10000 for i := 2; i <= rev; i++ { @@ -629,9 +629,9 @@ func TestHashKVWhenCompacting(t *testing.T) { // TestHashKVWithCompactedAndFutureRevisions ensures that HashKV returns a correct hash when called // with a past revision (lower than compacted), a future revision, and the exact compacted revision func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) rev := 10000 compactRev := rev / 2 @@ -662,9 +662,9 @@ func TestHashKVWithCompactedAndFutureRevisions(t *testing.T) { // TestHashKVZeroRevision ensures that "HashByRev(0)" computes // correct hash value with latest revision. func TestHashKVZeroRevision(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) rev := 10000 for i := 2; i <= rev; i++ { @@ -695,9 +695,9 @@ func TestTxnPut(t *testing.T) { keys := createBytesSlice(bytesN, sliceN) vals := createBytesSlice(bytesN, sliceN) - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) for i := 0; i < sliceN; i++ { txn := s.Write(traceutil.TODO()) @@ -711,9 +711,9 @@ func TestTxnPut(t *testing.T) { // TestConcurrentReadNotBlockingWrite ensures Read does not blocking Write after its creation func TestConcurrentReadNotBlockingWrite(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) // write something to read later s.Put([]byte("foo"), []byte("bar"), lease.NoLease) @@ -780,9 +780,9 @@ func TestConcurrentReadTxAndWrite(t *testing.T) { committedKVs kvs // committedKVs records the key-value pairs written by the finished Write Txns mu sync.Mutex // mu protects committedKVs ) - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) var wg sync.WaitGroup wg.Add(numOfWrites) diff --git a/server/storage/mvcc/watchable_store_bench_test.go b/server/storage/mvcc/watchable_store_bench_test.go index 9329dce7635..ba402a3e13e 100644 --- a/server/storage/mvcc/watchable_store_bench_test.go +++ b/server/storage/mvcc/watchable_store_bench_test.go @@ -16,7 +16,6 @@ package mvcc import ( "math/rand" - "os" "testing" "go.uber.org/zap/zaptest" @@ -27,9 +26,9 @@ import ( ) func BenchmarkWatchableStorePut(b *testing.B) { - be, tmpPath := betesting.NewDefaultTmpBackend(b) + be, _ := betesting.NewDefaultTmpBackend(b) s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, be, tmpPath) + defer cleanup(s, be) // arbitrary number of bytes bytesN := 64 @@ -47,9 +46,9 @@ func BenchmarkWatchableStorePut(b *testing.B) { // with transaction begin and end, where transaction involves // some synchronization operations, such as mutex locking. func BenchmarkWatchableStoreTxnPut(b *testing.B) { - be, tmpPath := betesting.NewDefaultTmpBackend(b) + be, _ := betesting.NewDefaultTmpBackend(b) s := New(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, be, tmpPath) + defer cleanup(s, be) // arbitrary number of bytes bytesN := 64 @@ -78,9 +77,9 @@ func BenchmarkWatchableStoreWatchPutUnsync(b *testing.B) { } func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) { - be, tmpPath := betesting.NewDefaultTmpBackend(b) + be, _ := betesting.NewDefaultTmpBackend(b) s := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, be, tmpPath) + defer cleanup(s, be) k := []byte("testkey") v := []byte("testval") @@ -122,7 +121,7 @@ func benchmarkWatchableStoreWatchPut(b *testing.B, synced bool) { // TODO: k is an arbitrary constant. We need to figure out what factor // we should put to simulate the real-world use cases. func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { - be, tmpPath := betesting.NewDefaultTmpBackend(b) + be, _ := betesting.NewDefaultTmpBackend(b) s := NewStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) // manually create watchableStore instead of newWatchableStore @@ -136,12 +135,10 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { // to make the test not crash from assigning to nil map. // 'synced' doesn't get populated in this test. synced: newWatcherGroup(), + stopc: make(chan struct{}), } - defer func() { - ws.store.Close() - os.Remove(tmpPath) - }() + defer cleanup(ws, be) // Put a key so that we can spawn watchers on that key // (testKey in this test). This increases the rev to 1, @@ -152,6 +149,7 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { s.Put(testKey, testValue, lease.NoLease) w := ws.NewWatchStream() + defer w.Close() const k int = 2 benchSampleN := b.N @@ -179,13 +177,10 @@ func BenchmarkWatchableStoreUnsyncedCancel(b *testing.B) { } func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { - be, tmpPath := betesting.NewDefaultTmpBackend(b) + be, _ := betesting.NewDefaultTmpBackend(b) s := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) - defer func() { - s.store.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, be) // Put a key so that we can spawn watchers on that key testKey := []byte("foo") @@ -193,6 +188,7 @@ func BenchmarkWatchableStoreSyncedCancel(b *testing.B) { s.Put(testKey, testValue, lease.NoLease) w := s.NewWatchStream() + defer w.Close() // put 1 million watchers on the same key const watcherN = 1000000 diff --git a/server/storage/mvcc/watchable_store_test.go b/server/storage/mvcc/watchable_store_test.go index b7d6868f84b..a98106bcae8 100644 --- a/server/storage/mvcc/watchable_store_test.go +++ b/server/storage/mvcc/watchable_store_test.go @@ -31,9 +31,9 @@ import ( ) func TestWatch(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) testKey := []byte("foo") testValue := []byte("bar") @@ -50,9 +50,9 @@ func TestWatch(t *testing.T) { } func TestNewWatcherCancel(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) testKey := []byte("foo") testValue := []byte("bar") @@ -74,7 +74,7 @@ func TestNewWatcherCancel(t *testing.T) { // TestCancelUnsynced tests if running CancelFunc removes watchers from unsynced. func TestCancelUnsynced(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) // manually create watchableStore instead of newWatchableStore // because newWatchableStore automatically calls syncWatchers @@ -90,7 +90,7 @@ func TestCancelUnsynced(t *testing.T) { stopc: make(chan struct{}), } - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) // Put a key so that we can spawn watchers on that key. // (testKey in this test). This increases the rev to 1, @@ -132,7 +132,7 @@ func TestCancelUnsynced(t *testing.T) { // method to see if it correctly sends events to channel of unsynced watchers // and moves these watchers to synced. func TestSyncWatchers(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := &watchableStore{ store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}), @@ -141,7 +141,7 @@ func TestSyncWatchers(t *testing.T) { stopc: make(chan struct{}), } - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) testKey := []byte("foo") testValue := []byte("bar") @@ -216,9 +216,9 @@ func TestSyncWatchers(t *testing.T) { // TestWatchCompacted tests a watcher that watches on a compacted revision. func TestWatchCompacted(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) testKey := []byte("foo") testValue := []byte("bar") @@ -251,9 +251,9 @@ func TestWatchCompacted(t *testing.T) { } func TestWatchFutureRev(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) testKey := []byte("foo") testValue := []byte("bar") @@ -290,17 +290,17 @@ func TestWatchFutureRev(t *testing.T) { func TestWatchRestore(t *testing.T) { test := func(delay time.Duration) func(t *testing.T) { return func(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) testKey := []byte("foo") testValue := []byte("bar") rev := s.Put(testKey, testValue, lease.NoLease) - newBackend, newPath := betesting.NewDefaultTmpBackend(t) + newBackend, _ := betesting.NewDefaultTmpBackend(t) newStore := newWatchableStore(zaptest.NewLogger(t), newBackend, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(newStore, newBackend, newPath) + defer cleanup(newStore, newBackend) w := newStore.NewWatchStream() defer w.Close() @@ -338,13 +338,13 @@ func TestWatchRestore(t *testing.T) { // 4. restore operation moves "synced" to "unsynced" watcher group // 5. choose the watcher from step 1, without panic func TestWatchRestoreSyncedWatcher(t *testing.T) { - b1, b1Path := betesting.NewDefaultTmpBackend(t) + b1, _ := betesting.NewDefaultTmpBackend(t) s1 := newWatchableStore(zaptest.NewLogger(t), b1, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s1, b1, b1Path) + defer cleanup(s1, b1) - b2, b2Path := betesting.NewDefaultTmpBackend(t) + b2, _ := betesting.NewDefaultTmpBackend(t) s2 := newWatchableStore(zaptest.NewLogger(t), b2, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s2, b2, b2Path) + defer cleanup(s2, b2) testKey, testValue := []byte("foo"), []byte("bar") rev := s1.Put(testKey, testValue, lease.NoLease) @@ -391,13 +391,13 @@ func TestWatchRestoreSyncedWatcher(t *testing.T) { // TestWatchBatchUnsynced tests batching on unsynced watchers func TestWatchBatchUnsynced(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) oldMaxRevs := watchBatchMaxRevs defer func() { watchBatchMaxRevs = oldMaxRevs - cleanup(s, b, tmpPath) + cleanup(s, b) }() batches := 3 watchBatchMaxRevs = 4 @@ -526,11 +526,11 @@ func TestNewMapwatcherToEventMap(t *testing.T) { func TestWatchVictims(t *testing.T) { oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) defer func() { - cleanup(s, b, tmpPath) + cleanup(s, b) chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync }() @@ -603,9 +603,9 @@ func TestWatchVictims(t *testing.T) { // TestStressWatchCancelClose tests closing a watch stream while // canceling its watches. func TestStressWatchCancelClose(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) testKey, testValue := []byte("foo"), []byte("bar") var wg sync.WaitGroup diff --git a/server/storage/mvcc/watcher_bench_test.go b/server/storage/mvcc/watcher_bench_test.go index 264369d75eb..52a55d0632b 100644 --- a/server/storage/mvcc/watcher_bench_test.go +++ b/server/storage/mvcc/watcher_bench_test.go @@ -25,12 +25,13 @@ import ( ) func BenchmarkKVWatcherMemoryUsage(b *testing.B) { - be, tmpPath := betesting.NewDefaultTmpBackend(b) + be, _ := betesting.NewDefaultTmpBackend(b) watchable := newWatchableStore(zaptest.NewLogger(b), be, &lease.FakeLessor{}, StoreConfig{}) - defer cleanup(watchable, be, tmpPath) + defer cleanup(watchable, be) w := watchable.NewWatchStream() + defer w.Close() b.ReportAllocs() b.StartTimer() diff --git a/server/storage/mvcc/watcher_test.go b/server/storage/mvcc/watcher_test.go index cbe39402224..b86e31a5542 100644 --- a/server/storage/mvcc/watcher_test.go +++ b/server/storage/mvcc/watcher_test.go @@ -32,9 +32,9 @@ import ( // TestWatcherWatchID tests that each watcher provides unique watchID, // and the watched event attaches the correct watchID. func TestWatcherWatchID(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) w := s.NewWatchStream() defer w.Close() @@ -82,9 +82,9 @@ func TestWatcherWatchID(t *testing.T) { } func TestWatcherRequestsCustomID(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) w := s.NewWatchStream() defer w.Close() @@ -119,9 +119,9 @@ func TestWatcherRequestsCustomID(t *testing.T) { // TestWatcherWatchPrefix tests if Watch operation correctly watches // and returns events with matching prefixes. func TestWatcherWatchPrefix(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) w := s.NewWatchStream() defer w.Close() @@ -193,9 +193,9 @@ func TestWatcherWatchPrefix(t *testing.T) { // TestWatcherWatchWrongRange ensures that watcher with wrong 'end' range // does not create watcher, which panics when canceling in range tree. func TestWatcherWatchWrongRange(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) w := s.NewWatchStream() defer w.Close() @@ -253,9 +253,9 @@ func TestWatchDeleteRange(t *testing.T) { // TestWatchStreamCancelWatcherByID ensures cancel calls the cancel func of the watcher // with given id inside watchStream. func TestWatchStreamCancelWatcherByID(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) w := s.NewWatchStream() defer w.Close() @@ -290,7 +290,7 @@ func TestWatchStreamCancelWatcherByID(t *testing.T) { // TestWatcherRequestProgress ensures synced watcher can correctly // report its correct progress. func TestWatcherRequestProgress(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) // manually create watchableStore instead of newWatchableStore // because newWatchableStore automatically calls syncWatchers @@ -300,12 +300,10 @@ func TestWatcherRequestProgress(t *testing.T) { store: NewStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{}), unsynced: newWatcherGroup(), synced: newWatcherGroup(), + stopc: make(chan struct{}), } - defer func() { - s.store.Close() - os.Remove(tmpPath) - }() + defer cleanup(s, b) testKey := []byte("foo") notTestKey := []byte("bad") @@ -345,9 +343,9 @@ func TestWatcherRequestProgress(t *testing.T) { } func TestWatcherWatchWithFilter(t *testing.T) { - b, tmpPath := betesting.NewDefaultTmpBackend(t) + b, _ := betesting.NewDefaultTmpBackend(t) s := WatchableKV(newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})) - defer cleanup(s, b, tmpPath) + defer cleanup(s, b) w := s.NewWatchStream() defer w.Close()