Skip to content

Commit

Permalink
xrootd/server: move fshander from cmd\xrd-srv
Browse files Browse the repository at this point in the history
  • Loading branch information
EgorMatirov authored and sbinet committed Aug 7, 2018
1 parent 4a999e6 commit d1e8655
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 39 deletions.
2 changes: 1 addition & 1 deletion xrootd/cmd/xrd-srv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func main() {
log.Fatalf("could not listen on %q: %v", *addr, err)
}

srv := server.New(newHandler(baseDir), func(err error) {
srv := server.New(server.NewFSHandler(baseDir), func(err error) {
log.Printf("an error occured: %v", err)
})

Expand Down
38 changes: 19 additions & 19 deletions xrootd/cmd/xrd-srv/handler.go → xrootd/server/fshandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main // import "go-hep.org/x/hep/xrootd/cmd/xrd-srv"
package server // import "go-hep.org/x/hep/xrootd/server"

import (
"fmt"
Expand All @@ -13,7 +13,6 @@ import (
"path"
"sync"

"go-hep.org/x/hep/xrootd/server"
"go-hep.org/x/hep/xrootd/xrdfs"
"go-hep.org/x/hep/xrootd/xrdproto"
"go-hep.org/x/hep/xrootd/xrdproto/dirlist"
Expand All @@ -27,9 +26,9 @@ import (
"go-hep.org/x/hep/xrootd/xrdproto/xrdclose"
)

// handler implements server.Handler API by making request to the backing filesystem at basePath.
type handler struct {
server.Handler
// fshandler implements server.Handler API by making request to the backing filesystem at basePath.
type fshandler struct {
Handler
basePath string

// map + RWMutex works a bit faster and with significant lower memory usage under Linux
Expand All @@ -43,16 +42,17 @@ type session struct {
handles map[xrdfs.FileHandle]*os.File
}

func newHandler(basePath string) server.Handler {
return &handler{
Handler: server.Default(),
// NewFSHandler creates a Handler that passes requests to the backing filesystem at basePath.
func NewFSHandler(basePath string) Handler {
return &fshandler{
Handler: Default(),
basePath: basePath,
sessions: make(map[[16]byte]*session),
}
}

// Dirlist implements server.Handler.Dirlist.
func (h *handler) Dirlist(sessionID [16]byte, request *dirlist.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
func (h *fshandler) Dirlist(sessionID [16]byte, request *dirlist.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
files, err := ioutil.ReadDir(path.Join(h.basePath, request.Path))
if err != nil {
return xrdproto.ServerError{
Expand All @@ -76,7 +76,7 @@ func (h *handler) Dirlist(sessionID [16]byte, request *dirlist.Request) (xrdprot
}

// Open implements server.Handler.Open.
func (h *handler) Open(sessionID [16]byte, request *open.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
func (h *fshandler) Open(sessionID [16]byte, request *open.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
var flag int
if request.Options&xrdfs.OpenOptionsOpenRead != 0 {
flag |= os.O_RDONLY
Expand Down Expand Up @@ -169,7 +169,7 @@ func (h *handler) Open(sessionID [16]byte, request *open.Request) (xrdproto.Mars
}

// Close implements server.Handler.Close.
func (h *handler) Close(sessionID [16]byte, request *xrdclose.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
func (h *fshandler) Close(sessionID [16]byte, request *xrdclose.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
h.mu.RLock()
sess, ok := h.sessions[sessionID]
h.mu.RUnlock()
Expand Down Expand Up @@ -201,7 +201,7 @@ func (h *handler) Close(sessionID [16]byte, request *xrdclose.Request) (xrdproto
}

// Read implements server.Handler.Read.
func (h *handler) Read(sessionID [16]byte, request *read.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
func (h *fshandler) Read(sessionID [16]byte, request *read.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
file := h.getFile(sessionID, request.Handle)
if file == nil {
return xrdproto.ServerError{
Expand All @@ -223,7 +223,7 @@ func (h *handler) Read(sessionID [16]byte, request *read.Request) (xrdproto.Mars
}

// Write implements server.Handler.Write.
func (h *handler) Write(sessionID [16]byte, request *write.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
func (h *fshandler) Write(sessionID [16]byte, request *write.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
file := h.getFile(sessionID, request.Handle)
if file == nil {
return xrdproto.ServerError{
Expand All @@ -243,7 +243,7 @@ func (h *handler) Write(sessionID [16]byte, request *write.Request) (xrdproto.Ma
return nil, xrdproto.Ok
}

func (h *handler) getFile(sessionID [16]byte, handle xrdfs.FileHandle) *os.File {
func (h *fshandler) getFile(sessionID [16]byte, handle xrdfs.FileHandle) *os.File {
h.mu.RLock()
sess, ok := h.sessions[sessionID]
h.mu.RUnlock()
Expand All @@ -260,7 +260,7 @@ func (h *handler) getFile(sessionID [16]byte, handle xrdfs.FileHandle) *os.File
}

// Stat implements server.Handler.Stat.
func (h *handler) Stat(sessionID [16]byte, request *stat.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
func (h *fshandler) Stat(sessionID [16]byte, request *stat.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
if request.Options&stat.OptionsVFS != 0 {
// TODO: handle virtual stat info.
return xrdproto.ServerError{
Expand Down Expand Up @@ -295,7 +295,7 @@ func (h *handler) Stat(sessionID [16]byte, request *stat.Request) (xrdproto.Mars
}

// Truncate implements server.Handler.Truncate.
func (h *handler) Truncate(sessionID [16]byte, request *truncate.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
func (h *fshandler) Truncate(sessionID [16]byte, request *truncate.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
var err error
if len(request.Path) == 0 {
file := h.getFile(sessionID, request.Handle)
Expand All @@ -321,7 +321,7 @@ func (h *handler) Truncate(sessionID [16]byte, request *truncate.Request) (xrdpr
}

// Sync implements server.Handler.Sync.
func (h *handler) Sync(sessionID [16]byte, request *xrdsync.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
func (h *fshandler) Sync(sessionID [16]byte, request *xrdsync.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
file := h.getFile(sessionID, request.Handle)
if file == nil {
return xrdproto.ServerError{
Expand All @@ -341,7 +341,7 @@ func (h *handler) Sync(sessionID [16]byte, request *xrdsync.Request) (xrdproto.M
}

// Rename implements server.Handler.Rename.
func (h *handler) Rename(sessionID [16]byte, request *mv.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
func (h *fshandler) Rename(sessionID [16]byte, request *mv.Request) (xrdproto.Marshaler, xrdproto.ResponseStatus) {
if err := os.Rename(path.Join(h.basePath, request.OldPath), path.Join(h.basePath, request.NewPath)); err != nil {
return xrdproto.ServerError{
Code: xrdproto.IOError,
Expand All @@ -353,7 +353,7 @@ func (h *handler) Rename(sessionID [16]byte, request *mv.Request) (xrdproto.Mars
}

// CloseSession implements server.Handler.CloseSession.
func (h *handler) CloseSession(sessionID [16]byte) error {
func (h *fshandler) CloseSession(sessionID [16]byte) error {
h.mu.Lock()
sess, ok := h.sessions[sessionID]
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package main // import "go-hep.org/x/hep/xrootd/cmd/xrd-srv"
package server_test // import "go-hep.org/x/hep/xrootd/server"

import (
"context"
Expand Down Expand Up @@ -51,7 +51,7 @@ func createServer(errorHandler func(err error)) (srv *server.Server, addr, baseD
return nil, "", "", errors.Errorf("xrd-srv: could not listen on %q: %v", addr, err)
}

srv = server.New(newHandler(baseDir), func(err error) {
srv = server.New(server.NewFSHandler(baseDir), func(err error) {
errorHandler(errors.Wrap(err, "xrd-srv: an error occured"))
})

Expand Down
35 changes: 18 additions & 17 deletions xrootd/server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package server // import "go-hep.org/x/hep/xrootd/server"
package server_test // import "go-hep.org/x/hep/xrootd/server"

import (
"context"
Expand All @@ -11,6 +11,7 @@ import (
"testing"

"go-hep.org/x/hep/xrootd/internal/xrdenc"
"go-hep.org/x/hep/xrootd/server"
"go-hep.org/x/hep/xrootd/xrdproto"
"go-hep.org/x/hep/xrootd/xrdproto/dirlist"
"go-hep.org/x/hep/xrootd/xrdproto/handshake"
Expand Down Expand Up @@ -63,10 +64,10 @@ func TestServe_Handshake(t *testing.T) {
listener := &pipeListener{conns: connsCh, close: make(chan struct{})}
defer listener.Close()

server := New(Default(), func(err error) {
srv := server.New(server.Default(), func(err error) {
t.Error(err)
})
defer server.Shutdown(context.Background())
defer srv.Shutdown(context.Background())

go func() {
req := handshake.NewRequest()
Expand Down Expand Up @@ -97,10 +98,10 @@ func TestServe_Handshake(t *testing.T) {
t.Errorf("wrong handshake response:\ngot = %v\nwant = %v", handshakeResp, want)
}

server.Shutdown(context.Background())
srv.Shutdown(context.Background())
}()

if err := server.Serve(listener); err != nil && err != ErrServerClosed {
if err := srv.Serve(listener); err != nil && err != server.ErrServerClosed {
t.Fatalf("unexpected error: %v", err)
}
}
Expand All @@ -115,10 +116,10 @@ func TestServe_Login(t *testing.T) {
listener := &pipeListener{conns: connsCh, close: make(chan struct{})}
defer listener.Close()

server := New(Default(), func(err error) {
srv := server.New(server.Default(), func(err error) {
t.Error(err)
})
defer server.Shutdown(context.Background())
defer srv.Shutdown(context.Background())

go func() {
handshakeReq := handshake.NewRequest()
Expand Down Expand Up @@ -158,10 +159,10 @@ func TestServe_Login(t *testing.T) {

// TODO: validate loginResp.

server.Shutdown(context.Background())
srv.Shutdown(context.Background())
}()

if err := server.Serve(listener); err != nil && err != ErrServerClosed {
if err := srv.Serve(listener); err != nil && err != server.ErrServerClosed {
t.Fatalf("unexpected error: %v", err)
}
}
Expand All @@ -176,10 +177,10 @@ func TestServe_Protocol(t *testing.T) {
listener := &pipeListener{conns: connsCh, close: make(chan struct{})}
defer listener.Close()

server := New(Default(), func(err error) {
srv := server.New(server.Default(), func(err error) {
t.Error(err)
})
defer server.Shutdown(context.Background())
defer srv.Shutdown(context.Background())

go func() {
handshakeReq := handshake.NewRequest()
Expand Down Expand Up @@ -223,10 +224,10 @@ func TestServe_Protocol(t *testing.T) {
t.Errorf("wrong response:\ngot = %v\nwant = %v", protocolResp, want)
}

server.Shutdown(context.Background())
srv.Shutdown(context.Background())
}()

if err := server.Serve(listener); err != nil && err != ErrServerClosed {
if err := srv.Serve(listener); err != nil && err != server.ErrServerClosed {
t.Fatalf("unexpected error: %v", err)
}
}
Expand All @@ -241,10 +242,10 @@ func TestServe_Dirlist(t *testing.T) {
listener := &pipeListener{conns: connsCh, close: make(chan struct{})}
defer listener.Close()

server := New(Default(), func(err error) {
srv := server.New(server.Default(), func(err error) {
t.Error(err)
})
defer server.Shutdown(context.Background())
defer srv.Shutdown(context.Background())

go func() {
handshakeReq := handshake.NewRequest()
Expand Down Expand Up @@ -288,10 +289,10 @@ func TestServe_Dirlist(t *testing.T) {
t.Errorf("wrong response:\ngot = %v\nwant = %v", errorResp, want)
}

server.Shutdown(context.Background())
srv.Shutdown(context.Background())
}()

if err := server.Serve(listener); err != nil && err != ErrServerClosed {
if err := srv.Serve(listener); err != nil && err != server.ErrServerClosed {
t.Fatalf("unexpected error: %v", err)
}
}

0 comments on commit d1e8655

Please sign in to comment.