Skip to content

Commit

Permalink
Add Support for Cache and S3 related metrics in Prometheus endpoint (m…
Browse files Browse the repository at this point in the history
…inio#8591)

This PR adds support below metrics

- Cache Hit Count
- Cache Miss Count
- Data served from Cache (in Bytes)
- Bytes received from AWS S3
- Bytes sent to AWS S3
- Number of requests sent to AWS S3

Fixes minio#8549
  • Loading branch information
nitisht authored and harshavardhana committed Dec 6, 2019
1 parent d2dc964 commit 3df7285
Show file tree
Hide file tree
Showing 32 changed files with 400 additions and 86 deletions.
2 changes: 1 addition & 1 deletion cmd/admin-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1028,7 +1028,7 @@ func mustTrace(entry interface{}, trcAll, errOnly bool) bool {
if !ok {
return false
}
trace := trcAll || !hasPrefix(trcInfo.ReqInfo.Path, minioReservedBucketPath+SlashSeparator)
trace := trcAll || !HasPrefix(trcInfo.ReqInfo.Path, minioReservedBucketPath+SlashSeparator)
if errOnly {
return trace && trcInfo.RespInfo.StatusCode >= http.StatusBadRequest
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/api-headers.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func setObjectHeaders(w http.ResponseWriter, objInfo ObjectInfo, rs *HTTPRangeSp

// Set all other user defined metadata.
for k, v := range objInfo.UserDefined {
if hasPrefix(k, ReservedMetadataPrefix) {
if HasPrefix(k, ReservedMetadataPrefix) {
// Do not need to send any internal metadata
// values to client.
continue
Expand Down
2 changes: 1 addition & 1 deletion cmd/api-response.go
Original file line number Diff line number Diff line change
Expand Up @@ -526,7 +526,7 @@ func generateListObjectsV2Response(bucket, prefix, token, nextToken, startAfter,
if metadata {
content.UserMetadata = make(StringMap)
for k, v := range CleanMinioInternalMetadataKeys(object.UserDefined) {
if hasPrefix(k, ReservedMetadataPrefix) {
if HasPrefix(k, ReservedMetadataPrefix) {
// Do not need to send any internal metadata
// values to client.
continue
Expand Down
4 changes: 2 additions & 2 deletions cmd/bucket-handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ func (api objectAPIHandlers) ListMultipartUploadsHandler(w http.ResponseWriter,

if keyMarker != "" {
// Marker not common with prefix is not implemented.
if !hasPrefix(keyMarker, prefix) {
if !HasPrefix(keyMarker, prefix) {
writeErrorResponse(ctx, w, errorCodes.ToAPIErr(ErrNotImplemented), r.URL, guessIsBrowserReq(r))
return
}
Expand Down Expand Up @@ -750,7 +750,7 @@ func (api objectAPIHandlers) PostPolicyBucketHandler(w http.ResponseWriter, r *h
return
}
if objectAPI.IsEncryptionSupported() {
if crypto.IsRequested(formValues) && !hasSuffix(object, SlashSeparator) { // handle SSE requests
if crypto.IsRequested(formValues) && !HasSuffix(object, SlashSeparator) { // handle SSE requests
if crypto.SSECopy.IsRequested(r.Header) {
writeErrorResponse(ctx, w, toAPIError(ctx, errInvalidEncryptionParameters), r.URL, guessIsBrowserReq(r))
return
Expand Down
6 changes: 3 additions & 3 deletions cmd/disk-cache-backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,14 +334,14 @@ func (c *diskCache) updateMetadataIfChanged(ctx context.Context, bucket, object
bkMeta := make(map[string]string)
cacheMeta := make(map[string]string)
for k, v := range bkObjectInfo.UserDefined {
if hasPrefix(k, ReservedMetadataPrefix) {
if HasPrefix(k, ReservedMetadataPrefix) {
// Do not need to send any internal metadata
continue
}
bkMeta[http.CanonicalHeaderKey(k)] = v
}
for k, v := range cacheObjInfo.UserDefined {
if hasPrefix(k, ReservedMetadataPrefix) {
if HasPrefix(k, ReservedMetadataPrefix) {
// Do not need to send any internal metadata
continue
}
Expand Down Expand Up @@ -602,7 +602,7 @@ func (c *diskCache) Get(ctx context.Context, bucket, object string, rs *HTTPRang

var nsUnlocker = func() {}
// For a directory, we need to send an reader that returns no bytes.
if hasSuffix(object, SlashSeparator) {
if HasSuffix(object, SlashSeparator) {
// The lock taken above is released when
// objReader.Close() is called by the caller.
return NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts.CheckCopyPrecondFn, nsUnlocker)
Expand Down
64 changes: 64 additions & 0 deletions cmd/disk-cache-stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cmd

import (
"go.uber.org/atomic"
)

// CacheStats - represents bytes served from cache,
// cache hits and cache misses.
type CacheStats struct {
BytesServed atomic.Uint64
Hits atomic.Uint64
Misses atomic.Uint64
}

// Increase total bytes served from cache
func (s *CacheStats) incBytesServed(n int64) {
s.BytesServed.Add(uint64(n))
}

// Increase cache hit by 1
func (s *CacheStats) incHit() {
s.Hits.Add(uint64(1))
}

// Increase cache miss by 1
func (s *CacheStats) incMiss() {
s.Misses.Add(uint64(1))
}

// Get total bytes served
func (s *CacheStats) getBytesServed() uint64 {
return s.BytesServed.Load()
}

// Get total cache hits
func (s *CacheStats) getHits() uint64 {
return s.Hits.Load()
}

// Get total cache misses
func (s *CacheStats) getMisses() uint64 {
return s.Misses.Load()
}

// Prepare new CacheStats structure
func newCacheStats() *CacheStats {
return &CacheStats{}
}
36 changes: 29 additions & 7 deletions cmd/disk-cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type CacheObjectLayer interface {
PutObject(ctx context.Context, bucket, object string, data *PutObjReader, opts ObjectOptions) (objInfo ObjectInfo, err error)
// Storage operations.
StorageInfo(ctx context.Context) CacheStorageInfo
CacheStats() *CacheStats
}

// Abstracts disk caching - used by the S3 layer
Expand All @@ -74,6 +75,9 @@ type cacheObjects struct {
// nsMutex namespace lock
nsMutex *nsLockMap

// Cache stats
cacheStats *CacheStats

// Object functions pointing to the corresponding functions of backend implementation.
NewNSLockFn func(ctx context.Context, bucket, object string) RWLocker
GetObjectNInfoFn func(ctx context.Context, bucket, object string, rs *HTTPRangeSpec, h http.Header, lockType LockType, opts ObjectOptions) (gr *GetObjectReader, err error)
Expand Down Expand Up @@ -181,11 +185,17 @@ func (c *cacheObjects) GetObjectNInfo(ctx context.Context, bucket, object string
cacheReader, cacheErr := c.get(ctx, dcache, bucket, object, rs, h, opts)
if cacheErr == nil {
cc = cacheControlOpts(cacheReader.ObjInfo)
if !cc.isEmpty() && !cc.isStale(cacheReader.ObjInfo.ModTime) {
if !cc.isStale(cacheReader.ObjInfo.ModTime) {
// This is a cache hit, mark it so
c.cacheStats.incHit()
c.cacheStats.incBytesServed(cacheReader.ObjInfo.Size)
return cacheReader, nil
}
}

// Reaching here implies cache miss
c.cacheStats.incMiss()

objInfo, err := c.GetObjectInfoFn(ctx, bucket, object, opts)
if backendDownError(err) && cacheErr == nil {
return cacheReader, nil
Expand Down Expand Up @@ -282,10 +292,16 @@ func (c *cacheObjects) GetObjectInfo(ctx context.Context, bucket, object string,
cachedObjInfo, cerr := c.stat(ctx, dcache, bucket, object)
if cerr == nil {
cc = cacheControlOpts(cachedObjInfo)
if !cc.isEmpty() && !cc.isStale(cachedObjInfo.ModTime) {
if !cc.isStale(cachedObjInfo.ModTime) {
// This is a cache hit, mark it so
c.cacheStats.incHit()
return cachedObjInfo, nil
}
}

// Reaching here implies cache miss
c.cacheStats.incMiss()

objInfo, err := getObjectInfoFn(ctx, bucket, object, opts)
if err != nil {
if _, ok := err.(ObjectNotFound); ok {
Expand Down Expand Up @@ -332,6 +348,11 @@ func (c *cacheObjects) StorageInfo(ctx context.Context) (cInfo CacheStorageInfo)
}
}

// CacheStats - returns underlying storage statistics.
func (c *cacheObjects) CacheStats() (cs *CacheStats) {
return c.cacheStats
}

// skipCache() returns true if cache migration is in progress
func (c *cacheObjects) skipCache() bool {
c.migMutex.Lock()
Expand Down Expand Up @@ -572,11 +593,12 @@ func newServerCacheObjects(ctx context.Context, config cache.Config) (CacheObjec
}

c := &cacheObjects{
cache: cache,
exclude: config.Exclude,
migrating: migrateSw,
migMutex: sync.Mutex{},
nsMutex: newNSLock(false),
cache: cache,
exclude: config.Exclude,
migrating: migrateSw,
migMutex: sync.Mutex{},
nsMutex: newNSLock(false),
cacheStats: newCacheStats(),
GetObjectInfoFn: func(ctx context.Context, bucket, object string, opts ObjectOptions) (ObjectInfo, error) {
return newObjectLayerFn().GetObjectInfo(ctx, bucket, object, opts)
},
Expand Down
2 changes: 1 addition & 1 deletion cmd/disk-usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func walk(ctx context.Context, path string, usageFn usageFunc) error {
return err
}

if !hasSuffix(path, SlashSeparator) {
if !HasSuffix(path, SlashSeparator) {
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/fs-v1-metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (m fsMetaV1) ToObjectInfo(bucket, object string, fi os.FileInfo) ObjectInfo
m.Meta["content-type"] = mimedb.TypeByExtension(pathutil.Ext(object))
}

if hasSuffix(object, SlashSeparator) {
if HasSuffix(object, SlashSeparator) {
m.Meta["etag"] = emptyETag // For directories etag is d41d8cd98f00b204e9800998ecf8427e
m.Meta["content-type"] = "application/octet-stream"
}
Expand Down
12 changes: 9 additions & 3 deletions cmd/fs-v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ func (fs *FSObjects) GetObjectNInfo(ctx context.Context, bucket, object string,
return nil, toObjectErr(err, bucket, object)
}
// For a directory, we need to send an reader that returns no bytes.
if hasSuffix(object, SlashSeparator) {
if HasSuffix(object, SlashSeparator) {
// The lock taken above is released when
// objReader.Close() is called by the caller.
return NewGetObjectReaderFromReader(bytes.NewBuffer(nil), objInfo, opts.CheckCopyPrecondFn, nsUnlocker)
Expand Down Expand Up @@ -605,7 +605,7 @@ func (fs *FSObjects) getObject(ctx context.Context, bucket, object string, offse
}

// If its a directory request, we return an empty body.
if hasSuffix(object, SlashSeparator) {
if HasSuffix(object, SlashSeparator) {
_, err = writer.Write([]byte(""))
logger.LogIf(ctx, err)
return toObjectErr(err, bucket, object)
Expand Down Expand Up @@ -699,7 +699,7 @@ func (fs *FSObjects) defaultFsJSON(object string) fsMetaV1 {
// getObjectInfo - wrapper for reading object metadata and constructs ObjectInfo.
func (fs *FSObjects) getObjectInfo(ctx context.Context, bucket, object string) (oi ObjectInfo, e error) {
fsMeta := fsMetaV1{}
if hasSuffix(object, SlashSeparator) {
if HasSuffix(object, SlashSeparator) {
fi, err := fsStatDir(ctx, pathJoin(fs.fsPath, bucket, object))
if err != nil {
return oi, err
Expand Down Expand Up @@ -1167,6 +1167,12 @@ func (fs *FSObjects) ListObjectsHeal(ctx context.Context, bucket, prefix, marker
return ListObjectsInfo{}, NotImplemented{}
}

// GetMetrics - no op
func (fs *FSObjects) GetMetrics(ctx context.Context) (*Metrics, error) {
logger.LogIf(ctx, NotImplemented{})
return &Metrics{}, NotImplemented{}
}

// SetBucketPolicy sets policy on bucket
func (fs *FSObjects) SetBucketPolicy(ctx context.Context, bucket string, policy *policy.Policy) error {
return savePolicyConfig(ctx, fs, bucket, policy)
Expand Down
1 change: 0 additions & 1 deletion cmd/gateway-main.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,6 @@ func StartGateway(ctx *cli.Context, gw Gateway) {
globalHTTPServer.Shutdown()
logger.FatalIf(err, "Unable to initialize gateway backend")
}

newObject = NewGatewayLayerWithLocker(newObject)

// Re-enable logging
Expand Down
79 changes: 79 additions & 0 deletions cmd/gateway-metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* MinIO Cloud Storage, (C) 2019 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cmd

import (
"sync"

"go.uber.org/atomic"
)

// Metrics - represents bytes served from backend
// only implemented for S3 Gateway
type Metrics struct {
BytesReceived atomic.Uint64
BytesSent atomic.Uint64
RequestStats map[string]int
sync.RWMutex
}

// IncBytesReceived - Increase total bytes received from gateway backend
func (s *Metrics) IncBytesReceived(n int64) {
s.BytesReceived.Add(uint64(n))
}

// GetBytesReceived - Get total bytes received from gateway backend
func (s *Metrics) GetBytesReceived() uint64 {
return s.BytesReceived.Load()
}

// IncBytesSent - Increase total bytes sent to gateway backend
func (s *Metrics) IncBytesSent(n int64) {
s.BytesSent.Add(uint64(n))
}

// GetBytesSent - Get total bytes received from gateway backend
func (s *Metrics) GetBytesSent() uint64 {
return s.BytesSent.Load()
}

// IncRequests - Increase request sent to gateway backend by 1
func (s *Metrics) IncRequests(method string) {
s.Lock()
defer s.Unlock()
if s == nil {
return
}
if s.RequestStats == nil {
s.RequestStats = make(map[string]int)
}
if _, ok := s.RequestStats[method]; ok {
s.RequestStats[method]++
return
}
s.RequestStats[method] = 1
}

// GetRequests - Get total number of requests sent to gateway backend
func (s *Metrics) GetRequests() map[string]int {
return s.RequestStats
}

// NewMetrics - Prepare new Metrics structure
func NewMetrics() *Metrics {
return &Metrics{}
}
6 changes: 6 additions & 0 deletions cmd/gateway-unsupported.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ func (a GatewayUnsupported) CopyObject(ctx context.Context, srcBucket string, sr
return objInfo, NotImplemented{}
}

// GetMetrics - no op
func (a GatewayUnsupported) GetMetrics(ctx context.Context) (*Metrics, error) {
logger.LogIf(ctx, NotImplemented{})
return &Metrics{}, NotImplemented{}
}

// IsNotificationSupported returns whether bucket notification is applicable for this layer.
func (a GatewayUnsupported) IsNotificationSupported() bool {
return false
Expand Down
Loading

0 comments on commit 3df7285

Please sign in to comment.