Skip to content

Commit

Permalink
Telem services have documented health checks (smartcontractkit#8670)
Browse files Browse the repository at this point in the history
  • Loading branch information
essamhassan authored Mar 13, 2023
1 parent 28306be commit 4d2e267
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 14 deletions.
2 changes: 1 addition & 1 deletion core/services/promreporter/prom_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (pr *promReporter) Name() string {
}

func (pr *promReporter) HealthReport() map[string]error {
return map[string]error{pr.Name(): pr.Healthy()}
return map[string]error{pr.Name(): pr.StartStopOnce.Healthy()}
}

func (pr *promReporter) OnNewLongestChain(ctx context.Context, head *evmtypes.Head) {
Expand Down
10 changes: 7 additions & 3 deletions core/services/synchronization/explorer_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"log"
"net/http"
"net/url"
"sync"
Expand Down Expand Up @@ -150,7 +149,9 @@ func (ec *explorerClient) Name() string {
}

func (ec *explorerClient) HealthReport() map[string]error {
return map[string]error{ec.Name(): ec.Healthy()}
return map[string]error{
ec.Name(): ec.StartStopOnce.Healthy(),
}
}

// Send sends data asynchronously across the websocket if it's open, or
Expand All @@ -169,7 +170,10 @@ func (ec *explorerClient) Send(ctx context.Context, data []byte, messageTypes ..
case ExplorerBinaryMessage:
send = ec.sendBinary
default:
log.Panicf("send on explorer client received unsupported message type %d", messageType)
err := fmt.Errorf("send on explorer client received unsupported message type %d", messageType)
ec.SvcErrBuffer.Append(err)
ec.lggr.Critical(err.Error())
return
}
select {
case send <- data:
Expand Down
5 changes: 2 additions & 3 deletions core/services/synchronization/explorer_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,8 @@ func TestWebSocketClient_Send_Unsupported(t *testing.T) {
explorerClient := newTestExplorerClient(t, wsserver.URL)
require.NoError(t, explorerClient.Start(testutils.Context(t)))

assert.PanicsWithValue(t, "send on explorer client received unsupported message type -1", func() {
explorerClient.Send(testutils.Context(t), []byte(`{"hello": "world"}`), -1)
})
explorerClient.Send(testutils.Context(t), []byte(`{"hello": "world"}`), -1)
require.Contains(t, explorerClient.HealthReport()[explorerClient.Name()].Error(), "send on explorer client received unsupported message type -1")
require.NoError(t, explorerClient.Close())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ func (tc *telemetryIngressBatchClient) Start(ctx context.Context) error {
tc.lggr.Warnw("gave up connecting to telemetry endpoint", "err", err)
} else {
tc.lggr.Criticalw("telemetry endpoint dial errored unexpectedly", "err", err)
tc.SvcErrBuffer.Append(err)
}
} else {
tc.telemClient = telemPb.NewTelemClient(conn)
Expand All @@ -137,7 +138,7 @@ func (tc *telemetryIngressBatchClient) Start(ctx context.Context) error {
// Spawns a goroutine that will eventually connect
conn, err := wsrpc.DialWithContext(ctx, tc.url.String(), wsrpc.WithTransportCreds(clientPrivKey, serverPubKey))
if err != nil {
return fmt.Errorf("Could not start TelemIngressBatchClient, Dial returned error: %v", err)
return fmt.Errorf("could not start TelemIngressBatchClient, Dial returned error: %v", err)
}
tc.telemClient = telemPb.NewTelemClient(conn)
tc.close = func() error { conn.Close(); return nil }
Expand Down Expand Up @@ -165,7 +166,7 @@ func (tc *telemetryIngressBatchClient) Name() string {
}

func (tc *telemetryIngressBatchClient) HealthReport() map[string]error {
return map[string]error{tc.Name(): tc.Healthy()}
return map[string]error{tc.Name(): tc.StartStopOnce.Healthy()}
}

// getCSAPrivateKey gets the client's CSA private key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ import (
"time"

"github.com/smartcontractkit/chainlink/core/logger"
"github.com/smartcontractkit/chainlink/core/services"
telemPb "github.com/smartcontractkit/chainlink/core/services/synchronization/telem"
"github.com/smartcontractkit/chainlink/core/utils"
)

// telemetryIngressBatchWorker pushes telemetry in batches to the ingress server via wsrpc.
// A worker is created per ContractID.
type telemetryIngressBatchWorker struct {
services.ServiceCtx

telemMaxBatchSize uint
telemSendInterval time.Duration
telemSendTimeout time.Duration
Expand Down
10 changes: 7 additions & 3 deletions core/services/synchronization/telemetry_ingress_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (tc *telemetryIngressClient) Name() string {
}

func (tc *telemetryIngressClient) HealthReport() map[string]error {
return map[string]error{tc.Name(): tc.Healthy()}
return map[string]error{tc.Name(): tc.StartStopOnce.Healthy()}
}

func (tc *telemetryIngressClient) connect(ctx context.Context, clientPrivKey []byte) {
Expand All @@ -127,8 +127,12 @@ func (tc *telemetryIngressClient) connect(ctx context.Context, clientPrivKey []b

conn, err := wsrpc.DialWithContext(ctx, tc.url.String(), wsrpc.WithTransportCreds(clientPrivKey, serverPubKey))
if err != nil {
tc.lggr.Errorf("Error connecting to telemetry ingress server: %v", err)
return
if ctx.Err() != nil {
tc.lggr.Warnw("gave up connecting to telemetry endpoint", "err", err)
} else {
tc.lggr.Criticalw("telemetry endpoint dial errored unexpectedly", "err", err)
tc.SvcErrBuffer.Append(err)
}
}
defer conn.Close()

Expand Down
2 changes: 1 addition & 1 deletion core/utils/mailbox_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (m *MailboxMonitor) Close() error {
}

func (m *MailboxMonitor) HealthReport() map[string]error {
return map[string]error{m.Name(): m.Healthy()}
return map[string]error{m.Name(): m.StartStopOnce.Healthy()}
}

func (m *MailboxMonitor) monitorLoop(c <-chan time.Time) {
Expand Down
2 changes: 1 addition & 1 deletion core/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -986,7 +986,7 @@ func (once *StartStopOnce) Ready() error {
func (once *StartStopOnce) Healthy() error {
state := once.State()
if state == StartStopOnce_Started {
return nil
return once.SvcErrBuffer.Flush()
}
return &errNotStarted{state: state}
}
Expand Down
8 changes: 8 additions & 0 deletions core/utils/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1037,4 +1037,12 @@ func TestErrorBuffer(t *testing.T) {
assert.Equal(t, err1.Error(), errs[0].Error())
})

t.Run("flushing an empty err buffer is a nil error", func(t *testing.T) {
t.Parallel()
buff := utils.ErrorBuffer{}

combined := buff.Flush()
require.Nil(t, combined)
})

}

0 comments on commit 4d2e267

Please sign in to comment.