Skip to content

Commit

Permalink
feature: creating vol with a specified zone
Browse files Browse the repository at this point in the history
     when creating vol which is not across zones,user specified a zone,to be created partition with the specified zone,
	 if specified zone is not writable,we choose a zone randomly

Signed-off-by: zhuhyc <[email protected]>
  • Loading branch information
zhuhyc committed Mar 13, 2020
1 parent 558e99f commit b91f267
Show file tree
Hide file tree
Showing 14 changed files with 78 additions and 35 deletions.
2 changes: 1 addition & 1 deletion cmd/master/scripts/manage_vol_create.sh
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#!/bin/bash
curl -v "http://127.0.0.1/admin/createVol?name=test&capacity=100&owner=cfs&crossZone=false"
curl -v "http://127.0.0.1/admin/createVol?name=test&capacity=100&owner=cfs&crossZone=false&zoneName=xx"
File renamed without changes.
File renamed without changes.
10 changes: 6 additions & 4 deletions master/api_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,9 +546,10 @@ func (m *Server) createVol(w http.ResponseWriter, r *http.Request) {
followerRead bool
authenticate bool
crossZone bool
zoneName string
)

if name, owner, mpCount, dpReplicaNum, size, capacity, followerRead, authenticate, crossZone, err = parseRequestToCreateVol(r); err != nil {
if name, owner, zoneName, mpCount, dpReplicaNum, size, capacity, followerRead, authenticate, crossZone, err = parseRequestToCreateVol(r); err != nil {
sendErrReply(w, r, &proto.HTTPReply{Code: proto.ErrCodeParamError, Msg: err.Error()})
return
}
Expand All @@ -557,7 +558,7 @@ func (m *Server) createVol(w http.ResponseWriter, r *http.Request) {
sendErrReply(w, r, &proto.HTTPReply{Code: proto.ErrCodeParamError, Msg: err.Error()})
return
}
if vol, err = m.cluster.createVol(name, owner, mpCount, dpReplicaNum, size, capacity, followerRead, authenticate, crossZone); err != nil {
if vol, err = m.cluster.createVol(name, owner, zoneName, mpCount, dpReplicaNum, size, capacity, followerRead, authenticate, crossZone); err != nil {
sendErrReply(w, r, newErrHTTPReply(err))
return
}
Expand Down Expand Up @@ -589,6 +590,7 @@ func newSimpleView(vol *Vol) *proto.SimpleVolView {
ID: vol.ID,
Name: vol.Name,
Owner: vol.Owner,
ZoneName: vol.zoneName,
DpReplicaNum: vol.dpReplicaNum,
MpReplicaNum: vol.mpReplicaNum,
Status: vol.Status,
Expand Down Expand Up @@ -1088,7 +1090,7 @@ func parseBoolFieldToUpdateVol(r *http.Request, vol *Vol) (followerRead, authent
return
}

func parseRequestToCreateVol(r *http.Request) (name, owner string, mpCount, dpReplicaNum, size, capacity int, followerRead, authenticate, crossZone bool, err error) {
func parseRequestToCreateVol(r *http.Request) (name, owner, zoneName string, mpCount, dpReplicaNum, size, capacity int, followerRead, authenticate, crossZone bool, err error) {
if err = r.ParseForm(); err != nil {
return
}
Expand Down Expand Up @@ -1138,7 +1140,7 @@ func parseRequestToCreateVol(r *http.Request) (name, owner string, mpCount, dpRe
if crossZone, err = extractCrossZone(r); err != nil {
return
}

zoneName = r.FormValue(zoneNameKey)
return
}

Expand Down
2 changes: 1 addition & 1 deletion master/api_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func createDefaultMasterServerForTest() *Server {
testServer.cluster.checkMetaNodeHeartbeat()
time.Sleep(5 * time.Second)
testServer.cluster.scheduleToUpdateStatInfo()
vol, err := testServer.cluster.createVol(commonVolName, "cfs", 3, 3, 3, 100, false, false, false)
vol, err := testServer.cluster.createVol(commonVolName, "cfs", "",3, 3, 3, 100, false, false, false)
if err != nil {
panic(err)
}
Expand Down
63 changes: 49 additions & 14 deletions master/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -513,7 +513,7 @@ func (c *Cluster) createDataPartition(volName string, zoneNum int) (dp *DataPart
vol.createDpMutex.Lock()
defer vol.createDpMutex.Unlock()
errChannel := make(chan error, vol.dpReplicaNum)
if targetHosts, targetPeers, err = c.chooseTargetDataNodes("", nil, nil, int(vol.dpReplicaNum), zoneNum); err != nil {
if targetHosts, targetPeers, err = c.chooseTargetDataNodes("", nil, nil, int(vol.dpReplicaNum), zoneNum, vol.zoneName); err != nil {
goto errHandler
}
if partitionID, err = c.idAlloc.allocateDataPartitionID(); err != nil {
Expand Down Expand Up @@ -621,7 +621,7 @@ func (c *Cluster) decideZoneNum(crossZone bool) (zoneNum int) {
}
return zoneNum
}
func (c *Cluster) chooseTargetDataNodes(excludeZone string, excludeNodeSets []uint64, excludeHosts []string, replicaNum int, zoneNum int) (hosts []string, peers []proto.Peer, err error) {
func (c *Cluster) chooseTargetDataNodes(excludeZone string, excludeNodeSets []uint64, excludeHosts []string, replicaNum int, zoneNum int, specifiedZone string) (hosts []string, peers []proto.Peer, err error) {

var (
masterZone *Zone
Expand All @@ -634,9 +634,22 @@ func (c *Cluster) chooseTargetDataNodes(excludeZone string, excludeNodeSets []ui
if replicaNum <= zoneNum {
zoneNum = replicaNum
}
zones, err = c.t.allocZonesForDataNode(zoneNum, replicaNum, excludeZones)
if err != nil {
return
// when creating vol,user specified a zone,we reset zoneNum to 1,to be created partition with specified zone,
//if specified zone is not writable,we choose a zone randomly
if specifiedZone != "" {
zoneNum = 1
zone, err := c.t.getZone(specifiedZone)
if err != nil {
Warn(c.Name, fmt.Sprintf("cluster[%v],specified zone[%v]is not writable", c.Name, specifiedZone))
} else {
zones = make([]*Zone, 0)
zones = append(zones, zone)
}
}
if zones == nil || specifiedZone == "" {
if zones, err = c.t.allocZonesForDataNode(zoneNum, replicaNum, excludeZones); err != nil {
return
}
}
//if vol enable cross zone,available zone less than 2,can't create partition
if zoneNum >= 2 && len(zones) < 2 {
Expand Down Expand Up @@ -849,7 +862,7 @@ func (c *Cluster) decommissionDataPartition(offlineAddr string, dp *DataPartitio
} else {
excludeZone = zones[0]
}
if targetHosts, _, err = c.chooseTargetDataNodes(excludeZone, excludeNodeSets, dp.Hosts, 1, 1); err != nil {
if targetHosts, _, err = c.chooseTargetDataNodes(excludeZone, excludeNodeSets, dp.Hosts, 1, 1, ""); err != nil {
goto errHandler
}
}
Expand Down Expand Up @@ -1253,7 +1266,7 @@ errHandler:

// Create a new volume.
// By default we create 3 meta partitions and 10 data partitions during initialization.
func (c *Cluster) createVol(name, owner string, mpCount, dpReplicaNum, size, capacity int, followerRead, authenticate, crossZone bool) (vol *Vol, err error) {
func (c *Cluster) createVol(name, owner, zoneName string, mpCount, dpReplicaNum, size, capacity int, followerRead, authenticate, crossZone bool) (vol *Vol, err error) {
var (
dataPartitionSize uint64
readWriteDataPartitions int
Expand All @@ -1267,7 +1280,15 @@ func (c *Cluster) createVol(name, owner string, mpCount, dpReplicaNum, size, cap
if crossZone && c.t.zoneLen() <= 1 {
return nil, fmt.Errorf("cluster has one zone,can't cross zone")
}
if vol, err = c.doCreateVol(name, owner, dataPartitionSize, uint64(capacity), dpReplicaNum, followerRead, authenticate, crossZone); err != nil {
if crossZone && zoneName != "" {
return nil, fmt.Errorf("only the vol which don't across zones,can specified zoneName")
}
if zoneName != "" {
if _, err = c.t.getZone(zoneName); err != nil {
return
}
}
if vol, err = c.doCreateVol(name, owner, zoneName, dataPartitionSize, uint64(capacity), dpReplicaNum, followerRead, authenticate, crossZone); err != nil {
goto errHandler
}
if err = vol.initMetaPartitions(c, mpCount); err != nil {
Expand Down Expand Up @@ -1295,7 +1316,7 @@ errHandler:
return
}

func (c *Cluster) doCreateVol(name, owner string, dpSize, capacity uint64, dpReplicaNum int, followerRead, authenticate, crossZone bool) (vol *Vol, err error) {
func (c *Cluster) doCreateVol(name, owner, zoneName string, dpSize, capacity uint64, dpReplicaNum int, followerRead, authenticate, crossZone bool) (vol *Vol, err error) {
var id uint64
c.createVolMutex.Lock()
defer c.createVolMutex.Unlock()
Expand All @@ -1307,7 +1328,7 @@ func (c *Cluster) doCreateVol(name, owner string, dpSize, capacity uint64, dpRep
if err != nil {
goto errHandler
}
vol = newVol(id, name, owner, dpSize, capacity, uint8(dpReplicaNum), defaultReplicaNum, followerRead, authenticate, crossZone)
vol = newVol(id, name, owner, zoneName, dpSize, capacity, uint8(dpReplicaNum), defaultReplicaNum, followerRead, authenticate, crossZone)
// refresh oss secure
vol.refreshOSSSecure()
if err = c.syncAddVol(vol); err != nil {
Expand Down Expand Up @@ -1356,7 +1377,7 @@ func (c *Cluster) updateInodeIDRange(volName string, start uint64) (err error) {
}

// Choose the target hosts from the available zones and meta nodes.
func (c *Cluster) chooseTargetMetaHosts(excludeZone string, excludeNodeSets []uint64, excludeHosts []string, replicaNum int, crossZone bool) (hosts []string, peers []proto.Peer, err error) {
func (c *Cluster) chooseTargetMetaHosts(excludeZone string, excludeNodeSets []uint64, excludeHosts []string, replicaNum int, crossZone bool, specifiedZone string) (hosts []string, peers []proto.Peer, err error) {
var (
zones []*Zone
masterZone *Zone
Expand All @@ -1369,10 +1390,24 @@ func (c *Cluster) chooseTargetMetaHosts(excludeZone string, excludeNodeSets []ui
if replicaNum < zoneNum {
zoneNum = replicaNum
}
zones, err = c.t.allocZonesForMetaNode(zoneNum, replicaNum, excludeZones)
if err != nil {
return
// when creating vol,user specified a zone,we reset zoneNum to 1,to be created partition with specified zone,
//if specified zone is not writable,we choose a zone randomly
if specifiedZone != "" {
zoneNum = 1
zone, err := c.t.getZone(specifiedZone)
if err != nil {
Warn(c.Name, fmt.Sprintf("cluster[%v],specified zone[%v]is not writable", c.Name, specifiedZone))
} else {
zones = make([]*Zone, 0)
zones = append(zones, zone)
}
}
if zones == nil || specifiedZone == "" {
if zones, err = c.t.allocZonesForMetaNode(zoneNum, replicaNum, excludeZones); err != nil {
return
}
}

if crossZone && len(zones) < 2 {
log.LogWarn(fmt.Sprintf("action[chooseTargetMetaNodes] ,no enough zones [%v] to be selected, expect select [%v] zones", len(zones), zoneNum))
return nil, nil, fmt.Errorf("action[chooseTargetMetaNodes] no enough zones [%v] to be selected, expect select [%v] zones", len(zones), zoneNum)
Expand Down
2 changes: 1 addition & 1 deletion master/cluster_task.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (c *Cluster) decommissionMetaPartition(nodeAddr string, mp *MetaPartition)
excludeZone = zones[0]
}
// choose a meta node in other zone
if _, newPeers, err = c.chooseTargetMetaHosts(excludeZone, excludeNodeSets, oldHosts, 1, false); err != nil {
if _, newPeers, err = c.chooseTargetMetaHosts(excludeZone, excludeNodeSets, oldHosts, 1, false, ""); err != nil {
goto errHandler
}
}
Expand Down
2 changes: 1 addition & 1 deletion master/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func buildPanicVol() *Vol {
if err != nil {
return nil
}
vol := newVol(id, commonVol.Name, commonVol.Owner, commonVol.dataPartitionSize, commonVol.Capacity, defaultReplicaNum, defaultReplicaNum, false, false,false)
vol := newVol(id, commonVol.Name, commonVol.Owner,"", commonVol.dataPartitionSize, commonVol.Capacity, defaultReplicaNum, defaultReplicaNum, false, false,false)
vol.dataPartitions = nil
return vol
}
Expand Down
14 changes: 8 additions & 6 deletions master/metadata_fsm_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ type volValue struct {
FollowerRead bool
Authenticate bool
CrossZone bool
ZoneName string
OSSAccessKey string
OSSSecretKey string
}
Expand All @@ -139,6 +140,7 @@ func newVolValue(vol *Vol) (vv *volValue) {
FollowerRead: vol.FollowerRead,
Authenticate: vol.authenticate,
CrossZone: vol.crossZone,
ZoneName: vol.zoneName,
OSSAccessKey: vol.OSSAccessKey,
OSSSecretKey: vol.OSSSecretKey,
}
Expand Down Expand Up @@ -186,16 +188,16 @@ func newMetaNodeValue(metaNode *MetaNode) *metaNodeValue {
}

type nodeSetValue struct {
ID uint64
Capacity int
ZoneName string
ID uint64
Capacity int
ZoneName string
}

func newNodeSetValue(nset *nodeSet) (nsv *nodeSetValue) {
nsv = &nodeSetValue{
ID: nset.ID,
Capacity: nset.Capacity,
ZoneName: nset.zoneName,
ID: nset.ID,
Capacity: nset.Capacity,
ZoneName: nset.zoneName,
}
return
}
Expand Down
2 changes: 1 addition & 1 deletion master/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ func (zone *Zone) getAvailMetaNodeHosts(excludeNodeSets []uint64, excludeHosts [
}
ns, err := zone.allocNodeSetForMetaNode(excludeNodeSets, uint8(replicaNum))
if err != nil {
return
return nil, nil, errors.NewErrorf("zone[%v],err[%v]", zone.name, err)
}
return ns.getAvailMetaNodeHosts(excludeHosts, replicaNum)

Expand Down
4 changes: 2 additions & 2 deletions master/topology_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ func TestAllocZones(t *testing.T) {
cluster.t = topo
cluster.cfg = newClusterConfig()
//don't cross zone
hosts, _, err := cluster.chooseTargetDataNodes("", nil, nil, replicaNum,1)
hosts, _, err := cluster.chooseTargetDataNodes("", nil, nil, replicaNum, 1, "")
if err != nil {
t.Error(err)
return
}
//cross zone
hosts, _, err = cluster.chooseTargetDataNodes("", nil, nil, replicaNum,2)
hosts, _, err = cluster.chooseTargetDataNodes("", nil, nil, replicaNum, 2, "")
if err != nil {
t.Error(err)
return
Expand Down
7 changes: 5 additions & 2 deletions master/vol.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Vol struct {
FollowerRead bool
authenticate bool
crossZone bool
zoneName string
MetaPartitions map[uint64]*MetaPartition
mpsLock sync.RWMutex
dataPartitions *DataPartitionMap
Expand All @@ -51,7 +52,7 @@ type Vol struct {
sync.RWMutex
}

func newVol(id uint64, name, owner string, dpSize, capacity uint64, dpReplicaNum, mpReplicaNum uint8, followerRead, authenticate, crossZone bool) (vol *Vol) {
func newVol(id uint64, name, owner, zoneName string, dpSize, capacity uint64, dpReplicaNum, mpReplicaNum uint8, followerRead, authenticate, crossZone bool) (vol *Vol) {
vol = &Vol{ID: id, Name: name, MetaPartitions: make(map[uint64]*MetaPartition, 0)}
vol.dataPartitions = newDataPartitionMap(name)
if dpReplicaNum <= 1 {
Expand All @@ -75,6 +76,7 @@ func newVol(id uint64, name, owner string, dpSize, capacity uint64, dpReplicaNum
vol.FollowerRead = followerRead
vol.authenticate = authenticate
vol.crossZone = crossZone
vol.zoneName = zoneName
vol.viewCache = make([]byte, 0)
vol.mpsCache = make([]byte, 0)
return
Expand All @@ -85,6 +87,7 @@ func newVolFromVolValue(vv *volValue) (vol *Vol) {
vv.ID,
vv.Name,
vv.Owner,
vv.ZoneName,
vv.DataPartitionSize,
vv.Capacity,
vv.DpReplicaNum,
Expand Down Expand Up @@ -691,7 +694,7 @@ func (vol *Vol) doCreateMetaPartition(c *Cluster, start, end uint64) (mp *MetaPa
wg sync.WaitGroup
)
errChannel := make(chan error, vol.mpReplicaNum)
if hosts, peers, err = c.chooseTargetMetaHosts("", nil, nil, int(vol.mpReplicaNum), vol.crossZone); err != nil {
if hosts, peers, err = c.chooseTargetMetaHosts("", nil, nil, int(vol.mpReplicaNum), vol.crossZone, vol.zoneName); err != nil {
log.LogErrorf("action[doCreateMetaPartition] chooseTargetMetaHosts err[%v]", err)
return nil, errors.NewError(err)
}
Expand Down
4 changes: 2 additions & 2 deletions master/vol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func markDeleteVol(name string, t *testing.T) {

func TestVolReduceReplicaNum(t *testing.T) {
volName := "reduce-replica-num"
vol, err := server.cluster.createVol(volName, volName, 3, 3, util.DefaultDataPartitionSize, 100, false, false,false)
vol, err := server.cluster.createVol(volName, volName, "",3, 3, util.DefaultDataPartitionSize, 100, false, false,false)
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -210,7 +210,7 @@ func TestVolReduceReplicaNum(t *testing.T) {
func TestConcurrentReadWriteDataPartitionMap(t *testing.T) {
name := "TestConcurrentReadWriteDataPartitionMap"
var volID uint64 = 1
vol := newVol(volID, name, name, util.DefaultDataPartitionSize, 100, defaultReplicaNum, defaultReplicaNum, false, false,false)
vol := newVol(volID, name, name,"", util.DefaultDataPartitionSize, 100, defaultReplicaNum, defaultReplicaNum, false, false,false)
//unavaliable mp
mp1 := newMetaPartition(1, 1, defaultMaxMetaPartitionInodeID, 3, name, volID)
vol.addMetaPartition(mp1)
Expand Down
1 change: 1 addition & 0 deletions proto/admin_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,7 @@ type SimpleVolView struct {
ID uint64
Name string
Owner string
ZoneName string
DpReplicaNum uint8
MpReplicaNum uint8
Status uint8
Expand Down

0 comments on commit b91f267

Please sign in to comment.