Skip to content

Commit

Permalink
merged
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph committed Aug 11, 2020
2 parents 9e34144 + ac8a903 commit 167479f
Show file tree
Hide file tree
Showing 12 changed files with 335 additions and 113 deletions.
33 changes: 0 additions & 33 deletions api/ipcs/chainipc.go

This file was deleted.

95 changes: 28 additions & 67 deletions api/ipcs/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,48 +7,42 @@ import (
"fmt"
"net/http"

"nanomsg.org/go/mangos/v2/protocol/pub"

_ "nanomsg.org/go/mangos/v2/transport/ipc" // registers the IPC transport
_ "go.nanomsg.org/mangos/v3/transport/ipc" // registers the IPC transport

"github.com/gorilla/rpc/v2"

"github.com/ava-labs/gecko/api"
"github.com/ava-labs/gecko/chains"
"github.com/ava-labs/gecko/ipcs"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/ava-labs/gecko/snow/triggers"
"github.com/ava-labs/gecko/utils/json"
"github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/wrappers"
)

const baseURL = "ipc:///tmp/"

// IPCs maintains the IPCs
type IPCs struct {
log logging.Logger
chainManager chains.Manager
// IPCServer maintains the IPCs
type IPCServer struct {
httpServer *api.Server
events *triggers.EventDispatcher
chains map[[32]byte]*ChainIPC
chainManager chains.Manager
log logging.Logger
ipcs *ipcs.ChainIPCs
}

// NewService returns a new IPCs API service
func NewService(log logging.Logger, chainManager chains.Manager, events *triggers.EventDispatcher, httpServer *api.Server) (*common.HTTPHandler, error) {
newServer := rpc.NewServer()
codec := json.NewCodec()
newServer.RegisterCodec(codec, "application/json")
newServer.RegisterCodec(codec, "application/json;charset=UTF-8")
if err := newServer.RegisterService(&IPCs{
func NewService(log logging.Logger, chainManager chains.Manager, httpServer *api.Server, ipcs *ipcs.ChainIPCs) (*common.HTTPHandler, error) {
ipcServer := &IPCServer{
log: log,
chainManager: chainManager,
httpServer: httpServer,
events: events,
chains: map[[32]byte]*ChainIPC{},
}, "ipcs"); err != nil {
return nil, err

ipcs: ipcs,
}
return &common.HTTPHandler{Handler: newServer}, nil

newServer := rpc.NewServer()
codec := json.NewCodec()
newServer.RegisterCodec(codec, "application/json")
newServer.RegisterCodec(codec, "application/json;charset=UTF-8")

return &common.HTTPHandler{Handler: newServer}, newServer.RegisterService(ipcServer, "ipcs")
}

// PublishBlockchainArgs are the arguments for calling PublishBlockchain
Expand All @@ -58,52 +52,28 @@ type PublishBlockchainArgs struct {

// PublishBlockchainReply are the results from calling PublishBlockchain
type PublishBlockchainReply struct {
URL string `json:"url"`
ConsensusURL string `json:"consensusURL"`
DecisionsURL string `json:"decisionsURL"`
}

// PublishBlockchain publishes the finalized accepted transactions from the blockchainID over the IPC
func (ipc *IPCs) PublishBlockchain(r *http.Request, args *PublishBlockchainArgs, reply *PublishBlockchainReply) error {
func (ipc *IPCServer) PublishBlockchain(r *http.Request, args *PublishBlockchainArgs, reply *PublishBlockchainReply) error {
ipc.log.Info("IPCs: PublishBlockchain called with BlockchainID: %s", args.BlockchainID)
chainID, err := ipc.chainManager.Lookup(args.BlockchainID)
if err != nil {
ipc.log.Error("unknown blockchainID: %s", err)
return err
}

chainIDKey := chainID.Key()
chainIDStr := chainID.String()
url := baseURL + chainIDStr + ".ipc"

reply.URL = url

if _, ok := ipc.chains[chainIDKey]; ok {
ipc.log.Info("returning existing blockchainID %s", chainIDStr)
return nil
}

sock, err := pub.NewSocket()
ipcs, err := ipc.ipcs.Publish(chainID)
if err != nil {
ipc.log.Error("can't get new pub socket: %s", err)
return err
}

if err = sock.Listen(url); err != nil {
ipc.log.Error("can't listen on pub socket: %s", err)
_ = sock.Close() // Return the original error
ipc.log.Error("couldn't publish blockchainID: %s", err)
return err
}

chainIPC := &ChainIPC{
log: ipc.log,
socket: sock,
}
if err := ipc.events.RegisterChain(chainID, "ipc", chainIPC); err != nil {
ipc.log.Error("couldn't register event: %s", err)
_ = sock.Close() // Return the original error
return err
}
reply.ConsensusURL = ipcs.ConsensusURL()
reply.DecisionsURL = ipcs.DecisionsURL()

ipc.chains[chainIDKey] = chainIPC
return nil
}

Expand All @@ -118,28 +88,19 @@ type UnpublishBlockchainReply struct {
}

// UnpublishBlockchain closes publishing of a blockchainID
func (ipc *IPCs) UnpublishBlockchain(r *http.Request, args *UnpublishBlockchainArgs, reply *UnpublishBlockchainReply) error {
func (ipc *IPCServer) UnpublishBlockchain(r *http.Request, args *UnpublishBlockchainArgs, reply *UnpublishBlockchainReply) error {
ipc.log.Info("IPCs: UnpublishBlockchain called with BlockchainID: %s", args.BlockchainID)
chainID, err := ipc.chainManager.Lookup(args.BlockchainID)
if err != nil {
ipc.log.Error("unknown blockchainID %s: %s", args.BlockchainID, err)
return err
}

chainIDKey := chainID.Key()

chain, ok := ipc.chains[chainIDKey]
ok, err := ipc.ipcs.Unpublish(chainID)
if !ok {
return fmt.Errorf("blockchainID not publishing: %s", chainID)
}

errs := wrappers.Errs{}
errs.Add(
chain.Stop(),
ipc.events.DeregisterChain(chainID, "ipc"),
)
delete(ipc.chains, chainIDKey)

reply.Success = true
return errs.Err
return err
}
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ require (
github.com/securego/gosec v0.0.0-20200401082031-e946c8c39989 // indirect
github.com/stretchr/testify v1.4.0
github.com/syndtr/goleveldb v1.0.0
go.nanomsg.org/mangos/v3 v3.0.1
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9
golang.org/x/net v0.0.0-20200625001655-4c5254603344
google.golang.org/grpc v1.30.0
nanomsg.org/go/mangos/v2 v2.0.8
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2
google.golang.org/grpc v1.29.1
)
5 changes: 5 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0
github.com/xtaci/kcp-go v5.4.20+incompatible/go.mod h1:bN6vIwHQbfHaHtFpEssmWsN45a+AZwO7eyRCmEIbtvE=
github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae/go.mod h1:gXtu8J62kEgmN++bm9BVICuT/e8yiLI2KFobd/TRFsE=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.nanomsg.org/mangos v2.0.0+incompatible h1:Ll6GIzeGGld6/bFrVgBB8CjwibhHXZtF5jon+GoH1bE=
go.nanomsg.org/mangos/v3 v3.0.1 h1:xR8nca0ZeAvwsoRWjeEHuR2/B0N+Po/ZJpGNCpDz6To=
go.nanomsg.org/mangos/v3 v3.0.1/go.mod h1:RxVwsn46YtfJ74mF8MeVo+MFjg545KCI50NuZrFXmzc=
go.opencensus.io v0.22.1 h1:8dP3SGL7MPB94crU3bEPplMPe83FI4EouesJUeFHv50=
go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
Expand Down Expand Up @@ -306,6 +309,7 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200222125558-5a598a2470a0/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2 h1:eDrdRpKgkcCqKZQwyZRyeFZgfqt37SL7Kv3tok06cKE=
golang.org/x/net v0.0.0-20200520182314-0ba52f642ac2/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
golang.org/x/net v0.0.0-20200625001655-4c5254603344 h1:vGXIOMxbNfDTk/aXCmfdLgkrSV+Z2tcbze+pEc3v5W4=
golang.org/x/net v0.0.0-20200625001655-4c5254603344/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
Expand Down Expand Up @@ -372,6 +376,7 @@ google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyac
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.1/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.29.1 h1:EC2SB8S04d2r73uptxphDSUG+kTKVgjRPF+N3xpxRB4=
google.golang.org/grpc v1.29.1/go.mod h1:itym6AZVZYACWQqET3MqgPpjcuV5QH3BxFS3IjizoKk=
google.golang.org/grpc v1.30.0 h1:M5a8xTlYTxwMn5ZFkwhRabsygDY5G8TYLyQDBxJNAxE=
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
Expand Down
91 changes: 91 additions & 0 deletions ipcs/chainipc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.

package ipcs

import (
"fmt"
"path"

"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/triggers"
"github.com/ava-labs/gecko/utils/logging"
)

const (
// DefaultBaseURL can be used as a reasonable default value for the base URL
DefaultBaseURL = "/tmp"

ipcIdentifierPrefix = "ipc"
ipcConsensusIdentifier = "consensus"
ipcDecisionsIdentifier = "decisions"
)

type context struct {
log logging.Logger
networkID uint32
path string
}

// ChainIPCs maintains IPCs for a set of chains
type ChainIPCs struct {
context
chains map[[32]byte]*EventSockets
consensusEvents *triggers.EventDispatcher
decisionEvents *triggers.EventDispatcher
}

// NewChainIPCs creates a new *ChainIPCs that writes consensus and decision
// events to IPC sockets
func NewChainIPCs(log logging.Logger, path string, networkID uint32, consensusEvents *triggers.EventDispatcher, decisionEvents *triggers.EventDispatcher, defaultChainIDs []ids.ID) (*ChainIPCs, error) {
cipcs := &ChainIPCs{
context: context{
log: log,
networkID: networkID,
path: path,
},
chains: make(map[[32]byte]*EventSockets),
consensusEvents: consensusEvents,
decisionEvents: decisionEvents,
}
for _, chainID := range defaultChainIDs {
if _, err := cipcs.Publish(chainID); err != nil {
return nil, err
}
}
return cipcs, nil
}

// Publish creates a set of eventSockets for the given chainID
func (cipcs *ChainIPCs) Publish(chainID ids.ID) (*EventSockets, error) {
chainIDKey := chainID.Key()

if es, ok := cipcs.chains[chainIDKey]; ok {
cipcs.log.Info("returning existing blockchainID %s", chainID.String())
return es, nil
}

es, err := newEventSockets(cipcs.context, chainID, cipcs.consensusEvents, cipcs.decisionEvents)
if err != nil {
cipcs.log.Error("can't create ipcs: %s", err)
return nil, err
}

cipcs.chains[chainIDKey] = es
cipcs.log.Info("created IPC sockets for blockchain %s at %s and %s", chainID.String(), es.ConsensusURL(), es.DecisionsURL())
return es, nil
}

// Unpublish stops the eventSocket for the given chain if it exists. It returns
// whether or not the socket existed and errors when trying to close it
func (cipcs *ChainIPCs) Unpublish(chainID ids.ID) (bool, error) {
chainIPCs, ok := cipcs.chains[chainID.Key()]
if !ok {
return false, nil
}
return true, chainIPCs.stop()
}

func ipcURL(ctx context, chainID ids.ID, eventType string) string {
return path.Join(ctx.path, fmt.Sprintf("%d-%s-%s", ctx.networkID, chainID.String(), eventType))
}
Loading

0 comments on commit 167479f

Please sign in to comment.