Skip to content

Commit

Permalink
Add global metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Lazar Cvetković <[email protected]>
  • Loading branch information
cvetkovic authored and francois141 committed Mar 6, 2024
1 parent 43058af commit de9738f
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 11 deletions.
38 changes: 36 additions & 2 deletions internal/data_plane/function_metadata/statistics.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
package function_metadata

import "sync/atomic"
import (
"sync"
"sync/atomic"
)

type FunctionStatistics struct {
SuccessfulInvocations int64 `json:"SuccessfulInvocations"`
Inflight int64 `json:"Inflight"`
QueueDepth int64 `json:"QueueDepth"`
}

func (fs *FunctionStatistics) IncrementSuccessInvocations() {
func (fs *FunctionStatistics) GetSuccessfulInvocations() int64 {
return atomic.LoadInt64(&fs.SuccessfulInvocations)
}

func (fs *FunctionStatistics) IncrementSuccessfulInvocations() {
atomic.AddInt64(&fs.SuccessfulInvocations, 1)
}

func (fs *FunctionStatistics) GetInflight() int64 {
return atomic.LoadInt64(&fs.Inflight)
}

func (fs *FunctionStatistics) IncrementInflight() {
atomic.AddInt64(&fs.Inflight, 1)
}
Expand All @@ -20,10 +31,33 @@ func (fs *FunctionStatistics) DecrementInflight() {
atomic.AddInt64(&fs.Inflight, -1)
}

func (fs *FunctionStatistics) GetQueueDepth() int64 {
return atomic.LoadInt64(&fs.QueueDepth)
}

func (fs *FunctionStatistics) IncrementQueueDepth() {
atomic.AddInt64(&fs.QueueDepth, 1)
}

func (fs *FunctionStatistics) DecrementQueueDepth() {
atomic.AddInt64(&fs.QueueDepth, -1)
}

func AggregateStatistics(functions []*FunctionMetadata) *FunctionStatistics {
aggregate := &FunctionStatistics{}
wg := &sync.WaitGroup{}

wg.Add(len(functions))
for _, f := range functions {
go func(metadata *FunctionMetadata) {
defer wg.Done()

atomic.AddInt64(&aggregate.SuccessfulInvocations, metadata.GetStatistics().GetSuccessfulInvocations())
atomic.AddInt64(&aggregate.Inflight, metadata.GetStatistics().GetInflight())
atomic.AddInt64(&aggregate.QueueDepth, metadata.GetStatistics().GetQueueDepth())
}(f)
}
wg.Wait()

return aggregate
}
18 changes: 10 additions & 8 deletions internal/data_plane/proxy/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,21 @@ import (

func CreateMetricsHandler(deployments *function_metadata.Deployments) func(writer http.ResponseWriter, r *http.Request) {
return func(writer http.ResponseWriter, r *http.Request) {
var statistics *function_metadata.FunctionStatistics

service := r.URL.Query().Get("service")
if service == "" {
writer.WriteHeader(http.StatusBadRequest)
return
}
statistics = function_metadata.AggregateStatistics(deployments.ListDeployments())
} else {
metadata, _ := deployments.GetDeployment(service)
if metadata == nil {
writer.WriteHeader(http.StatusBadRequest)
return
}

metadata, _ := deployments.GetDeployment(service)
if metadata == nil {
writer.WriteHeader(http.StatusBadRequest)
return
statistics = metadata.GetStatistics()
}

statistics := metadata.GetStatistics()
data, err := json.Marshal(*statistics)
if err != nil {
writer.WriteHeader(http.StatusInternalServerError)
Expand Down
2 changes: 1 addition & 1 deletion internal/data_plane/proxy/sync_proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,6 @@ func (ps *ProxyingService) createInvocationHandler(next http.Handler) http.Handl
Proxying: time.Since(startProxy),
PersistenceLayer: 0,
}
metadata.GetStatistics().IncrementSuccessInvocations()
metadata.GetStatistics().IncrementSuccessfulInvocations()
}
}

0 comments on commit de9738f

Please sign in to comment.