Skip to content

Commit

Permalink
Merge branch 'master' into acl
Browse files Browse the repository at this point in the history
  • Loading branch information
mervinkid authored May 20, 2020
2 parents d251bcb + 7427cb1 commit abe19e0
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 4 deletions.
3 changes: 3 additions & 0 deletions client/fs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ func (f *File) Attr(ctx context.Context, a *fuse.Attr) error {
if gen >= info.Generation {
a.Size = uint64(fileSize)
}
if proto.IsSymlink(info.Mode) {
a.Size = uint64(len(info.Target))
}

log.LogDebugf("TRACE Attr: inode(%v) attr(%v)", info, a)
return nil
Expand Down
36 changes: 33 additions & 3 deletions sdk/data/stream/stream_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,12 @@ func NewStreamConn(dp *wrapper.DataPartition, follower bool) *StreamConn {
}

epoch := atomic.AddUint64(&dp.Epoch, 1)
choice := len(dp.Hosts)
hosts := sortByStatus(dp, false)
choice := len(hosts)
currAddr := dp.LeaderAddr
if choice > 0 {
index := int(epoch) % choice
currAddr = dp.Hosts[index]
currAddr = hosts[index]
}

return &StreamConn{
Expand Down Expand Up @@ -106,7 +107,9 @@ func (sc *StreamConn) sendToPartition(req *Packet, getReply GetReplyFunc) (err e
log.LogWarnf("sendToPartition: get connection to curr addr failed, addr(%v) reqPacket(%v) err(%v)", sc.currAddr, req, err)
}

for _, addr := range sc.dp.Hosts {
hosts := sortByStatus(sc.dp, true)

for _, addr := range hosts {
log.LogWarnf("sendToPartition: try addr(%v) reqPacket(%v)", addr, req)
conn, err = StreamConnPool.GetConnect(addr)
if err != nil {
Expand Down Expand Up @@ -155,3 +158,30 @@ func (sc *StreamConn) sendToConn(conn *net.TCPConn, req *Packet, getReply GetRep
log.LogDebugf("sendToConn exit: send to addr(%v) reqPacket(%v) err(%v)", sc.currAddr, req, err)
return
}

// sortByStatus will return hosts list sort by host status for DataPartition.
// If param selectAll is true, hosts with status(true) is in front and hosts with status(false) is in behind.
// If param selectAll is false, only return hosts with status(true).
func sortByStatus(dp *wrapper.DataPartition, selectAll bool) (hosts []string) {
var failedHosts []string
hostsStatus := dp.ClientWrapper.HostsStatus
for _, addr := range dp.Hosts {
status, ok := hostsStatus[addr]
if ok {
if status {
hosts = append(hosts, addr)
} else {
failedHosts = append(failedHosts, addr)
}
} else {
failedHosts = append(failedHosts, addr)
log.LogWarnf("sortByStatus: can not find host[%v] in HostsStatus, dp[%d]", addr, dp.PartitionID)
}
}

if selectAll {
hosts = append(hosts, failedHosts...)
}

return
}
1 change: 1 addition & 0 deletions sdk/data/wrapper/data_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type DataPartition struct {
proto.DataPartitionResponse
RandomWrite bool
PartitionType string
ClientWrapper *Wrapper
Metrics *DataPartitionMetrics
}

Expand Down
31 changes: 30 additions & 1 deletion sdk/data/wrapper/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ type Wrapper struct {
mc *masterSDK.MasterClient
stopOnce sync.Once
stopC chan struct{}

HostsStatus map[string]bool
}

// NewDataPartitionWrapper returns a new data partition wrapper.
Expand All @@ -60,6 +62,7 @@ func NewDataPartitionWrapper(volName string, masters []string) (w *Wrapper, err
w.volName = volName
w.rwPartition = make([]*DataPartition, 0)
w.partitions = make(map[uint64]*DataPartition)
w.HostsStatus = make(map[string]bool)
if err = w.updateClusterInfo(); err != nil {
err = errors.Trace(err, "NewDataPartitionWrapper:")
return
Expand All @@ -72,6 +75,9 @@ func NewDataPartitionWrapper(volName string, masters []string) (w *Wrapper, err
err = errors.Trace(err, "NewDataPartitionWrapper:")
return
}
if err = w.updateDataNodeStatus(); err != nil {
log.LogErrorf("NewDataPartitionWrapper: init DataNodeStatus failed, [%v]", err)
}
go w.update()
return
}
Expand Down Expand Up @@ -120,6 +126,7 @@ func (w *Wrapper) update() {
select {
case <-ticker.C:
w.updateDataPartition(false)
w.updateDataNodeStatus()
case <-w.stopC:
return
}
Expand All @@ -136,7 +143,10 @@ func (w *Wrapper) updateDataPartition(isInit bool) (err error) {
log.LogInfof("updateDataPartition: get data partitions: volume(%v) partitions(%v)", w.volName, len(dpv.DataPartitions))

var convert = func(response *proto.DataPartitionResponse) *DataPartition {
return &DataPartition{DataPartitionResponse: *response}
return &DataPartition{
DataPartitionResponse: *response,
ClientWrapper: w,
}
}

rwPartitionGroups := make([]*DataPartition, 0)
Expand Down Expand Up @@ -251,3 +261,22 @@ func (w *Wrapper) GetDataPartition(partitionID uint64) (*DataPartition, error) {
func (w *Wrapper) WarningMsg() string {
return fmt.Sprintf("%s_client_warning", w.clusterName)
}

func (w *Wrapper) updateDataNodeStatus() (err error) {
var cv *proto.ClusterView
cv, err = w.mc.AdminAPI().GetCluster()
if err != nil {
log.LogErrorf("updateDataNodeStatus: get cluster fail: err(%v)", err)
return
}

newHostsStatus := make(map[string]bool)
for _, node := range cv.DataNodes {
newHostsStatus[node.Addr] = node.Status
}
log.LogInfof("updateDataNodeStatus: update %d hosts status", len(newHostsStatus))

w.HostsStatus = newHostsStatus

return
}

0 comments on commit abe19e0

Please sign in to comment.