Skip to content

Commit

Permalink
[apache#9177] add metrics server to go function (apache#9318)
Browse files Browse the repository at this point in the history
Fixes apache#9177

### Motivation

go function added metrics collector by apache#6105, but havnt pass `metricsPort` to go function, also not init & start prometheus http server. As the result, function worker will keep trying to access to the metrics port to collect data, which will cause massive log errors in log history.

### Modifications

- expose `metricsPort` to go function
- add prometheus http server to go function

### Verifying this change

- [x] Make sure that the change passes the CI checks.
  • Loading branch information
freeznet authored Jan 30, 2021
1 parent 88f8fa4 commit 211a125
Show file tree
Hide file tree
Showing 14 changed files with 97 additions and 5 deletions.
2 changes: 2 additions & 0 deletions pulsar-function-go/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ type Conf struct {
DeadLetterTopic string `json:"deadLetterTopic" yaml:"deadLetterTopic"`
ExpectedHealthCheckInterval int32 `json:"expectedHealthCheckInterval" yaml:"expectedHealthCheckInterval"`
UserConfig string `json:"userConfig" yaml:"userConfig"`
//metrics config
MetricsPort int `json:"metricsPort" yaml:"metricsPort"`
}

var (
Expand Down
2 changes: 2 additions & 0 deletions pulsar-function-go/conf/conf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,5 @@ disk: 0
maxMessageRetries: 0
deadLetterTopic: ""
expectedHealthCheckInterval: 3
# metrics config
metricsPort: 50001
2 changes: 1 addition & 1 deletion pulsar-function-go/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ require (
github.com/sirupsen/logrus v1.4.2
github.com/stretchr/testify v1.4.0
google.golang.org/grpc v1.27.0
google.golang.org/protobuf v1.25.0 // indirect
google.golang.org/protobuf v1.25.0
gopkg.in/yaml.v2 v2.3.0
)

Expand Down
6 changes: 6 additions & 0 deletions pulsar-function-go/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ github.com/apache/pulsar-client-go v0.3.1-0.20201201083639-154bff0bb825 h1:Rfvcn
github.com/apache/pulsar-client-go v0.3.1-0.20201201083639-154bff0bb825/go.mod h1:pTmScVVHRhbB8wh0J+m5ZzHI0Lyfe0TwfPEbYEh+JUw=
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb h1:E1P0FudxDdj2RhbveZC9i3PwukLCA/4XQSkBS/dw6/I=
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20200715083626-b9f8c5cedefb/go.mod h1:0UtvvETGDdvXNDCHa8ZQpxl+w3HbdFtfYZvDHLgWGTY=
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd h1:P5kM7jcXJ7TaftX0/EMKiSJgvQc/ct+Fw0KMvcH3WuY=
github.com/apache/pulsar-client-go/oauth2 v0.0.0-20201120111947-b8bd55bc02bd/go.mod h1:0UtvvETGDdvXNDCHa8ZQpxl+w3HbdFtfYZvDHLgWGTY=
github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
Expand All @@ -33,6 +34,7 @@ github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/danieljoos/wincred v1.0.2 h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU=
github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U=
github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32 h1:QWqadCIHYA5zja4b6h9uGQn93u1vL+G/aewImumdg/M=
github.com/datadog/zstd v1.4.6-0.20200617134701-89f69fb7df32/go.mod h1:inRp+etsHuvVqMPNTXaFlpf/Tj7wqviBtdJoPVrPEFQ=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down Expand Up @@ -76,6 +78,7 @@ github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.4.3 h1:JjCZWpVbqXDqFVmTfYWEVTMIYrL/NPdPSCHPJ0T/raM=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
Expand Down Expand Up @@ -113,6 +116,7 @@ github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/linkedin/goavro/v2 v2.9.8 h1:jN50elxBsGBDGVDEKqUlDuU1cFwJ11K/yrJCBMe/7Wg=
github.com/linkedin/goavro/v2 v2.9.8/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
Expand Down Expand Up @@ -253,11 +257,13 @@ google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg=
google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.27.0 h1:rRYRFMVgRv6E0D70Skyfsr28tDXIuuPZyWGMPdMcnXg=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
Expand Down
4 changes: 4 additions & 0 deletions pulsar-function-go/pf/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,11 @@ func (c *FunctionContext) SetCurrentRecord(record pulsar.Message) {
// GetCurrentRecord gets the current message from the function context
func (c *FunctionContext) GetCurrentRecord() pulsar.Message {
return c.record
}

//GetMetricsPort returns the port the pulsar function metrics listen on
func (c *FunctionContext) GetMetricsPort() int {
return c.instanceConf.metricsPort
}

// An unexported type to be used as the key for types in this package. This
Expand Down
3 changes: 3 additions & 0 deletions pulsar-function-go/pf/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ func (gi *goInstance) startFunction(function function) error {
servicer := InstanceControlServicer{goInstance: gi}
servicer.serve(gi)

metricsServicer := NewMetricsServicer(gi)
metricsServicer.serve()
defer metricsServicer.close()
CLOSE:
for {
idleTimer.Reset(idleDuration)
Expand Down
2 changes: 2 additions & 0 deletions pulsar-function-go/pf/instanceConf.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type instanceConf struct {
pulsarServiceURL string
killAfterIdle time.Duration
expectedHealthCheckInterval int32
metricsPort int
}

func newInstanceConf() *instanceConf {
Expand All @@ -59,6 +60,7 @@ func newInstanceConf() *instanceConf {
pulsarServiceURL: cfg.PulsarServiceURL,
killAfterIdle: cfg.KillAfterIdleMs,
expectedHealthCheckInterval: cfg.ExpectedHealthCheckInterval,
metricsPort: cfg.MetricsPort,
funcDetails: pb.FunctionDetails{
Tenant: cfg.Tenant,
Namespace: cfg.NameSpace,
Expand Down
1 change: 1 addition & 0 deletions pulsar-function-go/pf/instanceConf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func Test_newInstanceConf(t *testing.T) {
pulsarServiceURL: "pulsar://localhost:6650",
killAfterIdle: 50000,
expectedHealthCheckInterval: 3,
metricsPort: 50001,
funcDetails: pb.FunctionDetails{Tenant: "",
Namespace: "",
Name: "go-function",
Expand Down
47 changes: 46 additions & 1 deletion pulsar-function-go/pf/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
package pf

import (
"fmt"
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"

log "github.com/apache/pulsar/pulsar-function-go/logutil"
"github.com/prometheus/client_golang/prometheus"
prometheus_client "github.com/prometheus/client_model/go"
)

Expand Down Expand Up @@ -120,6 +124,11 @@ var (
Help: "Exception from system code."}, exceptionMetricsLabelNames)
)

type MetricsServicer struct {
goInstance *goInstance
server *http.Server
}

var reg *prometheus.Registry

func init() {
Expand Down Expand Up @@ -304,3 +313,39 @@ func (stat *StatWithLabelValues) reset() {
stat.statTotalSysExceptions1min.Set(0.0)
stat.statTotalReceived1min.Set(0.0)
}

func NewMetricsServicer(goInstance *goInstance) *MetricsServicer {
serveMux := http.NewServeMux()
serveMux.Handle("/metrics", promhttp.HandlerFor(
reg,
promhttp.HandlerOpts{
EnableOpenMetrics: true,
},
))
server := &http.Server{
Addr: fmt.Sprintf(":%d", goInstance.context.GetMetricsPort()),
Handler: serveMux,
}
return &MetricsServicer{
goInstance,
server,
}
}

func (s *MetricsServicer) serve() {
go func() {
// create a listener on metrics port
log.Infof("Starting metrics server on port %d", s.goInstance.context.GetMetricsPort())
err := s.server.ListenAndServe()
if err != nil {
log.Fatalf("failed to start metrics server: %v", err)
}
}()
}

func (s *MetricsServicer) close() {
err := s.server.Close()
if err != nil {
log.Fatalf("failed to close metrics server: %v", err)
}
}
18 changes: 18 additions & 0 deletions pulsar-function-go/pf/stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
package pf

import (
"fmt"
"io/ioutil"
"math"
"net/http"
"testing"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -184,3 +187,18 @@ func TestExampleSummaryVec_Pulsar(t *testing.T) {
assert.Equal(t, 61925, int(*sum))
assert.Equal(t, 2000, int(*count))
}

func TestMetricsServer(t *testing.T) {
gi := newGoInstance()
metricsServicer := NewMetricsServicer(gi)
metricsServicer.serve()
gi.stats.incrTotalReceived()

resp, err := http.Get(fmt.Sprintf("http://localhost:%d/metrics", gi.context.GetMetricsPort()))
assert.Equal(t, nil, err)
assert.Equal(t, 200, resp.StatusCode)
body, err := ioutil.ReadAll(resp.Body)
assert.Equal(t, nil, err)
assert.NotEmpty(t, body)
resp.Body.Close()
}
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,6 @@ public class GoInstanceConfig {

private int maxMessageRetries;
private String deadLetterTopic = "";

private int metricsPort;
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,8 @@ public static List<String> getArgsBeforeCmd(InstanceConfig instanceConfig, Strin
public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
String originalCodeFileName,
String pulsarServiceUrl,
boolean k8sRuntime) throws IOException {
boolean k8sRuntime,
int metricsPort) throws IOException {
final List<String> args = new LinkedList<>();
GoInstanceConfig goInstanceConfig = new GoInstanceConfig();

Expand Down Expand Up @@ -219,6 +220,10 @@ public static List<String> getGoInstanceCmd(InstanceConfig instanceConfig,
goInstanceConfig.setMaxMessageRetries(instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
}

if (metricsPort > 0 && metricsPort < 65536) {
goInstanceConfig.setMetricsPort(metricsPort);
}

goInstanceConfig.setKillAfterIdleMs(0);
goInstanceConfig.setPort(instanceConfig.getPort());

Expand Down Expand Up @@ -261,7 +266,7 @@ public static List<String> getCmd(InstanceConfig instanceConfig,
final List<String> args = new LinkedList<>();

if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
return getGoInstanceCmd(instanceConfig, originalCodeFileName, pulsarServiceUrl, k8sRuntime);
return getGoInstanceCmd(instanceConfig, originalCodeFileName, pulsarServiceUrl, k8sRuntime, metricsPort);
}

if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.JAVA) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {

instanceConfig.setFunctionDetails(functionDetails);

List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, "config", "pulsar://localhost:6650", k8sRuntime);
List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, "config", "pulsar://localhost:6650", k8sRuntime, 60000);
if (k8sRuntime) {
goInstanceConfig = new ObjectMapper().readValue(commands.get(2).replaceAll("^\'|\'$", ""), HashMap.class);
} else {
Expand Down Expand Up @@ -151,6 +151,7 @@ public void getGoInstanceCmd(boolean k8sRuntime) throws IOException {
Assert.assertEquals(goInstanceConfig.get("expectedHealthCheckInterval"), 0);
Assert.assertEquals(goInstanceConfig.get("deadLetterTopic"), "go-func-deadletter");
Assert.assertEquals(goInstanceConfig.get("userConfig"), userConfig.toString());
Assert.assertEquals(goInstanceConfig.get("metricsPort"), 60000);
}

@DataProvider(name = "k8sRuntime")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -849,6 +849,7 @@ private void verifyGolangInstance(InstanceConfig config) throws Exception {
assertEquals(goInstanceConfig.get("name"), TEST_NAME);
assertEquals(goInstanceConfig.get("expectedHealthCheckInterval"), 0);
assertEquals(goInstanceConfig.get("deadLetterTopic"), "");
assertEquals(goInstanceConfig.get("metricsPort"), 4331);

// check padding and xmx
V1Container containerSpec = container.getFunctionContainer(Collections.emptyList(), RESOURCES);
Expand Down

0 comments on commit 211a125

Please sign in to comment.