Skip to content

Commit

Permalink
fix: use a single gRPC and RPC connection for all Hasura actions (for…
Browse files Browse the repository at this point in the history
…bole#338)

## Description
This PR improves the overall architecture of Hasura actions by having a single `HasuraActionsWorker` that keeps the various connections to the node so that all handlers can share them without having to re-create it every time they are called. 

It should also make it easier to define new handlers in the future if there's the need to. 

---

### Author Checklist

*All items are required. Please add a note to the item if the item is not applicable and
please add links to any relevant follow up issues.*

I have...

- [x] included the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] added `!` to the type prefix if API or client breaking change
- [ ] targeted the correct branch
- [ ] provided a link to the relevant issue or specification
- [ ] added a changelog entry to `CHANGELOG.md`
- [ ] included comments for [documenting Go code](https://blog.golang.org/godoc)
- [ ] updated the relevant documentation or specification
- [ ] reviewed "Files changed" and left comments if necessary
- [ ] confirmed all CI checks have passed

### Reviewers Checklist

*All items are required. Please add a note if the item is not applicable and please add
your handle next to the items reviewed if you only reviewed selected items.*

I have...

- [ ] confirmed the correct [type prefix](https://github.com/commitizen/conventional-commit-types/blob/v3.0.0/index.json) in the PR title
- [ ] confirmed `!` in the type prefix if API or client breaking change
- [ ] confirmed all author checklist items have been addressed
- [ ] reviewed API design and naming
- [ ] reviewed documentation is accurate
- [ ] reviewed tests and test coverage
- [ ] manually tested (if applicable)
  • Loading branch information
RiccardoM authored Feb 24, 2022
1 parent 18c3566 commit d74438b
Show file tree
Hide file tree
Showing 24 changed files with 365 additions and 702 deletions.
125 changes: 99 additions & 26 deletions cmd/actions/actionscmd.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,33 @@
package actions

import (
"fmt"
"log"
"net/http"
"os"
"os/signal"
"sync"
"syscall"

"github.com/rs/zerolog/log"

"github.com/forbole/juno/v2/cmd/parse"
"github.com/forbole/juno/v2/node/builder"
nodeconfig "github.com/forbole/juno/v2/node/config"
"github.com/forbole/juno/v2/node/remote"
"github.com/spf13/cobra"

"github.com/forbole/bdjuno/v2/cmd/actions/handlers"
actionstypes "github.com/forbole/bdjuno/v2/cmd/actions/types"
"github.com/forbole/bdjuno/v2/modules"
)

const (
flagGRPC = "grpc"
flagRPC = "rpc"
flagSecure = "secure"
flagPort = "port"
)

var (
waitGroup sync.WaitGroup
)

// NewActionsCmd returns the Cobra command allowing to activate hasura actions
Expand All @@ -19,47 +37,102 @@ func NewActionsCmd(parseCfg *parse.Config) *cobra.Command {
Short: "Activate hasura actions",
PreRunE: parse.ReadConfig(parseCfg),
RunE: func(cmd *cobra.Command, args []string) error {
parseCtx, err := parse.GetParsingContext(parseCfg)
if err != nil {
return err
}

// Get the flags values
rpc, _ := cmd.Flags().GetString(flagRPC)
gRPC, _ := cmd.Flags().GetString(flagGRPC)
secure, _ := cmd.Flags().GetBool(flagSecure)
port, _ := cmd.Flags().GetUint(flagPort)

log.Info().Str(flagRPC, rpc).Str(flagGRPC, gRPC).Bool(flagSecure, secure).
Msg("Listening to incoming Hasura actions requests....")

fmt.Printf(
"Hasura Action is running on the node(s):\n rpc: %s \n grpc: %s\n secure connection: %v\n",
actionstypes.FlagRPC,
actionstypes.FlagGRPC,
actionstypes.FlagSecure,
// Build a custom node config to make sure it's remote
// TODO: Is this really necessary? Can't we use the default one?
nodeCfg := nodeconfig.NewConfig(
nodeconfig.TypeRemote,
remote.NewDetails(
remote.NewRPCConfig("hasura-actions", rpc, 100),
remote.NewGrpcConfig(gRPC, !secure),
),
)

// HTTP server for the handlers
mux := http.NewServeMux()
// Build the node
node, err := builder.BuildNode(nodeCfg, parseCtx.EncodingConfig)
if err != nil {
return err
}

// End points:
// Build the sources
sources, err := modules.BuildSources(nodeCfg, parseCtx.EncodingConfig)
if err != nil {
return err
}

// Build the worker
context := actionstypes.NewContext(node, sources)
worker := actionstypes.NewActionsWorker(context)

// Register the endpoints

// -- Bank --
mux.HandleFunc("/account_balance", handlers.AccountBalance)
worker.RegisterHandler("/account_balance", handlers.AccountBalanceHandler)

// -- Distribution --
mux.HandleFunc("/delegation_reward", handlers.DelegationReward)
mux.HandleFunc("/delegator_withdraw_address", handlers.DelegatorWithdrawAddress)
mux.HandleFunc("/validator_commission_amount", handlers.ValidatorCommissionAmount)
worker.RegisterHandler("/delegation_reward", handlers.DelegationRewardHandler)
worker.RegisterHandler("/delegator_withdraw_address", handlers.DelegatorWithdrawAddressHandler)
worker.RegisterHandler("/validator_commission_amount", handlers.ValidatorCommissionAmountHandler)

// -- Staking Delegator --
mux.HandleFunc("/delegation", handlers.Delegation)
mux.HandleFunc("/delegation_total", handlers.TotalDelegationAmount)
mux.HandleFunc("/unbonding_delegation", handlers.UnbondingDelegations)
mux.HandleFunc("/unbonding_delegation_total", handlers.UnbondingDelegationsTotal)
mux.HandleFunc("/redelegation", handlers.Redelegation)
worker.RegisterHandler("/delegation", handlers.DelegationHandler)
worker.RegisterHandler("/delegation_total", handlers.TotalDelegationAmountHandler)
worker.RegisterHandler("/unbonding_delegation", handlers.UnbondingDelegationsHandler)
worker.RegisterHandler("/unbonding_delegation_total", handlers.UnbondingDelegationsTotal)
worker.RegisterHandler("/redelegation", handlers.RedelegationHandler)

// -- Staking Validator --
mux.HandleFunc("/validator_delegations", handlers.ValidatorDelegation)
mux.HandleFunc("/validator_redelegations_from", handlers.ValidatorRedelegationsFrom)
mux.HandleFunc("/validator_unbonding_delegations", handlers.ValidatorUnbondingDelegations)
worker.RegisterHandler("/validator_delegations", handlers.ValidatorDelegation)
worker.RegisterHandler("/validator_redelegations_from", handlers.ValidatorRedelegationsFromHandler)
worker.RegisterHandler("/validator_unbonding_delegations", handlers.ValidatorUnbondingDelegationsHandler)

err := http.ListenAndServe(":3000", mux)
log.Fatal(err)
// Listen for and trap any OS signal to gracefully shutdown and exit
trapSignal(parseCtx)

// Start the worker
waitGroup.Add(1)
go worker.Start(port)

// Block main process (signal capture will call WaitGroup's Done)
waitGroup.Wait()
return nil
},
}

actionstypes.AddNodeFlagsToCmd(cmd)
cmd.Flags().String(flagRPC, "http://127.0.0.1:26657", "RPC listen address. Port required")
cmd.Flags().String(flagGRPC, "http://127.0.0.1:9090", "GRPC listen address. Port required")
cmd.Flags().Bool(flagSecure, false, "Activate secure connections")
cmd.Flags().Uint(flagPort, 3000, "Port to be used to expose the service")

return cmd
}

// trapSignal will listen for any OS signal and invoke Done on the main
// WaitGroup allowing the main process to gracefully exit.
func trapSignal(parseCtx *parse.Context) {
var sigCh = make(chan os.Signal)

signal.Notify(sigCh, syscall.SIGTERM)
signal.Notify(sigCh, syscall.SIGINT)

go func() {
sig := <-sigCh
parseCtx.Logger.Info("caught signal; shutting down...", "signal", sig.String())
defer parseCtx.Node.Stop()
defer parseCtx.Database.Close()
defer waitGroup.Done()
}()
}
50 changes: 6 additions & 44 deletions cmd/actions/handlers/account_balance.go
Original file line number Diff line number Diff line change
@@ -1,61 +1,23 @@
package handlers

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

actionstypes "github.com/forbole/bdjuno/v2/cmd/actions/types"
dbtypes "github.com/forbole/bdjuno/v2/database/types"

"github.com/forbole/bdjuno/v2/utils"
)

func AccountBalance(w http.ResponseWriter, r *http.Request) {

w.Header().Set("Content-Type", "application/json")

reqBody, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "invalid payload", http.StatusBadRequest)
return
}

