Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC v1.12.0: Register all services before call server.Serve. #562

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
gRPC v1.12.0: Register all services before call server.Serve.
Reference dcrwallet for this implementation.
  • Loading branch information
nhandl3 authored and Nhan D Le committed Dec 6, 2018
commit acb9bf290c15d6a30917e2e7b6084eab18db11d7
8 changes: 7 additions & 1 deletion btcwallet.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package main

import (
"github.com/btcsuite/btcwallet/rpc/rpcserver"
"io/ioutil"
"net"
"net/http"
Expand Down Expand Up @@ -87,7 +88,12 @@ func walletMain() error {
}

loader.RunAfterLoad(func(w *wallet.Wallet) {
startWalletRPCServices(w, rpcs, legacyRPCServer)
if rpcs != nil {
rpcserver.StartWalletService(rpcs, w)
}
if legacyRPCServer != nil {
legacyRPCServer.RegisterWallet(w)
}
})

if !cfg.NoInitialLoad {
Expand Down
71 changes: 61 additions & 10 deletions rpc/rpcserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package rpcserver
import (
"bytes"
"errors"
"google.golang.org/grpc/status"
"sync"
"sync/atomic"
"time"

"golang.org/x/net/context"
Expand Down Expand Up @@ -104,22 +106,60 @@ type versionServer struct {

// walletServer provides wallet services for RPC clients.
type walletServer struct {
ready uint32 // atomic
wallet *wallet.Wallet
}

// loaderServer provides RPC clients with the ability to load and close wallets,
// as well as establishing a RPC connection to a btcd consensus server.
type loaderServer struct {
ready uint32 // atomic
loader *wallet.Loader
activeNet *netparams.Params
rpcClient *chain.RPCClient
mu sync.Mutex
}

// StartVersionService creates an implementation of the VersionService and
// registers it with the gRPC server.
func StartVersionService(server *grpc.Server) {
pb.RegisterVersionServiceServer(server, &versionServer{})
// Singleton implementations of each service. Not all services are immediately
// usable.
var (
versionService versionServer
walletService walletServer
loaderService loaderServer
)

// RegisterServices registers implementations of each gRPC service and registers
// it with the server. Not all service are ready to be used after registration.
func RegisterServices(server *grpc.Server) {
pb.RegisterVersionServiceServer(server, &versionService)
pb.RegisterWalletServiceServer(server, &walletService)
pb.RegisterWalletLoaderServiceServer(server, &loaderService)
}

var serviceMap = map[string]interface{}{
"walletrpc.VersionService": &versionService,
"walletrpc.WalletService": &walletService,
"walletrpc.WalletLoaderService": &loaderService,
}

// ServiceReady returns nil when the service is ready and a gRPC error when not.
func ServiceReady(service string) error {
s, ok := serviceMap[service]
if !ok {
return status.Errorf(codes.Unimplemented, "service %s not found", service)
}
type readyChecker interface {
checkReady() bool
}
ready := true
r, ok := s.(readyChecker)
if ok {
ready = r.checkReady()
}
if !ready {
return status.Errorf(codes.FailedPrecondition, "service %v is not ready", service)
}
return nil
}

func (*versionServer) Version(ctx context.Context, req *pb.VersionRequest) (*pb.VersionResponse, error) {
Expand All @@ -134,8 +174,14 @@ func (*versionServer) Version(ctx context.Context, req *pb.VersionRequest) (*pb.
// StartWalletService creates an implementation of the WalletService and
// registers it with the gRPC server.
func StartWalletService(server *grpc.Server, wallet *wallet.Wallet) {
service := &walletServer{wallet}
pb.RegisterWalletServiceServer(server, service)
walletService.wallet = wallet
if atomic.SwapUint32(&walletService.ready, 1) != 0 {
panic("service already started")
}
}

func (s *walletServer) checkReady() bool {
return atomic.LoadUint32(&s.ready) != 0
}

func (s *walletServer) Ping(ctx context.Context, req *pb.PingRequest) (*pb.PingResponse, error) {
Expand Down Expand Up @@ -701,11 +747,16 @@ func (s *walletServer) AccountNotifications(req *pb.AccountNotificationsRequest,

// StartWalletLoaderService creates an implementation of the WalletLoaderService
// and registers it with the gRPC server.
func StartWalletLoaderService(server *grpc.Server, loader *wallet.Loader,
activeNet *netparams.Params) {
func StartWalletLoaderService(server *grpc.Server, loader *wallet.Loader, activeNet *netparams.Params) {
loaderService.loader = loader
loaderService.activeNet = activeNet
if atomic.SwapUint32(&loaderService.ready, 1) != 0 {
panic("service already started")
}
}

service := &loaderServer{loader: loader, activeNet: activeNet}
pb.RegisterWalletLoaderServiceServer(server, service)
func (s *loaderServer) checkReady() bool {
return atomic.LoadUint32(&s.ready) != 0
}

func (s *loaderServer) CreateWallet(ctx context.Context, req *pb.CreateWalletRequest) (
Expand Down
68 changes: 53 additions & 15 deletions rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package main

import (
"context"
"crypto/tls"
"errors"
"fmt"
"google.golang.org/grpc/peer"
"io/ioutil"
"net"
"os"
Expand Down Expand Up @@ -135,8 +137,12 @@ func startRPCServers(walletLoader *wallet.Loader) (*grpc.Server, *legacyrpc.Serv
return nil, nil, err
}
creds := credentials.NewServerTLSFromCert(&keyPair)
server = grpc.NewServer(grpc.Creds(creds))
rpcserver.StartVersionService(server)
server = grpc.NewServer(
grpc.Creds(creds),
grpc.StreamInterceptor(interceptStreaming),
grpc.UnaryInterceptor(interceptUnary),
)
rpcserver.RegisterServices(server)
rpcserver.StartWalletLoaderService(server, walletLoader, activeNet)
for _, lis := range listeners {
lis := lis
Expand Down Expand Up @@ -176,6 +182,51 @@ func startRPCServers(walletLoader *wallet.Loader) (*grpc.Server, *legacyrpc.Serv
return server, legacyServer, nil
}

// serviceName returns the package.service segment from the full gRPC method
// name `/package.service/method`.
func serviceName(method string) string {
// Slice off first /
method = method[1:]
// Keep everything before the next /
return method[:strings.IndexRune(method, '/')]
}

func interceptStreaming(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
p, ok := peer.FromContext(ss.Context())
if ok {
grpcLog.Infof("Streaming method %s invoked by %s", info.FullMethod,
p.Addr.String())
}
err := rpcserver.ServiceReady(serviceName(info.FullMethod))
if err != nil {
return err
}
err = handler(srv, ss)
if err != nil && ok {
grpcLog.Errorf("Streaming method %s invoked by %s errored: %v",
info.FullMethod, p.Addr.String(), err)
}
return err
}

func interceptUnary(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
p, ok := peer.FromContext(ctx)
if ok {
grpcLog.Infof("Unary method %s invoked by %s", info.FullMethod,
p.Addr.String())
}
err = rpcserver.ServiceReady(serviceName(info.FullMethod))
if err != nil {
return nil, err
}
resp, err = handler(ctx, req)
if err != nil && ok {
grpcLog.Errorf("Unary method %s invoked by %s errored: %v",
info.FullMethod, p.Addr.String(), err)
}
return resp, err
}

type listenFunc func(net string, laddr string) (net.Listener, error)

// makeListeners splits the normalized listen addresses into IPv4 and IPv6
Expand Down Expand Up @@ -239,16 +290,3 @@ func makeListeners(normalizedListenAddrs []string, listen listenFunc) []net.List
}
return listeners
}

// startWalletRPCServices associates each of the (optionally-nil) RPC servers
// with a wallet to enable remote wallet access. For the GRPC server, this
// registers the WalletService service, and for the legacy JSON-RPC server it
// enables methods that require a loaded wallet.
func startWalletRPCServices(wallet *wallet.Wallet, server *grpc.Server, legacyServer *legacyrpc.Server) {
if server != nil {
rpcserver.StartWalletService(server, wallet)
}
if legacyServer != nil {
legacyServer.RegisterWallet(wallet)
}
}