Skip to content

Commit

Permalink
plugins: auto-recovery (smartcontractkit#9146)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 authored May 16, 2023
1 parent cf3bb91 commit 71aef60
Show file tree
Hide file tree
Showing 21 changed files with 134 additions and 257 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ env:
ENV_JOB_IMAGE: ${{ secrets.QA_AWS_ACCOUNT_NUMBER }}.dkr.ecr.${{ secrets.QA_AWS_REGION }}.amazonaws.com/chainlink-tests:${{ github.sha }}
CHAINLINK_IMAGE: ${{ secrets.QA_AWS_ACCOUNT_NUMBER }}.dkr.ecr.${{ secrets.QA_AWS_REGION }}.amazonaws.com/chainlink
TEST_SUITE: smoke
TEST_ARGS: -test.timeout 30m
TEST_ARGS: -test.timeout 12m
INTERNAL_DOCKER_REPO: ${{ secrets.QA_AWS_ACCOUNT_NUMBER }}.dkr.ecr.${{ secrets.QA_AWS_REGION }}.amazonaws.com

jobs:
Expand Down
5 changes: 2 additions & 3 deletions core/cmd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,12 +248,11 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
if err != nil {
return nil, fmt.Errorf("failed to register Solana LOOP plugin: %w", err)
}
chainPluginService := loop.NewRelayerService(solLggr, func() *exec.Cmd {
chains.Solana = loop.NewRelayerService(solLggr, func() *exec.Cmd {
cmd := exec.Command(cmdName)
plugins.SetCmdEnvFromConfig(cmd, solLoop.EnvCfg)
return cmd
}, string(tomls), &keystore.SolanaSigner{keyStore.Solana()})
chains.Solana = chainPluginService
} else {
opts := solana.ChainSetOpts{
Logger: solLggr,
Expand All @@ -264,7 +263,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
if err != nil {
return nil, errors.Wrap(err, "failed to load Solana chainset")
}
chains.Solana = relay.NewLocalRelayerService(pkgsolana.NewRelayer(solLggr, chainSet), chainSet)
chains.Solana = relay.NewRelayerAdapter(pkgsolana.NewRelayer(solLggr, chainSet), chainSet)
}
}

Expand Down
2 changes: 1 addition & 1 deletion core/internal/cltest/cltest.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ func NewApplicationWithConfig(t testing.TB, cfg chainlink.GeneralConfig, flagsAn
lggr.Fatal(err)
}

chains.Solana = relay.NewLocalRelayerService(pkgsolana.NewRelayer(solLggr, chainSet), chainSet)
chains.Solana = relay.NewRelayerAdapter(pkgsolana.NewRelayer(solLggr, chainSet), chainSet)
}
if cfg.StarkNetEnabled() {
starkLggr := lggr.Named("StarkNet")
Expand Down
6 changes: 3 additions & 3 deletions core/scripts/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ require (
github.com/hashicorp/go-hclog v1.4.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-plugin v1.4.8 // indirect
github.com/hashicorp/go-plugin v1.4.9 // indirect
github.com/hashicorp/golang-lru v0.6.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/yamux v0.0.0-20200609203250-aecfd211c9ce // indirect
Expand Down Expand Up @@ -286,7 +286,7 @@ require (
github.com/shirou/gopsutil/v3 v3.22.12 // indirect
github.com/sirupsen/logrus v1.9.0 // indirect
github.com/smartcontractkit/chainlink-cosmos v0.4.0 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230505214134-c890447508f9 // indirect
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230516203704-babbd747746c // indirect
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20230516001004-0216997a2508 // indirect
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.0-20230424184429-bfdf6bddb239 // indirect
github.com/smartcontractkit/wsrpc v0.7.1 // indirect
Expand Down Expand Up @@ -332,7 +332,7 @@ require (
go.uber.org/zap v1.24.0 // indirect
golang.org/x/arch v0.0.0-20210923205945-b76863e36670 // indirect
golang.org/x/crypto v0.7.0 // indirect
golang.org/x/exp v0.0.0-20230307190834-24139beb5833 // indirect
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 // indirect
golang.org/x/mod v0.10.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sync v0.2.0 // indirect
Expand Down
12 changes: 6 additions & 6 deletions core/scripts/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -602,8 +602,8 @@ github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHh
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM=
github.com/hashicorp/go-plugin v1.4.8 h1:CHGwpxYDOttQOY7HOWgETU9dyVjOXzniXDqJcYJE1zM=
github.com/hashicorp/go-plugin v1.4.8/go.mod h1:viDMjcLJuDui6pXb8U4HVfb8AamCWhHGUjr2IrTF67s=
github.com/hashicorp/go-plugin v1.4.9 h1:ESiK220/qE0aGxWdzKIvRH69iLiuN/PjoLTm69RoWtU=
github.com/hashicorp/go-plugin v1.4.9/go.mod h1:viDMjcLJuDui6pXb8U4HVfb8AamCWhHGUjr2IrTF67s=
github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs=
github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
Expand Down Expand Up @@ -1372,8 +1372,8 @@ github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0
github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/smartcontractkit/chainlink-cosmos v0.4.0 h1:xYLAcJJIm0cyMtYtMaosO45bykEwcwQOmZz3mz46Y2A=
github.com/smartcontractkit/chainlink-cosmos v0.4.0/go.mod h1:938jBqOrhdCq4A8enUiBliiDLBndAXebHIitKsDVqY0=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230505214134-c890447508f9 h1:Bb9jYDlgjlqbbvTOt/Nwkka81sT7ewPsCol7oWNRnX0=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230505214134-c890447508f9/go.mod h1:9Gav0RjnEtcyNw+Vrvy6Y3veazNgkPNFPw8bXEzHOs0=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230516203704-babbd747746c h1:DsPkKgQDAOJF4aQqP7bA7oWaAO/x6EwiKLayC9hRTYg=
github.com/smartcontractkit/chainlink-relay v0.1.7-0.20230516203704-babbd747746c/go.mod h1:f7/HKcJWWFzINrkDDlpKsFaU6D+8D4B4OrQxkkCX4oc=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20230516001004-0216997a2508 h1:hsCcqkK7ZgABURMRYbNgRdVbNJv7JJPGlgKJbukjMsk=
github.com/smartcontractkit/chainlink-solana v1.0.3-0.20230516001004-0216997a2508/go.mod h1:CUmP50gxZsjwEYA7balCV3mhvX0CrR/a01X6jGBZR8I=
github.com/smartcontractkit/chainlink-starknet/relayer v0.0.0-20230424184429-bfdf6bddb239 h1:YmYkbdM5YXVV+IWgp+5ctfplMEoXamT0LR0qsnM4eps=
Expand Down Expand Up @@ -1650,8 +1650,8 @@ golang.org/x/exp v0.0.0-20191227195350-da58074b4299/go.mod h1:2RIsYlXP63K8oxa1u0
golang.org/x/exp v0.0.0-20200119233911-0405dc783f0a/go.mod h1:2RIsYlXP63K8oxa1u096TMicItID8zy7Y6sNkU49FU4=
golang.org/x/exp v0.0.0-20200207192155-f17229e696bd/go.mod h1:J/WKrq2StrnmMY6+EHIKF9dgMWnmCNThgcyBT1FY9mM=
golang.org/x/exp v0.0.0-20200224162631-6cc2880d07d6/go.mod h1:3jZMyOhIsHpP37uCMkUooju7aAi5cS1Q23tOzKc+0MU=
golang.org/x/exp v0.0.0-20230307190834-24139beb5833 h1:SChBja7BCQewoTAU7IgvucQKMIXrEpFxNMs0spT3/5s=
golang.org/x/exp v0.0.0-20230307190834-24139beb5833/go.mod h1:CxIveKay+FTh1D0yPZemJVgC/95VzuuOLq5Qi4xnoYc=
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53 h1:5llv2sWeaMSnA3w2kS57ouQQ4pudlXrR0dCgw51QK9o=
golang.org/x/exp v0.0.0-20230425010034-47ecfdc1ba53/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w=
golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js=
golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
Expand Down
19 changes: 8 additions & 11 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ type ApplicationOpts struct {
// Chains holds a ChainSet for each type of chain.
type Chains struct {
EVM evm.ChainSet
Cosmos cosmos.ChainSet // nil if disabled
Solana relay.RelayerService // nil if disabled
StarkNet starkchain.ChainSet // nil if disabled
Cosmos cosmos.ChainSet // nil if disabled
Solana loop.Relayer // nil if disabled
StarkNet starkchain.ChainSet // nil if disabled
}

func (c *Chains) services() (s []services.ServiceCtx) {
Expand Down Expand Up @@ -396,27 +396,24 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
}
if cfg.FeatureOffchainReporting2() {
globalLogger.Debug("Off-chain reporting v2 enabled")
relayers := make(map[relay.Network]func() (loop.Relayer, error))
relayers := make(map[relay.Network]loop.Relayer)
if cfg.EVMEnabled() {
lggr := globalLogger.Named("EVM")
evmRelayer := evmrelay.NewRelayer(db, chains.EVM, lggr, cfg, keyStore)
relayer := relay.RelayerAdapter{Relayer: evmRelayer, RelayerExt: chains.EVM}
relayers[relay.EVM] = func() (loop.Relayer, error) { return &relayer, nil }
relayers[relay.EVM] = relay.NewRelayerAdapter(evmRelayer, chains.EVM)
}
if cfg.CosmosEnabled() {
lggr := globalLogger.Named("Cosmos.Relayer")
cosmosRelayer := pkgcosmos.NewRelayer(lggr, chains.Cosmos)
relayer := relay.RelayerAdapter{Relayer: cosmosRelayer, RelayerExt: chains.Cosmos}
relayers[relay.Cosmos] = func() (loop.Relayer, error) { return &relayer, nil }
relayers[relay.Cosmos] = relay.NewRelayerAdapter(cosmosRelayer, chains.Cosmos)
}
if cfg.SolanaEnabled() {
relayers[relay.Solana] = chains.Solana.Relayer
relayers[relay.Solana] = chains.Solana
}
if cfg.StarkNetEnabled() {
lggr := globalLogger.Named("StarkNet.Relayer")
starknetRelayer := starknetrelay.NewRelayer(lggr, chains.StarkNet)
relayer := relay.RelayerAdapter{Relayer: starknetRelayer, RelayerExt: chains.StarkNet}
relayers[relay.StarkNet] = func() (loop.Relayer, error) { return &relayer, nil }
relayers[relay.StarkNet] = relay.NewRelayerAdapter(starknetRelayer, chains.StarkNet)
}
registrarConfig := plugins.NewRegistrarConfig(cfg, opts.LoopRegistry.Register)
ocr2DelegateConfig := ocr2.NewDelegateConfig(cfg, registrarConfig)
Expand Down
5 changes: 2 additions & 3 deletions core/services/job/spawner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,10 +272,9 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) {
orm := NewTestORM(t, db, cc, pipeline.NewORM(db, lggr, config), bridges.NewORM(db, lggr, config), keyStore, config)
mailMon := srvctest.Start(t, utils.NewMailboxMonitor(t.Name()))

relayers := make(map[relay.Network]func() (loop.Relayer, error))
relayers := make(map[relay.Network]loop.Relayer)
evmRelayer := evmrelay.NewRelayer(db, cc, lggr, config, keyStore)
relayer := relay.RelayerAdapter{Relayer: evmRelayer, RelayerExt: cc}
relayers[relay.EVM] = func() (loop.Relayer, error) { return &relayer, nil }
relayers[relay.EVM] = relay.NewRelayerAdapter(evmRelayer, cc)

processConfig := plugins.NewRegistrarConfig(config, func(name string, cfg plugins.LoggingConfig) (*plugins.RegisteredLoop, error) { return nil, nil })
ocr2DelegateConfig := ocr2.NewDelegateConfig(config, processConfig)
Expand Down
11 changes: 3 additions & 8 deletions core/services/ocr2/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ type Delegate struct {
dkgSignKs keystore.DKGSign
dkgEncryptKs keystore.DKGEncrypt
ethKs keystore.Eth
relayers map[relay.Network]func() (loop.Relayer, error)
relayers map[relay.Network]loop.Relayer
isNewlyCreatedJob bool // Set to true if this is a new job freshly added, false if job was present already on node boot.
mailMon *utils.MailboxMonitor
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func NewDelegate(
dkgSignKs keystore.DKGSign,
dkgEncryptKs keystore.DKGEncrypt,
ethKs keystore.Eth,
relayers map[relay.Network]func() (loop.Relayer, error),
relayers map[relay.Network]loop.Relayer,
mailMon *utils.MailboxMonitor,
) *Delegate {
return &Delegate{
Expand Down Expand Up @@ -213,15 +213,10 @@ func (d *Delegate) ServicesForSpec(jb job.Job) ([]job.ServiceCtx, error) {
return nil, errors.Errorf("expected a transmitterID to be specified")
}
transmitterID := spec.TransmitterID.String
relayerFn, exists := d.relayers[spec.Relay]
relayer, exists := d.relayers[spec.Relay]
if !exists {
return nil, errors.Errorf("%s relay does not exist is it enabled?", spec.Relay)
}
relayer, err := relayerFn()
if err != nil {
// TODO defer in order to retry https://smartcontract-it.atlassian.net/browse/BCF-2112
return nil, fmt.Errorf("failed to get relayer: %w", err)
}
effectiveTransmitterID := transmitterID

ctxVals := loop.ContextValues{
Expand Down
129 changes: 35 additions & 94 deletions core/services/ocr2/plugins/median/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"os/exec"
"time"

"github.com/hashicorp/go-plugin"

libocr2 "github.com/smartcontractkit/libocr/offchainreporting2"
"github.com/smartcontractkit/libocr/offchainreporting2/reportingplugin/median"
ocrtypes "github.com/smartcontractkit/libocr/offchainreporting2/types"
Expand Down Expand Up @@ -73,7 +71,7 @@ func NewMedianServices(ctx context.Context,
return
}
spec := jb.OCR2OracleSpec
//TODO retry https://smartcontract-it.atlassian.net/browse/BCF-2112

provider, err := relayer.NewMedianProvider(ctx, types.RelayArgs{
ExternalJobID: jb.ExternalJobID,
JobID: spec.ID,
Expand Down Expand Up @@ -104,17 +102,25 @@ func NewMedianServices(ctx context.Context,

var median loop.PluginMedian
if cmdName := v2.EnvMedianPluginCmd.Get(); cmdName != "" {
ms := NewPluginMedianService(cmdName, lggr, cfg)
if err = ms.Launch(); err != nil {
medianLggr := lggr.Named("Median")
var registeredLoop *plugins.RegisteredLoop
// use logger name to ensure unique naming
registeredLoop, err = cfg.RegisterLOOP(medianLggr.Name())
if err != nil {
err = fmt.Errorf("failed to register loop: %w", err)
abort()
return
}
median = ms
srvs = append(srvs, ms)
median = loop.NewMedianService(lggr, func() *exec.Cmd {
cmd := exec.Command(cmdName)
plugins.SetCmdEnvFromConfig(cmd, registeredLoop.EnvCfg)
return cmd
})
} else {
median = NewPlugin(lggr, ctx.Done())
median = NewPlugin(lggr)
}
argsNoPlugin.ReportingPluginFactory, err = median.NewMedianPluginFactory(ctx, provider, ocrcommon.NewDataSourceV2(pipelineRunner,
srvs = append(srvs, median)
argsNoPlugin.ReportingPluginFactory, err = median.NewMedianFactory(ctx, provider, ocrcommon.NewDataSourceV2(pipelineRunner,
jb,
*jb.PipelineSpec,
lggr,
Expand Down Expand Up @@ -151,15 +157,26 @@ func NewMedianServices(ctx context.Context,
}

type Plugin struct {
utils.StartStopOnce
lggr logger.Logger
stop utils.StopRChan
stop utils.StopChan
}

func (m *Plugin) Name() string { return m.lggr.Name() }

func (m *Plugin) Start(ctx context.Context) error {
return m.StartOnce("PluginMedian", func() error { return nil })
}

func (m *Plugin) HealthReport() map[string]error {
return map[string]error{m.Name(): m.Healthy()}
}

func NewPlugin(lggr logger.Logger, stop utils.StopRChan) *Plugin {
return &Plugin{lggr: lggr, stop: stop}
func NewPlugin(lggr logger.Logger) *Plugin {
return &Plugin{lggr: lggr, stop: make(utils.StopChan)}
}

func (m *Plugin) NewMedianPluginFactory(ctx context.Context, provider types.MedianProvider, dataSource, juelsPerFeeCoin median.DataSource, errorLog loop.ErrorLog) (ocrtypes.ReportingPluginFactory, error) {
func (m *Plugin) NewMedianFactory(ctx context.Context, provider types.MedianProvider, dataSource, juelsPerFeeCoin median.DataSource, errorLog loop.ErrorLog) (ocrtypes.ReportingPluginFactory, error) {
var ctxVals loop.ContextValues
ctxVals.SetValues(ctx)
lggr := m.lggr.With(ctxVals.Args()...)
Expand All @@ -180,88 +197,12 @@ func (m *Plugin) NewMedianPluginFactory(ctx context.Context, provider types.Medi
return factory, nil
}

var _ services.ServiceCtx = (*medianService)(nil)

type medianService struct {
utils.StartStopOnce

lggr logger.Logger
cfg plugins.RegistrarConfig
cmdName string

client *plugin.Client
cp plugin.ClientProtocol
loop.PluginMedian
}

func NewPluginMedianService(cmdName string, lggr logger.Logger, cfg plugins.RegistrarConfig) *medianService {
return &medianService{cmdName: cmdName, lggr: lggr.Named("PluginMedianService"), cfg: cfg}
}

func (m *medianService) Start(ctx context.Context) error {
return m.StartOnce("PluginMedianService", func() error {
if m.PluginMedian != nil {
return nil
}
return m.Launch()
})
}

// Launch launces the plugin, and sets the backing [loop.PluginMedian]. If this is called directly, then Start() will nop.
func (m *medianService) Launch() error {
cc := loop.PluginMedianClientConfig(m.lggr)
cc.Cmd = exec.Command(m.cmdName) //nolint:gosec
func (m *Plugin) Close() error {
return m.StopOnce("PluginMedian", func() (err error) {
m.lggr.Debug("Stopping")

// use logger name to ensure unique naming
registeredLoop, err := m.cfg.RegisterLOOP(m.lggr.Name())
if err != nil {
return fmt.Errorf("failed to register loop: %w", err)
}
plugins.SetCmdEnvFromConfig(cc.Cmd, registeredLoop.EnvCfg)
client := plugin.NewClient(cc)
cp, err := client.Client()
if err != nil {
client.Kill()
return fmt.Errorf("failed to create plugin Client: %w", err)
}
abort := func() {
if cerr := cp.Close(); cerr != nil {
m.lggr.Errorw("Error closing ClientProtocol", "err", cerr)
}
client.Kill()
}
i, err := cp.Dispense(loop.PluginMedianName)
if err != nil {
abort()
return fmt.Errorf("failed to Dispense %q plugin: %w", loop.PluginMedianName, err)
}
plug, ok := i.(loop.PluginMedian)
if !ok {
abort()
return fmt.Errorf("expected PluginMedian but got %T", i)
}
m.client = client
m.cp = cp
m.PluginMedian = plug
return nil
}
close(m.stop)

func (m *medianService) Close() error {
return m.StopOnce("PluginMedianService", func() error {
err := m.cp.Close()
m.client.Kill()
return err
return
})
}

func (m *medianService) Name() string { return m.lggr.Name() }

func (m *medianService) HealthReport() map[string]error {
return map[string]error{m.lggr.Name(): m.Healthy()}
}

func (m *medianService) ping() error { return m.cp.Ping() }

func (m *medianService) Ready() error { return m.ping() }

func (m *medianService) Healthy() error { return m.ping() }
Loading

0 comments on commit 71aef60

Please sign in to comment.