feat(inkless): Introduce CREATE_TOPICS config interceptor framework and DisklessForceCreateTopicInterceptor#614
Merged
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR introduces a chainable interceptor framework for mutating topic configs during CREATE_TOPICS requests, and uses it to implement a new diskless “force enable” behavior (regex allow-list), while refactoring the existing classic remote-storage forcing logic into the same framework.
Changes:
- Added
CreateTopicConfigInterceptor+CreateTopicConfigInterceptors(first-match-wins chain) and integrated it intoReplicationControlManager’s topic-creation flow. - Refactored classic remote storage forcing into
ClassicTopicRemoteStorageForceCreateTopicInterceptor. - Added
DisklessForceCreateTopicInterceptorwith new server configs (diskless.force.enable,diskless.force.include.topic.regexes) plus unit tests.
Reviewed changes
Copilot reviewed 12 out of 12 changed files in this pull request and generated 1 comment.
Show a summary per file
| File | Description |
|---|---|
| server-common/src/main/java/org/apache/kafka/server/config/ServerConfigs.java | Adds new controller/server configs for enabling diskless forcing and defining the allow-list regexes. |
| core/src/main/scala/kafka/server/KafkaConfig.scala | Wires new diskless force configs into KafkaConfig. |
| core/src/main/scala/kafka/server/ControllerServer.scala | Passes diskless force settings into the controller builder. |
| metadata/src/main/java/org/apache/kafka/controller/QuorumController.java | Threads diskless force config through QuorumController → ReplicationControlManager. |
| metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java | Replaces the old policy call-sites with interceptor-chain application during create-topics processing/validation/response building. |
| metadata/src/main/java/org/apache/kafka/controller/CreateTopicConfigInterceptor.java | Introduces the interceptor interface used for CREATE_TOPICS config mutation. |
| metadata/src/main/java/org/apache/kafka/controller/CreateTopicConfigInterceptors.java | Implements chaining + ordering (diskless first, then remote storage) with first-match-wins semantics. |
| metadata/src/main/java/org/apache/kafka/controller/DisklessForceCreateTopicInterceptor.java | New interceptor that forces diskless.enable=true for allow-listed topics with exclusions. |
| metadata/src/main/java/org/apache/kafka/controller/ClassicTopicRemoteStorageForceCreateTopicInterceptor.java | Refactors classic remote storage forcing into the new interceptor model. |
| metadata/src/test/java/org/apache/kafka/controller/CreateTopicConfigInterceptorsTest.java | Adds unit tests for chaining behavior and priority. |
| metadata/src/test/java/org/apache/kafka/controller/DisklessForceCreateTopicInterceptorTest.java | Adds unit tests for diskless forcing, exclusions, and __ prefix behavior. |
| metadata/src/test/java/org/apache/kafka/controller/ClassicTopicRemoteStorageForceCreateTopicInterceptorTest.java | Updates tests to reflect the refactor from policy to interceptor. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Implement a generic way to intercept and mutate CREATE_TOPICS configs via a chain of interceptors with first-match-wins semantics. - Add CreateTopicConfigInterceptor interface (returns boolean to indicate whether it matched) - Add CreateTopicConfigInterceptors container that chains interceptors and stops at the first match - Refactor ClassicTopicRemoteStorageForcePolicy into ClassicTopicRemoteStorageForceCreateTopicInterceptor implementing the new interface - Update ReplicationControlManager to use the new abstraction The interceptor chain is designed so that additional interceptors (e.g. diskless force) can be added with clear priority ordering. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Introduce a new CREATE_TOPICS config interceptor that forces diskless.enable=true for topics matching a configurable allow list of regexes. Behavior: - Forces diskless.enable=true when topic name matches at least one regex in diskless.force.include.topic.regexes - Excludes system topics (ReplicationControlManager.isSystemTopic) - Excludes compacted topics (cleanup.policy contains "compact") - Validates at construction that regexes cannot match "__"-prefixed names Interceptor chaining: - DisklessForceCreateTopicInterceptor has higher priority (tried first) - ClassicTopicRemoteStorageForceCreateTopicInterceptor is tried only if diskless interceptor did not match (first-match-wins semantics) - Both can be enabled simultaneously; users should configure regexes to avoid overlap New configs: - diskless.force.enable (boolean, default false) - diskless.force.include.topic.regexes (list, default empty) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
…erceptor The diskless force interceptor is now only activated when the system-level diskless storage is enabled (diskless.storage.system.enable=true). If diskless.force.enable=true but system-level diskless is not enabled, a warning is logged and the interceptor is not added to the chain. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
0b8370a to
6ccb30a
Compare
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
giuseppelillo
approved these changes
May 29, 2026
tvainika
pushed a commit
that referenced
this pull request
May 29, 2026
… regexes (#622) The diskless.force.include.topic.regexes LIST config added in #614 was missing a case in testFromPropsInvalid(), causing the nightly CI to fail when the catch-all tested "not_a_number" (valid for a list type). Also adds KafkaConfigTest to the regular Inkless PR CI workflow. Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
giuseppelillo
pushed a commit
that referenced
this pull request
May 29, 2026
…nd DisklessForceCreateTopicInterceptor (#614) * Introduce CREATE_TOPICS config interceptor framework Implement a generic way to intercept and mutate CREATE_TOPICS configs via a chain of interceptors with first-match-wins semantics. - Add CreateTopicConfigInterceptor interface (returns boolean to indicate whether it matched) - Add CreateTopicConfigInterceptors container that chains interceptors and stops at the first match - Refactor ClassicTopicRemoteStorageForcePolicy into ClassicTopicRemoteStorageForceCreateTopicInterceptor implementing the new interface - Update ReplicationControlManager to use the new abstraction The interceptor chain is designed so that additional interceptors (e.g. diskless force) can be added with clear priority ordering. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add DisklessForceCreateTopicInterceptor Introduce a new CREATE_TOPICS config interceptor that forces diskless.enable=true for topics matching a configurable allow list of regexes. Behavior: - Forces diskless.enable=true when topic name matches at least one regex in diskless.force.include.topic.regexes - Excludes system topics (ReplicationControlManager.isSystemTopic) - Excludes compacted topics (cleanup.policy contains "compact") - Validates at construction that regexes cannot match "__"-prefixed names Interceptor chaining: - DisklessForceCreateTopicInterceptor has higher priority (tried first) - ClassicTopicRemoteStorageForceCreateTopicInterceptor is tried only if diskless interceptor did not match (first-match-wins semantics) - Both can be enabled simultaneously; users should configure regexes to avoid overlap New configs: - diskless.force.enable (boolean, default false) - diskless.force.include.topic.regexes (list, default empty) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Require system-level diskless to activate DisklessForceCreateTopicInterceptor The diskless force interceptor is now only activated when the system-level diskless storage is enabled (diskless.storage.system.enable=true). If diskless.force.enable=true but system-level diskless is not enabled, a warning is logged and the interceptor is not added to the chain. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Document "__" prefix exclusion Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
giuseppelillo
pushed a commit
that referenced
this pull request
May 29, 2026
… regexes (#622) The diskless.force.include.topic.regexes LIST config added in #614 was missing a case in testFromPropsInvalid(), causing the nightly CI to fail when the catch-all tested "not_a_number" (valid for a list type). Also adds KafkaConfigTest to the regular Inkless PR CI workflow. Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
giuseppelillo
pushed a commit
that referenced
this pull request
May 29, 2026
…nd DisklessForceCreateTopicInterceptor (#614) * Introduce CREATE_TOPICS config interceptor framework Implement a generic way to intercept and mutate CREATE_TOPICS configs via a chain of interceptors with first-match-wins semantics. - Add CreateTopicConfigInterceptor interface (returns boolean to indicate whether it matched) - Add CreateTopicConfigInterceptors container that chains interceptors and stops at the first match - Refactor ClassicTopicRemoteStorageForcePolicy into ClassicTopicRemoteStorageForceCreateTopicInterceptor implementing the new interface - Update ReplicationControlManager to use the new abstraction The interceptor chain is designed so that additional interceptors (e.g. diskless force) can be added with clear priority ordering. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Add DisklessForceCreateTopicInterceptor Introduce a new CREATE_TOPICS config interceptor that forces diskless.enable=true for topics matching a configurable allow list of regexes. Behavior: - Forces diskless.enable=true when topic name matches at least one regex in diskless.force.include.topic.regexes - Excludes system topics (ReplicationControlManager.isSystemTopic) - Excludes compacted topics (cleanup.policy contains "compact") - Validates at construction that regexes cannot match "__"-prefixed names Interceptor chaining: - DisklessForceCreateTopicInterceptor has higher priority (tried first) - ClassicTopicRemoteStorageForceCreateTopicInterceptor is tried only if diskless interceptor did not match (first-match-wins semantics) - Both can be enabled simultaneously; users should configure regexes to avoid overlap New configs: - diskless.force.enable (boolean, default false) - diskless.force.include.topic.regexes (list, default empty) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Require system-level diskless to activate DisklessForceCreateTopicInterceptor The diskless force interceptor is now only activated when the system-level diskless storage is enabled (diskless.storage.system.enable=true). If diskless.force.enable=true but system-level diskless is not enabled, a warning is logged and the interceptor is not added to the chain. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Document "__" prefix exclusion Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
giuseppelillo
pushed a commit
that referenced
this pull request
May 29, 2026
… regexes (#622) The diskless.force.include.topic.regexes LIST config added in #614 was missing a case in testFromPropsInvalid(), causing the nightly CI to fail when the catch-all tested "not_a_number" (valid for a list type). Also adds KafkaConfigTest to the regular Inkless PR CI workflow. Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Introduce CREATE_TOPICS config interceptor framework and DisklessForceCreateTopicInterceptor
Summary
Introduce a generic interceptor framework for mutating topic configs during CREATE_TOPICS API requests, then use it to add a new diskless force interceptor.
Motivation
ClassicTopicRemoteStorageForcePolicywas implemented to automatically setremote.storage.enable=trueon topic creation. For Diskless-enabled-by-default, we need a similar mechanism that forcesdiskless.enable=true. Rather than adding another one-off policy, this PR refactors the approach into a generic, chainable interceptor pattern.Changes
Commit 1: Interceptor framework
CreateTopicConfigInterceptor— interface withintercept()methods returningboolean(true = matched)CreateTopicConfigInterceptors— container that chains interceptors with first-match-wins semanticsClassicTopicRemoteStorageForcePolicy→ClassicTopicRemoteStorageForceCreateTopicInterceptorReplicationControlManagerto use the new abstractionCommit 2: Diskless force interceptor
DisklessForceCreateTopicInterceptor— forcesdiskless.enable=truefor topics matching an allow-list of regexes__prefixNew configs
diskless.force.enablefalsediskless.force.include.topic.regexes[]diskless.enable=trueExample configuration
Testing
DisklessForceCreateTopicInterceptor(regex matching, exclusions,__prefix skip)CreateTopicConfigInterceptors(chaining, first-match-wins, empty chain)ClassicTopicRemoteStorageForceCreateTopicInterceptorTestcontinues to pass