Skip to content

Commit

Permalink
Merge pull request ava-labs#295 from tyler-smith/TS_ipc_master
Browse files Browse the repository at this point in the history
IPC changes
  • Loading branch information
StephenButtolph authored Aug 11, 2020
2 parents b621751 + c6ce11d commit ac8a903
Show file tree
Hide file tree
Showing 10 changed files with 327 additions and 107 deletions.
33 changes: 0 additions & 33 deletions api/ipcs/chainipc.go

This file was deleted.

91 changes: 27 additions & 64 deletions api/ipcs/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,46 +7,42 @@ import (
"fmt"
"net/http"

"go.nanomsg.org/mangos/v3/protocol/pub"

_ "go.nanomsg.org/mangos/v3/transport/ipc" // registers the IPC transport

"github.com/gorilla/rpc/v2"

"github.com/ava-labs/gecko/api"
"github.com/ava-labs/gecko/chains"
"github.com/ava-labs/gecko/ipcs"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/ava-labs/gecko/snow/triggers"
"github.com/ava-labs/gecko/utils/json"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/wrappers"
)

const baseURL = "ipc:///tmp/"

// IPCs maintains the IPCs
type IPCs struct {
log logging.Logger
chainManager chains.Manager
// IPCServer maintains the IPCs
type IPCServer struct {
httpServer *api.Server
events *triggers.EventDispatcher
chains map[[32]byte]*ChainIPC
chainManager chains.Manager
log logging.Logger
ipcs *ipcs.ChainIPCs
}

// NewService returns a new IPCs API service
func NewService(log logging.Logger, chainManager chains.Manager, events *triggers.EventDispatcher, httpServer *api.Server) *common.HTTPHandler {
func NewService(log logging.Logger, chainManager chains.Manager, httpServer *api.Server, ipcs *ipcs.ChainIPCs) (*common.HTTPHandler, error) {
ipcServer := &IPCServer{
log: log,
chainManager: chainManager,
httpServer: httpServer,

ipcs: ipcs,
}

newServer := rpc.NewServer()
codec := json.NewCodec()
newServer.RegisterCodec(codec, "application/json")
newServer.RegisterCodec(codec, "application/json;charset=UTF-8")
newServer.RegisterService(&IPCs{
log: log,
chainManager: chainManager,
httpServer: httpServer,
events: events,
chains: map[[32]byte]*ChainIPC{},
}, "ipcs")
return &common.HTTPHandler{Handler: newServer}
newServer.RegisterService(ipcServer, "ipcs")
return &common.HTTPHandler{Handler: newServer}, nil
}

// PublishBlockchainArgs are the arguments for calling PublishBlockchain
Expand All @@ -56,52 +52,28 @@ type PublishBlockchainArgs struct {

// PublishBlockchainReply are the results from calling PublishBlockchain
type PublishBlockchainReply struct {
URL string `json:"url"`
ConsensusURL string `json:"consensusURL"`
DecisionsURL string `json:"decisionsURL"`
}

// PublishBlockchain publishes the finalized accepted transactions from the blockchainID over the IPC
func (ipc *IPCs) PublishBlockchain(r *http.Request, args *PublishBlockchainArgs, reply *PublishBlockchainReply) error {
func (ipc *IPCServer) PublishBlockchain(r *http.Request, args *PublishBlockchainArgs, reply *PublishBlockchainReply) error {
ipc.log.Info("IPCs: PublishBlockchain called with BlockchainID: %s", args.BlockchainID)
chainID, err := ipc.chainManager.Lookup(args.BlockchainID)
if err != nil {
ipc.log.Error("unknown blockchainID: %s", err)
return err
}

chainIDKey := chainID.Key()
chainIDStr := chainID.String()
url := baseURL + chainIDStr + ".ipc"

reply.URL = url

if _, ok := ipc.chains[chainIDKey]; ok {
ipc.log.Info("returning existing blockchainID %s", chainIDStr)
return nil
}

sock, err := pub.NewSocket()
ipcs, err := ipc.ipcs.Publish(chainID)
if err != nil {
ipc.log.Error("can't get new pub socket: %s", err)
ipc.log.Error("couldn't publish blockchainID: %s", err)
return err
}

if err = sock.Listen(url); err != nil {
ipc.log.Error("can't listen on pub socket: %s", err)
sock.Close()
return err
}
reply.ConsensusURL = ipcs.ConsensusURL()
reply.DecisionsURL = ipcs.DecisionsURL()

chainIPC := &ChainIPC{
log: ipc.log,
socket: sock,
}
if err := ipc.events.RegisterChain(chainID, "ipc", chainIPC); err != nil {
ipc.log.Error("couldn't register event: %s", err)
sock.Close()
return err
}

ipc.chains[chainIDKey] = chainIPC
return nil
}

Expand All @@ -116,28 +88,19 @@ type UnpublishBlockchainReply struct {
}

// UnpublishBlockchain closes publishing of a blockchainID
func (ipc *IPCs) UnpublishBlockchain(r *http.Request, args *UnpublishBlockchainArgs, reply *UnpublishBlockchainReply) error {
func (ipc *IPCServer) UnpublishBlockchain(r *http.Request, args *UnpublishBlockchainArgs, reply *UnpublishBlockchainReply) error {
ipc.log.Info("IPCs: UnpublishBlockchain called with BlockchainID: %s", args.BlockchainID)
chainID, err := ipc.chainManager.Lookup(args.BlockchainID)
if err != nil {
ipc.log.Error("unknown blockchainID %s: %s", args.BlockchainID, err)
return err
}

chainIDKey := chainID.Key()

chain, ok := ipc.chains[chainIDKey]
ok, err := ipc.ipcs.Unpublish(chainID)
if !ok {
return fmt.Errorf("blockchainID not publishing: %s", chainID)
}

errs := wrappers.Errs{}
errs.Add(
chain.Stop(),
ipc.events.DeregisterChain(chainID, "ipc"),
)
delete(ipc.chains, chainIDKey)

reply.Success = true
return errs.Err
return err
}
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,4 @@ require (
google.golang.org/grpc v1.29.1
gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200316214253-d7b0ff38cac9 // indirect
gopkg.in/urfave/cli.v1 v1.20.0 // indirect
nanomsg.org/go/mangos/v2 v2.0.8
)
91 changes: 91 additions & 0 deletions ipcs/chainipc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package ipcs

import (
"fmt"
"path"

"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/triggers"
"github.com/ava-labs/gecko/utils/logging"
)

const (
// DefaultBaseURL can be used as a reasonable default value for the base URL
DefaultBaseURL = "/tmp"

ipcIdentifierPrefix = "ipc"
ipcConsensusIdentifier = "consensus"
ipcDecisionsIdentifier = "decisions"
)

type context struct {
log logging.Logger
networkID uint32
path string
}

// ChainIPCs maintains IPCs for a set of chains
type ChainIPCs struct {
context
chains map[[32]byte]*EventSockets
consensusEvents *triggers.EventDispatcher
decisionEvents *triggers.EventDispatcher
}

// NewChainIPCs creates a new *ChainIPCs that writes consensus and decision
// events to IPC sockets
func NewChainIPCs(log logging.Logger, path string, networkID uint32, consensusEvents *triggers.EventDispatcher, decisionEvents *triggers.EventDispatcher, defaultChainIDs []ids.ID) (*ChainIPCs, error) {
cipcs := &ChainIPCs{
context: context{
log: log,
networkID: networkID,
path: path,
},
chains: make(map[[32]byte]*EventSockets),
consensusEvents: consensusEvents,
decisionEvents: decisionEvents,
}
for _, chainID := range defaultChainIDs {
if _, err := cipcs.Publish(chainID); err != nil {
return nil, err
}
}
return cipcs, nil
}

// Publish creates a set of eventSockets for the given chainID
func (cipcs *ChainIPCs) Publish(chainID ids.ID) (*EventSockets, error) {
chainIDKey := chainID.Key()

if es, ok := cipcs.chains[chainIDKey]; ok {
cipcs.log.Info("returning existing blockchainID %s", chainID.String())
return es, nil
}

es, err := newEventSockets(cipcs.context, chainID, cipcs.consensusEvents, cipcs.decisionEvents)
if err != nil {
cipcs.log.Error("can't create ipcs: %s", err)
return nil, err
}

cipcs.chains[chainIDKey] = es
cipcs.log.Info("created IPC sockets for blockchain %s at %s and %s", chainID.String(), es.ConsensusURL(), es.DecisionsURL())
return es, nil
}

// Unpublish stops the eventSocket for the given chain if it exists. It returns
// whether or not the socket existed and errors when trying to close it
func (cipcs *ChainIPCs) Unpublish(chainID ids.ID) (bool, error) {
chainIPCs, ok := cipcs.chains[chainID.Key()]
if !ok {
return false, nil
}
return true, chainIPCs.stop()
}

func ipcURL(ctx context, chainID ids.ID, eventType string) string {
return path.Join(ctx.path, fmt.Sprintf("%d-%s-%s", ctx.networkID, chainID.String(), eventType))
}
Loading

0 comments on commit ac8a903

Please sign in to comment.