Skip to content

Commit

Permalink
objectLayer: Check for format.json in a wrapped disk. (minio#3311)
Browse files Browse the repository at this point in the history
This is needed to validate if the `format.json` indeed exists
when a fresh node is brought online.

This wrapped implementation also connects to the remote node
by attempting a re-login. Subsequently after a successful
connect `format.json` is validated as well.

Fixes minio#3207
  • Loading branch information
harshavardhana authored Nov 23, 2016
1 parent 7a5bbf7 commit 6efee20
Show file tree
Hide file tree
Showing 26 changed files with 877 additions and 194 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,8 @@ ineffassign:

cyclo:
@echo "Running $@:"
@GO15VENDOREXPERIMENT=1 ${GOPATH}/bin/gocyclo -over 65 cmd
@GO15VENDOREXPERIMENT=1 ${GOPATH}/bin/gocyclo -over 65 pkg
@GO15VENDOREXPERIMENT=1 ${GOPATH}/bin/gocyclo -over 100 cmd
@GO15VENDOREXPERIMENT=1 ${GOPATH}/bin/gocyclo -over 100 pkg

build: getdeps verifiers $(UI_ASSETS)

Expand Down
27 changes: 6 additions & 21 deletions cmd/auth-rpc-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,12 @@ type authConfig struct {

// AuthRPCClient is a wrapper type for RPCClient which provides JWT based authentication across reconnects.
type AuthRPCClient struct {
mu sync.Mutex
config *authConfig
rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client
isLoggedIn bool // Indicates if the auth client has been logged in and token is valid.
serverToken string // Disk rpc JWT based token.
serverVersion string // Server version exchanged by the RPC.
serverIOErrCnt int // Keeps track of total errors occurred for each RPC call.
mu sync.Mutex
config *authConfig
rpc *RPCClient // reconnect'able rpc client built on top of net/rpc Client
isLoggedIn bool // Indicates if the auth client has been logged in and token is valid.
serverToken string // Disk rpc JWT based token.
serverVersion string // Server version exchanged by the RPC.
}

// newAuthClient - returns a jwt based authenticated (go) rpc client, which does automatic reconnect.
Expand Down Expand Up @@ -133,20 +132,6 @@ func (authClient *AuthRPCClient) Login() (err error) {
// As soon as the function returns unlock,
defer authClient.mu.Unlock()

// Take remote disk offline if the total server errors
// are more than maximum allowable IO error limit.
if authClient.serverIOErrCnt > maxAllowedIOError {
return errFaultyRemoteDisk
}

// In defer sequence this is called first, so error
// increment happens well with in the lock.
defer func() {
if err != nil {
authClient.serverIOErrCnt++
}
}()

// Return if already logged in.
if authClient.isLoggedIn {
return nil
Expand Down
2 changes: 1 addition & 1 deletion cmd/event-notifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestInitEventNotifierFaultyDisks(t *testing.T) {
}

fs := obj.(fsObjects)
fsstorage := fs.storage.(*posix)
fsstorage := fs.storage.(*retryStorage)

listenARN := "arn:minio:sns:us-east-1:1:listen"
queueARN := "arn:minio:sqs:us-east-1:1:redis"
Expand Down
20 changes: 10 additions & 10 deletions cmd/format-config-v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ func TestInitFormatXLErrors(t *testing.T) {

// All disks API return disk not found
for i := 0; i < 16; i++ {
d := xl.storageDisks[i].(*posix)
d := xl.storageDisks[i].(*retryStorage)
testStorageDisks[i] = &naughtyDisk{disk: d, defaultErr: errDiskNotFound}
}
if err := initFormatXL(testStorageDisks); err != errDiskNotFound {
Expand All @@ -624,7 +624,7 @@ func TestInitFormatXLErrors(t *testing.T) {

// All disks returns disk not found in the fourth call
for i := 0; i < 15; i++ {
d := xl.storageDisks[i].(*posix)
d := xl.storageDisks[i].(*retryStorage)
testStorageDisks[i] = &naughtyDisk{disk: d, defaultErr: errDiskNotFound, errors: map[int]error{0: nil, 1: nil, 2: nil}}
}
if err := initFormatXL(testStorageDisks); err != errDiskNotFound {
Expand Down Expand Up @@ -720,9 +720,9 @@ func TestLoadFormatXLErrs(t *testing.T) {
xl.storageDisks[11] = nil

// disk 12 returns faulty disk
posixDisk, ok := xl.storageDisks[12].(*posix)
posixDisk, ok := xl.storageDisks[12].(*retryStorage)
if !ok {
t.Fatal("storage disk is not *posix type")
t.Fatal("storage disk is not *retryStorage type")
}
xl.storageDisks[10] = newNaughtyDisk(posixDisk, nil, errFaultyDisk)
if _, err = loadFormatXL(xl.storageDisks, 8); err != errFaultyDisk {
Expand All @@ -749,9 +749,9 @@ func TestLoadFormatXLErrs(t *testing.T) {

// disks 0..10 returns disk not found
for i := 0; i <= 10; i++ {
posixDisk, ok := xl.storageDisks[i].(*posix)
posixDisk, ok := xl.storageDisks[i].(*retryStorage)
if !ok {
t.Fatal("storage disk is not *posix type")
t.Fatal("storage disk is not *retryStorage type")
}
xl.storageDisks[i] = newNaughtyDisk(posixDisk, nil, errDiskNotFound)
}
Expand Down Expand Up @@ -881,9 +881,9 @@ func TestHealFormatXLCorruptedDisksErrs(t *testing.T) {
t.Fatal(err)
}
xl = obj.(*xlObjects)
posixDisk, ok := xl.storageDisks[0].(*posix)
posixDisk, ok := xl.storageDisks[0].(*retryStorage)
if !ok {
t.Fatal("storage disk is not *posix type")
t.Fatal("storage disk is not *retryStorage type")
}
xl.storageDisks[0] = newNaughtyDisk(posixDisk, nil, errFaultyDisk)
if err = healFormatXLCorruptedDisks(xl.storageDisks); err != errFaultyDisk {
Expand Down Expand Up @@ -1036,9 +1036,9 @@ func TestHealFormatXLFreshDisksErrs(t *testing.T) {
t.Fatal(err)
}
xl = obj.(*xlObjects)
posixDisk, ok := xl.storageDisks[0].(*posix)
posixDisk, ok := xl.storageDisks[0].(*retryStorage)
if !ok {
t.Fatal("storage disk is not *posix type")
t.Fatal("storage disk is not *retryStorage type")
}
xl.storageDisks[0] = newNaughtyDisk(posixDisk, nil, errFaultyDisk)
if err = healFormatXLFreshDisks(xl.storageDisks); err != errFaultyDisk {
Expand Down
4 changes: 2 additions & 2 deletions cmd/fs-v1-metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func TestReadFSMetadata(t *testing.T) {
}

// Test with corrupted disk
fsStorage := fs.storage.(*posix)
fsStorage := fs.storage.(*retryStorage)
naughty := newNaughtyDisk(fsStorage, nil, errFaultyDisk)
fs.storage = naughty
if _, err := readFSMetadata(fs.storage, ".minio.sys", fsPath); errorCause(err) != errFaultyDisk {
Expand Down Expand Up @@ -111,7 +111,7 @@ func TestWriteFSMetadata(t *testing.T) {
}

// Reading metadata with a corrupted disk
fsStorage := fs.storage.(*posix)
fsStorage := fs.storage.(*retryStorage)
for i := 1; i <= 2; i++ {
naughty := newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk, i + 1: errFaultyDisk}, nil)
fs.storage = naughty
Expand Down
6 changes: 3 additions & 3 deletions cmd/fs-v1-multipart-common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func TestFSIsBucketExist(t *testing.T) {
}

// Using a faulty disk
fsStorage := fs.storage.(*posix)
fsStorage := fs.storage.(*retryStorage)
naughty := newNaughtyDisk(fsStorage, nil, errFaultyDisk)
fs.storage = naughty
if found := fs.isBucketExist(bucketName); found {
Expand Down Expand Up @@ -92,7 +92,7 @@ func TestFSIsUploadExists(t *testing.T) {
}

// isUploadIdExists with a faulty disk should return false
fsStorage := fs.storage.(*posix)
fsStorage := fs.storage.(*retryStorage)
naughty := newNaughtyDisk(fsStorage, nil, errFaultyDisk)
fs.storage = naughty
if exists := fs.isUploadIDExists(bucketName, objectName, uploadID); exists {
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestFSWriteUploadJSON(t *testing.T) {
}

// isUploadIdExists with a faulty disk should return false
fsStorage := fs.storage.(*posix)
fsStorage := fs.storage.(*retryStorage)
for i := 1; i <= 3; i++ {
naughty := newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil)
fs.storage = naughty
Expand Down
8 changes: 4 additions & 4 deletions cmd/fs-v1-multipart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func TestNewMultipartUploadFaultyDisk(t *testing.T) {
}

// Test with faulty disk
fsStorage := fs.storage.(*posix)
fsStorage := fs.storage.(*retryStorage)
for i := 1; i <= 5; i++ {
// Faulty disk generates errFaultyDisk at 'i' storage api call number
fs.storage = newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil)
Expand Down Expand Up @@ -82,7 +82,7 @@ func TestPutObjectPartFaultyDisk(t *testing.T) {
sha256sum := ""

// Test with faulty disk
fsStorage := fs.storage.(*posix)
fsStorage := fs.storage.(*retryStorage)
for i := 1; i <= 7; i++ {
// Faulty disk generates errFaultyDisk at 'i' storage api call number
fs.storage = newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil)
Expand Down Expand Up @@ -138,7 +138,7 @@ func TestCompleteMultipartUploadFaultyDisk(t *testing.T) {

parts := []completePart{{PartNumber: 1, ETag: md5Hex}}

fsStorage := fs.storage.(*posix)
fsStorage := fs.storage.(*retryStorage)
for i := 1; i <= 3; i++ {
// Faulty disk generates errFaultyDisk at 'i' storage api call number
fs.storage = newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil)
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestListMultipartUploadsFaultyDisk(t *testing.T) {
t.Fatal("Unexpected error ", err)
}

fsStorage := fs.storage.(*posix)
fsStorage := fs.storage.(*retryStorage)
for i := 1; i <= 4; i++ {
// Faulty disk generates errFaultyDisk at 'i' storage api call number
fs.storage = newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil)
Expand Down
20 changes: 10 additions & 10 deletions cmd/fs-v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ func TestNewFS(t *testing.T) {
}

// Initializes all disks with XL
err = waitForFormatDisks(true, endpoints, xlStorageDisks)
formattedDisks, err := waitForFormatDisks(true, endpoints, xlStorageDisks)
if err != nil {
t.Fatalf("Unable to format XL %s", err)
}
_, err = newXLObjects(xlStorageDisks)
_, err = newXLObjects(formattedDisks)
if err != nil {
t.Fatalf("Unable to initialize XL object, %s", err)
}
Expand All @@ -79,15 +79,15 @@ func TestNewFS(t *testing.T) {
}

for _, testCase := range testCases {
if err = waitForFormatDisks(true, endpoints, []StorageAPI{testCase.disk}); err != testCase.expectedErr {
if _, err = waitForFormatDisks(true, endpoints, []StorageAPI{testCase.disk}); err != testCase.expectedErr {
t.Errorf("expected: %s, got :%s", testCase.expectedErr, err)
}
}
_, err = newFSObjects(nil)
if err != errInvalidArgument {
t.Errorf("Expecting error invalid argument, got %s", err)
}
_, err = newFSObjects(xlStorageDisks[0])
_, err = newFSObjects(&retryStorage{xlStorageDisks[0]})
if err != nil {
errMsg := "Unable to recognize backend format, Disk is not in FS format."
if err.Error() == errMsg {
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestFSShutdown(t *testing.T) {
/* for i := 1; i <= 5; i++ {
fs, disk := prepareTest()
fs.DeleteObject(bucketName, objectName)
fsStorage := fs.storage.(*posix)
fsStorage := fs.storage.(*retryStorage)
fs.storage = newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil)
if err := fs.Shutdown(); errorCause(err) != errFaultyDisk {
t.Fatal(i, ", Got unexpected fs shutdown error: ", err)
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestFSLoadFormatFS(t *testing.T) {
t.Fatal("Should return an error here")
}
// Loading format file from faulty disk
fsStorage := fs.storage.(*posix)
fsStorage := fs.storage.(*retryStorage)
fs.storage = newNaughtyDisk(fsStorage, nil, errFaultyDisk)
_, err = loadFormatFS(fs.storage)
if err != errFaultyDisk {
Expand Down Expand Up @@ -197,7 +197,7 @@ func TestFSGetBucketInfo(t *testing.T) {
}

// Loading format file from faulty disk
fsStorage := fs.storage.(*posix)
fsStorage := fs.storage.(*retryStorage)
fs.storage = newNaughtyDisk(fsStorage, nil, errFaultyDisk)
_, err = fs.GetBucketInfo(bucketName)
if errorCause(err) != errFaultyDisk {
Expand Down Expand Up @@ -239,7 +239,7 @@ func TestFSDeleteObject(t *testing.T) {
}

// Loading format file from faulty disk
fsStorage := fs.storage.(*posix)
fsStorage := fs.storage.(*retryStorage)
fs.storage = newNaughtyDisk(fsStorage, nil, errFaultyDisk)
if err := fs.DeleteObject(bucketName, objectName); errorCause(err) != errFaultyDisk {
t.Fatal("Unexpected error: ", err)
Expand Down Expand Up @@ -278,7 +278,7 @@ func TestFSDeleteBucket(t *testing.T) {
obj.MakeBucket(bucketName)

// Loading format file from faulty disk
fsStorage := fs.storage.(*posix)
fsStorage := fs.storage.(*retryStorage)
for i := 1; i <= 2; i++ {
fs.storage = newNaughtyDisk(fsStorage, map[int]error{i: errFaultyDisk}, nil)
if err := fs.DeleteBucket(bucketName); errorCause(err) != errFaultyDisk {
Expand Down Expand Up @@ -317,7 +317,7 @@ func TestFSListBuckets(t *testing.T) {
}

// Test ListBuckets with faulty disks
fsStorage := fs.storage.(*posix)
fsStorage := fs.storage.(*retryStorage)
for i := 1; i <= 2; i++ {
fs.storage = newNaughtyDisk(fsStorage, nil, errFaultyDisk)
if _, err := fs.ListBuckets(); errorCause(err) != errFaultyDisk {
Expand Down
2 changes: 1 addition & 1 deletion cmd/logger-file-hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func (l *localFile) Fire(entry *logrus.Entry) error {
if err != nil {
return fmt.Errorf("Unable to read entry, %v", err)
}
l.File.Write([]byte(line + "\n"))
l.File.Write([]byte(line))
l.File.Sync()
return nil
}
Expand Down
18 changes: 16 additions & 2 deletions cmd/naughty-disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import (
// Programmed errors are stored in errors field.
type naughtyDisk struct {
// The real disk
disk *posix
disk *retryStorage
// Programmed errors: API call number => error to return
errors map[int]error
// The error to return when no error value is programmed
Expand All @@ -39,14 +39,28 @@ type naughtyDisk struct {
mu sync.Mutex
}

func newNaughtyDisk(d *posix, errs map[int]error, defaultErr error) *naughtyDisk {
func newNaughtyDisk(d *retryStorage, errs map[int]error, defaultErr error) *naughtyDisk {
return &naughtyDisk{disk: d, errors: errs, defaultErr: defaultErr}
}

func (d *naughtyDisk) String() string {
return d.disk.String()
}

func (d *naughtyDisk) Init() (err error) {
if err = d.calcError(); err != nil {
return err
}
return d.disk.Init()
}

func (d *naughtyDisk) Close() (err error) {
if err = d.calcError(); err != nil {
return err
}
return d.disk.Close()
}

func (d *naughtyDisk) calcError() (err error) {
d.mu.Lock()
defer d.mu.Unlock()
Expand Down
28 changes: 2 additions & 26 deletions cmd/net-rpc-client.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,32 +155,8 @@ func (rpcClient *RPCClient) Call(serviceMethod string, args interface{}, reply i
}
}

// If the RPC fails due to a network-related error, then we reset
// rpc.Client for a subsequent reconnect.
err := rpcLocalStack.Call(serviceMethod, args, reply)
if err != nil {
// Any errors other than rpc.ErrShutdown just return quickly.
if err != rpc.ErrShutdown {
return err
} // else rpc.ErrShutdown returned by rpc.Call

// Reset the underlying rpc connection before
// moving to reconnect.
rpcClient.clearRPCClient()

// Close the underlying connection before reconnect.
rpcLocalStack.Close()

// Try once more to re-connect.
rpcLocalStack, err = rpcClient.dialRPCClient()
if err != nil {
return err
}

// Attempt the rpc.Call once again, upon any error now just give up.
err = rpcLocalStack.Call(serviceMethod, args, reply)
}
return err
// If the RPC fails due to a network-related error
return rpcLocalStack.Call(serviceMethod, args, reply)
}

// Close closes the underlying socket file descriptor.
Expand Down
10 changes: 10 additions & 0 deletions cmd/posix.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,16 @@ func (s *posix) String() string {
return s.diskPath
}

// Init - this is a dummy call.
func (s *posix) Init() error {
return nil
}

// Close - this is a dummy call.
func (s *posix) Close() error {
return nil
}

// DiskInfo provides current information about disk space usage,
// total free inodes and underlying filesystem.
func (s *posix) DiskInfo() (info disk.Info, err error) {
Expand Down
Loading

0 comments on commit 6efee20

Please sign in to comment.