Skip to content

Commit

Permalink
feat(gateway): reenabling gateway, adding flag to enable (celestiaorg…
Browse files Browse the repository at this point in the history
…#1199)

It adds the gateway back onto the node (closes celestiaorg#1182) with it's own
config, disabled by default, which is activated by the flag `--gateway`
(closes celestiaorg#1183).
  • Loading branch information
Ryan authored Oct 28, 2022
1 parent fb17d24 commit c537ec4
Show file tree
Hide file tree
Showing 13 changed files with 254 additions and 44 deletions.
42 changes: 23 additions & 19 deletions api/gateway/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,66 +2,70 @@ package gateway

import (
"context"
"fmt"
"net"
"net/http"
"sync/atomic"
"time"

"github.com/gorilla/mux"
)

// Server represents an RPC server on the Node.
// TODO @renaynay: eventually, gateway server should be able to be toggled on and off.
// Server represents a gateway server on the Node.
type Server struct {
cfg Config

srv *http.Server
srvMux *mux.Router // http request multiplexer
listener net.Listener
started atomic.Bool
}

// NewServer returns a new RPC Server.
func NewServer(cfg Config) *Server {
// NewServer returns a new gateway Server.
func NewServer(address string, port string) *Server {
srvMux := mux.NewRouter()
srvMux.Use(setContentType)

server := &Server{
cfg: cfg,
srvMux: srvMux,
}
server.srv = &http.Server{
Addr: address + ":" + port,
Handler: server,
// the amount of time allowed to read request headers. set to the default 2 seconds
ReadHeaderTimeout: 2 * time.Second,
}
return server
}

// Start starts the RPC Server, listening on the given address.
// Start starts the gateway Server, listening on the given address.
func (s *Server) Start(context.Context) error {
listenAddr := fmt.Sprintf("%s:%s", s.cfg.Address, s.cfg.Port)
listener, err := net.Listen("tcp", listenAddr)
couldStart := s.started.CompareAndSwap(false, true)
if !couldStart {
log.Warn("cannot start server: already started")
return nil
}

listener, err := net.Listen("tcp", s.srv.Addr)
if err != nil {
return err
}
s.listener = listener
log.Infow("RPC server started", "listening on", listener.Addr().String())
log.Infow("server started", "listening on", listener.Addr().String())
//nolint:errcheck
go s.srv.Serve(listener)
return nil
}

// Stop stops the RPC Server.
func (s *Server) Stop(context.Context) error {
// if server already stopped, return
if s.listener == nil {
// Stop stops the gateway Server.
func (s *Server) Stop(ctx context.Context) error {
couldStop := s.started.CompareAndSwap(true, false)
if !couldStop {
log.Warn("cannot stop server: already stopped")
return nil
}
if err := s.listener.Close(); err != nil {
err := s.srv.Shutdown(ctx)
if err != nil {
return err
}
s.listener = nil
log.Info("RPC server stopped")
log.Info("server stopped")
return nil
}

Expand Down
8 changes: 3 additions & 5 deletions api/gateway/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,8 @@ import (
)

func TestServer(t *testing.T) {
server := NewServer(Config{
Address: "0.0.0.0",
Port: "0",
})
address, port := "localhost", "0"
server := NewServer(address, port)

ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
Expand All @@ -27,7 +25,7 @@ func TestServer(t *testing.T) {
ping := new(ping)
server.RegisterHandlerFunc("/ping", ping.ServeHTTP, http.MethodGet)

url := fmt.Sprintf("http://%s/ping", server.listener.Addr().String())
url := fmt.Sprintf("http://%s/ping", server.ListenAddr())

resp, err := http.Get(url)
require.NoError(t, err)
Expand Down
21 changes: 16 additions & 5 deletions api/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package rpc
import (
"context"
"net/http"
"sync/atomic"
"time"

"github.com/filecoin-project/go-jsonrpc"
Expand All @@ -12,8 +13,9 @@ import (
var log = logging.Logger("rpc")

type Server struct {
http *http.Server
rpc *jsonrpc.RPCServer
http *http.Server
rpc *jsonrpc.RPCServer
started atomic.Bool
}

func NewServer(address string, port string) *Server {
Expand All @@ -37,20 +39,29 @@ func (s *Server) RegisterService(namespace string, service interface{}) {

// Start starts the RPC Server.
func (s *Server) Start(context.Context) error {
couldStart := s.started.CompareAndSwap(false, true)
if !couldStart {
log.Warn("cannot start server: already started")
return nil
}
//nolint:errcheck
go s.http.ListenAndServe()
log.Infow("RPC server started", "listening on", s.http.Addr)
log.Infow("server started", "listening on", s.http.Addr)
return nil
}

// Stop stops the RPC Server.
func (s *Server) Stop(ctx context.Context) error {
// if server already stopped, return
couldStop := s.started.CompareAndSwap(true, false)
if !couldStop {
log.Warn("cannot stop server: already stopped")
return nil
}
err := s.http.Shutdown(ctx)
if err != nil {
return err
}
log.Info("RPC server stopped")
log.Info("server stopped")
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions cmd/celestia/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (

cmdnode "github.com/celestiaorg/celestia-node/cmd"
"github.com/celestiaorg/celestia-node/nodebuilder/core"
"github.com/celestiaorg/celestia-node/nodebuilder/gateway"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
"github.com/celestiaorg/celestia-node/nodebuilder/rpc"
Expand All @@ -22,6 +23,7 @@ func init() {
core.Flags(),
cmdnode.MiscFlags(),
rpc.Flags(),
gateway.Flags(),
state.Flags(),
),
cmdnode.Start(
Expand All @@ -30,6 +32,7 @@ func init() {
core.Flags(),
cmdnode.MiscFlags(),
rpc.Flags(),
gateway.Flags(),
state.Flags(),
),
)
Expand Down Expand Up @@ -76,6 +79,7 @@ var bridgeCmd = &cobra.Command{
}

rpc.ParseFlags(cmd, &cfg.RPC)
gateway.ParseFlags(cmd, &cfg.Gateway)
state.ParseFlags(cmd, &cfg.State)

// set config
Expand Down
4 changes: 4 additions & 0 deletions cmd/celestia/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

cmdnode "github.com/celestiaorg/celestia-node/cmd"
"github.com/celestiaorg/celestia-node/nodebuilder/core"
"github.com/celestiaorg/celestia-node/nodebuilder/gateway"
"github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
Expand All @@ -27,6 +28,7 @@ func init() {
// over an RPC connection with a celestia-core node.
core.Flags(),
rpc.Flags(),
gateway.Flags(),
state.Flags(),
),
cmdnode.Start(
Expand All @@ -38,6 +40,7 @@ func init() {
// over an RPC connection with a celestia-core node.
core.Flags(),
rpc.Flags(),
gateway.Flags(),
state.Flags(),
),
)
Expand Down Expand Up @@ -89,6 +92,7 @@ var fullCmd = &cobra.Command{
}

rpc.ParseFlags(cmd, &cfg.RPC)
gateway.ParseFlags(cmd, &cfg.Gateway)
state.ParseFlags(cmd, &cfg.State)

// set config
Expand Down
5 changes: 5 additions & 0 deletions cmd/celestia/light.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

cmdnode "github.com/celestiaorg/celestia-node/cmd"
"github.com/celestiaorg/celestia-node/nodebuilder/core"
"github.com/celestiaorg/celestia-node/nodebuilder/gateway"
"github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
Expand All @@ -27,6 +28,7 @@ func init() {
// over an RPC connection with a celestia-core node.
core.Flags(),
rpc.Flags(),
gateway.Flags(),
state.Flags(),
),
cmdnode.Start(
Expand All @@ -38,6 +40,7 @@ func init() {
// over an RPC connection with a celestia-core node.
core.Flags(),
rpc.Flags(),
gateway.Flags(),
state.Flags(),
),
)
Expand Down Expand Up @@ -88,7 +91,9 @@ var lightCmd = &cobra.Command{
if err != nil {
return err
}

rpc.ParseFlags(cmd, &cfg.RPC)
gateway.ParseFlags(cmd, &cfg.Gateway)
state.ParseFlags(cmd, &cfg.State)

// set config
Expand Down
31 changes: 17 additions & 14 deletions nodebuilder/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/celestiaorg/celestia-node/nodebuilder/core"
"github.com/celestiaorg/celestia-node/nodebuilder/das"
"github.com/celestiaorg/celestia-node/nodebuilder/gateway"
"github.com/celestiaorg/celestia-node/nodebuilder/header"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/p2p"
Expand All @@ -22,13 +23,14 @@ type ConfigLoader func() (*Config, error)
// Config is main configuration structure for a Node.
// It combines configuration units for all Node subsystems.
type Config struct {
Core core.Config
State state.Config
P2P p2p.Config
RPC rpc.Config
Share share.Config
Header header.Config
DASer das.Config
Core core.Config
State state.Config
P2P p2p.Config
RPC rpc.Config
Gateway gateway.Config
Share share.Config
Header header.Config
DASer das.Config
}

// DefaultConfig provides a default Config for a given Node Type 'tp'.
Expand All @@ -37,13 +39,14 @@ func DefaultConfig(tp node.Type) *Config {
switch tp {
case node.Bridge, node.Light, node.Full:
return &Config{
Core: core.DefaultConfig(),
State: state.DefaultConfig(),
P2P: p2p.DefaultConfig(),
RPC: rpc.DefaultConfig(),
Share: share.DefaultConfig(),
Header: header.DefaultConfig(),
DASer: das.DefaultConfig(),
Core: core.DefaultConfig(),
State: state.DefaultConfig(),
P2P: p2p.DefaultConfig(),
RPC: rpc.DefaultConfig(),
Gateway: gateway.DefaultConfig(),
Share: share.DefaultConfig(),
Header: header.DefaultConfig(),
DASer: das.DefaultConfig(),
}
default:
panic("node: invalid node type")
Expand Down
33 changes: 33 additions & 0 deletions nodebuilder/gateway/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package gateway

import (
"fmt"
"net"
"strconv"
)

type Config struct {
Address string
Port string
Enabled bool
}

func DefaultConfig() Config {
return Config{
Address: "0.0.0.0",
// do NOT expose the same port as celestia-core by default so that both can run on the same machine
Port: "26659",
Enabled: false,
}
}

func (cfg *Config) Validate() error {
if ip := net.ParseIP(cfg.Address); ip == nil {
return fmt.Errorf("gateway: invalid listen address format: %s", cfg.Address)
}
_, err := strconv.Atoi(cfg.Port)
if err != nil {
return fmt.Errorf("gateway: invalid port: %s", err.Error())
}
return nil
}
Loading

0 comments on commit c537ec4

Please sign in to comment.