Skip to content

Commit

Permalink
[apache#30703][prism] Update logging handling (apache#32826)
Browse files Browse the repository at this point in the history
* Migrate to standard library slog package

* Add dev logger dependency for pre printed development logs

* Improve logging output for prism and user side logs, and emit container logs.

* Fix missed lines from artifact and worker.

---------

Co-authored-by: lostluck <[email protected]>
  • Loading branch information
lostluck and lostluck authored Oct 20, 2024
1 parent a163dd0 commit 1ba33b8
Show file tree
Hide file tree
Showing 21 changed files with 153 additions and 65 deletions.
4 changes: 3 additions & 1 deletion sdks/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// directory.
module github.com/apache/beam/sdks/v2

go 1.21
go 1.21.0

require (
cloud.google.com/go/bigquery v1.63.1
Expand Down Expand Up @@ -69,6 +69,8 @@ require (
require (
github.com/avast/retry-go/v4 v4.6.0
github.com/fsouza/fake-gcs-server v1.49.2
github.com/golang-cz/devslog v0.0.11
github.com/golang/protobuf v1.5.4
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
)

Expand Down
2 changes: 2 additions & 0 deletions sdks/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,8 @@ github.com/goccy/go-json v0.10.2 h1:CrxCmQqYDkv1z7lO7Wbh2HN93uovUHgrECaO5ZrCXAU=
github.com/goccy/go-json v0.10.2/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-cz/devslog v0.0.11 h1:v4Yb9o0ZpuZ/D8ZrtVw1f9q5XrjnkxwHF1XmWwO8IHg=
github.com/golang-cz/devslog v0.0.11/go.mod h1:bSe5bm0A7Nyfqtijf1OMNgVJHlWEuVSXnkuASiE1vV8=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/glog v1.0.0/go.mod h1:EWib/APOK0SL3dFbYqvxE3UYd8E6s1ouQ7iEp/0LWV4=
Expand Down
47 changes: 47 additions & 0 deletions sdks/go/cmd/prism/prism.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,14 @@ import (
"flag"
"fmt"
"log"
"log/slog"
"os"
"strings"
"time"

jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism"
"github.com/golang-cz/devslog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
Expand All @@ -37,10 +42,52 @@ var (
idleShutdownTimeout = flag.Duration("idle_shutdown_timeout", -1, "duration that prism will wait for a new job before shutting itself down. Negative durations disable auto shutdown. Defaults to never shutting down.")
)

// Logging flags
var (
debug = flag.Bool("debug", false,
"Enables full verbosity debug logging from the runner by default. Used to build SDKs or debug Prism itself.")
logKind = flag.String("log_kind", "dev",
"Determines the format of prism's logging to std err: valid values are `dev', 'json', or 'text'. Default is `dev`.")
)

var logLevel = new(slog.LevelVar)

func main() {
flag.Parse()
ctx, cancel := context.WithCancelCause(context.Background())

var logHandler slog.Handler
loggerOutput := os.Stderr
handlerOpts := &slog.HandlerOptions{
Level: logLevel,
AddSource: *debug,
}
if *debug {
logLevel.Set(slog.LevelDebug)
// Print the Prism source line for a log in debug mode.
handlerOpts.AddSource = true
}
switch strings.ToLower(*logKind) {
case "dev":
logHandler =
devslog.NewHandler(loggerOutput, &devslog.Options{
TimeFormat: "[" + time.RFC3339Nano + "]",
StringerFormatter: true,
HandlerOptions: handlerOpts,
StringIndentation: false,
NewLineAfterLog: true,
MaxErrorStackTrace: 3,
})
case "json":
logHandler = slog.NewJSONHandler(loggerOutput, handlerOpts)
case "text":
logHandler = slog.NewTextHandler(loggerOutput, handlerOpts)
default:
log.Fatalf("Invalid value for log_kind: %v, must be 'dev', 'json', or 'text'", *logKind)
}

slog.SetDefault(slog.New(logHandler))

cli, err := makeJobClient(ctx,
prism.Options{
Port: *jobPort,
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/metricsx/metricsx.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ import (
"bytes"
"fmt"
"log"
"log/slog"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"golang.org/x/exp/slog"
)

// FromMonitoringInfos extracts metrics from monitored states and
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/coders.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"log/slog"
"strings"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
Expand All @@ -28,7 +29,6 @@ import (
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"golang.org/x/exp/slog"
"google.golang.org/protobuf/encoding/prototext"
)

Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/engine/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package engine
import (
"bytes"
"fmt"
"log/slog"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/coder"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/exec"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"golang.org/x/exp/slog"
)

// StateData is a "union" between Bag state and MultiMap state to increase common code.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"context"
"fmt"
"io"
"log/slog"
"sort"
"strings"
"sync"
Expand All @@ -36,7 +37,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/internal/errors"
"golang.org/x/exp/maps"
"golang.org/x/exp/slog"
)

type element struct {
Expand Down Expand Up @@ -1607,7 +1607,7 @@ func (ss *stageState) bundleReady(em *ElementManager, emNow mtime.Time) (mtime.T
inputW := ss.input
_, upstreamW := ss.UpstreamWatermark()
if inputW == upstreamW {
slog.Debug("bundleReady: insufficient upstream watermark",
slog.Debug("bundleReady: unchanged upstream watermark",
slog.String("stage", ss.ID),
slog.Group("watermark",
slog.Any("upstream", upstreamW),
Expand Down
32 changes: 25 additions & 7 deletions sdks/go/pkg/beam/runners/prism/internal/environments.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (
"context"
"fmt"
"io"
"log/slog"
"os"

fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/jobservices"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
"golang.org/x/exp/slog"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
Expand All @@ -42,7 +42,7 @@ import (
// TODO move environment handling to the worker package.

func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *worker.W) error {
logger := slog.With(slog.String("envID", wk.Env))
logger := j.Logger.With(slog.String("envID", wk.Env))
// TODO fix broken abstraction.
// We're starting a worker pool here, because that's the loopback environment.
// It's sort of a mess, largely because of loopback, which has
Expand All @@ -56,7 +56,7 @@ func runEnvironment(ctx context.Context, j *jobservices.Job, env string, wk *wor
}
go func() {
externalEnvironment(ctx, ep, wk)
slog.Debug("environment stopped", slog.String("job", j.String()))
logger.Debug("environment stopped", slog.String("job", j.String()))
}()
return nil
case urns.EnvDocker:
Expand Down Expand Up @@ -129,6 +129,8 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock
credEnv := fmt.Sprintf("%v=%v", gcloudCredsEnv, dockerGcloudCredsFile)
envs = append(envs, credEnv)
}
} else {
logger.Debug("local GCP credentials environment variable not found")
}
if _, _, err := cli.ImageInspectWithRaw(ctx, dp.GetContainerImage()); err != nil {
// We don't have a local image, so we should pull it.
Expand All @@ -140,6 +142,7 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock
logger.Warn("unable to pull image and it's not local", "error", err)
}
}
logger.Debug("creating container", "envs", envs, "mounts", mounts)

ccr, err := cli.ContainerCreate(ctx, &container.Config{
Image: dp.GetContainerImage(),
Expand Down Expand Up @@ -169,17 +172,32 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock
return fmt.Errorf("unable to start container image %v with docker for env %v, err: %w", dp.GetContainerImage(), wk.Env, err)
}

logger.Debug("container started")

// Start goroutine to wait on container state.
go func() {
defer cli.Close()
defer wk.Stop()
defer func() {
logger.Debug("container stopped")
}()

statusCh, errCh := cli.ContainerWait(ctx, containerID, container.WaitConditionNotRunning)
bgctx := context.Background()
statusCh, errCh := cli.ContainerWait(bgctx, containerID, container.WaitConditionNotRunning)
select {
case <-ctx.Done():
// Can't use command context, since it's already canceled here.
err := cli.ContainerKill(context.Background(), containerID, "")
rc, err := cli.ContainerLogs(bgctx, containerID, container.LogsOptions{Details: true, ShowStdout: true, ShowStderr: true})
if err != nil {
logger.Error("error fetching container logs error on context cancellation", "error", err)
}
if rc != nil {
defer rc.Close()
var buf bytes.Buffer
stdcopy.StdCopy(&buf, &buf, rc)
logger.Info("container being killed", slog.Any("cause", context.Cause(ctx)), slog.Any("containerLog", buf))
}
// Can't use command context, since it's already canceled here.
if err := cli.ContainerKill(bgctx, containerID, ""); err != nil {
logger.Error("docker container kill error", "error", err)
}
case err := <-errCh:
Expand All @@ -189,7 +207,7 @@ func dockerEnvironment(ctx context.Context, logger *slog.Logger, dp *pipepb.Dock
case resp := <-statusCh:
logger.Info("docker container has self terminated", "status_code", resp.StatusCode)

rc, err := cli.ContainerLogs(ctx, containerID, container.LogsOptions{Details: true, ShowStdout: true, ShowStderr: true})
rc, err := cli.ContainerLogs(bgctx, containerID, container.LogsOptions{Details: true, ShowStdout: true, ShowStderr: true})
if err != nil {
logger.Error("docker container logs error", "error", err)
}
Expand Down
14 changes: 7 additions & 7 deletions sdks/go/pkg/beam/runners/prism/internal/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"errors"
"fmt"
"io"
"log/slog"
"sort"
"sync/atomic"
"time"
Expand All @@ -34,7 +35,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
"golang.org/x/exp/maps"
"golang.org/x/exp/slog"
"golang.org/x/sync/errgroup"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -311,7 +311,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
return fmt.Errorf("prism error building stage %v: \n%w", stage.ID, err)
}
stages[stage.ID] = stage
slog.Debug("pipelineBuild", slog.Group("stage", slog.String("ID", stage.ID), slog.String("transformName", t.GetUniqueName())))
j.Logger.Debug("pipelineBuild", slog.Group("stage", slog.String("ID", stage.ID), slog.String("transformName", t.GetUniqueName())))
outputs := maps.Keys(stage.OutputsToCoders)
sort.Strings(outputs)
em.AddStage(stage.ID, []string{stage.primaryInput}, outputs, stage.sideInputs)
Expand All @@ -322,9 +322,7 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
em.StageProcessingTimeTimers(stage.ID, stage.processingTimeTimers)
}
default:
err := fmt.Errorf("unknown environment[%v]", t.GetEnvironmentId())
slog.Error("Execute", err)
return err
return fmt.Errorf("unknown environment[%v]", t.GetEnvironmentId())
}
}

Expand All @@ -344,11 +342,13 @@ func executePipeline(ctx context.Context, wks map[string]*worker.W, j *jobservic
for {
select {
case <-ctx.Done():
return context.Cause(ctx)
err := context.Cause(ctx)
j.Logger.Debug("context canceled", slog.Any("cause", err))
return err
case rb, ok := <-bundles:
if !ok {
err := eg.Wait()
slog.Debug("pipeline done!", slog.String("job", j.String()), slog.Any("error", err))
j.Logger.Debug("pipeline done!", slog.String("job", j.String()), slog.Any("error", err), slog.Any("topo", topo))
return err
}
eg.Go(func() error {
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/handlerunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"bytes"
"fmt"
"io"
"log/slog"
"reflect"
"sort"

Expand All @@ -31,7 +32,6 @@ import (
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/engine"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/worker"
"golang.org/x/exp/slog"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@ import (
"context"
"fmt"
"io"
"log/slog"

jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
"golang.org/x/exp/slog"
"google.golang.org/protobuf/encoding/prototext"
)

Expand Down Expand Up @@ -77,7 +77,7 @@ func (s *Server) ReverseArtifactRetrievalService(stream jobpb.ArtifactStagingSer

case *jobpb.ArtifactResponseWrapper_ResolveArtifactResponse:
err := fmt.Errorf("unexpected ResolveArtifactResponse to GetArtifact: %v", in.GetResponse())
slog.Error("GetArtifact failure", err)
slog.Error("GetArtifact failure", slog.Any("error", err))
return err
}
}
Expand Down
4 changes: 3 additions & 1 deletion sdks/go/pkg/beam/runners/prism/internal/jobservices/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ package jobservices
import (
"context"
"fmt"
"log/slog"
"sort"
"strings"
"sync"
Expand All @@ -37,7 +38,6 @@ import (
jobpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/jobmanagement_v1"
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"golang.org/x/exp/slog"
"google.golang.org/protobuf/types/known/structpb"
)

Expand Down Expand Up @@ -88,6 +88,8 @@ type Job struct {
// Context used to terminate this job.
RootCtx context.Context
CancelFn context.CancelCauseFunc
// Logger for this job.
Logger *slog.Logger

metrics metricsStore
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"sync/atomic"

Expand All @@ -27,7 +28,6 @@ import (
pipepb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/pipeline_v1"
"github.com/apache/beam/sdks/v2/go/pkg/beam/runners/prism/internal/urns"
"golang.org/x/exp/maps"
"golang.org/x/exp/slog"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -92,6 +92,7 @@ func (s *Server) Prepare(ctx context.Context, req *jobpb.PrepareJobRequest) (_ *
cancelFn(err)
terminalOnceWrap()
},
Logger: s.logger, // TODO substitute with a configured logger.
artifactEndpoint: s.Endpoint(),
}
// Stop the idle timer when a new job appears.
Expand Down
Loading

0 comments on commit 1ba33b8

Please sign in to comment.