Skip to content

Commit

Permalink
feature: object storage interface
Browse files Browse the repository at this point in the history
Add ObjectNode provides S3-compatibile APIs.
Fusion Storage interface expose two interface (POSIX and S3-compatible) for file operation.

Signed-off-by: Mofei Zhang <[email protected]>
  • Loading branch information
mervinkid committed Dec 17, 2019
1 parent ddfb8ac commit d609fed
Show file tree
Hide file tree
Showing 301 changed files with 28,398 additions and 2,992 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
.gitignore
build/bin
build/rocksdb
build/snappy
Expand Down
50 changes: 50 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
before_script:
- go version
- echo $CI_PROJECT_DIR
- export GOPATH=/export/workspace/go
- echo $GOPATH
- echo ${CI_COMMIT_REF_NAME}
- rm -fr $GOPATH/src/github.com/chubaofs/chubaofs
- ln -s $CI_PROJECT_DIR $GOPATH/src/github.com/chubaofs/
- echo $CI_PROJECT_DIR
- echo $GOPATH/src/github.com/chubaofs/
- cd $GOPATH/src/github.com/chubaofs/chubaofs

stages:
- build
- deploy
- restart
- ltptest

build_main:
stage: build
tags:
- cfs
script:
- sh /export/App/cfsci/scripts/build.sh

deploy:
stage: deploy
tags:
- cfs
script:
- sh /export/App/cfsci/scripts/deploy.sh

restart:
stage: restart
tags:
- cfs
script:
- sh /export/App/cfsci/scripts/stop-ltp.sh
- sh /export/App/cfsci/scripts/stop-client.sh
- sh /export/App/cfsci/scripts/stop-server.sh
- sh /export/App/cfsci/scripts/start-server.sh
- sh /export/App/cfsci/scripts/start-client.sh

ltptest:
stage: ltptest
tags:
- cfs
script:
- sh /export/App/cfsci/scripts/start-ltp.sh

9 changes: 4 additions & 5 deletions MAINTAINERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,12 @@ The ChubaoFS maintainers are:
* Shuoran Liu <[email protected]> @sjzlsr: Client and SDK
* Hongyin Zhu <[email protected]> @zhuhyc: Master
* Jianxing Zhao <[email protected]> @znlstar: MetaNode
* Mofei Zhang <[email protected]> @mervinkid: MetaNode and S3 Gateway
* Mofei Zhang <[email protected]> @mervinkid: MetaNode and ObjectNode
* Tianpeng Li <[email protected]> @Skypigltp: DataNode and Raft
* Yubo Li <[email protected]> @yuboLee: S3 Gateway and Console
* Yubo Li <[email protected]> @yuboLee: ObjectNode and Console
* Wei Ding <[email protected]> @wding109: Research and Open Source Strategy
* Zhengyi Zhu <[email protected]> @zhuzhengyi: Monitoring and S3 Gateway
* Zhengyi Zhu <[email protected]> @zhuzhengyi: Monitoring and ObjectNode
* Liying Zhang <[email protected]> @Vivian7755: Product Management & Advocate
* Junyuan Zeng <[email protected]> @jzeng4: Authorization Node
* Xihao Xu<[email protected]> @xxscott: CSI Driver
* Wenjia Wu<[email protected]> @wenjia322: Authorization Node
* Chengyu Liu<[email protected]> @@chengyu-l: Helm and CSI Driver
* Wenjia Wu<[email protected]> @wenjia322: Authorization Node
56 changes: 44 additions & 12 deletions authnode/api_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func genAuthRaftNodeOpResp(req *proto.APIAccessReq, ts int64, key []byte, msg st
}

if message, err = cryptoutil.EncodeMessage(jresp, key); err != nil {
err = fmt.Errorf("encdoe message for response failed %s", err.Error())
err = fmt.Errorf("encode message for response failed %s", err.Error())
return
}

Expand Down Expand Up @@ -302,17 +302,27 @@ func (m *Server) handleGetKey(keyInfo *keystore.KeyInfo) (res *keystore.KeyInfo,
}

