Skip to content

Commit

Permalink
Merge pull request moby#30891 from mixja/awslogs-multiline-support
Browse files Browse the repository at this point in the history
Add awslogs multiline support
  • Loading branch information
cpuguy83 authored May 17, 2017
2 parents 77c9728 + dc87490 commit 5034288
Show file tree
Hide file tree
Showing 2 changed files with 450 additions and 51 deletions.
218 changes: 168 additions & 50 deletions daemon/logger/awslogs/cloudwatchlogs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ package awslogs

import (
"bytes"
"errors"
"fmt"
"os"
"regexp"
"runtime"
"sort"
"strconv"
Expand All @@ -24,6 +24,7 @@ import (
"github.com/docker/docker/daemon/logger/loggerutils"
"github.com/docker/docker/dockerversion"
"github.com/docker/docker/pkg/templates"
"github.com/pkg/errors"
)

const (
Expand All @@ -34,6 +35,8 @@ const (
logStreamKey = "awslogs-stream"
logCreateGroupKey = "awslogs-create-group"
tagKey = "tag"
datetimeFormatKey = "awslogs-datetime-format"
multilinePatternKey = "awslogs-multiline-pattern"
batchPublishFrequency = 5 * time.Second

// See: http://docs.aws.amazon.com/AmazonCloudWatchLogs/latest/APIReference/API_PutLogEvents.html
Expand All @@ -53,14 +56,15 @@ const (
)

type logStream struct {
logStreamName string
logGroupName string
logCreateGroup bool
client api
messages chan *logger.Message
lock sync.RWMutex
closed bool
sequenceToken *string
logStreamName string
logGroupName string
logCreateGroup bool
multilinePattern *regexp.Regexp
client api
messages chan *logger.Message
lock sync.RWMutex
closed bool
sequenceToken *string
}

type api interface {
Expand Down Expand Up @@ -91,7 +95,8 @@ func init() {

// New creates an awslogs logger using the configuration passed in on the
// context. Supported context configuration variables are awslogs-region,
// awslogs-group, awslogs-stream, and awslogs-create-group. When available, configuration is
// awslogs-group, awslogs-stream, awslogs-create-group, awslogs-multiline-pattern
// and awslogs-datetime-format. When available, configuration is
// also taken from environment variables AWS_REGION, AWS_ACCESS_KEY_ID,
// AWS_SECRET_ACCESS_KEY, the shared credentials file (~/.aws/credentials), and
// the EC2 Instance Metadata Service.
Expand All @@ -112,16 +117,23 @@ func New(info logger.Info) (logger.Logger, error) {
if info.Config[logStreamKey] != "" {
logStreamName = info.Config[logStreamKey]
}

multilinePattern, err := parseMultilineOptions(info)
if err != nil {
return nil, err
}

client, err := newAWSLogsClient(info)
if err != nil {
return nil, err
}
containerStream := &logStream{
logStreamName: logStreamName,
logGroupName: logGroupName,
logCreateGroup: logCreateGroup,
client: client,
messages: make(chan *logger.Message, 4096),
logStreamName: logStreamName,
logGroupName: logGroupName,
logCreateGroup: logCreateGroup,
multilinePattern: multilinePattern,
client: client,
messages: make(chan *logger.Message, 4096),
}
err = containerStream.create()
if err != nil {
Expand All @@ -132,6 +144,56 @@ func New(info logger.Info) (logger.Logger, error) {
return containerStream, nil
}

// Parses awslogs-multiline-pattern and awslogs-datetime-format options
// If awslogs-datetime-format is present, convert the format from strftime
// to regexp and return.
// If awslogs-multiline-pattern is present, compile regexp and return
func parseMultilineOptions(info logger.Info) (*regexp.Regexp, error) {
dateTimeFormat := info.Config[datetimeFormatKey]
multilinePatternKey := info.Config[multilinePatternKey]
// strftime input is parsed into a regular expression
if dateTimeFormat != "" {
// %. matches each strftime format sequence and ReplaceAllStringFunc
// looks up each format sequence in the conversion table strftimeToRegex
// to replace with a defined regular expression
r := regexp.MustCompile("%.")
multilinePatternKey = r.ReplaceAllStringFunc(dateTimeFormat, func(s string) string {
return strftimeToRegex[s]
})
}
if multilinePatternKey != "" {
multilinePattern, err := regexp.Compile(multilinePatternKey)
if err != nil {
return nil, errors.Wrapf(err, "awslogs could not parse multiline pattern key %q", multilinePatternKey)
}
return multilinePattern, nil
}
return nil, nil
}

// Maps strftime format strings to regex
var strftimeToRegex = map[string]string{
/*weekdayShort */ `%a`: `(?:Mon|Tue|Wed|Thu|Fri|Sat|Sun)`,
/*weekdayFull */ `%A`: `(?:Monday|Tuesday|Wednesday|Thursday|Friday|Saturday|Sunday)`,
/*weekdayZeroIndex */ `%w`: `[0-6]`,
/*dayZeroPadded */ `%d`: `(?:0[1-9]|[1,2][0-9]|3[0,1])`,
/*monthShort */ `%b`: `(?:Jan|Feb|Mar|Apr|May|Jun|Jul|Aug|Sep|Oct|Nov|Dec)`,
/*monthFull */ `%B`: `(?:January|February|March|April|May|June|July|August|September|October|November|December)`,
/*monthZeroPadded */ `%m`: `(?:0[1-9]|1[0-2])`,
/*yearCentury */ `%Y`: `\d{4}`,
/*yearZeroPadded */ `%y`: `\d{2}`,
/*hour24ZeroPadded */ `%H`: `(?:[0,1][0-9]|2[0-3])`,
/*hour12ZeroPadded */ `%I`: `(?:0[0-9]|1[0-2])`,
/*AM or PM */ `%p`: "[A,P]M",
/*minuteZeroPadded */ `%M`: `[0-5][0-9]`,
/*secondZeroPadded */ `%S`: `[0-5][0-9]`,
/*microsecondZeroPadded */ `%f`: `\d{6}`,
/*utcOffset */ `%z`: `[+-]\d{4}`,
/*tzName */ `%Z`: `[A-Z]{1,4}T`,
/*dayOfYearZeroPadded */ `%j`: `(?:0[0-9][1-9]|[1,2][0-9][0-9]|3[0-5][0-9]|36[0-6])`,
/*milliseconds */ `%L`: `\.\d{3}`,
}

func parseLogGroup(info logger.Info, groupTemplate string) (string, error) {
tmpl, err := templates.NewParse("log-group", groupTemplate)
if err != nil {
Expand Down Expand Up @@ -297,60 +359,108 @@ var newTicker = func(freq time.Duration) *time.Ticker {
}

// collectBatch executes as a goroutine to perform batching of log events for
// submission to the log stream. Batching is performed on time- and size-
// bases. Time-based batching occurs at a 5 second interval (defined in the
// batchPublishFrequency const). Size-based batching is performed on the
// maximum number of events per batch (defined in maximumLogEventsPerPut) and
// the maximum number of total bytes in a batch (defined in
// maximumBytesPerPut). Log messages are split by the maximum bytes per event
// (defined in maximumBytesPerEvent). There is a fixed per-event byte overhead
// (defined in perEventBytes) which is accounted for in split- and batch-
// calculations.
// submission to the log stream. If the awslogs-multiline-pattern or
// awslogs-datetime-format options have been configured, multiline processing
// is enabled, where log messages are stored in an event buffer until a multiline
// pattern match is found, at which point the messages in the event buffer are
// pushed to CloudWatch logs as a single log event. Multline messages are processed
// according to the maximumBytesPerPut constraint, and the implementation only
// allows for messages to be buffered for a maximum of 2*batchPublishFrequency
// seconds. When events are ready to be processed for submission to CloudWatch
// Logs, the processEvents method is called. If a multiline pattern is not
// configured, log events are submitted to the processEvents method immediately.
func (l *logStream) collectBatch() {
timer := newTicker(batchPublishFrequency)
var events []wrappedEvent
bytes := 0
var eventBuffer []byte
var eventBufferTimestamp int64
for {
select {
case <-timer.C:
case t := <-timer.C:
// If event buffer is older than batch publish frequency flush the event buffer
if eventBufferTimestamp > 0 && len(eventBuffer) > 0 {
eventBufferAge := t.UnixNano()/int64(time.Millisecond) - eventBufferTimestamp
eventBufferExpired := eventBufferAge > int64(batchPublishFrequency)/int64(time.Millisecond)
eventBufferNegative := eventBufferAge < 0
if eventBufferExpired || eventBufferNegative {
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
}
}
l.publishBatch(events)
events = events[:0]
bytes = 0
case msg, more := <-l.messages:
if !more {
// Flush event buffer
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
l.publishBatch(events)
return
}
if eventBufferTimestamp == 0 {
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
}
unprocessedLine := msg.Line
for len(unprocessedLine) > 0 {
// Split line length so it does not exceed the maximum
lineBytes := len(unprocessedLine)
if lineBytes > maximumBytesPerEvent {
lineBytes = maximumBytesPerEvent
if l.multilinePattern != nil {
if l.multilinePattern.Match(unprocessedLine) {
// This is a new log event so flush the current eventBuffer to events
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
eventBufferTimestamp = msg.Timestamp.UnixNano() / int64(time.Millisecond)
eventBuffer = eventBuffer[:0]
}
line := unprocessedLine[:lineBytes]
unprocessedLine = unprocessedLine[lineBytes:]
if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
// Publish an existing batch if it's already over the maximum number of events or if adding this
// event would push it over the maximum number of total bytes.
l.publishBatch(events)
events = events[:0]
bytes = 0
// If we will exceed max bytes per event flush the current event buffer before appending
if len(eventBuffer)+len(unprocessedLine) > maximumBytesPerEvent {
events = l.processEvent(events, eventBuffer, eventBufferTimestamp)
eventBuffer = eventBuffer[:0]
}
events = append(events, wrappedEvent{
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(line)),
Timestamp: aws.Int64(msg.Timestamp.UnixNano() / int64(time.Millisecond)),
},
insertOrder: len(events),
})
bytes += (lineBytes + perEventBytes)
// Append new line
processedLine := append(unprocessedLine, "\n"...)
eventBuffer = append(eventBuffer, processedLine...)
logger.PutMessage(msg)
} else {
events = l.processEvent(events, unprocessedLine, msg.Timestamp.UnixNano()/int64(time.Millisecond))
logger.PutMessage(msg)
}
logger.PutMessage(msg)
}
}
}

// processEvent processes log events that are ready for submission to CloudWatch
// logs. Batching is performed on time- and size-bases. Time-based batching
// occurs at a 5 second interval (defined in the batchPublishFrequency const).
// Size-based batching is performed on the maximum number of events per batch
// (defined in maximumLogEventsPerPut) and the maximum number of total bytes in a
// batch (defined in maximumBytesPerPut). Log messages are split by the maximum
// bytes per event (defined in maximumBytesPerEvent). There is a fixed per-event
// byte overhead (defined in perEventBytes) which is accounted for in split- and
// batch-calculations.
func (l *logStream) processEvent(events []wrappedEvent, unprocessedLine []byte, timestamp int64) []wrappedEvent {
bytes := 0
for len(unprocessedLine) > 0 {
// Split line length so it does not exceed the maximum
lineBytes := len(unprocessedLine)
if lineBytes > maximumBytesPerEvent {
lineBytes = maximumBytesPerEvent
}
line := unprocessedLine[:lineBytes]
unprocessedLine = unprocessedLine[lineBytes:]
if (len(events) >= maximumLogEventsPerPut) || (bytes+lineBytes+perEventBytes > maximumBytesPerPut) {
// Publish an existing batch if it's already over the maximum number of events or if adding this
// event would push it over the maximum number of total bytes.
l.publishBatch(events)
events = events[:0]
bytes = 0
}
events = append(events, wrappedEvent{
inputLogEvent: &cloudwatchlogs.InputLogEvent{
Message: aws.String(string(line)),
Timestamp: aws.Int64(timestamp),
},
insertOrder: len(events),
})
bytes += (lineBytes + perEventBytes)
}
return events
}

// publishBatch calls PutLogEvents for a given set of InputLogEvents,
// accounting for sequencing requirements (each request must reference the
// sequence token returned by the previous request).
Expand Down Expand Up @@ -419,7 +529,8 @@ func (l *logStream) putLogEvents(events []*cloudwatchlogs.InputLogEvent, sequenc
}

// ValidateLogOpt looks for awslogs-specific log options awslogs-region,
// awslogs-group, awslogs-stream, awslogs-create-group
// awslogs-group, awslogs-stream, awslogs-create-group, awslogs-datetime-format,
// awslogs-multiline-pattern
func ValidateLogOpt(cfg map[string]string) error {
for key := range cfg {
switch key {
Expand All @@ -428,6 +539,8 @@ func ValidateLogOpt(cfg map[string]string) error {
case logCreateGroupKey:
case regionKey:
case tagKey:
case datetimeFormatKey:
case multilinePatternKey:
default:
return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
}
Expand All @@ -440,6 +553,11 @@ func ValidateLogOpt(cfg map[string]string) error {
return fmt.Errorf("must specify valid value for log opt '%s': %v", logCreateGroupKey, err)
}
}
_, datetimeFormatKeyExists := cfg[datetimeFormatKey]
_, multilinePatternKeyExists := cfg[multilinePatternKey]
if datetimeFormatKeyExists && multilinePatternKeyExists {
return fmt.Errorf("you cannot configure log opt '%s' and '%s' at the same time", datetimeFormatKey, multilinePatternKey)
}
return nil
}

Expand Down
Loading

0 comments on commit 5034288

Please sign in to comment.