Skip to content

Commit 337e4e3

Browse files
authored
chore: improve processor ordering (influxdata#12308)
1 parent c4dc104 commit 337e4e3

File tree

3 files changed

+60
-66
lines changed

3 files changed

+60
-66
lines changed

config/config.go

+52-27
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ import (
2121
"time"
2222

2323
"github.com/coreos/go-semver/semver"
24-
"github.com/google/uuid"
2524
"github.com/influxdata/toml"
2625
"github.com/influxdata/toml/ast"
2726

@@ -69,15 +68,29 @@ type Config struct {
6968
Outputs []*models.RunningOutput
7069
Aggregators []*models.RunningAggregator
7170
// Processors have a slice wrapper type because they need to be sorted
72-
Processors models.RunningProcessors
73-
AggProcessors models.RunningProcessors
71+
Processors models.RunningProcessors
72+
AggProcessors models.RunningProcessors
73+
fileProcessors OrderedPlugins
74+
fileAggProcessors OrderedPlugins
75+
7476
// Parsers are created by their inputs during gather. Config doesn't keep track of them
7577
// like the other plugins because they need to be garbage collected (See issue #11809)
7678

7779
Deprecations map[string][]int64
7880
version *semver.Version
7981
}
8082

83+
// Ordered plugins used to keep the order in which they appear in a file
84+
type OrderedPlugin struct {
85+
Line int
86+
plugin any
87+
}
88+
type OrderedPlugins []*OrderedPlugin
89+
90+
func (op OrderedPlugins) Len() int { return len(op) }
91+
func (op OrderedPlugins) Swap(i, j int) { op[i], op[j] = op[j], op[i] }
92+
func (op OrderedPlugins) Less(i, j int) bool { return op[i].Line < op[j].Line }
93+
8194
// NewConfig creates a new struct to hold the Telegraf config.
8295
// For historical reasons, It holds the actual instances of the running plugins
8396
// once the configuration is parsed.
@@ -95,14 +108,16 @@ func NewConfig() *Config {
95108
LogfileRotationMaxArchives: 5,
96109
},
97110

98-
Tags: make(map[string]string),
99-
Inputs: make([]*models.RunningInput, 0),
100-
Outputs: make([]*models.RunningOutput, 0),
101-
Processors: make([]*models.RunningProcessor, 0),
102-
AggProcessors: make([]*models.RunningProcessor, 0),
103-
InputFilters: make([]string, 0),
104-
OutputFilters: make([]string, 0),
105-
Deprecations: make(map[string][]int64),
111+
Tags: make(map[string]string),
112+
Inputs: make([]*models.RunningInput, 0),
113+
Outputs: make([]*models.RunningOutput, 0),
114+
Processors: make([]*models.RunningProcessor, 0),
115+
AggProcessors: make([]*models.RunningProcessor, 0),
116+
fileProcessors: make([]*OrderedPlugin, 0),
117+
fileAggProcessors: make([]*OrderedPlugin, 0),
118+
InputFilters: make([]string, 0),
119+
OutputFilters: make([]string, 0),
120+
Deprecations: make(map[string][]int64),
106121
}
107122

108123
// Handle unknown version
@@ -391,14 +406,16 @@ func (c *Config) LoadAll(configFiles ...string) error {
391406
}
392407
}
393408

409+
// Sort the processors according to their `order` setting while
410+
// using a stable sort to keep the file loading / file position order.
411+
sort.Stable(c.Processors)
412+
sort.Stable(c.AggProcessors)
413+
394414
return nil
395415
}
396416

