Skip to content

Commit

Permalink
build: cubefs using local module of blobstore
Browse files Browse the repository at this point in the history
close: cubefs#1527

Signed-off-by: slasher <[email protected]>
  • Loading branch information
sejust committed May 23, 2023
1 parent 54d3f6f commit 5361602
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 126 deletions.
2 changes: 2 additions & 0 deletions blobstore/cli/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ var App = grumble.New(&grumble.Config{
})

func init() {
log.SetOutputLevel(log.Lerror)

App.OnInit(func(a *grumble.App, fm grumble.FlagMap) error {
if path := flags.Config(fm); path != "" {
config.LoadConfig(path)
Expand Down
4 changes: 1 addition & 3 deletions client/fs/super.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ import (
"sync/atomic"
"time"

"github.com/hashicorp/consul/api"

"github.com/cubefs/cubefs/blobstore/api/access"
"github.com/cubefs/cubefs/blockcache/bcache"
"github.com/cubefs/cubefs/client/common"
Expand Down Expand Up @@ -241,7 +239,7 @@ func NewSuper(opt *proto.MountOptions) (s *Super, err error) {
if proto.IsCold(opt.VolType) {
s.ebsc, err = blobstore.NewEbsClient(access.Config{
ConnMode: access.NoLimitConnMode,
Consul: api.Config{
Consul: access.ConsulConfig{
Address: opt.EbsEndpoint,
},
MaxSizePutOnce: MaxSizePutOnce,
Expand Down
11 changes: 4 additions & 7 deletions libsdk/libsdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,24 +90,21 @@ import (
"time"
"unsafe"

"github.com/cubefs/cubefs/util/auditlog"
"github.com/cubefs/cubefs/util/buf"

"github.com/bits-and-blooms/bitset"
"github.com/cubefs/cubefs/blobstore/api/access"
"github.com/cubefs/cubefs/blobstore/common/trace"

"github.com/bits-and-blooms/bitset"
"github.com/cubefs/cubefs/blockcache/bcache"
"github.com/cubefs/cubefs/client/fs"
"github.com/cubefs/cubefs/proto"
"github.com/cubefs/cubefs/sdk/data/blobstore"
"github.com/cubefs/cubefs/sdk/data/stream"
masterSDK "github.com/cubefs/cubefs/sdk/master"
"github.com/cubefs/cubefs/sdk/meta"
"github.com/cubefs/cubefs/util/auditlog"
"github.com/cubefs/cubefs/util/buf"
"github.com/cubefs/cubefs/util/errors"
"github.com/cubefs/cubefs/util/log"
"github.com/cubefs/cubefs/util/stat"
"github.com/hashicorp/consul/api"
)

const (
Expand Down Expand Up @@ -1185,7 +1182,7 @@ func (c *client) start() (err error) {
if c.ebsEndpoint != "" {
if ebsc, err = blobstore.NewEbsClient(access.Config{
ConnMode: access.NoLimitConnMode,
Consul: api.Config{
Consul: access.ConsulConfig{
Address: c.ebsEndpoint,
},
MaxSizePutOnce: MaxSizePutOnce,
Expand Down
6 changes: 2 additions & 4 deletions metanode/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,13 @@ import (

"github.com/cubefs/cubefs/blobstore/api/access"
"github.com/cubefs/cubefs/cmd/common"
raftproto "github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
"github.com/cubefs/cubefs/proto"
"github.com/cubefs/cubefs/raftstore"

raftproto "github.com/cubefs/cubefs/depends/tiglabs/raft/proto"
"github.com/cubefs/cubefs/sdk/data/blobstore"
"github.com/cubefs/cubefs/util"
"github.com/cubefs/cubefs/util/errors"
"github.com/cubefs/cubefs/util/log"
"github.com/hashicorp/consul/api"
)

var (
Expand Down Expand Up @@ -606,7 +604,7 @@ func (mp *metaPartition) onStart(isCreate bool) (err error) {
ebsClient, err = blobstore.NewEbsClient(
access.Config{
ConnMode: access.NoLimitConnMode,
Consul: api.Config{
Consul: access.ConsulConfig{
Address: clusterInfo.EbsAddr,
},
MaxSizePutOnce: int64(volumeInfo.ObjBlockSize),
Expand Down
10 changes: 4 additions & 6 deletions objectnode/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,12 @@ import (

"github.com/cubefs/cubefs/blobstore/api/access"
"github.com/cubefs/cubefs/blockcache/bcache"
"github.com/cubefs/cubefs/sdk/data/blobstore"
"github.com/cubefs/cubefs/util/config"
"github.com/cubefs/cubefs/util/exporter"
"github.com/hashicorp/consul/api"

"github.com/cubefs/cubefs/cmd/common"
"github.com/cubefs/cubefs/proto"
"github.com/cubefs/cubefs/sdk/data/blobstore"
"github.com/cubefs/cubefs/sdk/master"
"github.com/cubefs/cubefs/util/config"
"github.com/cubefs/cubefs/util/exporter"
"github.com/cubefs/cubefs/util/log"

"github.com/gorilla/mux"
Expand Down Expand Up @@ -277,7 +275,7 @@ func handleStart(s common.Server, cfg *config.Config) (err error) {
log.LogInfof("handleStart: get cluster information: region(%v)", o.region)
ebsClient, err = blobstore.NewEbsClient(access.Config{
ConnMode: access.NoLimitConnMode,
Consul: api.Config{
Consul: access.ConsulConfig{
Address: ci.EbsAddr,
},
//ServicePath: ci.ServicePath,
Expand Down
6 changes: 2 additions & 4 deletions preload/sdk/preloadsdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,16 @@ import (
"sync/atomic"
"time"

"github.com/cubefs/cubefs/util/buf"

"github.com/cubefs/cubefs/blobstore/api/access"
"github.com/cubefs/cubefs/blobstore/common/trace"
"github.com/cubefs/cubefs/proto"
"github.com/cubefs/cubefs/sdk/data/blobstore"
"github.com/cubefs/cubefs/sdk/data/stream"
masterSDK "github.com/cubefs/cubefs/sdk/master"
"github.com/cubefs/cubefs/sdk/meta"
"github.com/cubefs/cubefs/util/buf"
"github.com/cubefs/cubefs/util/log"
"github.com/cubefs/cubefs/util/stat"
"github.com/hashicorp/consul/api"
)

type LimitParameters struct {
Expand Down Expand Up @@ -206,7 +204,7 @@ func (c *PreLoadClient) newEBSClient(masters []string, logDir string) (err error

if ebsc, err = blobstore.NewEbsClient(access.Config{
ConnMode: access.NoLimitConnMode,
Consul: api.Config{
Consul: access.ConsulConfig{
Address: ebsEndpoint,
},
MaxSizePutOnce: int64(c.ebsBlockSize),
Expand Down
13 changes: 5 additions & 8 deletions sdk/data/blobstore/blobstore_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,14 @@ import (
"io"
"time"

"github.com/cubefs/cubefs/proto"
"github.com/cubefs/cubefs/util"
"github.com/cubefs/cubefs/util/stat"

"github.com/cubefs/cubefs/blobstore/api/access"
"github.com/cubefs/cubefs/blobstore/common/codemode"
ebsproto "github.com/cubefs/cubefs/blobstore/common/proto"
"github.com/cubefs/cubefs/proto"
"github.com/cubefs/cubefs/util"
"github.com/cubefs/cubefs/util/exporter"
"github.com/cubefs/cubefs/util/log"
"github.com/cubefs/cubefs/util/stat"
"github.com/google/uuid"
)

Expand Down Expand Up @@ -124,7 +123,6 @@ func (ebs *BlobStoreClient) Write(ctx context.Context, volName string, data []by
requestId := uuid.New().String()
log.LogDebugf("TRACE Ebs Write Enter,requestId(%v) len(%v)", requestId, size)
start := time.Now()
hashAlg := access.HashAlgMD5
ctx = access.WithRequestID(ctx, requestId)
metric := exporter.NewTPCnt(createOPMetric(data, "ebswrite"))
defer func() {
Expand All @@ -133,9 +131,8 @@ func (ebs *BlobStoreClient) Write(ctx context.Context, volName string, data []by

for i := 0; i < MaxRetryTimes; i++ {
location, _, err = ebs.client.Put(ctx, &access.PutArgs{
Size: int64(size),
Hashes: hashAlg,
Body: bytes.NewReader(data),
Size: int64(size),
Body: bytes.NewReader(data),
})
if err == nil {
break
Expand Down
91 changes: 3 additions & 88 deletions sdk/data/blobstore/blobstore_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@ import (
"net/http"
"net/http/httptest"
"strconv"
"strings"
"testing"
"time"

"github.com/cubefs/cubefs/blobstore/api/access"
"github.com/cubefs/cubefs/blobstore/common/crc32block"
"github.com/cubefs/cubefs/blobstore/common/proto"
"github.com/cubefs/cubefs/blobstore/util/bytespool"
cproto "github.com/cubefs/cubefs/proto"
"github.com/hashicorp/consul/api"
"github.com/stretchr/testify/require"
)

Expand All @@ -41,15 +37,7 @@ const (
)

var (
client access.API
services []*api.ServiceEntry
hostsApply []*api.ServiceEntry
dataCache []byte
blobCache = make([]byte, blobSize)
tokenAlloc = []byte("token")
tokenPutat = []byte("token")

partRandBroken = false
dataCache []byte
)

type MockEbsService struct {
Expand All @@ -60,19 +48,8 @@ func NewMockEbsService() *MockEbsService {
dataCache = make([]byte, 1<<25)
mockServer := httptest.NewServer(
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/v1/health/service/access" {
w.WriteHeader(http.StatusOK)
b, _ := json.Marshal(hostsApply)
w.Write(b)

} else if req.URL.Path == "/put" {
if req.URL.Path == "/put" {
putSize := req.URL.Query().Get("size")
// just for testing timeout
if strings.HasPrefix(putSize, "-") {
time.Sleep(30 * time.Second)
w.WriteHeader(http.StatusForbidden)
return
}

dataSize, _ := strconv.Atoi(putSize)
size := req.Header.Get("Content-Length")
Expand Down Expand Up @@ -133,34 +110,12 @@ func NewMockEbsService() *MockEbsService {
}
}

if len(args.Locations) > 0 && len(args.Locations)%2 == 0 {
locs := args.Locations[:]
b, _ := json.Marshal(access.DeleteResp{FailedLocations: locs})
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Length", strconv.Itoa(len(b)))
w.WriteHeader(http.StatusIMUsed)
w.Write(b)
return
}

b, _ := json.Marshal(access.DeleteResp{})
w.Header().Set("Content-Type", "application/json")
w.Header().Set("Content-Length", strconv.Itoa(len(b)))
w.WriteHeader(http.StatusOK)
w.Write(b)

} else if req.URL.Path == "/sign" {
args := access.SignArgs{}
requestBody(req, &args)
if err := signCrc(&args.Location, args.Locations); err != nil {
w.WriteHeader(http.StatusForbidden)
return
}

b, _ := json.Marshal(access.SignResp{Location: args.Location})
w.WriteHeader(http.StatusOK)
w.Write(b)

} else {
w.WriteHeader(http.StatusOK)
}
Expand Down Expand Up @@ -213,54 +168,14 @@ func verifyCrc(loc *access.Location) bool {
}
return loc.Crc == crc
}
func signCrc(loc *access.Location, locs []access.Location) error {
first := locs[0]
bids := make(map[proto.BlobID]struct{}, 64)

if loc.ClusterID != first.ClusterID ||
loc.CodeMode != first.CodeMode ||
loc.BlobSize != first.BlobSize {
return fmt.Errorf("not equal in constant field")
}

for _, l := range locs {
if !verifyCrc(&l) {
return fmt.Errorf("not equal in crc %d", l.Crc)
}

// assert
if l.ClusterID != first.ClusterID ||
l.CodeMode != first.CodeMode ||
l.BlobSize != first.BlobSize {
return fmt.Errorf("not equal in constant field")
}

for _, blob := range l.Blobs {
for c := 0; c < int(blob.Count); c++ {
bids[blob.MinBid+proto.BlobID(c)] = struct{}{}
}
}
}

for _, blob := range loc.Blobs {
for c := 0; c < int(blob.Count); c++ {
bid := blob.MinBid + proto.BlobID(c)
if _, ok := bids[bid]; !ok {
return fmt.Errorf("not equal in blob_id(%d)", bid)
}
}
}

return fillCrc(loc)
}

func TestEbsClient_Write_Read(t *testing.T) {
cfg := access.Config{}
mockServer := NewMockEbsService()
cfg.Consul.Address = mockServer.service.URL[7:] // strip http://
cfg.PriorityAddrs = []string{mockServer.service.URL}
cfg.ConnMode = access.QuickConnMode
cfg.MaxSizePutOnce = 1 << 20
defer mockServer.service.Close()

blobStoreClient, err := NewEbsClient(cfg)
if err != nil {
Expand Down
10 changes: 4 additions & 6 deletions sdk/data/blobstore/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@ import (
"syscall"
"testing"

"github.com/cubefs/cubefs/sdk/data/manager"
"github.com/cubefs/cubefs/util/buf"

"github.com/brahma-adshonor/gohook"
"github.com/stretchr/testify/assert"

"github.com/brahma-adshonor/gohook"
"github.com/cubefs/cubefs/blobstore/api/access"
"github.com/cubefs/cubefs/proto"
"github.com/cubefs/cubefs/sdk/data/manager"
"github.com/cubefs/cubefs/sdk/data/stream"
"github.com/cubefs/cubefs/sdk/meta"
"github.com/hashicorp/consul/api"
"github.com/cubefs/cubefs/util/buf"
)

var (
Expand All @@ -44,7 +42,7 @@ func init() {
mockServer := NewMockEbsService()
cfg := access.Config{
ConnMode: access.QuickConnMode,
Consul: api.Config{
Consul: access.ConsulConfig{
Address: mockServer.service.URL[7:],
},
PriorityAddrs: []string{mockServer.service.URL},
Expand Down

0 comments on commit 5361602

Please sign in to comment.