Skip to content

Commit

Permalink
fix(metanode): add crc check for loading snapshot
Browse files Browse the repository at this point in the history
1. add unit test for loading snapshot

Signed-off-by: JasonHu520 <[email protected]>
  • Loading branch information
JasonHu520 authored and leonrayang committed May 18, 2023
1 parent bf967eb commit d3de5ab
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 91 deletions.
55 changes: 55 additions & 0 deletions metanode/btree_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2018 The CubeFS Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package metanode

import (
"testing"

"github.com/stretchr/testify/require"
)

type testItem struct {
data int
}

func (t *testItem) Less(than BtreeItem) bool {
item, ok := than.(*testItem)
return ok && (t.data < item.data)
}
func (t *testItem) Copy() BtreeItem {
newItem := *t
return &newItem
}

func TestBtree(t *testing.T) {
bt := NewBtree()
key1 := &testItem{data: 2}
key2 := &testItem{data: 3}
key3 := &testItem{data: 4}
key4 := &testItem{data: 5}
key5 := &testItem{data: 6}
bt.ReplaceOrInsert(key1, true)
bt.ReplaceOrInsert(key2, true)
bt.ReplaceOrInsert(key3, true)
bt.ReplaceOrInsert(key4, true)
bt.ReplaceOrInsert(key5, true)

item := bt.Get(key1)
require.Equal(t, key1, item)
item = bt.Delete(key2)
require.Equal(t, key2, item)
item = bt.Get(key2)
require.Nil(t, item)
}
14 changes: 0 additions & 14 deletions metanode/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,20 +363,6 @@ func (m *metadataManager) loadPartitions() (err error) {
wg.Add(1)
go func(fileName string) {
var errload error
defer func() {
if r := recover(); r != nil {
log.LogErrorf("loadPartitions partition: %s, "+
"error: %s, failed: %v", fileName, errload, r)
log.LogFlush()
panic(r)
}
if errload != nil {
log.LogErrorf("loadPartitions partition: %s, "+
"error: %s", fileName, errload)
log.LogFlush()
panic(errload)
}
}()
defer wg.Done()
if len(fileName) < 10 {
log.LogWarnf("ignore unknown partition dir: %s", fileName)
Expand Down
8 changes: 6 additions & 2 deletions metanode/metanode.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,12 @@ func (m *MetaNode) checkLocalPartitionMatchWithMaster() (err error) {
if len(lackPartitions) == 0 {
return
}
err = fmt.Errorf("LackPartitions %v on metanode %v,metanode cannot start", lackPartitions, m.localAddr+":"+m.listen)
log.LogErrorf(err.Error())
m.metrics.MetricMetaFailedPartition.SetWithLabels(float64(1), map[string]string{
"partids": fmt.Sprintf("%v", lackPartitions),
"node": m.localAddr + ":" + m.listen,
"nodeid": fmt.Sprintf("%d", m.nodeId),
})
log.LogErrorf("LackPartitions %v on metanode %v, please deal quickly", lackPartitions, m.localAddr+":"+m.listen)
return
}

Expand Down
25 changes: 19 additions & 6 deletions metanode/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,30 @@ import (
//metrics
const (
StatPeriod = time.Minute * time.Duration(1)

MetricMetaFailedPartition = "meta_failed_partition"
MetricMetaPartitionInodeCount = "mpInodeCount"
MetricMetaPartitionDentryCount = "mpDentryCount"
MetricConnectionCount = "connectionCnt"
)

type MetaNodeMetrics struct {
MetricMetaPartitionInodeCount *exporter.Gauge
metricStopCh chan struct{}
MetricConnectionCount *exporter.Gauge
MetricMetaFailedPartition *exporter.Gauge
MetricMetaPartitionInodeCount *exporter.Gauge
MetricMetaPartitionDentryCount *exporter.Gauge

metricStopCh chan struct{}
}

func (m *MetaNode) startStat() {
m.metrics = &MetaNodeMetrics{
metricStopCh: make(chan struct{}, 0),

MetricConnectionCount: exporter.NewGauge(MetricConnectionCount),
MetricMetaFailedPartition: exporter.NewGauge(MetricMetaFailedPartition),
MetricMetaPartitionInodeCount: exporter.NewGauge(MetricMetaPartitionInodeCount),
MetricMetaPartitionDentryCount: exporter.NewGauge(MetricMetaPartitionDentryCount),
}

go m.collectPartitionMetrics()
Expand All @@ -44,9 +58,8 @@ func (m *MetaNode) upatePartitionMetrics(mp *metaPartition) {
"partid": fmt.Sprintf("%d", mp.config.PartitionId),
exporter.Vol: mp.config.VolName,
}

exporter.NewGauge("mpInodeCount").SetWithLabels(float64(mp.GetInodeTreeLen()), labels)
exporter.NewGauge("mpDentryCount").SetWithLabels(float64(mp.GetDentryTreeLen()), labels)
m.metrics.MetricMetaPartitionInodeCount.SetWithLabels(float64(mp.GetInodeTreeLen()), labels)
m.metrics.MetricMetaPartitionDentryCount.SetWithLabels(float64(mp.GetDentryTreeLen()), labels)
}

func (m *MetaNode) collectPartitionMetrics() {
Expand All @@ -65,7 +78,7 @@ func (m *MetaNode) collectPartitionMetrics() {
}
manager.mu.RUnlock()
}
exporter.NewGauge("connectionCnt").Set(float64(m.connectionCnt))
m.metrics.MetricConnectionCount.Set(float64(m.connectionCnt))
}
}
}
Expand Down
121 changes: 64 additions & 57 deletions metanode/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
var (
ErrIllegalHeartbeatAddress = errors.New("illegal heartbeat address")
ErrIllegalReplicateAddress = errors.New("illegal replicate address")
ErrSnapshotCrcMismatch = errors.New("snapshot crc not match")
)

// Errors
Expand Down Expand Up @@ -794,42 +795,70 @@ func (mp *metaPartition) PersistMetadata() (err error) {
return
}

func (mp *metaPartition) LoadSnapshot(snapshotPath string) (err error) {
if err = mp.loadInode(snapshotPath); err != nil {
return
}
if err = mp.loadDentry(snapshotPath); err != nil {
return
}
if err = mp.loadExtend(snapshotPath); err != nil {
return
func (mp *metaPartition) parseCrcFromFile() ([]uint32, error) {
data, err := ioutil.ReadFile(path.Join(path.Join(mp.config.RootDir, snapshotDir), SnapshotSign))
if err != nil {
return nil, err
}
if err = mp.loadMultipart(snapshotPath); err != nil {
return
raw := string(data)
crcStrs := strings.Split(raw, " ")

crcs := make([]uint32, 0, len(crcStrs))
for _, crcStr := range crcStrs {
crc, err := strconv.ParseUint(crcStr, 10, 32)
if err != nil {
return nil, err
}
crcs = append(crcs, uint32(crc))
}

if err = mp.loadTxInfo(snapshotPath); err != nil {
return
return crcs, nil
}

func (mp *metaPartition) LoadSnapshot(snapshotPath string) (err error) {

crcs, err := mp.parseCrcFromFile()
if err != nil {
return err
}

if err = mp.loadTxRbInode(snapshotPath); err != nil {
return
var loadFuncs = []func(rootDir string, crc uint32) error{
mp.loadInode,
mp.loadDentry,
mp.loadExtend,
mp.loadMultipart,
mp.loadTxInfo,
mp.loadTxRbInode,
mp.loadTxRbDentry,
}

if err = mp.loadTxRbDentry(snapshotPath); err != nil {
return
if len(crcs) != len(loadFuncs) {
return ErrSnapshotCrcMismatch
}

if err = mp.loadApplyID(snapshotPath); err != nil {
return
errs := make([]error, len(loadFuncs))
var wg sync.WaitGroup
wg.Add(len(loadFuncs))
for idx, f := range loadFuncs {
loadFunc := f
i := idx
go func() {
defer wg.Done()
errs[i] = loadFunc(snapshotPath, crcs[i])
}()
}

wg.Wait()

for _, err = range errs {
if err != nil {
return
}
}
if err = mp.loadTxID(snapshotPath); err != nil {
return
}

err = mp.loadApplyID(snapshotPath)
return
return mp.loadApplyID(snapshotPath)
}

func (mp *metaPartition) load(isCreate bool) (err error) {
Expand All @@ -849,38 +878,12 @@ func (mp *metaPartition) load(isCreate bool) (err error) {
}

snapshotPath := path.Join(mp.config.RootDir, snapshotDir)
var loadFuncs = []func(rootDir string) error{
mp.loadInode,
mp.loadDentry,
mp.loadExtend,
mp.loadMultipart,
mp.loadTxInfo,
mp.loadTxRbInode,
mp.loadTxRbDentry,
mp.loadApplyID,
mp.loadTxID,
}
if _, err = os.Stat(snapshotPath); err != nil {
log.LogErrorf("load snapshot failed, err: %s", err.Error())
return nil

errs := make([]error, 0)
var mutex sync.Mutex
var wg sync.WaitGroup
wg.Add(len(loadFuncs))
for _, f := range loadFuncs {
loadFunc := f
go func() {
defer wg.Done()
if e := loadFunc(snapshotPath); e != nil {
mutex.Lock()
errs = append(errs, e)
mutex.Unlock()
}
}()
}
wg.Wait()
if len(errs) > 0 {
err = errs[0]
}
return
return mp.LoadSnapshot(snapshotPath)
}

func (mp *metaPartition) store(sm *storeMsg) (err error) {
Expand Down Expand Up @@ -1235,11 +1238,15 @@ func (mp *metaPartition) initTxInfo(txInfo *proto.TransactionInfo) {

func (mp *metaPartition) storeSnapshotFiles() (err error) {
msg := &storeMsg{
applyIndex: mp.applyID,
inodeTree: NewBtree(),
dentryTree: NewBtree(),
extendTree: NewBtree(),
multipartTree: NewBtree(),
applyIndex: mp.applyID,
txId: mp.txProcessor.txManager.txIdAlloc.getTransactionID(),
inodeTree: NewBtree(),
dentryTree: NewBtree(),
extendTree: NewBtree(),
multipartTree: NewBtree(),
txTree: NewBtree(),
txRbInodeTree: NewBtree(),
txRbDentryTree: NewBtree(),
}

return mp.store(msg)
Expand Down
Loading

0 comments on commit d3de5ab

Please sign in to comment.