Skip to content

Commit d472a2f

Browse files
authored
refact cmd/crowdsec: remove globals ParseDump, BucketPourTrack (#4184)
1 parent 097c242 commit d472a2f

9 files changed

Lines changed: 19 additions & 25 deletions

File tree

cmd/crowdsec/main.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"github.com/crowdsecurity/crowdsec/pkg/fflag"
2525
"github.com/crowdsecurity/crowdsec/pkg/leakybucket"
2626
"github.com/crowdsecurity/crowdsec/pkg/logging"
27-
"github.com/crowdsecurity/crowdsec/pkg/parser"
2827
"github.com/crowdsecurity/crowdsec/pkg/pipeline"
2928
)
3029

@@ -115,11 +114,6 @@ func LoadConfig(configFile string, disableAgent bool, disableAPI bool, quiet boo
115114
}
116115
}
117116

118-
if flags.DumpDir != "" {
119-
parser.ParseDump = true
120-
leakybucket.BucketPourTrack = true
121-
}
122-
123117
if flags.haveTimeMachine() {
124118
// in time-machine mode, we want to see what's happening
125119
cConfig.Common.LogMedia = "stdout"

cmd/crowdsec/output.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ func runOutput(
114114
}
115115

116116
/* process post overflow parser nodes */
117-
event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes)
117+
dump := flags.DumpDir != ""
118+
event, err := parser.Parse(postOverflowCTX, event, postOverflowNodes, dump)
118119
if err != nil {
119120
return fmt.Errorf("postoverflow failed: %w", err)
120121
}

