Skip to content

Commit

Permalink
Update go-concert to 0.1.0 (elastic#23770)
Browse files Browse the repository at this point in the history
  • Loading branch information
Steffen Siering authored Feb 15, 2021
1 parent 6839307 commit 342a845
Show file tree
Hide file tree
Showing 12 changed files with 73 additions and 87 deletions.
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6336,11 +6336,11 @@ SOFTWARE

--------------------------------------------------------------------------------
Dependency : github.com/elastic/go-concert
Version: v0.0.4
Version: v0.1.0
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/go-concert@v0.0.4/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/go-concert@v0.1.0/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down
68 changes: 25 additions & 43 deletions filebeat/input/filestream/filestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@
package filestream

import (
"context"
"errors"
"io"
"os"
"time"

"github.com/elastic/go-concert/ctxtool"
"github.com/elastic/go-concert/timed"
"github.com/elastic/go-concert/unison"

input "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/common/backoff"
"github.com/elastic/beats/v7/libbeat/common/file"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert/ctxtool"
"github.com/elastic/go-concert/unison"
)

var (
Expand All @@ -39,10 +40,9 @@ var (

// logFile contains all log related data
type logFile struct {
file *os.File
log *logp.Logger
ctx context.Context
cancelReading context.CancelFunc
file *os.File
log *logp.Logger
readerCtx ctxtool.CancelContext

closeAfterInterval time.Duration
closeOnEOF bool
Expand All @@ -55,7 +55,7 @@ type logFile struct {
offset int64
lastTimeRead time.Time
backoff backoff.Backoff
tg unison.TaskGroup
tg *unison.TaskGroup
}

// newFileReader creates a new log instance to read log sources
Expand All @@ -71,6 +71,9 @@ func newFileReader(
return nil, err
}

readerCtx := ctxtool.WithCancelContext(ctxtool.FromCanceller(canceler))
tg := unison.TaskGroupWithCancel(readerCtx)

l := &logFile{
file: f,
log: log,
Expand All @@ -83,16 +86,10 @@ func newFileReader(
offset: offset,
lastTimeRead: time.Now(),
backoff: backoff.NewExpBackoff(canceler.Done(), config.Backoff.Init, config.Backoff.Max),
tg: unison.TaskGroup{},
readerCtx: readerCtx,
tg: tg,
}

l.ctx, l.cancelReading = ctxtool.WithFunc(ctxtool.FromCanceller(canceler), func() {
err := l.tg.Stop()
if err != nil {
l.log.Errorf("Error while stopping filestream logFile reader: %v", err)
}
})

l.startFileMonitoringIfNeeded()

return l, nil
Expand All @@ -103,7 +100,7 @@ func newFileReader(
func (f *logFile) Read(buf []byte) (int, error) {
totalN := 0

for f.ctx.Err() == nil {
for f.readerCtx.Err() == nil {
n, err := f.file.Read(buf)
if n > 0 {
f.offset += int64(n)
Expand Down Expand Up @@ -154,35 +151,18 @@ func (f *logFile) startFileMonitoringIfNeeded() {
}

func (f *logFile) closeIfTimeout(ctx unison.Canceler) {
timer := time.NewTimer(f.closeAfterInterval)
defer timer.Stop()

for {
select {
case <-ctx.Done():
return
case <-timer.C:
f.cancelReading()
return
}
if err := timed.Wait(ctx, f.closeAfterInterval); err == nil {
f.readerCtx.Cancel()
}
}

func (f *logFile) periodicStateCheck(ctx unison.Canceler) {
ticker := time.NewTicker(f.checkInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if f.shouldBeClosed() {
f.cancelReading()
return
}
timed.Periodic(ctx, f.checkInterval, func() error {
if f.shouldBeClosed() {
f.readerCtx.Cancel()
}
}
return nil
})
}

func (f *logFile) shouldBeClosed() bool {
Expand Down Expand Up @@ -267,6 +247,8 @@ func (f *logFile) handleEOF() error {

// Close
func (f *logFile) Close() error {
f.cancelReading()
return f.file.Close()
f.readerCtx.Cancel()
err := f.file.Close()
f.tg.Stop() // Wait until all resources are released for sure.
return err
}
2 changes: 1 addition & 1 deletion filebeat/input/filestream/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (inp *filestream) Run(
return err
}

_, streamCancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() {
_, streamCancel := ctxtool.WithFunc(ctx.Cancelation, func() {
log.Debug("Closing reader of filestream")
err := r.Close()
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions filebeat/input/v2/compat/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package compat

import (
"context"
"fmt"
"sync"

Expand All @@ -31,7 +32,7 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/go-concert"
"github.com/elastic/go-concert/ctxtool"
)

// factory implements the cfgfile.RunnerFactory interface and wraps the
Expand All @@ -52,7 +53,7 @@ type runner struct {
log *logp.Logger
agent *beat.Info
wg sync.WaitGroup
sig *concert.OnceSignaler
sig ctxtool.CancelContext
input v2.Input
connector beat.PipelineConnector
}
Expand Down Expand Up @@ -94,7 +95,7 @@ func (f *factory) Create(
id: id,
log: f.log.Named(input.Name()),
agent: &f.info,
sig: concert.NewOnceSignaler(),
sig: ctxtool.WithCancelContext(context.Background()),
input: input,
connector: p,
}, nil
Expand Down Expand Up @@ -126,7 +127,7 @@ func (r *runner) Start() {
}

func (r *runner) Stop() {
r.sig.Trigger()
r.sig.Cancel()
r.wg.Wait()
r.log.Infof("Input '%v' stopped", r.input.Name())
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/input/winlog/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func (eventlogRunner) Run(

// setup closing the API if either the run function is signaled asynchronously
// to shut down or when returning after io.EOF
cancelCtx, cancelFn := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() {
cancelCtx, cancelFn := ctxtool.WithFunc(ctx.Cancelation, func() {
if err := api.Close(); err != nil {
log.Errorf("Error while closing Windows Eventlog Access: %v", err)
}
Expand Down
41 changes: 23 additions & 18 deletions filebeat/inputsource/common/dgram/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,26 +72,29 @@ func (l *Listener) Run(ctx context.Context) error {
l.log.Info("Started listening for " + l.family.String() + " connection")

for ctx.Err() == nil {
conn, err := l.listener()
if err != nil {
l.log.Debugw("Cannot connect", "error", err)
continue
}
connCtx, connCancel := ctxtool.WithFunc(ctx, func() {
conn.Close()
})

err = l.run(connCtx, conn)
if err != nil {
l.log.Debugw("Error while processing input", "error", err)
connCancel()
continue
}
connCancel()
l.doRun(ctx)
}
return nil
}

func (l *Listener) doRun(ctx context.Context) {
conn, err := l.listener()
if err != nil {
l.log.Debugw("Cannot connect", "error", err)
return
}

connCtx, connCancel := ctxtool.WithFunc(ctx, func() {
conn.Close()
})
defer connCancel()

err = l.connectAndRun(connCtx, conn)
if err != nil {
l.log.Debugw("Error while processing input", "error", err)
}
}

func (l *Listener) Start() error {
l.log.Info("Started listening for " + l.family.String() + " connection")

Expand All @@ -106,12 +109,14 @@ func (l *Listener) Start() error {
})
defer connCancel()

return l.run(ctxtool.FromCanceller(connCtx), conn)
return l.connectAndRun(ctxtool.FromCanceller(connCtx), conn)
})
return nil
}

func (l *Listener) run(ctx context.Context, conn net.PacketConn) error {
func (l *Listener) connectAndRun(ctx context.Context, conn net.PacketConn) error {
defer l.log.Recover("Panic handling datagram")

handler := l.connect(*l.config)
for ctx.Err() == nil {
err := handler(ctx, conn)
Expand Down
9 changes: 4 additions & 5 deletions filebeat/inputsource/common/streaming/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ type Listener struct {
family inputsource.Family
wg sync.WaitGroup
log *logp.Logger
ctx context.Context
cancel context.CancelFunc
ctx ctxtool.CancelContext
clientsCount atomic.Int
handlerFactory HandlerFactory
listenerFactory ListenerFactory
Expand Down Expand Up @@ -111,9 +110,9 @@ func (l *Listener) initListen(ctx context.Context) error {
return err
}

l.ctx, l.cancel = ctxtool.WithFunc(ctx, func() {
l.ctx = ctxtool.WrapCancel(ctxtool.WithFunc(ctx, func() {
l.Listener.Close()
})
}))
return nil
}

Expand Down Expand Up @@ -171,7 +170,7 @@ func (l *Listener) run() {
// Stop stops accepting new incoming connections and Close any active clients
func (l *Listener) Stop() {
l.log.Info("Stopping" + l.family.String() + "server")
l.cancel()
l.ctx.Cancel()
l.wg.Wait()
l.log.Info(l.family.String() + " server stopped")
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ require (
github.com/eclipse/paho.mqtt.golang v1.2.1-0.20200121105743-0d940dd29fd2
github.com/elastic/ecs v1.6.0
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a
github.com/elastic/go-concert v0.0.4
github.com/elastic/go-concert v0.1.0
github.com/elastic/go-libaudit/v2 v2.1.0
github.com/elastic/go-licenser v0.3.1
github.com/elastic/go-lookslike v0.3.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -253,8 +253,8 @@ github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a h1
github.com/elastic/elastic-agent-client/v7 v7.0.0-20200709172729-d43b7ad5833a/go.mod h1:uh/Gj9a0XEbYoM4NYz4LvaBVARz3QXLmlNjsrKY9fTc=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270 h1:cWPqxlPtir4RoQVCpGSRXmLqjEHpJKbR60rxh1nQZY4=
github.com/elastic/fsevents v0.0.0-20181029231046-e1d381a4d270/go.mod h1:Msl1pdboCbArMF/nSCDUXgQuWTeoMmE/z8607X+k7ng=
github.com/elastic/go-concert v0.0.4 h1:pzgYCmJ/xMJsW8PSk33inAWZ065hrwSeP79TpwAbsLE=
github.com/elastic/go-concert v0.0.4/go.mod h1:9MtFarjXroUgmm0m6HY3NSe1XiKhdktiNRRj9hWvIaM=
github.com/elastic/go-concert v0.1.0 h1:gz/yvA3bseuHzoF/lNMltkL30XdPqMo+bg5o2mBx2EE=
github.com/elastic/go-concert v0.1.0/go.mod h1:9MtFarjXroUgmm0m6HY3NSe1XiKhdktiNRRj9hWvIaM=
github.com/elastic/go-libaudit/v2 v2.1.0 h1:yWSKoGaoWLGFPjqWrQ4gwtuM77pTk7K4CsPxXss8he4=
github.com/elastic/go-libaudit/v2 v2.1.0/go.mod h1:MM/l/4xV7ilcl+cIblL8Zn448J7RZaDwgNLE4gNKYPg=
github.com/elastic/go-licenser v0.3.1 h1:RmRukU/JUmts+rpexAw0Fvt2ly7VVu6mw8z4HrEzObU=
Expand Down
13 changes: 7 additions & 6 deletions journalbeat/reader/journal.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,23 +39,23 @@ type Reader struct {
r *journalread.Reader
journal *sdjournal.Journal
config Config
done chan struct{}
ctx ctxtool.CancelContext
logger *logp.Logger
backoff backoff.Backoff
}

// New creates a new journal reader and moves the FP to the configured position.
func New(c Config, done chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) {
func New(c Config, done <-chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) {
return newReader(c.Path, c, done, state, logger)
}

// NewLocal creates a reader to read form the local journal and moves the FP
// to the configured position.
func NewLocal(c Config, done chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) {
func NewLocal(c Config, done <-chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) {
return newReader(LocalSystemJournalID, c, done, state, logger)
}

func newReader(path string, c Config, done chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) {
func newReader(path string, c Config, done <-chan struct{}, state checkpoint.JournalState, logger *logp.Logger) (*Reader, error) {
logger = logger.With("path", path)
backoff := backoff.NewExpBackoff(done, c.Backoff, c.MaxBackoff)

Expand All @@ -79,7 +79,7 @@ func newReader(path string, c Config, done chan struct{}, state checkpoint.Journ
r: r,
journal: journal,
config: c,
done: done,
ctx: ctxtool.WithCancelContext(ctxtool.FromChannel(done)),
logger: logger,
backoff: backoff,
}, nil
Expand All @@ -100,13 +100,14 @@ func seekBy(log *logp.Logger, c Config, state checkpoint.JournalState) (journalr
// Close closes the underlying journal reader.
func (r *Reader) Close() {
instance.StopMonitoringJournal(r.config.Path)
r.ctx.Cancel()
r.r.Close()
}

// Next waits until a new event shows up and returns it.
// It blocks until an event is returned or an error occurs.
func (r *Reader) Next() (*beat.Event, error) {
entry, err := r.r.Next(ctxtool.FromChannel(r.done))
entry, err := r.r.Next(r.ctx)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/cloudfoundry/v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error {
return errors.Wrapf(err, "initializing doppler consumer")
}

stopCtx, cancel := ctxtool.WithFunc(ctxtool.FromCanceller(ctx.Cancelation), func() {
stopCtx, cancel := ctxtool.WithFunc(ctx.Cancelation, func() {
// wait stops the consumer and waits for all internal go-routines to be stopped.
consumer.Wait()
})
Expand Down
Loading

0 comments on commit 342a845

Please sign in to comment.