397417
// LoadConfigData loads TOML-formatted config data
398418
func (c *Config) LoadConfigData(data []byte) error {
399-
// Create unique identifier for plugins to identify when using multiple configurations
400-
id := uuid.New()
401-
402419
tbl, err := parseConfig(data)
403420
if err != nil {
404421
return fmt.Errorf("error parsing data: %s", err)
@@ -450,6 +467,10 @@ func (c *Config) LoadConfigData(data []byte) error {
450467
return fmt.Errorf("line %d: configuration specified the fields %q, but they weren't used", tbl.Line, keys(c.UnusedFields))
451468
}
452469

470+
// Initialize the file-sorting slices
471+
c.fileProcessors = make(OrderedPlugins, 0)
472+
c.fileAggProcessors = make(OrderedPlugins, 0)
473+
453474
// Parse all the rest of the plugins:
454475
for name, val := range tbl.Fields {
455476
subTable, ok := val.(*ast.Table)
@@ -510,7 +531,7 @@ func (c *Config) LoadConfigData(data []byte) error {
510531
switch pluginSubTable := pluginVal.(type) {
511532
case []*ast.Table:
512533
for _, t := range pluginSubTable {
513-
if err = c.addProcessor(id.String(), pluginName, t); err != nil {
534+
if err = c.addProcessor(pluginName, t); err != nil {
514535
return fmt.Errorf("error parsing %s, %w", pluginName, err)
515536
}
516537
}
@@ -555,8 +576,16 @@ func (c *Config) LoadConfigData(data []byte) error {
555576
}
556577
}
557578

558-
if len(c.Processors) > 1 {
559-
sort.Sort(c.Processors)
579+
// Sort the processor according to the order they appeared in this file
580+
// In a later stage, we sort them using the `order` option.
581+
sort.Sort(c.fileProcessors)
582+
for _, op := range c.fileProcessors {
583+
c.Processors = append(c.Processors, op.plugin.(*models.RunningProcessor))
584+
}
585+
586+
sort.Sort(c.fileAggProcessors)
587+
for _, op := range c.fileAggProcessors {
588+
c.AggProcessors = append(c.AggProcessors, op.plugin.(*models.RunningProcessor))
560589
}
561590

562591
return nil
@@ -758,7 +787,7 @@ func (c *Config) addParser(parentcategory, parentname string, table *ast.Table)
758787
return running, err
759788
}
760789

761-
func (c *Config) addProcessor(id string, name string, table *ast.Table) error {
790+
func (c *Config) addProcessor(name string, table *ast.Table) error {
762791
creator, ok := processors.Processors[name]
763792
if !ok {
764793
// Handle removed, deprecated plugins
@@ -780,7 +809,7 @@ func (c *Config) addProcessor(id string, name string, table *ast.Table) error {
780809
c.setLocalMissingTomlFieldTracker(missCount)
781810
defer c.resetMissingTomlFieldTracker()
782811

783-
processorConfig, err := c.buildProcessor(id, name, table)
812+
processorConfig, err := c.buildProcessor(name, table)
784813
if err != nil {
785814
return err
786815
}
@@ -791,15 +820,15 @@ func (c *Config) addProcessor(id string, name string, table *ast.Table) error {
791820
return err
792821
}
793822
rf := models.NewRunningProcessor(processorBefore, processorConfig)
794-
c.Processors = append(c.Processors, rf)
823+
c.fileProcessors = append(c.fileProcessors, &OrderedPlugin{table.Line, rf})
795824

796825
// Setup another (new) processor instance running after the aggregator
797826
processorAfter, _, err := c.setupProcessor(processorConfig.Name, creator, table)
798827
if err != nil {
799828
return err
800829
}
801830
rf = models.NewRunningProcessor(processorAfter, processorConfig)
802-
c.AggProcessors = append(c.AggProcessors, rf)
831+
c.fileAggProcessors = append(c.fileAggProcessors, &OrderedPlugin{table.Line, rf})
803832

804833
// Check the number of misses against the threshold
805834
if hasParser {
@@ -1074,12 +1103,8 @@ func (c *Config) buildParser(name string, tbl *ast.Table) *models.ParserConfig {
10741103
// buildProcessor parses Processor specific items from the ast.Table,
10751104
// builds the filter and returns a
10761105
// models.ProcessorConfig to be inserted into models.RunningProcessor
1077-
func (c *Config) buildProcessor(id string, name string, tbl *ast.Table) (*models.ProcessorConfig, error) {
1078-
conf := &models.ProcessorConfig{
1079-
ID: id,
1080-
Name: name,
1081-
Line: tbl.Line,
1082-
}
1106+
func (c *Config) buildProcessor(name string, tbl *ast.Table) (*models.ProcessorConfig, error) {
1107+
conf := &models.ProcessorConfig{Name: name}
10831108

10841109
c.getFieldInt64(tbl, "order", &conf.Order)
10851110
c.getFieldString(tbl, "alias", &conf.Alias)

config/config_test.go

+5-3
Original file line numberDiff line numberDiff line change
@@ -825,9 +825,11 @@ func TestConfig_MultipleProcessorsOrder(t *testing.T) {
825825
for _, test := range tests {
826826
t.Run(test.name, func(t *testing.T) {
827827
c := NewConfig()
828-
for _, f := range test.filename {
829-
require.NoError(t, c.LoadConfig(filepath.Join("./testdata/processor_order", f)))
828+
filenames := make([]string, 0, len(test.filename))
829+
for _, fn := range test.filename {
830+
filenames = append(filenames, filepath.Join("./testdata/processor_order", fn))
830831
}
832+
require.NoError(t, c.LoadAll(filenames...))
831833

832834
require.Equal(t, len(test.expectedOrder), len(c.Processors))
833835

@@ -862,7 +864,7 @@ func TestConfig_ProcessorsWithParsers(t *testing.T) {
862864
}
863865

864866
c := NewConfig()
865-
require.NoError(t, c.LoadConfig("./testdata/processors_with_parsers.toml"))
867+
require.NoError(t, c.LoadAll("./testdata/processors_with_parsers.toml"))
866868
require.Len(t, c.Processors, len(formats))
867869

868870
override := map[string]struct {

models/running_processor.go

+3-36
Original file line numberDiff line numberDiff line change
@@ -16,48 +16,15 @@ type RunningProcessor struct {
1616

1717
type RunningProcessors []*RunningProcessor
1818

19-
func (rp RunningProcessors) Len() int {
20-
return len(rp)
21-
}
22-
func (rp RunningProcessors) Swap(i, j int) {
23-
rp[i], rp[j] = rp[j], rp[i]
24-
}
25-
func (rp RunningProcessors) Less(i, j int) bool {
26-
// If the processors are defined in separate files only sort based on order
27-
if rp[i].Config.ID != rp[j].Config.ID {
28-
return rp[i].Config.Order < rp[j].Config.Order
29-
}
30-
31-
// If Order is defined for both processors, sort according to the number set
32-
if rp[i].Config.Order != 0 && rp[j].Config.Order != 0 {
33-
// If both orders are equal, ensure config order is maintained
34-
if rp[i].Config.Order == rp[j].Config.Order {
35-
return rp[i].Config.Line < rp[j].Config.Line
36-
}
37-
38-
return rp[i].Config.Order < rp[j].Config.Order
39-
}
40-
41-
// If "Order" is defined for one processor but not another,
42-
// the processor without an "Order" will always take precedence.
43-
// This adheres to the original implementation.
44-
if rp[i].Config.Order != 0 {
45-
return false
46-
}
47-
if rp[j].Config.Order != 0 {
48-
return true
49-
}
50-
51-
return rp[i].Config.Line < rp[j].Config.Line
52-
}
19+
func (rp RunningProcessors) Len() int { return len(rp) }
20+
func (rp RunningProcessors) Swap(i, j int) { rp[i], rp[j] = rp[j], rp[i] }
21+
func (rp RunningProcessors) Less(i, j int) bool { return rp[i].Config.Order < rp[j].Config.Order }
5322

5423
// ProcessorConfig containing a name and filter
5524
type ProcessorConfig struct {
56-
ID string
5725
Name string
5826
Alias string
5927
Order int64
60-
Line int
6128
Filter Filter
6229
}
6330

0 commit comments

Comments
 (0)