var actionPayload actionstypes.Payload
err = json.Unmarshal(reqBody, &actionPayload)
if err != nil {
http.Error(w, "invalid payload: failed to unmarshal json", http.StatusInternalServerError)
return
}

result, err := getAccountBalance(actionPayload.Input)
if err != nil {
errorHandler(w, err)
return
}

data, _ := json.Marshal(result)
w.Write(data)
}

func getAccountBalance(input actionstypes.PayloadArgs) (response actionstypes.Balance, err error) {
parseCtx, sources, err := getCtxAndSources()
if err != nil {
return response, err
}

height, err := utils.GetHeight(parseCtx, input.Height)
func AccountBalanceHandler(ctx *actionstypes.Context, payload *actionstypes.Payload) (interface{}, error) {
height, err := ctx.GetHeight(payload)
if err != nil {
return response, fmt.Errorf("error while getting height: %s", err)
return nil, err
}

balance, err := sources.BankSource.GetAccountBalance(input.Address, height)
balance, err := ctx.Sources.BankSource.GetAccountBalance(payload.GetAddress(), height)
if err != nil {
return response, err
return nil, fmt.Errorf("error while getting account balance: %s", err)
}

return actionstypes.Balance{
Coins: dbtypes.NewDbCoins(balance),
Coins: actionstypes.ConvertCoins(balance),
}, nil
}
57 changes: 6 additions & 51 deletions cmd/actions/handlers/delegation.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,31 @@
package handlers

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

sdk "github.com/cosmos/cosmos-sdk/types"

"github.com/cosmos/cosmos-sdk/types/query"
actionstypes "github.com/forbole/bdjuno/v2/cmd/actions/types"
dbtypes "github.com/forbole/bdjuno/v2/database/types"

"github.com/forbole/bdjuno/v2/utils"
)

