-
Notifications
You must be signed in to change notification settings - Fork 106
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added discard filter #186
base: master
Are you sure you want to change the base?
Added discard filter #186
Conversation
For filters execution priority, I prefer control by users config instead of In your case, you can suggest the users to put the filter at the beginning in your README. |
Hi I think I did not explained this well. FilterPos was a way to serve two purposes:
For my number two change I am working on a filter that is a bit like logstash aggregate. I am running this filter now in test, but I am currently using the filter as a standalone program (gogstash instance 1 -> my filter -> gogstash instance 2). This filter looks at similar events (based on the value from a configurable key), queues them up in memory and if there are more than one event based on the same key only the last is kept, the others are discarded. After some configured time of inactivity (no new events based on the key value) the event is sent back into gogstash. A workflow like this is not possible to do in gogstash without a mechanism like FilterPos. It is not my intention to use this field to jump/skip parts of the chain, for that we have cond. |
Thinking about this kind of filter:
|
The way the filter flow works is that it is single threaded (unless you have more workers), so I need a way to signal back that I want a message dropped. I cannot hold the queue, all filters have to do its work as quickly as possible. The way you described above is exactly the way I planned to implement it. Step 3 is the hardest, and to be able to flush I need a mechanism to send an event into the next filter in the filter chain. I can’t see any good way to handle this other than being able to insert an event into the chain at a configurable spot. |
There are only one input channel flush example: by FIFO var eventPool ThreadSafeTreeWithTimeKey // member of filter
timer := time.NewTimer(config.FlushDuration)
for {
select {
case <-timer.C:
processBeforeTime := time.Now().Add(-config.FlushBeforeDuration)
for {
eventPool.lock()
event := eventPool.first()
if event == nil || event.Time.After(processBeforeTime) {
eventPool.unlock()
break
}
eventPool.remove(event)
eventPool.unlock()
chFilterOut <- event
}
timer.Reset(config.FlushDuration)
}
} |
From the code snippet above I see that you are sending the event straight to the outputs. I was more in line of letting all filters handle the event. Below is an example of a pipeline that could be something that I wanted to do, assuming that the input codec prepared all the fields for me. Filter 1: discardLook into the event and see if I want to continue on it or discard it. If I discard it no further processing is done on the server. Filter 2: aggregateThis filter will look at each event and see if it is a continuation of a previous event. If it is - update the state. If not - create a new state. A dispatcher will scan for events that are ready to be sent further into the chain. The dispatched event could be sent back to With my PR I want to use the discard option to discard the event so it is not processed any further. Filter 3: ip2locationGet IP information from the event. Filter 4: hashMake a hash of the event. |
In your type FilterReturn struct {
event logevent.LogEvent
stop bool // do not process the next filter
discard bool // discard the event
// more flow control functions
}
-func (TypeFilterConfig).Event(context.Context, logevent.LogEvent) (logevent.LogEvent, bool)
+func (TypeFilterConfig).Event(context.Context, logevent.LogEvent) (FilterReturn) Why not extend the control properties in |
In this PR I have added a discard filter. This filter allows me to
I also rewrote the filter handling routine to allow for an event to start at a specific filter, instead of at the beginning of the list of filters. I think there are many good ways to use this. I am currently working on a filter where my goal is to identify and remove all events of a kind expect the last one. To do this I need to discard all but the last event that I need to inject for further processing from the next filter in the pipeline.
Other good examples for the discard code can be logstash aggregate or logstash throttling.