Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ All notable changes to this project will be documented in this file.
The `.clusterConfig.metadataStorageDatabase` has subfields according to the supported db types: `postgresql`, `mysql` and `derby`.
- BREAKING: The `.clusterConfig.metadataStorageDatabase` field has been renamed to `.clusterConfig.metadataDatabase` for consistency ([#814]).
- Document Helm deployed RBAC permissions and remove unnecessary permissions ([#810]).
- Internal operator refactoring: introduce dereference() and validate() steps in the reconciler ([#824]).

### Deleted

Expand All @@ -25,6 +26,7 @@ All notable changes to this project will be documented in this file.
[#813]: https://github.com/stackabletech/druid-operator/pull/813
[#814]: https://github.com/stackabletech/druid-operator/pull/814
[#818]: https://github.com/stackabletech/druid-operator/pull/818
[#824]: https://github.com/stackabletech/druid-operator/pull/824

## [26.3.0] - 2026-03-16

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use product_config::{
types::PropertyNameKind,
writer::{PropertiesWriterError, to_java_properties_string},
};
use snafu::{OptionExt, ResultExt, Snafu};
use snafu::{ResultExt, Snafu};
use stackable_operator::{
builder::{
self,
Expand All @@ -26,11 +26,7 @@ use stackable_operator::{
},
cli::OperatorEnvironmentOptions,
cluster_resources::{ClusterResourceApplyStrategy, ClusterResources},
commons::{
opa::OpaApiVersion,
product_image_selection::{self, ResolvedProductImage},
rbac::build_rbac_resources,
},
commons::{product_image_selection::ResolvedProductImage, rbac::build_rbac_resources},
constants::RESTART_CONTROLLER_ENABLED_LABEL,
crd::s3,
database_connections::drivers::jdbc::JdbcDatabaseConnection as _,
Expand All @@ -49,7 +45,6 @@ use stackable_operator::{
},
kvp::{KeyValuePairError, LabelError, LabelValueError, Labels},
logging::controller::ReconcilerError,
product_config_utils::{transform_all_roles_to_config, validate_all_roles_and_groups_config},
product_logging::{
self,
framework::LoggingError,
Expand Down Expand Up @@ -77,7 +72,6 @@ use crate::{
LOG_CONFIG_DIRECTORY, MAX_DRUID_LOG_FILES_SIZE, METRICS_PORT, METRICS_PORT_NAME,
OPERATOR_NAME, RUNTIME_PROPS, RW_CONFIG_DIRECTORY, S3_ACCESS_KEY, S3_ENDPOINT_URL,
S3_PATH_STYLE_ACCESS, S3_SECRET_KEY, STACKABLE_LOG_DIR, ZOOKEEPER_CONNECTION_STRING,
authentication::AuthenticationClassesResolved, authorization::DruidAuthorization,
build_recommended_labels, build_string_list, security::DruidTlsSecurity, v1alpha1,
},
discovery::{self, build_discovery_configmaps},
Expand All @@ -92,10 +86,13 @@ use crate::{
service::{build_rolegroup_headless_service, build_rolegroup_metrics_service},
};

mod dereference;
mod validate;

pub const DRUID_CONTROLLER_NAME: &str = "druidcluster";
pub const FULL_CONTROLLER_NAME: &str = concatcp!(DRUID_CONTROLLER_NAME, '.', OPERATOR_NAME);

const CONTAINER_IMAGE_BASE_NAME: &str = "druid";
pub(super) const CONTAINER_IMAGE_BASE_NAME: &str = "druid";

// volume names
const DRUID_CONFIG_VOLUME_NAME: &str = "config";
Expand Down Expand Up @@ -139,63 +136,19 @@ pub enum Error {
rolegroup: RoleGroupRef<v1alpha1::DruidCluster>,
},

#[snafu(display("invalid product configuration"))]
InvalidProductConfig {
source: stackable_operator::product_config_utils::Error,
},

#[snafu(display("invalid authentication configuration"))]
InvalidDruidAuthenticationConfig {
source: crate::authentication::Error,
},

#[snafu(display("object is missing metadata to build owner reference"))]
ObjectMissingMetadataForOwnerRef {
source: stackable_operator::builder::meta::Error,
},

#[snafu(display(
"failed to get ZooKeeper discovery config map for cluster: {}",
cm_name
))]
GetZookeeperConnStringConfigMap {
source: stackable_operator::client::Error,
cm_name: String,
},

#[snafu(display(
"failed to get OPA discovery config map and/or connection string for cluster: {}",
cm_name
))]
GetOpaConnString {
source: stackable_operator::commons::opa::Error,
cm_name: String,
},

#[snafu(display("failed to get valid S3 connection"))]
GetS3Connection { source: crate::crd::Error },
#[snafu(display("failed to dereference cluster objects"))]
Dereference { source: dereference::Error },

#[snafu(display("failed to configure S3 connection"))]
ConfigureS3 {
source: stackable_operator::crd::s3::v1alpha1::ConnectionError,
},

#[snafu(display("failed to get deep storage bucket"))]
GetDeepStorageBucket {
source: stackable_operator::crd::s3::v1alpha1::BucketError,
},

#[snafu(display(
"failed to get ZooKeeper connection string from config map {}",
cm_name
))]
MissingZookeeperConnString { cm_name: String },

#[snafu(display("failed to transform configs"))]
ProductConfigTransform {
source: stackable_operator::product_config_utils::Error,
},

#[snafu(display("failed to format runtime properties"))]
PropertiesWriteError { source: PropertiesWriterError },

Expand Down Expand Up @@ -245,17 +198,9 @@ pub enum Error {
name: String,
},

#[snafu(display("object defines no namespace"))]
ObjectHasNoNamespace,

#[snafu(display("failed to initialize security context"))]
FailedToInitializeSecurityContext { source: crate::crd::security::Error },

#[snafu(display("failed to retrieve AuthenticationClass"))]
AuthenticationClassRetrieval {
source: crate::crd::authentication::Error,
},

#[snafu(display("failed to get JVM config"))]
GetJvmConfig { source: crate::config::jvm::Error },

Expand Down Expand Up @@ -363,10 +308,8 @@ pub enum Error {
#[snafu(display("failed to configure service"))]
ServiceConfiguration { source: crate::service::Error },

#[snafu(display("failed to resolve product image"))]
ResolveProductImage {
source: product_image_selection::Error,
},
#[snafu(display("failed to validate cluster"))]
ValidateCluster { source: validate::Error },

#[snafu(display("invalid metadata database connection"))]
InvalidMetadataDatabaseConnection {
Expand Down Expand Up @@ -394,89 +337,18 @@ pub async fn reconcile_druid(
.context(InvalidDruidClusterSnafu)?;

let client = &ctx.client;
let namespace = &druid
.metadata
.namespace
.clone()
.with_context(|| ObjectHasNoNamespaceSnafu {})?;
let resolved_product_image = druid
.spec
.image
.resolve(
CONTAINER_IMAGE_BASE_NAME,
&ctx.operator_environment.image_repository,
crate::built_info::PKG_VERSION,
)
.context(ResolveProductImageSnafu)?;

let zk_confmap = druid.spec.cluster_config.zookeeper_config_map_name.clone();
let zk_connstr = client
.get::<ConfigMap>(&zk_confmap, namespace)
let dereferenced_objects = dereference::dereference(client, druid)
.await
.context(GetZookeeperConnStringConfigMapSnafu {
cm_name: zk_confmap.clone(),
})?
.data
.and_then(|mut data| data.remove("ZOOKEEPER"))
.context(MissingZookeeperConnStringSnafu {
cm_name: zk_confmap.clone(),
})?;
.context(DereferenceSnafu)?;

// Assemble the OPA connection string from the discovery and the given path, if a spec is given.
let opa_connstr = if let Some(DruidAuthorization { opa: opa_config }) =
&druid.spec.cluster_config.authorization
{
Some(
opa_config
.full_document_url_from_config_map(client, druid, Some("allow"), &OpaApiVersion::V1)
.await
.context(GetOpaConnStringSnafu {
cm_name: opa_config.config_map_name.clone(),
})?,
)
} else {
None
};

// Get the s3 connection if one is defined
let s3_conn = druid
.get_s3_connection(client)
.await
.context(GetS3ConnectionSnafu)?;

let deep_storage_bucket_name = match &druid.spec.cluster_config.deep_storage {
DeepStorageSpec::S3(s3_spec) => Some(
s3_spec
.bucket
.clone()
.resolve(client, namespace)
.await
.context(GetDeepStorageBucketSnafu)?
.bucket_name,
),
_ => None,
};

let resolved_auth_classes =
AuthenticationClassesResolved::from(&druid.spec.cluster_config, client)
.await
.context(AuthenticationClassRetrievalSnafu)?;

let druid_tls_security =
DruidTlsSecurity::new_from_druid_cluster(druid, &resolved_auth_classes);

let druid_auth_config = DruidAuthenticationConfig::try_from(resolved_auth_classes)
.context(InvalidDruidAuthenticationConfigSnafu)?;

let role_config = transform_all_roles_to_config(druid, &druid.build_role_properties());
let validated_role_config = validate_all_roles_and_groups_config(
&resolved_product_image.product_version,
&role_config.context(ProductConfigTransformSnafu)?,
let validated = validate::validate(
druid,
&dereferenced_objects,
&ctx.operator_environment,
&ctx.product_config,
false,
false,
)
.context(InvalidProductConfigSnafu)?;
.context(ValidateClusterSnafu)?;

let mut cluster_resources = ClusterResources::new(
APP_NAME,
Expand Down Expand Up @@ -510,7 +382,7 @@ pub async fn reconcile_druid(

let mut ss_cond_builder = StatefulSetConditionBuilder::default();

for (role_name, role_config) in validated_role_config.iter() {
for (role_name, role_config) in validated.validated_role_config.iter() {
let druid_role = DruidRole::from_str(role_name).context(UnidentifiedDruidRoleSnafu {
role: role_name.to_string(),
})?;
Expand All @@ -533,7 +405,7 @@ pub async fn reconcile_druid(
let role_group_service_recommended_labels = build_recommended_labels(
druid,
DRUID_CONTROLLER_NAME,
&resolved_product_image.app_version_label_value,
&validated.resolved_product_image.app_version_label_value,
&rolegroup.role,
&rolegroup.role_group,
);
Expand All @@ -548,7 +420,7 @@ pub async fn reconcile_druid(

let rg_headless_service = build_rolegroup_headless_service(
druid,
&druid_tls_security,
&validated.druid_tls_security,
&druid_role,
&rolegroup,
role_group_service_recommended_labels.clone(),
Expand All @@ -565,27 +437,27 @@ pub async fn reconcile_druid(

let rg_configmap = build_rolegroup_config_map(
druid,
&resolved_product_image,
&validated.resolved_product_image,
&rolegroup,
rolegroup_config,
&merged_rolegroup_config,
&zk_connstr,
opa_connstr.as_deref(),
s3_conn.as_ref(),
deep_storage_bucket_name.as_deref(),
&druid_tls_security,
&druid_auth_config,
&dereferenced_objects.zookeeper_connection_string,
dereferenced_objects.opa_connection_string.as_deref(),
dereferenced_objects.s3_connection.as_ref(),
dereferenced_objects.deep_storage_bucket_name.as_deref(),
&validated.druid_tls_security,
&validated.druid_auth_config,
)?;
let rg_statefulset = build_rolegroup_statefulset(
druid,
&resolved_product_image,
&validated.resolved_product_image,
&druid_role,
&rolegroup,
rolegroup_config,
&merged_rolegroup_config,
s3_conn.as_ref(),
&druid_tls_security,
&druid_auth_config,
dereferenced_objects.s3_connection.as_ref(),
&validated.druid_tls_security,
&validated.druid_auth_config,
&rbac_sa,
)?;

Expand Down Expand Up @@ -628,14 +500,14 @@ pub async fn reconcile_druid(
build_recommended_labels(
druid,
DRUID_CONTROLLER_NAME,
&resolved_product_image.app_version_label_value,
&validated.resolved_product_image.app_version_label_value,
role_name,
"none",
),
listener_class.to_string(),
listener_group_name,
&druid_role,
&druid_tls_security,
&validated.druid_tls_security,
)
.context(ListenerConfigurationSnafu)?;

Expand All @@ -649,8 +521,8 @@ pub async fn reconcile_druid(
for discovery_cm in build_discovery_configmaps(
druid,
druid,
&resolved_product_image,
&druid_tls_security,
&validated.resolved_product_image,
&validated.druid_tls_security,
listener,
)
.await
Expand Down Expand Up @@ -1387,9 +1259,12 @@ pub fn error_policy(
mod test {
use product_config::{ProductConfigManager, writer};
use rstest::*;
use stackable_operator::product_config_utils::{
transform_all_roles_to_config, validate_all_roles_and_groups_config,
};

use super::*;
use crate::crd::PROP_SEGMENT_CACHE_LOCATIONS;
use crate::crd::{PROP_SEGMENT_CACHE_LOCATIONS, authentication::AuthenticationClassesResolved};

#[derive(Snafu, Debug, EnumDiscriminants)]
#[strum_discriminants(derive(IntoStaticStr))]
Expand Down
Loading
Loading