Skip to content

Commit

Permalink
internal/{fetch,worker}: include DB insertion in fetch metrics
Browse files Browse the repository at this point in the history
Fetch latency and related metrics include the time spent inserting
into the DB, as well as the time to fetch and process the module.

For golang/go#48010

Change-Id: I1d685bd25f1b632b0b20de5b1bfac5003bff0caa
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/346750
Trust: Jonathan Amsterdam <[email protected]>
Run-TryBot: Jonathan Amsterdam <[email protected]>
Reviewed-by: Julie Qiu <[email protected]>
  • Loading branch information
jba committed Sep 1, 2021
1 parent bb6da52 commit ad1f0ca
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 61 deletions.
7 changes: 3 additions & 4 deletions cmd/worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"golang.org/x/pkgsite/internal"
"golang.org/x/pkgsite/internal/config"
"golang.org/x/pkgsite/internal/dcensus"
"golang.org/x/pkgsite/internal/fetch"
"golang.org/x/pkgsite/internal/index"
"golang.org/x/pkgsite/internal/log"
"golang.org/x/pkgsite/internal/middleware"
Expand Down Expand Up @@ -117,9 +116,9 @@ func main() {
worker.UnprocessedModules,
worker.UnprocessedNewModules,
worker.SheddedFetchCount,
fetch.FetchLatencyDistribution,
fetch.FetchResponseCount,
fetch.FetchPackageCount)
worker.FetchLatencyDistribution,
worker.FetchResponseCount,
worker.FetchPackageCount)
if err := dcensus.Init(cfg, views...); err != nil {
log.Fatal(ctx, err)
}
Expand Down
53 changes: 0 additions & 53 deletions internal/fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,13 @@ import (
"io/fs"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"

"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
"golang.org/x/mod/modfile"
"golang.org/x/pkgsite/internal"
"golang.org/x/pkgsite/internal/dcensus"
"golang.org/x/pkgsite/internal/derrors"
"golang.org/x/pkgsite/internal/licenses"
"golang.org/x/pkgsite/internal/log"
Expand All @@ -35,44 +29,6 @@ import (

var ErrModuleContainsNoPackages = errors.New("module contains 0 packages")

var (
fetchLatency = stats.Float64(
"go-discovery/worker/fetch-latency",
"Latency of a fetch request.",
stats.UnitSeconds,
)
fetchedPackages = stats.Int64(
"go-discovery/worker/fetch-package-count",
"Count of successfully fetched packages.",
stats.UnitDimensionless,
)

// FetchLatencyDistribution aggregates frontend fetch request
// latency by status code. It does not count shedded requests.
FetchLatencyDistribution = &view.View{
Name: "go-discovery/worker/fetch-latency",
Measure: fetchLatency,
Aggregation: ochttp.DefaultLatencyDistribution,
Description: "Fetch latency by result status.",
TagKeys: []tag.Key{dcensus.KeyStatus},
}
// FetchResponseCount counts fetch responses by status.
FetchResponseCount = &view.View{
Name: "go-discovery/worker/fetch-count",
Measure: fetchLatency,
Aggregation: view.Count(),
Description: "Fetch request count by result status",
TagKeys: []tag.Key{dcensus.KeyStatus},
}
// FetchPackageCount counts how many packages were successfully fetched.
FetchPackageCount = &view.View{
Name: "go-discovery/worker/fetch-package-count",
Measure: fetchedPackages,
Aggregation: view.Count(),
Description: "Count of packages successfully fetched",
}
)

type FetchResult struct {
ModulePath string
RequestedVersion string
Expand All @@ -96,15 +52,6 @@ type FetchResult struct {
//
// Even if err is non-nil, the result may contain useful information, like the go.mod path.
func FetchModule(ctx context.Context, modulePath, requestedVersion string, mg ModuleGetter, sourceClient *source.Client) (fr *FetchResult) {
start := time.Now()
defer func() {
latency := float64(time.Since(start).Seconds())
dcensus.RecordWithTag(ctx, dcensus.KeyStatus, strconv.Itoa(fr.Status), fetchLatency.M(latency))
if fr.Status < 300 {
stats.Record(ctx, fetchedPackages.M(int64(len(fr.PackageVersionStates))))
}
}()

fr = &FetchResult{
ModulePath: modulePath,
RequestedVersion: requestedVersion,
Expand Down
59 changes: 55 additions & 4 deletions internal/worker/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,22 @@ import (
"math"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
"unicode/utf8"

"go.opencensus.io/plugin/ochttp"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opencensus.io/trace"
"golang.org/x/mod/semver"
"golang.org/x/pkgsite/internal"
"golang.org/x/pkgsite/internal/cache"
"golang.org/x/pkgsite/internal/config"
"golang.org/x/pkgsite/internal/dcensus"
"golang.org/x/pkgsite/internal/derrors"
"golang.org/x/pkgsite/internal/experiment"
"golang.org/x/pkgsite/internal/fetch"
Expand All @@ -39,6 +43,41 @@ var (
"Count of shedded fetches.",
stats.UnitDimensionless,
)
fetchLatency = stats.Float64(
"go-discovery/worker/fetch-latency",
"Latency of a fetch request.",
stats.UnitSeconds,
)
fetchedPackages = stats.Int64(
"go-discovery/worker/fetch-package-count",
"Count of successfully fetched packages.",
stats.UnitDimensionless,
)

// FetchLatencyDistribution aggregates frontend fetch request
// latency by status code. It does not count shedded requests.
FetchLatencyDistribution = &view.View{
Name: "go-discovery/worker/fetch-latency",
Measure: fetchLatency,
Aggregation: ochttp.DefaultLatencyDistribution,
Description: "Fetch latency by result status.",
TagKeys: []tag.Key{dcensus.KeyStatus},
}
// FetchResponseCount counts fetch responses by status.
FetchResponseCount = &view.View{
Name: "go-discovery/worker/fetch-count",
Measure: fetchLatency,
Aggregation: view.Count(),
Description: "Fetch request count by result status",
TagKeys: []tag.Key{dcensus.KeyStatus},
}
// FetchPackageCount counts how many packages were successfully fetched.
FetchPackageCount = &view.View{
Name: "go-discovery/worker/fetch-package-count",
Measure: fetchedPackages,
Aggregation: view.Count(),
Description: "Count of packages successfully fetched",
}

// SheddedFetchCount counts the number of fetches that were shedded.
SheddedFetchCount = &view.View{
Expand Down Expand Up @@ -67,11 +106,22 @@ type Fetcher struct {
// the module_version_states table according to the result. It returns an HTTP
// status code representing the result of the fetch operation, and a non-nil
// error if this status code is not 200.
func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion, appVersionLabel string) (_ int, resolvedVersion string, err error) {
func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requestedVersion, appVersionLabel string) (status int, resolvedVersion string, err error) {
defer derrors.Wrap(&err, "FetchAndUpdateState(%q, %q, %q)", modulePath, requestedVersion, appVersionLabel)
tctx, span := trace.StartSpan(ctx, "FetchAndUpdateState")
ctx = experiment.NewContext(tctx, experiment.FromContext(ctx).Active()...)
ctx = log.NewContextWithLabel(ctx, "fetch", modulePath+"@"+requestedVersion)

start := time.Now()
var nPackages int64
defer func() {
latency := float64(time.Since(start).Seconds())
dcensus.RecordWithTag(ctx, dcensus.KeyStatus, strconv.Itoa(status), fetchLatency.M(latency))
if status < 300 {
stats.Record(ctx, fetchedPackages.M(nPackages))
}
}()

if !utf8.ValidString(modulePath) {
log.Errorf(ctx, "module path %q is not valid UTF-8", modulePath)
}
Expand Down Expand Up @@ -108,7 +158,8 @@ func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requested
return derrors.ToStatus(err), "", err
}
ft := f.fetchAndInsertModule(ctx, modulePath, requestedVersion, lmv)
span.AddAttributes(trace.Int64Attribute("numPackages", int64(len(ft.PackageVersionStates))))
nPackages = int64(len(ft.PackageVersionStates))
span.AddAttributes(trace.Int64Attribute("numPackages", nPackages))

// If there were any errors processing the module then we didn't insert it.
// Delete it in case we are reprocessing an existing module.
Expand Down Expand Up @@ -160,7 +211,7 @@ func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requested
// action.
// TODO(golang/go#39628): Split UpsertModuleVersionState into
// InsertModuleVersionState and UpdateModuleVersionState.
start := time.Now()
startUpdate := time.Now()
mvs := &postgres.ModuleVersionStateForUpsert{
ModulePath: ft.ModulePath,
Version: ft.ResolvedVersion,
Expand All @@ -172,7 +223,7 @@ func (f *Fetcher) FetchAndUpdateState(ctx context.Context, modulePath, requested
PackageVersionStates: ft.PackageVersionStates,
}
err = f.DB.UpsertModuleVersionState(ctx, mvs)
ft.timings["db.UpsertModuleVersionState"] = time.Since(start)
ft.timings["db.UpsertModuleVersionState"] = time.Since(startUpdate)
if err != nil {
log.Error(ctx, err)
if ft.Error != nil {
Expand Down

0 comments on commit ad1f0ca

Please sign in to comment.