func Delegation(w http.ResponseWriter, r *http.Request) {

w.Header().Set("Content-Type", "application/json")

reqBody, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "invalid payload", http.StatusBadRequest)
return
}

var actionPayload actionstypes.Payload
err = json.Unmarshal(reqBody, &actionPayload)
func DelegationHandler(ctx *actionstypes.Context, payload *actionstypes.Payload) (interface{}, error) {
height, err := ctx.GetHeight(payload)
if err != nil {
http.Error(w, "invalid payload: failed to unmarshal json", http.StatusInternalServerError)
return
}

result, err := getDelegation(actionPayload.Input)
if err != nil {
errorHandler(w, err)
return
}

data, _ := json.Marshal(result)
w.Write(data)
}

func getDelegation(input actionstypes.PayloadArgs) (actionstypes.DelegationResponse, error) {
parseCtx, sources, err := getCtxAndSources()
if err != nil {
return actionstypes.DelegationResponse{}, err
}

height, err := utils.GetHeight(parseCtx, input.Height)
if err != nil {
return actionstypes.DelegationResponse{}, fmt.Errorf("error while getting height: %s", err)
}

pagination := &query.PageRequest{
Offset: input.Offset,
Limit: input.Limit,
CountTotal: input.CountTotal,
return nil, err
}

// Get delegator's total rewards
res, err := sources.StakingSource.GetDelegationsWithPagination(height, input.Address, pagination)
res, err := ctx.Sources.StakingSource.GetDelegationsWithPagination(height, payload.GetAddress(), payload.GetPagination())
if err != nil {
return actionstypes.DelegationResponse{}, fmt.Errorf("error while getting delegator delegations: %s", err)
return err, fmt.Errorf("error while getting delegator delegations: %s", err)
}

delegations := make([]actionstypes.Delegation, len(res.DelegationResponses))
for index, del := range res.DelegationResponses {
delegations[index] = actionstypes.Delegation{
DelegatorAddress: del.Delegation.DelegatorAddress,
ValidatorAddress: del.Delegation.ValidatorAddress,
Coins: dbtypes.NewDbCoins([]sdk.Coin{del.Balance}),
Coins: actionstypes.ConvertCoins([]sdk.Coin{del.Balance}),
}
}

Expand Down
51 changes: 7 additions & 44 deletions cmd/actions/handlers/delegation_total.go
Original file line number Diff line number Diff line change
@@ -1,60 +1,23 @@
package handlers

import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"

sdk "github.com/cosmos/cosmos-sdk/types"
actionstypes "github.com/forbole/bdjuno/v2/cmd/actions/types"
dbtypes "github.com/forbole/bdjuno/v2/database/types"

"github.com/forbole/bdjuno/v2/utils"
actionstypes "github.com/forbole/bdjuno/v2/cmd/actions/types"
)

