Skip to content

Commit

Permalink
hubble: Refactor payload parser to return v1.Event
Browse files Browse the repository at this point in the history
This changes the signature of the monitor payload parser to return a
`v1.Event` instead of `flowpb.Flow` type. This will enable us to return
non-flow events in the parser.

Signed-off-by: Sebastian Wicki <[email protected]>
  • Loading branch information
gandro authored and qmonnet committed Jul 14, 2020
1 parent cecf5a7 commit 79e91ee
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 32 deletions.
42 changes: 15 additions & 27 deletions pkg/hubble/observer/local_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,32 +153,31 @@ nextEvent:
}
}

flow, err := decodeFlow(s.payloadParser, pl)
ev, err := s.payloadParser.Decode(pl)
if err != nil {
if !parserErrors.IsErrInvalidType(err) {
s.log.WithError(err).WithField("data", pl.Data).Debug("failed to decode payload")
}
continue
}

for _, f := range s.opts.OnDecodedFlow {
stop, err := f.OnDecodedFlow(ctx, flow)
if err != nil {
s.log.WithError(err).WithField("data", pl.Data).Info("failed in OnDecodedFlow")
}
if stop {
continue nextEvent
if flow, ok := ev.Event.(*flowpb.Flow); ok {
for _, f := range s.opts.OnDecodedFlow {
stop, err := f.OnDecodedFlow(ctx, flow)
if err != nil {
s.log.WithError(err).WithField("data", pl.Data).Info("failed in OnDecodedFlow")
}
if stop {
continue nextEvent
}
}
}

atomic.AddUint64(&s.numObservedFlows, 1)
// FIXME: Convert metrics into an OnDecodedFlow function
metrics.ProcessFlow(flow)
atomic.AddUint64(&s.numObservedFlows, 1)
// FIXME: Convert metrics into an OnDecodedFlow function
metrics.ProcessFlow(flow)
}

s.GetRingBuffer().Write(&v1.Event{
Timestamp: pl.Time,
Event: flow,
})
s.GetRingBuffer().Write(ev)
}
close(s.GetStopped())
}
Expand Down Expand Up @@ -334,17 +333,6 @@ func logFilters(filters []*flowpb.FlowFilter) string {
return "{" + strings.Join(s, ",") + "}"
}

func decodeFlow(payloadParser *parser.Parser, pl *flowpb.Payload) (*flowpb.Flow, error) {
// TODO: Pool these instead of allocating new flows each time.
f := &flowpb.Flow{}
err := payloadParser.Decode(pl, f)
if err != nil {
return nil, err
}

return f, nil
}

// flowsReader reads flows using a RingReader. It applies the flow request
// criteria (blacklist, whitelist, follow, ...) before returning flows.
type flowsReader struct {
Expand Down
24 changes: 19 additions & 5 deletions pkg/hubble/parser/new.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package parser

import (
pb "github.com/cilium/cilium/api/v1/flow"
v1 "github.com/cilium/cilium/pkg/hubble/api/v1"
"github.com/cilium/cilium/pkg/hubble/parser/errors"
"github.com/cilium/cilium/pkg/hubble/parser/getters"
"github.com/cilium/cilium/pkg/hubble/parser/options"
Expand Down Expand Up @@ -60,20 +61,33 @@ func New(
}

// Decode decodes the data from 'payload' into 'decoded'
func (p *Parser) Decode(payload *pb.Payload, decoded *pb.Flow) error {
func (p *Parser) Decode(payload *pb.Payload) (*v1.Event, error) {
if payload == nil || len(payload.Data) == 0 {
return errors.ErrEmptyData
return nil, errors.ErrEmptyData
}

// TODO: Pool decoded flows instead of allocating new objects each time.
ev := &v1.Event{
Timestamp: payload.Time,
}

eventType := payload.Data[0]
switch eventType {
case monitorAPI.MessageTypeDrop,
monitorAPI.MessageTypeTrace,
monitorAPI.MessageTypePolicyVerdict:
return p.l34.Decode(payload, decoded)
ev.Event = &pb.Flow{}
if err := p.l34.Decode(payload, ev.Event.(*pb.Flow)); err != nil {
return nil, err
}
return ev, nil
case monitorAPI.MessageTypeAccessLog:
return p.l7.Decode(payload, decoded)
ev.Event = &pb.Flow{}
if err := p.l7.Decode(payload, ev.Event.(*pb.Flow)); err != nil {
return nil, err
}
return ev, nil
default:
return errors.NewErrInvalidType(eventType)
return nil, errors.NewErrInvalidType(eventType)
}
}

0 comments on commit 79e91ee

Please sign in to comment.