func (m *Server) handleAddCaps(keyInfo *keystore.KeyInfo) (res *keystore.KeyInfo, err error) {
if res, err = m.cluster.AddCaps(keyInfo.ID, keyInfo); err != nil {
return
if keyInfo.ID == "" {
var akInfo *keystore.AccessKeyInfo
if akInfo, err = m.cluster.GetAKInfo(keyInfo.AccessKey); err != nil {
return
}
return m.cluster.AddCaps(akInfo.ID, keyInfo)
} else {
return m.cluster.AddCaps(keyInfo.ID, keyInfo)
}
return
}

func (m *Server) handleDeleteCaps(keyInfo *keystore.KeyInfo) (res *keystore.KeyInfo, err error) {
if res, err = m.cluster.DeleteCaps(keyInfo.ID, keyInfo); err != nil {
return
if keyInfo.ID == "" {
var akInfo *keystore.AccessKeyInfo
if akInfo, err = m.cluster.GetAKInfo(keyInfo.AccessKey); err != nil {
return
}
return m.cluster.DeleteCaps(akInfo.ID, keyInfo)
} else {
return m.cluster.DeleteCaps(keyInfo.ID, keyInfo)
}
return
}

func (m *Server) extractClientReqInfo(r *http.Request) (plaintext []byte, err error) {
Expand All @@ -335,6 +345,28 @@ func (m *Server) extractClientReqInfo(r *http.Request) (plaintext []byte, err er
return
}

func (m *Server) osCapsOp(writer http.ResponseWriter, request *http.Request) {
//TODO
/*
case proto.MsgAuthOSAddCapsReq:
fallthrough
case proto.MsgAuthOSDeleteCapsReq:
if err = keyInfo.IsValidAK(); err != nil {
sendErrReply(w, r, &proto.HTTPAuthReply{Code: proto.ErrCodeParamError, Msg: err.Error()})
return
}
if err = keyInfo.IsValidCaps(); err != nil {
sendErrReply(w, r, &proto.HTTPAuthReply{Code: proto.ErrCodeParamError, Msg: err.Error()})
return
}
case proto.MsgAuthOSGetCapsReq:
if err = keyInfo.IsValidAK(); err != nil {
sendErrReply(w, r, &proto.HTTPAuthReply{Code: proto.ErrCodeParamError, Msg: err.Error()})
return
}
*/
}

func (m *Server) genTicket(key []byte, serviceID string, IP string, caps []byte) (ticket cryptoutil.Ticket) {
currentTime := time.Now().Unix()
ticket.Version = cryptoutil.TicketVersion
Expand All @@ -354,14 +386,14 @@ func (m *Server) getSecretKey(id string) (key []byte, err error) {
if keyInfo, err = m.getSecretKeyInfo(id); err != nil {
return
}
return keyInfo.Key, err
return keyInfo.AuthKey, err
}

func (m *Server) getSecretKeyInfo(id string) (keyInfo *keystore.KeyInfo, err error) {
if id == proto.AuthServiceID {
keyInfo = &keystore.KeyInfo{
Key: m.cluster.AuthSecretKey,
Caps: []byte(`{"API": ["*:*:*"]}`),
AuthKey: m.cluster.AuthSecretKey,
Caps: []byte(`{"API": ["*:*:*"]}`),
}
} else {
if keyInfo, err = m.cluster.GetKey(id); err != nil {
Expand Down Expand Up @@ -417,7 +449,7 @@ func (m *Server) genGetTicketAuthResp(req *proto.AuthGetTicketReq, ts int64, r *
if keyInfo, err = m.getSecretKeyInfo(resp.ClientID); err != nil {
return
}
clientKey = keyInfo.Key
clientKey = keyInfo.AuthKey
if message, err = cryptoutil.EncodeMessage(jresp, clientKey); err != nil {
return
}
Expand Down Expand Up @@ -459,7 +491,7 @@ func genAuthAPIAccessResp(req *proto.APIAccessReq, keyInfo *keystore.KeyInfo, ts
}

if message, err = cryptoutil.EncodeMessage(jresp, key); err != nil {
err = fmt.Errorf("encdoe message for response failed %s", err.Error())
err = fmt.Errorf("encode message for response failed %s", err.Error())
return
}

Expand Down
3 changes: 3 additions & 0 deletions authnode/authnode_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func (m *Server) handleLeaderChange(leader uint64) {
if err := m.cluster.loadKeystore(); err != nil {
panic(err)
}
if err := m.cluster.loadAKstore(); err != nil {
panic(err)
}
m.metaReady = true
}
}
Expand Down
28 changes: 27 additions & 1 deletion authnode/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package authnode
import (
"encoding/json"
"fmt"
"github.com/chubaofs/chubaofs/util"
"time"

"github.com/chubaofs/chubaofs/proto"
Expand Down Expand Up @@ -57,6 +58,7 @@ func newCluster(name string, leaderInfo *LeaderInfo, fsm *KeystoreFsm, partition
c.fsm = fsm
c.partition = partition
c.fsm.keystore = make(map[string]*keystore.KeyInfo, 0)
c.fsm.accessKeystore = make(map[string]*keystore.AccessKeyInfo, 0)
return
}

Expand Down Expand Up @@ -88,17 +90,28 @@ func (c *Cluster) checkLeaderAddr() {
func (c *Cluster) CreateNewKey(id string, keyInfo *keystore.KeyInfo) (res *keystore.KeyInfo, err error) {
c.fsm.opKeyMutex.Lock()
defer c.fsm.opKeyMutex.Unlock()
accessKeyInfo := &keystore.AccessKeyInfo{
AccessKey: keyInfo.AccessKey,
ID: keyInfo.ID,
}
if _, err = c.fsm.GetKey(id); err == nil {
err = proto.ErrDuplicateKey
goto errHandler
}
keyInfo.Ts = time.Now().Unix()
keyInfo.Key = cryptoutil.GenSecretKey([]byte(c.AuthRootKey), keyInfo.Ts, id)
keyInfo.AuthKey = cryptoutil.GenSecretKey([]byte(c.AuthRootKey), keyInfo.Ts, id)
//TODO check duplicate
keyInfo.AccessKey = util.RandomString(16, util.Numeric|util.LowerLetter|util.UpperLetter)
keyInfo.SecretKey = util.RandomString(32, util.Numeric|util.LowerLetter|util.UpperLetter)
if err = c.syncAddKey(keyInfo); err != nil {
goto errHandler
}
if err = c.syncAddAccessKey(accessKeyInfo); err != nil {
goto errHandler
}
res = keyInfo
c.fsm.PutKey(keyInfo)
c.fsm.PutAKInfo(accessKeyInfo)
return
errHandler:
err = fmt.Errorf("action[CreateNewKey], clusterID[%v] ID:%v, err:%v ", c.Name, keyInfo, err.Error())
Expand Down Expand Up @@ -138,6 +151,19 @@ errHandler:
return
}

// GetKey get a key from the AKstore
func (c *Cluster) GetAKInfo(accessKey string) (akInfo *keystore.AccessKeyInfo, err error) {
if akInfo, err = c.fsm.GetAKInfo(accessKey); err != nil {
err = proto.ErrAccessKeyNotExists
goto errHandler
}
return
errHandler:
err = fmt.Errorf("action[GetAKInfo], clusterID[%v] ID:%v, err:%v ", c.Name, accessKey, err.Error())
log.LogError(errors.Stack(err))
return
}

// AddCaps add caps to the key
func (c *Cluster) AddCaps(id string, keyInfo *keystore.KeyInfo) (res *keystore.KeyInfo, err error) {
var (
Expand Down
3 changes: 3 additions & 0 deletions authnode/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,7 @@ const (
idSeparator = "$" // To seperate ID of server that submits raft changes
keyAcronym = "key"
ksPrefix = keySeparator + keyAcronym + keySeparator

akAcronym = "ak"
akPrefix = keySeparator + akAcronym + keySeparator
)
10 changes: 9 additions & 1 deletion authnode/http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ func (m *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
fallthrough
case proto.AdminRemoveRaftNode:
m.raftNodeOp(w, r)
case proto.OSAddCaps:
fallthrough
case proto.OSDeleteCaps:
fallthrough
case proto.OSGetCaps:
m.osCapsOp(w, r)
default:
sendErrReply(w, r, &proto.HTTPAuthReply{Code: proto.ErrCodeParamError, Msg: "Invalid requst URL"})
}
Expand All @@ -107,7 +113,9 @@ func (m *Server) handleFunctions() {
http.Handle(proto.AdminGetCaps, m.handlerWithInterceptor())
http.Handle(proto.AdminAddRaftNode, m.handlerWithInterceptor())
http.Handle(proto.AdminRemoveRaftNode, m.handlerWithInterceptor())

http.Handle(proto.OSAddCaps, m.handlerWithInterceptor())
http.Handle(proto.OSDeleteCaps, m.handlerWithInterceptor())
http.Handle(proto.OSGetCaps, m.handlerWithInterceptor())
return
}

Expand Down
27 changes: 26 additions & 1 deletion authnode/keystore_cache_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,31 @@ func (mf *KeystoreFsm) GetKey(id string) (u *keystore.KeyInfo, err error) {
func (mf *KeystoreFsm) DeleteKey(id string) {
mf.ksMutex.Lock()
defer mf.ksMutex.Unlock()
delete((mf.keystore), id)
delete(mf.keystore, id)
return
}

func (mf *KeystoreFsm) PutAKInfo(akInfo *keystore.AccessKeyInfo) {
mf.aksMutex.Lock()
defer mf.aksMutex.Unlock()
if _, ok := (mf.accessKeystore)[akInfo.AccessKey]; !ok {
(mf.accessKeystore)[akInfo.AccessKey] = akInfo
}
}

func (mf *KeystoreFsm) GetAKInfo(accessKey string) (akInfo *keystore.AccessKeyInfo, err error) {
mf.aksMutex.RLock()
defer mf.aksMutex.RUnlock()
akInfo, ok := (mf.accessKeystore)[accessKey]
if !ok {
err = proto.ErrAccessKeyNotExists
}
return
}

func (mf *KeystoreFsm) DeleteAKInfo(accessKey string) {
mf.aksMutex.Lock()
defer mf.aksMutex.Unlock()
//TODO
return
}
12 changes: 7 additions & 5 deletions authnode/keystore_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,12 @@ type KeystoreFsm struct {
peerChangeHandler raftPeerChangeHandler
snapshotHandler raftApplySnapshotHandler

keystore map[string]*keystore.KeyInfo
ksMutex sync.RWMutex // keystore mutex
opKeyMutex sync.RWMutex // operations on key mutex
id uint64 // current id of server
keystore map[string]*keystore.KeyInfo
accessKeystore map[string]*keystore.AccessKeyInfo
ksMutex sync.RWMutex // keystore mutex
aksMutex sync.RWMutex //accesskeystore mutex
opKeyMutex sync.RWMutex // operations on key mutex
id uint64 // current id of server
}

func newKeystoreFsm(store *raftstore.RocksDBStore, retainsLog uint64, rs *raft.RaftServer) (fsm *KeystoreFsm) {
Expand Down Expand Up @@ -144,7 +146,7 @@ func (mf *KeystoreFsm) Apply(command []byte, index uint64) (resp interface{}, er
// of cache may happen in newly demoted leader node. Therefore, we use the following
// statement: "id" indicates which server has changed keystore cache (typical leader).
if mf.id != leader {
mf.DeleteKey(string(keyInfo.Key))
mf.DeleteKey(keyInfo.ID)
log.LogInfof("action[Apply], Successfully delete key in node[%d]", mf.id)
} else {
log.LogInfof("action[Apply], Already delete key in node[%d]", mf.id)
Expand Down
Loading

0 comments on commit d609fed

Please sign in to comment.