Skip to content

Commit

Permalink
Ensures that if the log-sink's log writer fails, we output
Browse files Browse the repository at this point in the history
socket-related error information if it is at hand.
  • Loading branch information
manadart committed Feb 10, 2022
1 parent a860d0c commit 57618a8
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 25 deletions.
50 changes: 39 additions & 11 deletions api/logsender/logsender.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,33 +47,61 @@ func (api *API) LogWriter() (LogWriter, error) {
if err != nil {
return nil, errors.Annotatef(err, "cannot connect to /logsink")
}
logWriter := writer{conn}
go logWriter.readLoop()
logWriter := newWriter(conn)
return logWriter, nil
}

type writer struct {
conn base.Stream
conn base.Stream
readErrs chan error
}

func newWriter(conn base.Stream) *writer {
w := &writer{
conn: conn,
readErrs: make(chan error, 1),
}

go w.readLoop()
return w
}

// readLoop is necessary for the client to process websocket control messages.
// If we get an error, enqueue it so that if a subsequent call to WriteLog
// fails do to our closure of the socket, we can enhance the resulting error.
// Close() is safe to call concurrently.
func (w writer) readLoop() {
func (w *writer) readLoop() {
for {
if _, _, err := w.conn.NextReader(); err != nil {
w.conn.Close()
select {
case w.readErrs <- err:
default:
}

_ = w.conn.Close()
break
}
}
}

func (w writer) WriteLog(m *params.LogRecord) error {
// Note: due to the fire-and-forget nature of the
// logsink API, it is possible that when the
// connection dies, any logs that were "in-flight"
// will not be recorded on the server side.
// WriteLog streams the log record as JSON to the logsink endpoint.
// Upon error, check to see if there is an enqueued read error that
// we can use to enhance the output.
func (w *writer) WriteLog(m *params.LogRecord) error {
// Note: due to the fire-and-forget nature of the logsink API,
// it is possible that when the connection dies, any logs that
// were "in-flight" will not be recorded on the server side.
if err := w.conn.WriteJSON(m); err != nil {
return errors.Annotatef(err, "cannot send log message")
var readErr error
select {
case readErr, _ = <-w.readErrs:
default:
}

if readErr != nil {
err = errors.Annotate(err, readErr.Error())
}
return errors.Annotate(err, "sending log message")
}
return nil
}
Expand Down
66 changes: 52 additions & 14 deletions api/logsender/logsender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,22 @@ func (s *LogSenderSuite) TestNewAPIWriteError(c *gc.C) {
c.Assert(err, gc.IsNil)

err = w.WriteLog(new(params.LogRecord))
c.Assert(err, gc.ErrorMatches, "cannot send log message: foo")
c.Assert(err, gc.ErrorMatches, "sending log message: foo")
c.Assert(conn.written, gc.HasLen, 0)
}

func (s *LogSenderSuite) TestNewAPIReadError(c *gc.C) {
conn := &mockConnector{
c: c,
readError: errors.New("read foo"),
writeError: errors.New("closed yo"),
}
a := logsender.NewAPI(conn)
w, err := a.LogWriter()
c.Assert(err, gc.IsNil)

err = w.WriteLog(new(params.LogRecord))
c.Assert(err, gc.ErrorMatches, "sending log message: read foo: closed yo")
c.Assert(conn.written, gc.HasLen, 0)
}

Expand All @@ -75,9 +90,10 @@ type mockConnector struct {

connectError error
writeError error
readError error
written []interface{}

closeCount int
readDone chan struct{}
closeCount int
}

func (c *mockConnector) ConnectStream(path string, values url.Values) (base.Stream, error) {
Expand All @@ -86,37 +102,59 @@ func (c *mockConnector) ConnectStream(path string, values url.Values) (base.Stre
"jujuclientversion": []string{version.Current.String()},
"version": []string{"1"},
})

if c.connectError != nil {
return nil, c.connectError
}

c.readDone = make(chan struct{}, 1)
return mockStream{c}, nil
}

type mockStream struct {
conn *mockConnector
}

func (s mockStream) NextReader() (messageType int, r io.Reader, err error) {
defer func() {
select {
case s.conn.readDone <- struct{}{}:
default:
}
}()

// NextReader is now called by the read loop thread.
// Wait a bit before returning, so it doesn't sit in a very tight loop.
time.Sleep(time.Millisecond)

if s.conn.readError != nil {
return 0, nil, s.conn.readError
}
return 0, nil, nil
}

func (s mockStream) WriteJSON(v interface{}) error {
// Wait for a NextReader call in case the test
// orchestration is for an error there.
select {
case <-s.conn.readDone:
case <-time.After(coretesting.LongWait):
s.conn.c.Errorf("timed out waiting for read")
}

if s.conn.writeError != nil {
return s.conn.writeError
}
s.conn.written = append(s.conn.written, v)
return nil
}

func (s mockStream) ReadJSON(v interface{}) error {
s.conn.c.Errorf("ReadJSON called unexpectedly")
func (s mockStream) Close() error {
s.conn.closeCount++
return nil
}

func (s mockStream) NextReader() (messageType int, r io.Reader, err error) {
// NextReader is now called by the read loop thread.
// So just wait a bit and return so it doesn't sit in a very tight loop.
time.Sleep(time.Millisecond)
return 0, nil, nil
}

func (s mockStream) Close() error {
s.conn.closeCount++
func (s mockStream) ReadJSON(v interface{}) error {
s.conn.c.Errorf("ReadJSON called unexpectedly")
return nil
}

0 comments on commit 57618a8

Please sign in to comment.