func TotalDelegationAmount(w http.ResponseWriter, r *http.Request) {

w.Header().Set("Content-Type", "application/json")

reqBody, err := ioutil.ReadAll(r.Body)
if err != nil {
http.Error(w, "invalid payload", http.StatusBadRequest)
return
}

var actionPayload actionstypes.Payload
err = json.Unmarshal(reqBody, &actionPayload)
if err != nil {
http.Error(w, "invalid payload: failed to unmarshal json", http.StatusInternalServerError)
return
}

result, err := getTotalDelegationAmount(actionPayload.Input)
if err != nil {
errorHandler(w, err)
return
}

data, _ := json.Marshal(result)
w.Write(data)
}

func getTotalDelegationAmount(input actionstypes.PayloadArgs) (actionstypes.Balance, error) {
parseCtx, sources, err := getCtxAndSources()
if err != nil {
return actionstypes.Balance{}, err
}

height, err := utils.GetHeight(parseCtx, input.Height)
func TotalDelegationAmountHandler(ctx *actionstypes.Context, payload *actionstypes.Payload) (interface{}, error) {
height, err := ctx.GetHeight(payload)
if err != nil {
return actionstypes.Balance{}, fmt.Errorf("error while getting height: %s", err)
return nil, err
}

// Get all delegations for given delegator address
delegationList, err := sources.StakingSource.GetDelegationsWithPagination(height, input.Address, nil)
delegationList, err := ctx.Sources.StakingSource.GetDelegationsWithPagination(height, payload.GetAddress(), nil)
if err != nil {
return actionstypes.Balance{}, fmt.Errorf("error while getting delegator delegations: %s", err)
return nil, fmt.Errorf("error while getting delegator delegations: %s", err)
}

var coinObject sdk.Coins
Expand All @@ -75,6 +38,6 @@ func getTotalDelegationAmount(input actionstypes.PayloadArgs) (actionstypes.Bala
}

return actionstypes.Balance{
Coins: dbtypes.NewDbCoins(coinObject),
Coins: actionstypes.ConvertCoins(coinObject),
}, nil
}
Loading

0 comments on commit d74438b

Please sign in to comment.