diff --git a/internal/cli/agent.go b/internal/cli/agent.go index af3ce610d..f835855e4 100644 --- a/internal/cli/agent.go +++ b/internal/cli/agent.go @@ -165,16 +165,22 @@ func init() { "RPC certification file's path", ) cmd.Flags().StringVar( - &conf.RPC.CertFile, + &conf.RPC.KeyFile, "rpc-key-file", "", "RPC key file's path", ) cmd.Flags().IntVar( - &conf.Metrics.Port, - "metrics-port", - yorkie.DefaultMetricsPort, - "Metrics port", + &conf.Profiling.Port, + "profiling-port", + yorkie.DefaultProfilingPort, + "Profiling port", + ) + cmd.Flags().BoolVar( + &conf.Profiling.EnablePprof, + "enable-pprof", + false, + "Enable runtime profiling data via HTTP server.", ) cmd.Flags().DurationVar( &mongoConnectionTimeout, diff --git a/test/helper/helper.go b/test/helper/helper.go index e2f09eed4..46a49e156 100644 --- a/test/helper/helper.go +++ b/test/helper/helper.go @@ -29,7 +29,7 @@ import ( "github.com/yorkie-team/yorkie/yorkie/backend" "github.com/yorkie-team/yorkie/yorkie/backend/db/mongo" "github.com/yorkie-team/yorkie/yorkie/backend/sync/etcd" - "github.com/yorkie-team/yorkie/yorkie/metrics/prometheus" + "github.com/yorkie-team/yorkie/yorkie/profiling" "github.com/yorkie-team/yorkie/yorkie/rpc" ) @@ -38,7 +38,7 @@ var testStartedAt int64 // Below are the values of the Yorkie config used in the test. const ( RPCPort = 21101 - MetricsPort = 21102 + ProfilingPort = 21102 MongoConnectionURI = "mongodb://localhost:27017" MongoConnectionTimeout = "5s" MongoPingTimeout = "5s" @@ -121,8 +121,8 @@ func TestConfig(authWebhook string) *yorkie.Config { RPC: &rpc.Config{ Port: RPCPort + portOffset, }, - Metrics: &prometheus.Config{ - Port: MetricsPort + portOffset, + Profiling: &profiling.Config{ + Port: ProfilingPort + portOffset, }, Backend: &backend.Config{ SnapshotThreshold: SnapshotThreshold, diff --git a/yorkie/backend/backend.go b/yorkie/backend/backend.go index ace6504e9..22f92f79f 100644 --- a/yorkie/backend/backend.go +++ b/yorkie/backend/backend.go @@ -30,7 +30,7 @@ import ( "github.com/yorkie-team/yorkie/yorkie/backend/sync" "github.com/yorkie-team/yorkie/yorkie/backend/sync/etcd" "github.com/yorkie-team/yorkie/yorkie/backend/sync/memory" - "github.com/yorkie-team/yorkie/yorkie/metrics" + "github.com/yorkie-team/yorkie/yorkie/profiling" ) const authWebhookCacheSize = 5000 @@ -43,7 +43,7 @@ type Backend struct { DB db.DB Coordinator sync.Coordinator - Metrics metrics.Metrics + Metrics profiling.Metrics AuthWebhookCache *cache.LRUExpireCache // closing is closed by backend close. @@ -63,7 +63,7 @@ func New( mongoConf *mongo.Config, etcdConf *etcd.Config, rpcAddr string, - met metrics.Metrics, + met profiling.Metrics, ) (*Backend, error) { hostname, err := os.Hostname() if err != nil { diff --git a/yorkie/backend/sync/memory/pubsub.go b/yorkie/backend/sync/memory/pubsub.go index 91bffb675..d2db057d8 100644 --- a/yorkie/backend/sync/memory/pubsub.go +++ b/yorkie/backend/sync/memory/pubsub.go @@ -18,6 +18,7 @@ package memory import ( gosync "sync" + gotime "time" "go.uber.org/zap" @@ -201,7 +202,18 @@ func (m *PubSub) Publish( sub.SubscriberID(), ) } - sub.Events() <- event + + // NOTE: When a subscription is being closed by a subscriber, + // the subscriber may not receive messages. + select { + case sub.Events() <- event: + case <-gotime.After(100 * gotime.Millisecond): + log.Logger.Warn( + `Publish(%s,%s) to %s timeout`, + k, + publisherID.String(), + sub.SubscriberID()) + } } } if log.Core.Enabled(zap.DebugLevel) { diff --git a/yorkie/config.go b/yorkie/config.go index 79d256842..91cd828cb 100644 --- a/yorkie/config.go +++ b/yorkie/config.go @@ -28,14 +28,14 @@ import ( "github.com/yorkie-team/yorkie/yorkie/backend" "github.com/yorkie-team/yorkie/yorkie/backend/db/mongo" "github.com/yorkie-team/yorkie/yorkie/backend/sync/etcd" - "github.com/yorkie-team/yorkie/yorkie/metrics/prometheus" + "github.com/yorkie-team/yorkie/yorkie/profiling" "github.com/yorkie-team/yorkie/yorkie/rpc" ) // Below are the values of the default values of Yorkie config. const ( - DefaultRPCPort = 11101 - DefaultMetricsPort = 11102 + DefaultRPCPort = 11101 + DefaultProfilingPort = 11102 DefaultMongoConnectionURI = "mongodb://localhost:27017" DefaultMongoConnectionTimeout = 5 * time.Second @@ -53,17 +53,17 @@ const ( // Config is the configuration for creating a Yorkie instance. type Config struct { - RPC *rpc.Config `yaml:"RPC"` - Metrics *prometheus.Config `yaml:"Metrics"` - Backend *backend.Config `yaml:"Backend"` - Mongo *mongo.Config `yaml:"Mongo"` - ETCD *etcd.Config `yaml:"ETCD"` + RPC *rpc.Config `yaml:"RPC"` + Profiling *profiling.Config `yaml:"Profiling"` + Backend *backend.Config `yaml:"Backend"` + Mongo *mongo.Config `yaml:"Mongo"` + ETCD *etcd.Config `yaml:"ETCD"` } // NewConfig returns a Config struct that contains reasonable defaults // for most of the configurations. func NewConfig() *Config { - return newConfig(DefaultRPCPort, DefaultMetricsPort, DefaultMongoYorkieDatabase) + return newConfig(DefaultRPCPort, DefaultProfilingPort, DefaultMongoYorkieDatabase) } // NewConfigFromFile returns a Config struct for the given conf file. @@ -95,7 +95,7 @@ func (c *Config) Validate() error { return err } - if err := c.Metrics.Validate(); err != nil { + if err := c.Profiling.Validate(); err != nil { return err } @@ -120,8 +120,8 @@ func (c *Config) ensureDefaultValue() { c.RPC.Port = DefaultRPCPort } - if c.Metrics.Port == 0 { - c.Metrics.Port = DefaultMetricsPort + if c.Profiling.Port == 0 { + c.Profiling.Port = DefaultProfilingPort } if c.Mongo.ConnectionTimeout == "" { @@ -175,13 +175,13 @@ func (c *Config) ensureDefaultValue() { } } -func newConfig(port int, metricsPort int, dbName string) *Config { +func newConfig(port int, profilingPort int, dbName string) *Config { return &Config{ RPC: &rpc.Config{ Port: port, }, - Metrics: &prometheus.Config{ - Port: metricsPort, + Profiling: &profiling.Config{ + Port: profilingPort, }, Backend: &backend.Config{ SnapshotThreshold: DefaultSnapshotThreshold, diff --git a/yorkie/config.sample.yml b/yorkie/config.sample.yml index 072f59b04..f0835fa27 100644 --- a/yorkie/config.sample.yml +++ b/yorkie/config.sample.yml @@ -9,10 +9,14 @@ RPC: # KeyFile is the file containing the TLS private key. KeyFile: "" -Metrics: - # Port to listen on for Prometheus metrics `/metrics` (default: 11102). +# Profiling is the configuration for the profiling server. +Profiling: + # Port is the port to listen on for serving metrics `/metrics` and pprof (default: 11102). Port: 11102 + # EnablePprof is whether to enable the pprof `/debug/pprof` endpoint. + EnablePprof: false + # Backend is the configuration for the backend of Yorkie. Backend: # SnapshotThreshold is the threshold that determines if changes should be diff --git a/yorkie/metrics/prometheus/server.go b/yorkie/metrics/prometheus/server.go deleted file mode 100644 index 7e7ca16f7..000000000 --- a/yorkie/metrics/prometheus/server.go +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Copyright 2021 The Yorkie Authors. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package prometheus - -import ( - "context" - "fmt" - "net/http" - - "github.com/prometheus/client_golang/prometheus/promhttp" - - "github.com/yorkie-team/yorkie/internal/log" -) - -// Server provides application-specific and Go metrics. -type Server struct { - conf *Config - metrics *Metrics - metricsServer *http.Server -} - -// NewServer creates an instance of Server. -func NewServer(conf *Config, met *Metrics) *Server { - return &Server{ - conf: conf, - metrics: met, - metricsServer: &http.Server{ - Addr: fmt.Sprintf(":%d", conf.Port), - }, - } -} - -func (s *Server) listenAndServe() error { - go func() { - log.Logger.Infof(fmt.Sprintf("serving Metrics on %d", s.conf.Port)) - serveMux := http.NewServeMux() - serveMux.Handle("/metrics", promhttp.HandlerFor(s.metrics.Registry(), promhttp.HandlerOpts{})) - s.metricsServer.Handler = serveMux - if err := s.metricsServer.ListenAndServe(); err != http.ErrServerClosed { - log.Logger.Error("HTTP server ListenAndServe: %v", err) - } - }() - return nil -} - -// Start registers application-specific metrics and starts the HTTP server. -func (s *Server) Start() error { - return s.listenAndServe() -} - -// Shutdown closes the server. -func (s *Server) Shutdown(graceful bool) { - if graceful { - if err := s.metricsServer.Shutdown(context.Background()); err != nil { - log.Logger.Error("HTTP server Shutdown: %v", err) - } - return - } - - if err := s.metricsServer.Close(); err != nil { - log.Logger.Error("HTTP server Close: %v", err) - } -} diff --git a/yorkie/metrics/prometheus/config.go b/yorkie/profiling/config.go similarity index 79% rename from yorkie/metrics/prometheus/config.go rename to yorkie/profiling/config.go index 9f34ba7a7..71c27c345 100644 --- a/yorkie/metrics/prometheus/config.go +++ b/yorkie/profiling/config.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package prometheus +package profiling import ( "errors" @@ -22,19 +22,20 @@ import ( ) var ( - // ErrInvalidMetricPort occurs when the port in the config is invalid. - ErrInvalidMetricPort = errors.New("invalid port number for metric server") + // ErrInvalidProfilingPort occurs when the port in the config is invalid. + ErrInvalidProfilingPort = errors.New("invalid port number for profiling server") ) // Config is the configuration for creating a Server instance. type Config struct { - Port int + Port int + EnablePprof bool } // Validate validates the port number. func (c *Config) Validate() error { if c.Port < 1 || 65535 < c.Port { - return fmt.Errorf("must be between 1 and 65535, given %d: %w", c.Port, ErrInvalidMetricPort) + return fmt.Errorf("must be between 1 and 65535, given %d: %w", c.Port, ErrInvalidProfilingPort) } return nil diff --git a/yorkie/metrics/prometheus/config_test.go b/yorkie/profiling/config_test.go similarity index 72% rename from yorkie/metrics/prometheus/config_test.go rename to yorkie/profiling/config_test.go index 094352720..3b2648312 100644 --- a/yorkie/metrics/prometheus/config_test.go +++ b/yorkie/profiling/config_test.go @@ -14,24 +14,24 @@ * limitations under the License. */ -package prometheus_test +package profiling_test import ( "testing" "github.com/stretchr/testify/assert" - "github.com/yorkie-team/yorkie/yorkie/metrics/prometheus" + "github.com/yorkie-team/yorkie/yorkie/profiling" ) func TestConfig(t *testing.T) { scenarios := []*struct { - config *prometheus.Config + config *profiling.Config expected error }{ - {config: &prometheus.Config{Port: -1}, expected: prometheus.ErrInvalidMetricPort}, - {config: &prometheus.Config{Port: 0}, expected: prometheus.ErrInvalidMetricPort}, - {config: &prometheus.Config{Port: 11102}, expected: nil}, + {config: &profiling.Config{Port: -1}, expected: profiling.ErrInvalidProfilingPort}, + {config: &profiling.Config{Port: 0}, expected: profiling.ErrInvalidProfilingPort}, + {config: &profiling.Config{Port: 11102}, expected: nil}, } for _, scenario := range scenarios { assert.ErrorIs(t, scenario.config.Validate(), scenario.expected, "provided config: %#v", scenario.config) diff --git a/yorkie/metrics/metrics.go b/yorkie/profiling/metrics.go similarity index 98% rename from yorkie/metrics/metrics.go rename to yorkie/profiling/metrics.go index e9ebdca2d..e0053b47e 100644 --- a/yorkie/metrics/metrics.go +++ b/yorkie/profiling/metrics.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package metrics +package profiling import "google.golang.org/grpc" diff --git a/yorkie/metrics/prometheus/metrics.go b/yorkie/profiling/prometheus/metrics.go similarity index 100% rename from yorkie/metrics/prometheus/metrics.go rename to yorkie/profiling/prometheus/metrics.go diff --git a/yorkie/profiling/server.go b/yorkie/profiling/server.go new file mode 100644 index 000000000..c50083e40 --- /dev/null +++ b/yorkie/profiling/server.go @@ -0,0 +1,95 @@ +/* + * Copyright 2021 The Yorkie Authors. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package profiling + +import ( + "context" + "fmt" + "net/http" + "net/http/pprof" + + "github.com/yorkie-team/yorkie/internal/log" +) + +const httpPrefixPProf = "/debug/pprof" + +// Server serves information for profiling, such as metrics and pprof information. +type Server struct { + conf *Config + serveMux *http.ServeMux + httpServer *http.Server +} + +// NewServer creates an instance of Server. +func NewServer(conf *Config) *Server { + serveMux := http.NewServeMux() + if conf.EnablePprof { + serveMux.Handle(httpPrefixPProf+"/", http.HandlerFunc(pprof.Index)) + serveMux.Handle(httpPrefixPProf+"/profile", http.HandlerFunc(pprof.Profile)) + serveMux.Handle(httpPrefixPProf+"/symbol", http.HandlerFunc(pprof.Symbol)) + serveMux.Handle(httpPrefixPProf+"/cmdline", http.HandlerFunc(pprof.Cmdline)) + serveMux.Handle(httpPrefixPProf+"/trace", http.HandlerFunc(pprof.Trace)) + serveMux.Handle(httpPrefixPProf+"/heap", pprof.Handler("heap")) + serveMux.Handle(httpPrefixPProf+"/goroutine", pprof.Handler("goroutine")) + serveMux.Handle(httpPrefixPProf+"/threadcreate", pprof.Handler("threadcreate")) + serveMux.Handle(httpPrefixPProf+"/block", pprof.Handler("block")) + serveMux.Handle(httpPrefixPProf+"/mutex", pprof.Handler("mutex")) + } + + return &Server{ + conf: conf, + serveMux: serveMux, + httpServer: &http.Server{ + Addr: fmt.Sprintf(":%d", conf.Port), + }, + } +} + +// Handle handles the given handler. +func (s *Server) Handle(pattern string, handler http.Handler) { + s.serveMux.Handle(pattern, handler) +} + +// Start starts the server. +func (s *Server) Start() error { + return s.listenAndServe() +} + +// Shutdown closes the server. +func (s *Server) Shutdown(graceful bool) { + if graceful { + if err := s.httpServer.Shutdown(context.Background()); err != nil { + log.Logger.Error("HTTP server Shutdown: %v", err) + } + return + } + + if err := s.httpServer.Close(); err != nil { + log.Logger.Error("HTTP server Close: %v", err) + } +} + +func (s *Server) listenAndServe() error { + go func() { + log.Logger.Infof(fmt.Sprintf("serving profiling on %d", s.conf.Port)) + s.httpServer.Handler = s.serveMux + if err := s.httpServer.ListenAndServe(); err != http.ErrServerClosed { + log.Logger.Error("HTTP server ListenAndServe: %v", err) + } + }() + return nil +} diff --git a/yorkie/metrics/prometheus/server_test.go b/yorkie/profiling/server_test.go similarity index 71% rename from yorkie/metrics/prometheus/server_test.go rename to yorkie/profiling/server_test.go index 2275867c6..82eee6128 100644 --- a/yorkie/metrics/prometheus/server_test.go +++ b/yorkie/profiling/server_test.go @@ -14,7 +14,7 @@ * limitations under the License. */ -package prometheus_test +package profiling_test import ( "testing" @@ -22,23 +22,19 @@ import ( "github.com/stretchr/testify/assert" "github.com/yorkie-team/yorkie/test/helper" - "github.com/yorkie-team/yorkie/yorkie/metrics/prometheus" + "github.com/yorkie-team/yorkie/yorkie/profiling" ) const ( - // to avoid conflict with metrics port used for client test - testMetricsPort = helper.MetricsPort + 100 + // to avoid conflict with profiling port used for client test + testProfilingPort = helper.ProfilingPort + 100 ) func TestMetricsServer(t *testing.T) { t.Run("new server test", func(t *testing.T) { - met, err := prometheus.NewMetrics() - assert.NoError(t, err) - server := prometheus.NewServer( - &prometheus.Config{Port: testMetricsPort}, - met, + server := profiling.NewServer( + &profiling.Config{Port: testProfilingPort}, ) - assert.NoError(t, err) assert.NotNil(t, server) server.Shutdown(true) }) diff --git a/yorkie/rpc/server.go b/yorkie/rpc/server.go index 49719fe36..0d45e2da7 100644 --- a/yorkie/rpc/server.go +++ b/yorkie/rpc/server.go @@ -112,7 +112,7 @@ func (s *Server) listenAndServeGRPC() error { } go func() { - log.Logger.Infof("serving API on %d", s.conf.Port) + log.Logger.Infof("serving RPC on %d", s.conf.Port) if err := s.grpcServer.Serve(lis); err != nil { if err != grpc.ErrServerStopped { diff --git a/yorkie/rpc/server_test.go b/yorkie/rpc/server_test.go index 4ecf59cf4..9f0181377 100644 --- a/yorkie/rpc/server_test.go +++ b/yorkie/rpc/server_test.go @@ -18,7 +18,7 @@ import ( "github.com/yorkie-team/yorkie/yorkie/backend" "github.com/yorkie-team/yorkie/yorkie/backend/db/mongo" "github.com/yorkie-team/yorkie/yorkie/backend/sync/etcd" - "github.com/yorkie-team/yorkie/yorkie/metrics/prometheus" + "github.com/yorkie-team/yorkie/yorkie/profiling/prometheus" "github.com/yorkie-team/yorkie/yorkie/rpc" ) diff --git a/yorkie/yorkie.go b/yorkie/yorkie.go index f519d0879..9a349cf7a 100644 --- a/yorkie/yorkie.go +++ b/yorkie/yorkie.go @@ -19,10 +19,13 @@ package yorkie import ( gosync "sync" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/yorkie-team/yorkie/internal/log" "github.com/yorkie-team/yorkie/yorkie/backend" "github.com/yorkie-team/yorkie/yorkie/backend/sync" - "github.com/yorkie-team/yorkie/yorkie/metrics/prometheus" + "github.com/yorkie-team/yorkie/yorkie/profiling" + "github.com/yorkie-team/yorkie/yorkie/profiling/prometheus" "github.com/yorkie-team/yorkie/yorkie/rpc" ) @@ -32,10 +35,10 @@ import ( type Yorkie struct { lock gosync.Mutex - conf *Config - backend *backend.Backend - rpcServer *rpc.Server - metricsServer *prometheus.Server + conf *Config + backend *backend.Backend + rpcServer *rpc.Server + profilingServer *profiling.Server shutdown bool shutdownCh chan struct{} @@ -47,7 +50,7 @@ func New(conf *Config) (*Yorkie, error) { return nil, err } - met, err := prometheus.NewMetrics() + metrics, err := prometheus.NewMetrics() if err != nil { return nil, err } @@ -57,7 +60,7 @@ func New(conf *Config) (*Yorkie, error) { conf.Mongo, conf.ETCD, conf.RPCAddr(), - met, + metrics, ) if err != nil { return nil, err @@ -68,16 +71,17 @@ func New(conf *Config) (*Yorkie, error) { return nil, err } - var metricsServer *prometheus.Server - if conf.Metrics != nil { - metricsServer = prometheus.NewServer(conf.Metrics, met) + var profilingServer *profiling.Server + if conf.Profiling != nil { + profilingServer = profiling.NewServer(conf.Profiling) + profilingServer.Handle("/metrics", promhttp.HandlerFor(metrics.Registry(), promhttp.HandlerOpts{})) } return &Yorkie{ - conf: conf, - backend: be, - rpcServer: rpcServer, - metricsServer: metricsServer, + conf: conf, + backend: be, + rpcServer: rpcServer, + profilingServer: profilingServer, shutdownCh: make(chan struct{}), }, nil @@ -88,8 +92,8 @@ func (r *Yorkie) Start() error { r.lock.Lock() defer r.lock.Unlock() - if r.metricsServer != nil { - err := r.metricsServer.Start() + if r.profilingServer != nil { + err := r.profilingServer.Start() if err != nil { log.Logger.Error(err) return err @@ -107,8 +111,8 @@ func (r *Yorkie) Shutdown(graceful bool) error { } r.rpcServer.Shutdown(graceful) - if r.metricsServer != nil { - r.metricsServer.Shutdown(graceful) + if r.profilingServer != nil { + r.profilingServer.Shutdown(graceful) } if err := r.backend.Close(); err != nil {