Skip to content

Commit

Permalink
Fix: when disk error,the raft cannot start on new data server first: …
Browse files Browse the repository at this point in the history
…StartRaftAfterRepair func get leader maxExtentID second: loop get leader from 0 to maxExtentID the partitionSize compare local partitionSize

Signed-off-by: awzhgw <[email protected]>
  • Loading branch information
awzhgw committed Aug 16, 2019
1 parent d4a94ae commit 74b9f1b
Show file tree
Hide file tree
Showing 17 changed files with 144 additions and 54 deletions.
75 changes: 65 additions & 10 deletions datanode/partition_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,13 @@ func (dp *DataPartition) StartRaftAfterRepair() {
var (
partitionSize uint64 = 0
err error
maxExtentID uint64 = 0
)
timer := time.NewTimer(0)
for {
select {
case <-timer.C:
err = nil
if dp.isLeader { // primary does not need to wait repair
if err := dp.StartRaft(); err != nil {
log.LogErrorf("partitionID[%v] leader start raft err[%v].", dp.partitionID, err)
Expand All @@ -193,18 +195,25 @@ func (dp *DataPartition) StartRaftAfterRepair() {
timer.Reset(5 * time.Second)
continue
}
if maxExtentID == 0 {
maxExtentID, err = dp.getLeaderMaxExtentID()
}

if err != nil {
log.LogErrorf("partitionID[%v] get MaxExtentID err[%v]", dp.partitionID, err)
timer.Reset(5 * time.Second)
continue
}

// get the partition size from the primary and compare it with the local one
if partitionSize == 0 {
partitionSize, err = dp.getPartitionSize()
if err != nil {
log.LogErrorf("partitionID[%v] get leader size err[%v]", dp.partitionID, err)
timer.Reset(5 * time.Second)
continue
}
partitionSize, err = dp.getLeaderPartitionSize(maxExtentID)
if err != nil {
log.LogErrorf("partitionID[%v] get leader size err[%v]", dp.partitionID, err)
timer.Reset(5 * time.Second)
continue
}

localSize := dp.extentStore.StoreSize()
localSize := dp.extentStore.StoreSizeExtentID(maxExtentID)
if partitionSize > localSize {
log.LogErrorf("partitionID[%v] leader size[%v] local size[%v]", dp.partitionID, partitionSize, localSize)
timer.Reset(5 * time.Second)
Expand Down Expand Up @@ -442,6 +451,16 @@ func NewPacketToGetPartitionSize(partitionID uint64) (p *repl.Packet) {
return
}

// NewPacketToGetPartitionSize returns a new packet to get the partition size.
func NewPacketToGetMaxExtentID(partitionID uint64) (p *repl.Packet) {
p = new(repl.Packet)
p.Opcode = proto.OpGetMaxExtentID
p.PartitionID = partitionID
p.Magic = proto.ProtoMagic
p.ReqID = proto.GenerateRequestID()
return
}

func (dp *DataPartition) findMinAppliedID(allAppliedIDs []uint64) (minAppliedID uint64, index int) {
index = 0
minAppliedID = allAppliedIDs[0]
Expand All @@ -465,13 +484,13 @@ func (dp *DataPartition) findMaxAppliedID(allAppliedIDs []uint64) (maxAppliedID
}

// Get the partition size from the leader.
func (dp *DataPartition) getPartitionSize() (size uint64, err error) {
func (dp *DataPartition) getLeaderPartitionSize(maxExtentID uint64) (size uint64, err error) {
var (
conn *net.TCPConn
)

p := NewPacketToGetPartitionSize(dp.partitionID)

p.ExtentID = maxExtentID
target := dp.replicas[0]
conn, err = gConnPool.GetConnect(target) //get remote connect
if err != nil {
Expand Down Expand Up @@ -500,6 +519,42 @@ func (dp *DataPartition) getPartitionSize() (size uint64, err error) {
return
}

// Get the MaxExtentID partition from the leader.
func (dp *DataPartition) getLeaderMaxExtentID() (maxExtentID uint64, err error) {
var (
conn *net.TCPConn
)

p := NewPacketToGetMaxExtentID(dp.partitionID)

target := dp.replicas[0]
conn, err = gConnPool.GetConnect(target) //get remote connect
if err != nil {
err = errors.Trace(err, " partition=%v get host[%v] connect", dp.partitionID, target)
return
}
defer gConnPool.PutConnect(conn, true)
err = p.WriteToConn(conn) // write command to the remote host
if err != nil {
err = errors.Trace(err, "partition=%v write to host[%v]", dp.partitionID, target)
return
}
err = p.ReadFromConn(conn, 60)
if err != nil {
err = errors.Trace(err, "partition=%v read from host[%v]", dp.partitionID, target)
return
}

if p.ResultCode != proto.OpOk {
err = errors.Trace(err, "partition=%v result code not ok [%v] from host[%v]", dp.partitionID, p.ResultCode, target)
return
}
maxExtentID = binary.BigEndian.Uint64(p.Data)
log.LogDebugf("partition=%v maxExtentID=%v", dp.partitionID, maxExtentID)

return
}

func (dp *DataPartition) broadcastMinAppliedID(minAppliedID uint64) (err error) {
for i := 0; i < len(dp.replicas); i++ {
p := NewPacketToBroadcastMinAppliedID(dp.partitionID, minAppliedID)
Expand Down
17 changes: 15 additions & 2 deletions datanode/wrap_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ func (s *DataNode) OperatePacket(p *repl.Packet, c *net.TCPConn) (err error) {
s.handlePacketToDecommissionDataPartition(p)
case proto.OpGetPartitionSize:
s.handlePacketToGetPartitionSize(p)
case proto.OpGetMaxExtentID:
s.handlePacketToGetMaxExtentID(p)
case proto.OpReadTinyDeleteRecord:
s.handlePacketToReadTinyDeleteRecordFile(p, c)
case proto.OpBroadcastMinAppliedID:
Expand Down Expand Up @@ -163,7 +165,7 @@ func (s *DataNode) handlePacketToCreateDataPartition(p *repl.Packet) {
err = fmt.Errorf("from master Task[%v] cannot unmash CreateDataPartitionRequest struct", task.ToString())
return
}
p.PartitionID=request.PartitionId
p.PartitionID = request.PartitionId
if dp, err = s.space.CreatePartition(request); err != nil {
err = fmt.Errorf("from master Task[%v] cannot create Partition err(%v)", task.ToString(), err)
return
Expand Down Expand Up @@ -720,7 +722,7 @@ func (s *DataNode) handlePacketToGetAppliedID(p *repl.Packet) {

func (s *DataNode) handlePacketToGetPartitionSize(p *repl.Packet) {
partition := p.Object.(*DataPartition)
usedSize := partition.extentStore.StoreSize()
usedSize := partition.extentStore.StoreSizeExtentID(p.ExtentID)

buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(usedSize))
Expand All @@ -729,6 +731,17 @@ func (s *DataNode) handlePacketToGetPartitionSize(p *repl.Packet) {
return
}

func (s *DataNode) handlePacketToGetMaxExtentID(p *repl.Packet) {
partition := p.Object.(*DataPartition)
maxExtentID := partition.extentStore.GetMaxExtentID()

buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(maxExtentID))
p.PacketOkWithBody(buf)

return
}

func (s *DataNode) handlePacketToDecommissionDataPartition(p *repl.Packet) {
var (
err error
Expand Down
4 changes: 2 additions & 2 deletions master/admin_task_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ type AdminTaskManager struct {
targetAddr string
TaskMap map[string]*proto.AdminTask
sync.RWMutex
exitCh chan struct{}
connPool *util.ConnectPool
exitCh chan struct{}
connPool *util.ConnectPool
}

func newAdminTaskManager(targetAddr, clusterID string) (sender *AdminTaskManager) {
Expand Down
2 changes: 1 addition & 1 deletion master/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ func (c *Cluster) decommissionDataPartition(offlineAddr string, dp *DataPartitio
c.Name, dp.PartitionID, offlineAddr, newAddr, dp.Hosts)
return
errHandler:
msg = fmt.Sprintf(errMsg + " clusterID[%v] partitionID:%v on Node:%v "+
msg = fmt.Sprintf(errMsg+" clusterID[%v] partitionID:%v on Node:%v "+
"Then Fix It on newHost:%v Err:%v , PersistenceHosts:%v ",
c.Name, dp.PartitionID, offlineAddr, newAddr, err, dp.Hosts)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion master/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ package master

import (
"fmt"
"github.com/chubaofs/chubaofs/proto"
"testing"
"time"
"github.com/chubaofs/chubaofs/proto"
)

func buildPanicCluster() *Cluster {
Expand Down
2 changes: 1 addition & 1 deletion master/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ const (
volCapacityKey = "capacity"
volOwnerKey = "owner"
volAuthKey = "authKey"
replicaNumKey = "replicaNum"
replicaNumKey = "replicaNum"
)

const (
Expand Down
16 changes: 8 additions & 8 deletions master/data_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ import (

// DataPartition represents the structure of storing the file contents.
type DataPartition struct {
PartitionID uint64
LastLoadedTime int64
ReplicaNum uint8
Status int8
isRecover bool
Replicas []*DataReplica
Hosts []string // host addresses
Peers []proto.Peer
PartitionID uint64
LastLoadedTime int64
ReplicaNum uint8
Status int8
isRecover bool
Replicas []*DataReplica
Hosts []string // host addresses
Peers []proto.Peer
sync.RWMutex
total uint64
used uint64
Expand Down
6 changes: 3 additions & 3 deletions metanode/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,12 @@ func (m *metadataManager) loadPartitions() (err error) {
}

func (m *metadataManager) attachPartition(id uint64, partition MetaPartition) (err error) {
fmt.Println(fmt.Sprintf("start load metaPartition %v",id))
fmt.Println(fmt.Sprintf("start load metaPartition %v", id))
if err = partition.Start(); err != nil {
fmt.Println(fmt.Sprintf("fininsh load metaPartition %v error %v",id,err))
fmt.Println(fmt.Sprintf("fininsh load metaPartition %v error %v", id, err))
return
}
fmt.Println(fmt.Sprintf("fininsh load metaPartition %v",id))
fmt.Println(fmt.Sprintf("fininsh load metaPartition %v", id))
m.mu.Lock()
defer m.mu.Unlock()
m.partitions[id] = partition
Expand Down
10 changes: 5 additions & 5 deletions metanode/manager_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import (
raftProto "github.com/tiglabs/raft/proto"
)

const(
MaxUsedMemFactor=1.1
const (
MaxUsedMemFactor = 1.1
)

func (m *metadataManager) opMasterHeartbeat(conn net.Conn, p *Packet,
Expand Down Expand Up @@ -75,8 +75,8 @@ func (m *metadataManager) opMasterHeartbeat(conn net.Conn, p *Packet,
if mConf.Cursor >= mConf.End {
mpr.Status = proto.ReadOnly
}
if resp.Used>uint64(float64(resp.Total)*MaxUsedMemFactor){
mpr.Status=proto.ReadOnly
if resp.Used > uint64(float64(resp.Total)*MaxUsedMemFactor) {
mpr.Status = proto.ReadOnly
}
resp.MetaPartitionReports = append(resp.MetaPartitionReports, mpr)
return true
Expand All @@ -86,7 +86,7 @@ end:
adminTask.Request = nil
adminTask.Response = resp
m.respondToMaster(adminTask)
data,_:=json.Marshal(resp)
data, _ := json.Marshal(resp)
log.LogInfof("%s [opMasterHeartbeat] req:%v; respAdminTask: %v, "+
"resp: %v", remoteAddr, req, adminTask, string(data))
return
Expand Down
4 changes: 2 additions & 2 deletions metanode/metanode.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,12 +190,12 @@ func (m *MetaNode) parseConfig(cfg *config.Config) (err error) {
m.raftReplicatePort = cfg.GetString(cfgRaftReplicaPort)
configTotalMem, _ = strconv.ParseUint(cfg.GetString(cfgTotalMem), 10, 64)

if configTotalMem==0{
if configTotalMem == 0 {
return fmt.Errorf("bad totalMem config,Recommended to be configured as 80% of physical machine memory")
}

total, _, err := util.GetMemInfo()
if err == nil && configTotalMem >total-util.GB {
if err == nil && configTotalMem > total-util.GB {
return fmt.Errorf("bad totalMem config,Recommended to be configured as 80% of physical machine memory")
}

Expand Down
3 changes: 3 additions & 0 deletions proto/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ const (
OpSyncWrite uint8 = 0x13
OpReadTinyDeleteRecord uint8 = 0x14
OpTinyExtentRepairRead uint8 = 0x15
OpGetMaxExtentID uint8 = 0x16

// Operations: Client -> MetaNode.
OpMetaCreateInode uint8 = 0x20
Expand Down Expand Up @@ -289,6 +290,8 @@ func (p *Packet) GetOpMsg() (m string) {
m = "OpPing"
case OpTinyExtentRepairRead:
m = "OpTinyExtentRepairRead"
case OpGetMaxExtentID:
m = "OpGetMaxExtentID"
case OpBroadcastMinAppliedID:
m = "OpBroadcastMinAppliedID"
}
Expand Down
25 changes: 22 additions & 3 deletions storage/extent_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,12 +606,14 @@ func (s *ExtentStore) GetBrokenTinyExtent() (extentID uint64, err error) {
}
}

// StoreSize returns the size of the extent store
func (s *ExtentStore) StoreSize() (totalSize uint64) {
// StoreSizeExtentID returns the size of the extent store
func (s *ExtentStore) StoreSizeExtentID(maxExtentID uint64) (totalSize uint64) {
extentInfos := make([]*ExtentInfo, 0)
s.eiMutex.RLock()
for _, extentInfo := range s.extentInfoMap {
extentInfos = append(extentInfos, extentInfo)
if extentInfo.FileID <= maxExtentID {
extentInfos = append(extentInfos, extentInfo)
}
}
s.eiMutex.RUnlock()
for _, extentInfo := range extentInfos {
Expand All @@ -621,6 +623,23 @@ func (s *ExtentStore) StoreSize() (totalSize uint64) {
return totalSize
}

// StoreSizeExtentID returns the size of the extent store
func (s *ExtentStore) GetMaxExtentID() (maxExtentID uint64) {
extentInfos := make([]*ExtentInfo, 0)
s.eiMutex.RLock()
for _, extentInfo := range s.extentInfoMap {
extentInfos = append(extentInfos, extentInfo)
}
s.eiMutex.RUnlock()
for _, extentInfo := range extentInfos {
if extentInfo.FileID > maxExtentID {
maxExtentID = extentInfo.FileID
}
}

return maxExtentID
}

func MarshalTinyExtent(extentID uint64, offset, size int64) (data []byte) {
data = make([]byte, DeleteTinyRecordSize)
binary.BigEndian.PutUint64(data[0:8], extentID)
Expand Down
2 changes: 1 addition & 1 deletion vendor/github.com/jacobsa/fuse/conversions.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vendor/github.com/jacobsa/fuse/fusetesting/parallel.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions vendor/github.com/jacobsa/fuse/samples/flushfs/flush_fs.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 74b9f1b

Please sign in to comment.