Skip to content

Commit 69c0962

Browse files
authored
pkg/acquisition/registry, move datasource registration to avoid dependency (#4189)
* pkg/acquisition/types * pkg/application/registry * move datasource registration to pkg/application/modules * reformat * move datasource factory & getter * RegisterTestFactory() * register datasources for tests too
1 parent e325e2c commit 69c0962

50 files changed

Lines changed: 591 additions & 442 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

cmd/crowdsec-cli/clisetup/setup/acquisition.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,9 @@ import (
1212
goccyyaml "github.com/goccy/go-yaml"
1313
"gopkg.in/yaml.v3"
1414

15-
"github.com/crowdsecurity/crowdsec/pkg/acquisition"
15+
"github.com/crowdsecurity/crowdsec/pkg/acquisition/registry"
1616
"github.com/crowdsecurity/crowdsec/pkg/acquisition/configuration"
17+
_ "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules" // register all datasources
1718
)
1819

1920
var (
@@ -48,11 +49,13 @@ func (d DatasourceConfig) Validate() error {
4849
return ErrMissingSourceField
4950
}
5051

51-
ds, err := acquisition.GetDataSourceIface(commonDS.Source)
52+
factory, err := registry.LookupFactory(commonDS.Source)
5253
if err != nil {
5354
return err
5455
}
5556

57+
ds := factory()
58+
5659
// validate the rest of the fields with the concrete implementation
5760

5861
err = ds.UnmarshalConfig(body)

cmd/crowdsec/crowdsec.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"github.com/crowdsecurity/go-cs-lib/trace"
1414

1515
"github.com/crowdsecurity/crowdsec/pkg/acquisition"
16+
acquisitionTypes "github.com/crowdsecurity/crowdsec/pkg/acquisition/types"
1617
"github.com/crowdsecurity/crowdsec/pkg/alertcontext"
1718
"github.com/crowdsecurity/crowdsec/pkg/apiclient"
1819
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
@@ -24,7 +25,7 @@ import (
2425
)
2526

2627
// initCrowdsec prepares the log processor service
27-
func initCrowdsec(ctx context.Context, cConfig *csconfig.Config, hub *cwhub.Hub, testMode bool) (*parser.Parsers, []acquisition.DataSource, error) {
28+
func initCrowdsec(ctx context.Context, cConfig *csconfig.Config, hub *cwhub.Hub, testMode bool) (*parser.Parsers, []acquisitionTypes.DataSource, error) {
2829
var err error
2930
if err = alertcontext.LoadConsoleContext(cConfig, hub); err != nil {
3031
return nil, nil, fmt.Errorf("while loading context: %w", err)
@@ -106,7 +107,7 @@ func startOutputRoutines(ctx context.Context, cConfig *csconfig.Config, parsers
106107
}
107108
}
108109

109-
func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *apiclient.ApiClient, hub *cwhub.Hub, datasources []acquisition.DataSource) error {
110+
func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *apiclient.ApiClient, hub *cwhub.Hub, datasources []acquisitionTypes.DataSource) error {
110111
mp := NewMetricsProvider(
111112
apiClient,
112113
lpMetricsDefaultInterval,
@@ -134,7 +135,7 @@ func startLPMetrics(ctx context.Context, cConfig *csconfig.Config, apiClient *ap
134135
}
135136

136137
// runCrowdsec starts the log processor service
137-
func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisition.DataSource) error {
138+
func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.Hub, datasources []acquisitionTypes.DataSource) error {
138139
inEvents = make(chan pipeline.Event)
139140
logLines = make(chan pipeline.Event)
140141

@@ -164,7 +165,7 @@ func runCrowdsec(ctx context.Context, g *errgroup.Group, cConfig *csconfig.Confi
164165
}
165166

166167
// serveCrowdsec wraps the log processor service
167-
func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisition.DataSource, agentReady chan bool) {
168+
func serveCrowdsec(ctx context.Context, parsers *parser.Parsers, cConfig *csconfig.Config, hub *cwhub.Hub, datasources []acquisitionTypes.DataSource, agentReady chan bool) {
168169
cctx, cancel := context.WithCancel(ctx)
169170

170171
var g errgroup.Group

cmd/crowdsec/lpmetrics.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import (
1818
"github.com/crowdsecurity/go-cs-lib/trace"
1919
"github.com/crowdsecurity/go-cs-lib/version"
2020

21-
"github.com/crowdsecurity/crowdsec/pkg/acquisition"
21+
acquisitionTypes "github.com/crowdsecurity/crowdsec/pkg/acquisition/types"
2222
"github.com/crowdsecurity/crowdsec/pkg/apiclient"
2323
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
2424
"github.com/crowdsecurity/crowdsec/pkg/fflag"
@@ -87,7 +87,7 @@ func getHubState(hub *cwhub.Hub) models.HubItems {
8787
}
8888

8989
// newStaticMetrics is called when the process starts, or reloads the configuration
90-
func newStaticMetrics(datasources []acquisition.DataSource, hub *cwhub.Hub) staticMetrics {
90+
func newStaticMetrics(datasources []acquisitionTypes.DataSource, hub *cwhub.Hub) staticMetrics {
9191
datasourceMap := map[string]int64{}
9292

9393
for _, ds := range datasources {
@@ -111,7 +111,7 @@ func NewMetricsProvider(
111111
apic *apiclient.ApiClient,
112112
interval time.Duration,
113113
logger *logrus.Entry,
114-
datasources []acquisition.DataSource,
114+
datasources []acquisitionTypes.DataSource,
115115
hub *cwhub.Hub,
116116
) *MetricsProvider {
117117
static := newStaticMetrics(datasources, hub)

cmd/crowdsec/main.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import (
1717
"github.com/crowdsecurity/go-cs-lib/trace"
1818

1919
"github.com/crowdsecurity/crowdsec/pkg/acquisition"
20+
_ "github.com/crowdsecurity/crowdsec/pkg/acquisition/modules" // register all datasources
21+
acquisitionTypes "github.com/crowdsecurity/crowdsec/pkg/acquisition/types"
2022
"github.com/crowdsecurity/crowdsec/pkg/csconfig"
2123
"github.com/crowdsecurity/crowdsec/pkg/csplugin"
2224
"github.com/crowdsecurity/crowdsec/pkg/cwhub"
@@ -38,7 +40,7 @@ var (
3840
flags Flags
3941

4042
// the state of acquisition
41-
dataSources []acquisition.DataSource
43+
dataSources []acquisitionTypes.DataSource
4244
// the state of the buckets
4345
holders []leakybucket.BucketFactory
4446
buckets *leakybucket.Buckets
@@ -72,7 +74,7 @@ func LoadBuckets(cConfig *csconfig.Config, hub *cwhub.Hub) error {
7274
return nil
7375
}
7476

75-
func LoadAcquisition(ctx context.Context, cConfig *csconfig.Config, hub *cwhub.Hub) ([]acquisition.DataSource, error) {
77+
func LoadAcquisition(ctx context.Context, cConfig *csconfig.Config, hub *cwhub.Hub) ([]acquisitionTypes.DataSource, error) {
7678
if flags.SingleFileType != "" && flags.OneShotDSN != "" {
7779
flags.Labels["type"] = flags.SingleFileType
7880

0 commit comments

Comments
 (0)