Skip to content

Commit 5fcc066

Browse files
authored
refact pkg/leakybucket: drop closures (#4178)
* dry assignment * OnBucketPour: drop closure * OnBucketOverflow, AfterBucketPour: drop closure * lint
1 parent d3bae01 commit 5fcc066

9 files changed

Lines changed: 166 additions & 199 deletions

File tree

pkg/leakybucket/bayesian.go

Lines changed: 17 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -60,29 +60,27 @@ func (c *BayesianBucket) OnBucketInit(g *BucketFactory) error {
6060
return err
6161
}
6262

63-
func (c *BayesianBucket) AfterBucketPour(_ *BucketFactory) func(pipeline.Event, *Leaky) *pipeline.Event {
64-
return func(msg pipeline.Event, l *Leaky) *pipeline.Event {
65-
c.posterior = c.prior
66-
l.logger.Debugf("starting bayesian evaluation with prior: %v", c.posterior)
67-
68-
for _, bevent := range c.bayesianEventArray {
69-
err := bevent.bayesianUpdate(c, msg, l)
70-
if err != nil {
71-
l.logger.Errorf("bayesian update failed for %s with %s", bevent.rawCondition.ConditionalFilterName, err)
72-
}
73-
}
74-
75-
l.logger.Debugf("value of posterior after events : %v", c.posterior)
63+
func (c *BayesianBucket) AfterBucketPour(_ *BucketFactory, msg pipeline.Event, l *Leaky) *pipeline.Event {
64+
c.posterior = c.prior
65+
l.logger.Debugf("starting bayesian evaluation with prior: %v", c.posterior)
7666

77-
if c.posterior > c.threshold {
78-
l.logger.Debugf("Bayesian bucket overflow")
79-
l.Ovflw_ts = l.Last_ts
80-
l.Out <- l.Queue
81-
return nil
67+
for _, bevent := range c.bayesianEventArray {
68+
err := bevent.bayesianUpdate(c, msg, l)
69+
if err != nil {
70+
l.logger.Errorf("bayesian update failed for %s with %s", bevent.rawCondition.ConditionalFilterName, err)
8271
}
72+
}
8373

84-
return &msg
74+
l.logger.Debugf("value of posterior after events : %v", c.posterior)
75+
76+
if c.posterior > c.threshold {
77+
l.logger.Debugf("Bayesian bucket overflow")
78+
l.Ovflw_ts = l.Last_ts
79+
l.Out <- l.Queue
80+
return nil
8581
}
82+
83+
return &msg
8684
}
8785

8886
func (b *BayesianEvent) bayesianUpdate(c *BayesianBucket, msg pipeline.Event, l *Leaky) error {

pkg/leakybucket/blackhole.go

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -30,36 +30,33 @@ func NewBlackhole(bucketFactory *BucketFactory) (*blackhole, error) {
3030
}, nil
3131
}
3232

33-
func (bl *blackhole) OnBucketOverflow(_ *BucketFactory) func(*Leaky, pipeline.RuntimeAlert, *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue) {
34-
return func(leaky *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue) {
35-
var blackholed = false
36-
var tmp []hiddenKey
37-
// search if we are blackholed and refresh the slice
38-
for _, element := range bl.hiddenKeys {
39-
40-
if element.key == leaky.Mapkey {
41-
if element.expiration.After(leaky.Ovflw_ts) {
42-
leaky.logger.Debugf("Overflow discarded, still blackholed for %s", element.expiration.Sub(leaky.Ovflw_ts))
43-
blackholed = true
44-
}
45-
}
46-
33+
func (bl *blackhole) OnBucketOverflow(_ *BucketFactory, leaky *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue) {
34+
var blackholed = false
35+
var tmp []hiddenKey
36+
// search if we are blackholed and refresh the slice
37+
for _, element := range bl.hiddenKeys {
38+
if element.key == leaky.Mapkey {
4739
if element.expiration.After(leaky.Ovflw_ts) {
48-
tmp = append(tmp, element)
49-
} else {
50-
leaky.logger.Debugf("%s left blackhole %s ago", element.key, leaky.Ovflw_ts.Sub(element.expiration))
40+
leaky.logger.Debugf("Overflow discarded, still blackholed for %s", element.expiration.Sub(leaky.Ovflw_ts))
41+
blackholed = true
5142
}
5243
}
53-
bl.hiddenKeys = tmp
5444

55-
if blackholed {
56-
leaky.logger.Tracef("Event is blackholed (%s)", leaky.First_ts)
57-
return pipeline.RuntimeAlert{
58-
Mapkey: leaky.Mapkey,
59-
}, nil
45+
if element.expiration.After(leaky.Ovflw_ts) {
46+
tmp = append(tmp, element)
47+
} else {
48+
leaky.logger.Debugf("%s left blackhole %s ago", element.key, leaky.Ovflw_ts.Sub(element.expiration))
6049
}
61-
bl.hiddenKeys = append(bl.hiddenKeys, hiddenKey{key:leaky.Mapkey, expiration: leaky.Ovflw_ts.Add(bl.duration)})
62-
leaky.logger.Debugf("Adding overflow to blackhole (%s)", leaky.First_ts)
63-
return alert, queue
6450
}
51+
bl.hiddenKeys = tmp
52+
53+
if blackholed {
54+
leaky.logger.Tracef("Event is blackholed (%s)", leaky.First_ts)
55+
return pipeline.RuntimeAlert{
56+
Mapkey: leaky.Mapkey,
57+
}, nil
58+
}
59+
bl.hiddenKeys = append(bl.hiddenKeys, hiddenKey{key:leaky.Mapkey, expiration: leaky.Ovflw_ts.Add(bl.duration)})
60+
leaky.logger.Debugf("Adding overflow to blackhole (%s)", leaky.First_ts)
61+
return alert, queue
6562
}

pkg/leakybucket/bucket.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func LeakRoutine(ctx context.Context, leaky *Leaky) {
196196
case msg := <-leaky.In:
197197
// the msg var use is confusing and is redeclared in a different type :/
198198
for _, processor := range processors {
199-
msg = processor.OnBucketPour(leaky.BucketConfig)(*msg, leaky)
199+
msg = processor.OnBucketPour(leaky.BucketConfig, *msg, leaky)
200200
// if &msg == nil we stop processing
201201
if msg == nil {
202202
if leaky.orderEvent {
@@ -213,7 +213,7 @@ func LeakRoutine(ctx context.Context, leaky *Leaky) {
213213
leaky.Pour(leaky, *msg) // glue for now
214214

215215
for _, processor := range processors {
216-
msg = processor.AfterBucketPour(leaky.BucketConfig)(*msg, leaky)
216+
msg = processor.AfterBucketPour(leaky.BucketConfig, *msg, leaky)
217217
if msg == nil {
218218
if leaky.orderEvent {
219219
orderEvent[leaky.Mapkey].Done()
@@ -270,7 +270,7 @@ func LeakRoutine(ctx context.Context, leaky *Leaky) {
270270
log.Error(err)
271271
}
272272
for _, f := range leaky.BucketConfig.processors {
273-
alert, ofw = f.OnBucketOverflow(leaky.BucketConfig)(leaky, alert, ofw)
273+
alert, ofw = f.OnBucketOverflow(leaky.BucketConfig, leaky, alert, ofw)
274274
if ofw == nil {
275275
leaky.logger.Debugf("Overflow has been discarded (%T)", f)
276276
break
@@ -297,7 +297,6 @@ func LeakRoutine(ctx context.Context, leaky *Leaky) {
297297
}
298298
leaky.AllOut <- pipeline.Event{Type: pipeline.OVFLW, Overflow: pipeline.RuntimeAlert{Mapkey: leaky.Mapkey}}
299299
return
300-
301300
}
302301
End:
303302
}
@@ -332,7 +331,7 @@ func (leaky *Leaky) overflow(ofw *pipeline.Queue) {
332331
}
333332
leaky.logger.Tracef("Overflow hooks time : %v", leaky.BucketConfig.processors)
334333
for _, f := range leaky.BucketConfig.processors {
335-
alert, ofw = f.OnBucketOverflow(leaky.BucketConfig)(leaky, alert, ofw)
334+
alert, ofw = f.OnBucketOverflow(leaky.BucketConfig, leaky, alert, ofw)
336335
if ofw == nil {
337336
leaky.logger.Debugf("Overflow has been discarded (%T)", f)
338337
break

pkg/leakybucket/conditional.go

Lines changed: 29 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ func (c *ConditionalOverflow) OnBucketInit(g *BucketFactory) error {
3232
c.ConditionalFilterRuntime = compiled
3333
} else {
3434
conditionalExprCacheLock.Unlock()
35-
//release the lock during compile
36-
compiledExpr, err = expr.Compile(g.ConditionalOverflow, exprhelpers.GetExprOptions(map[string]interface{}{"queue": &pipeline.Queue{}, "leaky": &Leaky{}, "evt": &pipeline.Event{}})...)
35+
// release the lock during compile
36+
compiledExpr, err = expr.Compile(g.ConditionalOverflow, exprhelpers.GetExprOptions(map[string]any{"queue": &pipeline.Queue{}, "leaky": &Leaky{}, "evt": &pipeline.Event{}})...)
3737
if err != nil {
3838
return fmt.Errorf("conditional compile error : %w", err)
3939
}
@@ -47,36 +47,34 @@ func (c *ConditionalOverflow) OnBucketInit(g *BucketFactory) error {
4747
return err
4848
}
4949

50-
func (c *ConditionalOverflow) AfterBucketPour(b *BucketFactory) func(pipeline.Event, *Leaky) *pipeline.Event {
51-
return func(msg pipeline.Event, l *Leaky) *pipeline.Event {
52-
var condition, ok bool
53-
54-
if c.ConditionalFilterRuntime != nil {
55-
l.logger.Debugf("Running condition expression : %s", c.ConditionalFilter)
56-
57-
ret, err := exprhelpers.Run(c.ConditionalFilterRuntime,
58-
map[string]any{"evt": &msg, "queue": l.Queue, "leaky": l},
59-
l.logger, b.Debug)
60-
if err != nil {
61-
l.logger.Errorf("unable to run conditional filter : %s", err)
62-
return &msg
63-
}
64-
65-
l.logger.Debugf("Conditional bucket expression returned : %v", ret)
66-
67-
if condition, ok = ret.(bool); !ok {
68-
l.logger.Warningf("overflow condition, unexpected non-bool return : %T", ret)
69-
return &msg
70-
}
71-
72-
if condition {
73-
l.logger.Debugf("Conditional bucket overflow")
74-
l.Ovflw_ts = l.Last_ts
75-
l.Out <- l.Queue
76-
return nil
77-
}
50+
func (c *ConditionalOverflow) AfterBucketPour(b *BucketFactory, msg pipeline.Event, l *Leaky) *pipeline.Event {
51+
var condition, ok bool
52+
53+
if c.ConditionalFilterRuntime != nil {
54+
l.logger.Debugf("Running condition expression : %s", c.ConditionalFilter)
55+
56+
ret, err := exprhelpers.Run(c.ConditionalFilterRuntime,
57+
map[string]any{"evt": &msg, "queue": l.Queue, "leaky": l},
58+
l.logger, b.Debug)
59+
if err != nil {
60+
l.logger.Errorf("unable to run conditional filter : %s", err)
61+
return &msg
62+
}
63+
64+
l.logger.Debugf("Conditional bucket expression returned : %v", ret)
65+
66+
if condition, ok = ret.(bool); !ok {
67+
l.logger.Warningf("overflow condition, unexpected non-bool return : %T", ret)
68+
return &msg
7869
}
7970

80-
return &msg
71+
if condition {
72+
l.logger.Debugf("Conditional bucket overflow")
73+
l.Ovflw_ts = l.Last_ts
74+
l.Out <- l.Queue
75+
return nil
76+
}
8177
}
78+
79+
return &msg
8280
}

pkg/leakybucket/overflow_filter.go

Lines changed: 20 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -28,35 +28,33 @@ func NewOverflowFilter(g *BucketFactory) (*OverflowFilter, error) {
2828
u := OverflowFilter{}
2929
u.Filter = g.OverflowFilter
3030

31-
u.FilterRuntime, err = expr.Compile(u.Filter, exprhelpers.GetExprOptions(map[string]interface{}{"queue": &pipeline.Queue{}, "signal": &pipeline.RuntimeAlert{}, "leaky": &Leaky{}})...)
31+
u.FilterRuntime, err = expr.Compile(u.Filter, exprhelpers.GetExprOptions(map[string]any{"queue": &pipeline.Queue{}, "signal": &pipeline.RuntimeAlert{}, "leaky": &Leaky{}})...)
3232
if err != nil {
3333
g.logger.Errorf("Unable to compile filter : %v", err)
3434
return nil, fmt.Errorf("unable to compile filter : %v", err)
3535
}
3636
return &u, nil
3737
}
3838

39-
func (u *OverflowFilter) OnBucketOverflow(bucket *BucketFactory) func(*Leaky, pipeline.RuntimeAlert, *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue) {
40-
return func(l *Leaky, s pipeline.RuntimeAlert, q *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue) {
41-
el, err := exprhelpers.Run(u.FilterRuntime, map[string]interface{}{
42-
"queue": q, "signal": s, "leaky": l}, l.logger, bucket.Debug)
43-
if err != nil {
44-
l.logger.Errorf("Failed running overflow filter: %s", err)
45-
return s, q
46-
}
47-
element, ok := el.(bool)
48-
if !ok {
49-
l.logger.Errorf("Overflow filter didn't return bool: %s", err)
50-
return s, q
51-
}
52-
/*filter returned false, event is blackholded*/
53-
if !element {
54-
l.logger.Infof("Event is discarded by overflow filter (%s)", u.Filter)
55-
return pipeline.RuntimeAlert{
56-
Mapkey: l.Mapkey,
57-
}, nil
58-
}
59-
l.logger.Tracef("Event is not discarded by overflow filter (%s)", u.Filter)
39+
func (u *OverflowFilter) OnBucketOverflow(bucket *BucketFactory, l *Leaky, s pipeline.RuntimeAlert, q *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue) {
40+
el, err := exprhelpers.Run(u.FilterRuntime, map[string]any{
41+
"queue": q, "signal": s, "leaky": l}, l.logger, bucket.Debug)
42+
if err != nil {
43+
l.logger.Errorf("Failed running overflow filter: %s", err)
44+
return s, q
45+
}
46+
element, ok := el.(bool)
47+
if !ok {
48+
l.logger.Errorf("Overflow filter didn't return bool: %s", err)
6049
return s, q
6150
}
51+
// filter returned false, event is blackholded
52+
if !element {
53+
l.logger.Infof("Event is discarded by overflow filter (%s)", u.Filter)
54+
return pipeline.RuntimeAlert{
55+
Mapkey: l.Mapkey,
56+
}, nil
57+
}
58+
l.logger.Tracef("Event is not discarded by overflow filter (%s)", u.Filter)
59+
return s, q
6260
}

pkg/leakybucket/processor.go

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ import (
66

77
type Processor interface {
88
OnBucketInit(Bucket *BucketFactory) error
9-
OnBucketPour(Bucket *BucketFactory) func(pipeline.Event, *Leaky) *pipeline.Event
10-
OnBucketOverflow(Bucket *BucketFactory) func(*Leaky, pipeline.RuntimeAlert, *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue)
9+
OnBucketPour(Bucket *BucketFactory, msg pipeline.Event, leaky *Leaky) *pipeline.Event
10+
OnBucketOverflow(Bucket *BucketFactory, leaky *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue)
1111

12-
AfterBucketPour(Bucket *BucketFactory) func(pipeline.Event, *Leaky) *pipeline.Event
12+
AfterBucketPour(Bucket *BucketFactory, msg pipeline.Event, leaky *Leaky) *pipeline.Event
1313
}
1414

1515
type DumbProcessor struct{}
@@ -18,20 +18,14 @@ func (*DumbProcessor) OnBucketInit(_ *BucketFactory) error {
1818
return nil
1919
}
2020

21-
func (*DumbProcessor) OnBucketPour(_ *BucketFactory) func(pipeline.Event, *Leaky) *pipeline.Event {
22-
return func(msg pipeline.Event, _ *Leaky) *pipeline.Event {
23-
return &msg
24-
}
21+
func (*DumbProcessor) OnBucketPour(_ *BucketFactory, msg pipeline.Event, _ *Leaky) *pipeline.Event {
22+
return &msg
2523
}
2624

27-
func (*DumbProcessor) OnBucketOverflow(_ *BucketFactory) func(*Leaky, pipeline.RuntimeAlert, *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue) {
28-
return func(_ *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue) {
29-
return alert, queue
30-
}
25+
func (*DumbProcessor) OnBucketOverflow(_ *BucketFactory, _ *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue) {
26+
return alert, queue
3127
}
3228

33-
func (*DumbProcessor) AfterBucketPour(_ *BucketFactory) func(pipeline.Event, *Leaky) *pipeline.Event {
34-
return func(msg pipeline.Event, _ *Leaky) *pipeline.Event {
35-
return &msg
36-
}
29+
func (*DumbProcessor) AfterBucketPour(_ *BucketFactory, msg pipeline.Event, _ *Leaky) *pipeline.Event {
30+
return &msg
3731
}

pkg/leakybucket/reset_filter.go

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -30,41 +30,35 @@ var (
3030
}
3131
)
3232

33-
func (u *CancelOnFilter) OnBucketPour(bucketFactory *BucketFactory) func(pipeline.Event, *Leaky) *pipeline.Event {
34-
return func(msg pipeline.Event, leaky *Leaky) *pipeline.Event {
35-
var condition, ok bool
36-
if u.CancelOnFilter != nil {
37-
leaky.logger.Tracef("running cancel_on filter")
38-
output, err := exprhelpers.Run(u.CancelOnFilter, map[string]any{"evt": &msg}, leaky.logger, u.Debug)
39-
if err != nil {
40-
leaky.logger.Warningf("cancel_on error : %s", err)
41-
return &msg
42-
}
43-
if condition, ok = output.(bool); !ok {
44-
leaky.logger.Warningf("cancel_on, unexpected non-bool return : %T", output)
45-
return &msg
46-
}
47-
if condition {
48-
leaky.logger.Debugf("reset_filter matched, kill bucket")
49-
leaky.Suicide <- true
50-
return nil //counter intuitively, we need to keep the message so that it doesn't trigger an endless loop
51-
}
52-
leaky.logger.Debugf("reset_filter didn't match")
33+
func (u *CancelOnFilter) OnBucketPour(_ *BucketFactory, msg pipeline.Event, leaky *Leaky) *pipeline.Event {
34+
var condition, ok bool
35+
if u.CancelOnFilter != nil {
36+
leaky.logger.Tracef("running cancel_on filter")
37+
output, err := exprhelpers.Run(u.CancelOnFilter, map[string]any{"evt": &msg}, leaky.logger, u.Debug)
38+
if err != nil {
39+
leaky.logger.Warningf("cancel_on error : %s", err)
40+
return &msg
5341
}
54-
return &msg
42+
if condition, ok = output.(bool); !ok {
43+
leaky.logger.Warningf("cancel_on, unexpected non-bool return : %T", output)
44+
return &msg
45+
}
46+
if condition {
47+
leaky.logger.Debugf("reset_filter matched, kill bucket")
48+
leaky.Suicide <- true
49+
return nil // counter intuitively, we need to keep the message so that it doesn't trigger an endless loop
50+
}
51+
leaky.logger.Debugf("reset_filter didn't match")
5552
}
53+
return &msg
5654
}
5755

58-
func (*CancelOnFilter) OnBucketOverflow(bucketFactory *BucketFactory) func(*Leaky, pipeline.RuntimeAlert, *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue) {
59-
return func(leaky *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue) {
60-
return alert, queue
61-
}
56+
func (*CancelOnFilter) OnBucketOverflow(_ *BucketFactory, _ *Leaky, alert pipeline.RuntimeAlert, queue *pipeline.Queue) (pipeline.RuntimeAlert, *pipeline.Queue) {
57+
return alert, queue
6258
}
6359

64-
func (*CancelOnFilter) AfterBucketPour(bucketFactory *BucketFactory) func(pipeline.Event, *Leaky) *pipeline.Event {
65-
return func(msg pipeline.Event, leaky *Leaky) *pipeline.Event {
66-
return &msg
67-
}
60+
func (*CancelOnFilter) AfterBucketPour(_ *BucketFactory, msg pipeline.Event, _ *Leaky) *pipeline.Event {
61+
return &msg
6862
}
6963

7064
func (u *CancelOnFilter) OnBucketInit(bucketFactory *BucketFactory) error {
@@ -87,7 +81,7 @@ func (u *CancelOnFilter) OnBucketInit(bucketFactory *BucketFactory) error {
8781
}
8882

8983
cancelExprCacheLock.Unlock()
90-
//release the lock during compile
84+
// release the lock during compile
9185

9286
compiledExpr.CancelOnFilter, err = expr.Compile(bucketFactory.CancelOnFilter, exprhelpers.GetExprOptions(map[string]any{"evt": &pipeline.Event{}})...)
9387
if err != nil {

0 commit comments

Comments
 (0)