cmd/crowdsec/parse.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@ func parseEvent(
3535

3636
startParsing := time.Now()
3737
/* parse the log using magic */
38-
parsed, err := parser.Parse(parserCTX, event, nodes)
38+
dump := flags.DumpDir != ""
39+
parsed, err := parser.Parse(parserCTX, event, nodes, dump)
3940
if err != nil {
4041
log.Errorf("failed parsing: %v", err)
4142
}

cmd/crowdsec/pour.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ func runPour(ctx context.Context, input chan pipeline.Event, holders []leaky.Buc
5252
triggerGC(parsed, buckets, cConfig)
5353
}
5454
// here we can bucketify with parsed
55-
poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets)
55+
track := flags.DumpDir != ""
56+
poured, err := leaky.PourItemToHolders(ctx, parsed, holders, buckets, track)
5657
if err != nil {
5758
log.Warningf("bucketify failed for: %v with %s", parsed, err)
5859
continue

pkg/leakybucket/buckets_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ func testFile(t *testing.T, file string, holders []BucketFactory, response chan
184184
in.ExpectMode = pipeline.TIMEMACHINE
185185
log.Infof("Buckets input : %s", spew.Sdump(in))
186186

187-
ok, err := PourItemToHolders(ctx, in, holders, buckets)
187+
ok, err := PourItemToHolders(ctx, in, holders, buckets, false)
188188
if err != nil {
189189
t.Fatalf("Failed to pour : %s", err)
190190
}
@@ -207,7 +207,7 @@ POLL_AGAIN:
207207
results = append(results, ret)
208208
if ret.Overflow.Reprocess {
209209
log.Errorf("Overflow being reprocessed.")
210-
ok, err := PourItemToHolders(ctx, ret, holders, buckets)
210+
ok, err := PourItemToHolders(ctx, ret, holders, buckets, false)
211211
if err != nil {
212212
t.Fatalf("Failed to pour : %s", err)
213213
}

pkg/leakybucket/manager_run.go

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import (
2020
var (
2121
serialized map[string]Leaky
2222
BucketPourCache map[string][]pipeline.Event = make(map[string][]pipeline.Event)
23-
BucketPourTrack bool
2423
bucketPourMu sync.Mutex
2524
)
2625

@@ -75,7 +74,7 @@ func GarbageCollectBuckets(deadline time.Time, buckets *Buckets) {
7574
}
7675
}
7776

78-
func PourItemToBucket(ctx context.Context, bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed *pipeline.Event) (bool, error) {
77+
func PourItemToBucket(ctx context.Context, bucket *Leaky, holder BucketFactory, buckets *Buckets, parsed *pipeline.Event, track bool) (bool, error) {
7978
var sent bool
8079
var buckey = bucket.Mapkey
8180
var err error
@@ -143,7 +142,7 @@ func PourItemToBucket(ctx context.Context, bucket *Leaky, holder BucketFactory,
143142
select {
144143
case bucket.In <- parsed:
145144
// holder.logger.Tracef("Successfully sent !")
146-
if BucketPourTrack {
145+
if track {
147146
evt := deepcopy.Copy(*parsed).(pipeline.Event)
148147

149148
bucketPourMu.Lock()
@@ -204,10 +203,10 @@ func LoadOrStoreBucketFromHolder(ctx context.Context, partitionKey string, bucke
204203

205204
var orderEvent map[string]*sync.WaitGroup
206205

207-
func PourItemToHolders(ctx context.Context, parsed pipeline.Event, holders []BucketFactory, buckets *Buckets) (bool, error) {
206+
func PourItemToHolders(ctx context.Context, parsed pipeline.Event, holders []BucketFactory, buckets *Buckets, track bool) (bool, error) {
208207
var ok, condition, poured bool
209208

210-
if BucketPourTrack {
209+
if track {
211210
evt := deepcopy.Copy(parsed).(pipeline.Event)
212211

213212
bucketPourMu.Lock()
@@ -275,7 +274,7 @@ func PourItemToHolders(ctx context.Context, parsed pipeline.Event, holders []Buc
275274
orderEvent[buckey].Add(1)
276275
}
277276

278-
ok, err := PourItemToBucket(ctx, bucket, holders[idx], buckets, &parsed)
277+
ok, err := PourItemToBucket(ctx, bucket, holders[idx], buckets, &parsed, track)
279278

280279
if bucket.orderEvent {
281280
orderEvent[buckey].Wait()

pkg/leakybucket/manager_run_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func TestGCandDump(t *testing.T) {
8686

8787
in := pipeline.Event{Parsed: map[string]string{"something": "something"}}
8888
// pour an item that will go to leaky + counter
89-
ok, err := PourItemToHolders(ctx, in, Holders, buckets)
89+
ok, err := PourItemToHolders(ctx, in, Holders, buckets, false)
9090
if err != nil {
9191
t.Fatalf("while pouring item: %s", err)
9292
}
@@ -157,7 +157,7 @@ func TestShutdownBuckets(t *testing.T) {
157157
in := pipeline.Event{Parsed: map[string]string{"something": "something"}}
158158
// pour an item that will go to leaky + counter
159159
ctx, cancel := context.WithCancel(t.Context())
160-
ok, err := PourItemToHolders(ctx, in, Holders, buckets)
160+
ok, err := PourItemToHolders(ctx, in, Holders, buckets, false)
161161
if err != nil {
162162
t.Fatalf("while pouring item : %s", err)
163163
}

pkg/parser/parsing_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ func testSubSet(testSet TestFile, pctx UnixParserCtx, nodes []Node) (bool, error
316316
var results []pipeline.Event
317317

318318
for _, in := range testSet.Lines {
319-
out, err := Parse(pctx, in, nodes)
319+
out, err := Parse(pctx, in, nodes, false)
320320
if err != nil {
321321
log.Errorf("Failed to process %s : %v", spew.Sdump(in), err)
322322
}

pkg/parser/runtime.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,6 @@ func stageidx(stage string, stages []string) int {
242242
return -1
243243
}
244244

245-
var ParseDump bool
246-
247245
var (
248246
StageParseCache dumps.ParserResults = make(dumps.ParserResults)
249247
StageParseMutex sync.Mutex
@@ -254,7 +252,7 @@ var (
254252
})
255253
)
256254

257-
func Parse(ctx UnixParserCtx, xp pipeline.Event, nodes []Node) (pipeline.Event, error) {
255+
func Parse(ctx UnixParserCtx, xp pipeline.Event, nodes []Node, dump bool) (pipeline.Event, error) {
258256
event := xp
259257

260258
/* the stage is undefined, probably line is freshly acquired, set to first stage !*/
@@ -288,7 +286,7 @@ func Parse(ctx UnixParserCtx, xp pipeline.Event, nodes []Node) (pipeline.Event,
288286
log.Tracef("INPUT '%s'", event.Line.Raw)
289287
}
290288

291-
if ParseDump {
289+
if dump {
292290
ensureStageCache()
293291
}
294292

@@ -339,7 +337,7 @@ func Parse(ctx UnixParserCtx, xp pipeline.Event, nodes []Node) (pipeline.Event,
339337

340338
clog.Tracef("node (%s) ret : %v", nodes[idx].rn, ret)
341339

342-
if ParseDump {
340+
if dump {
343341
var parserIdxInStage int
344342

345343
// copy outside of critical section

0 commit comments

Comments
 (0)