Skip to content

Commit d3bae01

Browse files
authored
cmd/crowdsec: rename pipeline channels (#4175)
1 parent fdc65de commit d3bae01

4 files changed

Lines changed: 13 additions & 13 deletions

File tree

cmd/crowdsec/crowdsec.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ func startParserRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconf
7474
log.WithField("idx", idx).Info("Starting parser routine")
7575
g.Go(func() error {
7676
defer trace.CatchPanic("crowdsec/runParse/"+strconv.Itoa(idx))
77-
runParse(ctx, inputLineChan, inputEventChan, *parsers.Ctx, parsers.Nodes)
77+
runParse(ctx, logLines, inEvents, *parsers.Ctx, parsers.Nodes)
7878
return nil
7979
})
8080
}
@@ -85,7 +85,7 @@ func startBucketRoutines(ctx context.Context, g *errgroup.Group, cConfig *csconf
8585
log.WithField("idx", idx).Info("Starting bucket routine")
8686
g.Go(func() error {
8787
defer trace.CatchPanic("crowdsec/runPour/"+strconv.Itoa(idx))
88-
runPour(ctx, inputEventChan, holders, buckets, cConfig)
88+
runPour(ctx, inEvents, holders, buckets, cConfig)
8989
return nil
9090
})
9191
}
@@ -101,7 +101,7 @@ func startOutputRoutines(ctx context.Context, cConfig *csconfig.Config, parsers
101101
log.WithField("idx", idx).Info("Starting output routine")
102102
outputsTomb.Go(func() error {
103103
defer trace.CatchPanic("crowdsec/runOutput/"+strconv.Itoa(idx))
104-
return runOutput(ctx, inputEventChan, outputEventChan, buckets, *parsers.PovfwCtx, parsers.Povfwnodes, apiClient)
104+
return runOutput(ctx, inEvents, outEvents, buckets, *parsers.PovfwCtx, parsers.Povfwnodes, apiClient)
105105
})
106106
}
107107
}
@@ -135,8 +135,8 @@ func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *ap
135135

136136
// runCrowdsec starts the log processor service
137137
func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisition.DataSource) error {
138-
inputEventChan = make(chan pipeline.Event)
139-
inputLineChan = make(chan pipeline.Event)
138+
inEvents = make(chan pipeline.Event)
139+
logLines = make(chan pipeline.Event)
140140

141141
startParserRoutines(ctx, g, cConfig, parsers)
142142
startBucketRoutines(ctx, g, cConfig)
@@ -156,7 +156,7 @@ func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Confi
156156

157157
log.Info("Starting processing data")
158158

159-
if err := acquisition.StartAcquisition(ctx, dataSources, inputLineChan, &acquisTomb); err != nil {
159+
if err := acquisition.StartAcquisition(ctx, dataSources, logLines, &acquisTomb); err != nil {
160160
return fmt.Errorf("starting acquisition error: %w", err)
161161
}
162162

cmd/crowdsec/main.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,9 +44,9 @@ var (
4444
holders []leakybucket.BucketFactory
4545
buckets *leakybucket.Buckets
4646

47-
inputLineChan chan pipeline.Event
48-
inputEventChan chan pipeline.Event
49-
outputEventChan chan pipeline.Event // the buckets init returns its own chan that is used for multiplexing
47+
logLines chan pipeline.Event
48+
inEvents chan pipeline.Event
49+
outEvents chan pipeline.Event // the buckets init returns its own chan that is used for multiplexing
5050
pluginBroker csplugin.PluginBroker
5151
)
5252

@@ -59,7 +59,7 @@ func LoadBuckets(cConfig *csconfig.Config, hub *cwhub.Hub) error {
5959

6060
log.Infof("Loading %d scenario files", len(scenarios))
6161

62-
holders, outputEventChan, err = leakybucket.LoadBuckets(cConfig.Crowdsec, hub, scenarios, buckets, flags.OrderEvent)
62+
holders, outEvents, err = leakybucket.LoadBuckets(cConfig.Crowdsec, hub, scenarios, buckets, flags.OrderEvent)
6363
if err != nil {
6464
return err
6565
}

cmd/crowdsec/parse.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func parseEvent(
2424
- one that is treated as a log and can go to scenarios
2525
- another one that will go directly to LAPI*/
2626
if event.Type == pipeline.APPSEC {
27-
outputEventChan <- event
27+
outEvents <- event
2828
return nil
2929
}
3030
if event.Line.Module == "" {

cmd/crowdsec/serve.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ func ShutdownCrowdsecRoutines(cancel context.CancelFunc, g *errgroup.Group) erro
105105
if len(dataSources) > 0 {
106106
acquisTomb.Kill(nil)
107107
log.Debugf("waiting for acquisition to finish")
108-
drainChan(inputLineChan)
108+
drainChan(logLines)
109109

110110
if err := acquisTomb.Wait(); err != nil {
111111
log.Warningf("Acquisition returned error : %s", err)
@@ -114,7 +114,7 @@ func ShutdownCrowdsecRoutines(cancel context.CancelFunc, g *errgroup.Group) erro
114114
}
115115

116116
log.Debugf("acquisition is finished, wait for parser/bucket/ouputs.")
117-
drainChan(inputEventChan)
117+
drainChan(inEvents)
118118

119119
outputsTomb.Kill(nil)
120120

0 commit comments

Comments
 (0)