Skip to content

Commit c946802

Browse files
authored
KAFKA-20506 kafka-configs.sh can't delete the config from a offline broker when using bootstrap controller (#22104)
Ref #22070 (comment) Remove the pre-flight DescribeConfigs existence check in `alterResourceConfig()` since deleting a non-existent config is idempotent, and the check causes a timeout when the target broker is offline. Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
1 parent e1cfea3 commit c946802

4 files changed

Lines changed: 44 additions & 108 deletions

File tree

docs/getting-started/upgrade.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ type: docs
4040

4141
### Notable changes in 4.3.0
4242

43+
* `kafka-configs.sh --alter --delete-config` no longer requires the specified config keys to exist on the target resource. Previously, attempting to delete a non-existent config key raised an `InvalidConfigurationException`. The deletion is now a no-op when the key does not exist, which allows managing configs for offline brokers via `--bootstrap-controller`. For further details, please refer to [KAFKA-20506](https://issues.apache.org/jira/browse/KAFKA-20506).
4344
* Support dynamically changing configs for dynamic quorum controllers. Previously only brokers and static quorum controllers were supported. For further details, please refer to [KAFKA-18928](https://issues.apache.org/jira/browse/KAFKA-18928).
4445
* Two new configs have been introduced: `group.coordinator.cached.buffer.max.bytes` and `share.coordinator.cached.buffer.max.bytes`. They allow the respective coordinators to set the maximum buffer size retained for reuse. For further details, please refer to [KIP-1196](https://cwiki.apache.org/confluence/x/hA5JFg).
4546
* The new config have been introduced: `remote.log.metadata.topic.min.isr` with 2 as default value. You can correct the min.insync.replicas for the existed __remote_log_metadata topic via kafka-configs.sh if needed. For further details, please refer to [KIP-1235](https://cwiki.apache.org/confluence/x/yommFw).

tools/src/main/java/org/apache/kafka/tools/ConfigCommand.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -249,12 +249,15 @@ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOE
249249
} else if (CLIENT_METRICS_TYPE.equals(entityType)) {
250250
configResourceType = ConfigResource.Type.CLIENT_METRICS;
251251
} else if (BROKER_TYPE.equals(entityType)) {
252+
if (!BROKER_DEFAULT_ENTITY_NAME.equals(entityName)) {
253+
validateBrokerId(entityName, entityType);
254+
}
252255
configResourceType = ConfigResource.Type.BROKER;
253256
} else {
254257
configResourceType = ConfigResource.Type.GROUP;
255258
}
256259
try {
257-
alterResourceConfig(adminClient, entityType, entityName, configsToBeDeleted, configsToBeAdded, configResourceType);
260+
alterResourceConfig(adminClient, entityName, configsToBeDeleted, configsToBeAdded, configResourceType);
258261
} catch (ExecutionException ee) {
259262
if (ee.getCause() instanceof UnsupportedVersionException) {
260263
throw new UnsupportedVersionException("The " + ApiKeys.INCREMENTAL_ALTER_CONFIGS + " API is not supported by the cluster. The API is supported starting from version 2.3.0."
@@ -263,7 +266,7 @@ static void alterConfig(Admin adminClient, ConfigCommandOptions opts) throws IOE
263266
throw ee;
264267
}
265268
} else if (BROKER_LOGGER_CONFIG_TYPE.equals(entityType)) {
266-
List<String> validLoggers = getResourceConfig(adminClient, entityType, entityName, true, false).stream().map(ConfigEntry::name).toList();
269+
List<String> validLoggers = getResourceConfig(adminClient, entityType, entityName, false, false).stream().map(ConfigEntry::name).toList();
267270
// fail the command if any of the configured broker loggers do not exist
268271
List<String> invalidBrokerLoggers = Stream.concat(
269272
configsToBeDeleted.stream().filter(c -> !validLoggers.contains(c)),
@@ -577,18 +580,7 @@ private static void describeResourceConfig(Admin adminClient, String entityType,
577580
}
578581
}
579582

580-
private static void alterResourceConfig(Admin adminClient, String entityTypeHead, String entityNameHead, List<String> configsToBeDeleted, Map<String, ConfigEntry> configsToBeAdded, ConfigResource.Type resourceType) throws ExecutionException, InterruptedException, TimeoutException {
581-
Map<String, ConfigEntry> oldConfig = getResourceConfig(adminClient, entityTypeHead, entityNameHead, false, false)
582-
.stream()
583-
.collect(Collectors.toMap(ConfigEntry::name, entry -> entry));
584-
585-
// fail the command if any of the configs to be deleted does not exist
586-
List<String> invalidConfigs = configsToBeDeleted.stream()
587-
.filter(config -> !oldConfig.containsKey(config))
588-
.toList();
589-
if (!invalidConfigs.isEmpty())
590-
throw new InvalidConfigurationException("Invalid config(s): " + String.join(",", invalidConfigs));
591-
583+
private static void alterResourceConfig(Admin adminClient, String entityNameHead, List<String> configsToBeDeleted, Map<String, ConfigEntry> configsToBeAdded, ConfigResource.Type resourceType) throws ExecutionException, InterruptedException, TimeoutException {
592584
ConfigResource configResource = new ConfigResource(resourceType, entityNameHead);
593585
AlterConfigsOptions alterOptions = new AlterConfigsOptions().timeoutMs(30000).validateOnly(false);
594586
List<AlterConfigOp> addEntries = configsToBeAdded.values().stream().map(k -> new AlterConfigOp(k, AlterConfigOp.OpType.SET)).toList();

tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,29 @@ public void testUpdateInvalidTopicConfigs() throws ExecutionException, Interrupt
621621
}
622622
}
623623

624+
@ClusterTest
625+
public void testDeleteNonExistentConfigIsIdempotent() throws Exception {
626+
String topicName = "test-delete-nonexistent-topic";
627+
try (Admin client = cluster.admin()) {
628+
client.createTopics(List.of(new NewTopic(topicName, 1, (short) 1))).all().get();
629+
630+
ConfigCommand.alterConfig(client, new ConfigCommand.ConfigCommandOptions(toArray(
631+
List.of("--bootstrap-server", cluster.bootstrapServers(),
632+
"--entity-type", "topics", "--entity-name", topicName,
633+
"--alter", "--delete-config", "non.existent.config"))));
634+
635+
ConfigCommand.alterConfig(client, new ConfigCommand.ConfigCommandOptions(toArray(
636+
List.of("--bootstrap-server", cluster.bootstrapServers(),
637+
"--entity-type", "brokers", "--entity-name", defaultBrokerId,
638+
"--alter", "--delete-config", "non.existent.config"))));
639+
640+
ConfigCommand.alterConfig(client, new ConfigCommand.ConfigCommandOptions(toArray(
641+
List.of("--bootstrap-server", cluster.bootstrapServers(),
642+
"--entity-type", "brokers", "--entity-default",
643+
"--alter", "--delete-config", "non.existent.config"))));
644+
}
645+
}
646+
624647
// Test case from KAFKA-13788
625648
@ClusterTest(serverProperties = {
626649
// Must be at greater than 1MB per cleaner thread, set to 2M+2 so that we can set 2 cleaner threads.

tools/src/test/java/org/apache/kafka/tools/ConfigCommandTest.java

Lines changed: 14 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -805,30 +805,13 @@ public void shouldAlterTopicConfig(boolean file) throws Exception {
805805
"--delete-config", "unclean.leader.election.enable"));
806806
AtomicBoolean alteredConfigs = new AtomicBoolean();
807807

808-
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName);
809-
List<ConfigEntry> configEntries = List.of(newConfigEntry("min.insync.replicas", "1"), newConfigEntry("unclean.leader.election.enable", "1"));
810-
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
811-
future.complete(Map.of(resource, new Config(configEntries)));
812-
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
813-
when(describeResult.all()).thenReturn(future);
814-
815808
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
816809
alterFuture.complete(null);
817810
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
818811
when(alterResult.all()).thenReturn(alterFuture);
819812

820813
Node node = new Node(1, "localhost", 9092);
821814
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
822-
@Override
823-
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
824-
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
825-
assertEquals(1, resources.size());
826-
ConfigResource res = resources.iterator().next();
827-
assertEquals(ConfigResource.Type.TOPIC, res.type());
828-
assertEquals(resourceName, res.name());
829-
return describeResult;
830-
}
831-
832815
@Override
833816
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
834817
assertEquals(1, configs.size());
@@ -858,7 +841,7 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
858841
};
859842
ConfigCommand.alterConfig(mockAdminClient, alterOpts);
860843
assertTrue(alteredConfigs.get());
861-
verify(describeResult).all();
844+
verify(alterResult).all();
862845
}
863846

864847
public ConfigEntry newConfigEntry(String name, String value) {
@@ -971,16 +954,16 @@ public void shouldRaiseInvalidConfigurationExceptionWhenAddingInvalidBrokerLogge
971954
@Test
972955
public void shouldAddDefaultBrokerDynamicConfig() throws Exception {
973956
Node node = new Node(1, "localhost", 9092);
974-
verifyAlterBrokerConfig(node, "", List.of("--entity-default"));
957+
verifyAlterBrokerConfig(node, List.of("--entity-default"));
975958
}
976959

977960
@Test
978961
public void shouldAddBrokerDynamicConfig() throws Exception {
979962
Node node = new Node(1, "localhost", 9092);
980-
verifyAlterBrokerConfig(node, "1", List.of("--entity-name", "1"));
963+
verifyAlterBrokerConfig(node, List.of("--entity-name", "1"));
981964
}
982965

983-
public void verifyAlterBrokerConfig(Node node, String resourceName, List<String> resourceOpts) throws Exception {
966+
public void verifyAlterBrokerConfig(Node node, List<String> resourceOpts) throws Exception {
984967
String[] optsList = toArray(List.of("--bootstrap-server", "localhost:9092",
985968
"--entity-type", "brokers",
986969
"--alter",
@@ -989,29 +972,12 @@ public void verifyAlterBrokerConfig(Node node, String resourceName, List<String>
989972
Map<String, String> brokerConfigs = new HashMap<>();
990973
brokerConfigs.put("num.io.threads", "5");
991974

992-
ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, resourceName);
993-
List<ConfigEntry> configEntries = List.of(new ConfigEntry("num.io.threads", "5"));
994-
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
995-
future.complete(Map.of(resource, new Config(configEntries)));
996-
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
997-
when(describeResult.all()).thenReturn(future);
998-
999975
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
1000976
alterFuture.complete(null);
1001977
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
1002978
when(alterResult.all()).thenReturn(alterFuture);
1003979

1004980
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
1005-
@Override
1006-
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
1007-
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
1008-
assertEquals(1, resources.size());
1009-
ConfigResource res = resources.iterator().next();
1010-
assertEquals(ConfigResource.Type.BROKER, res.type());
1011-
assertEquals(resourceName, res.name());
1012-
return describeResult;
1013-
}
1014-
1015981
@Override
1016982
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
1017983
assertEquals(1, configs.size());
@@ -1029,7 +995,7 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
1029995
expected.put("num.io.threads", "5");
1030996
expected.put("leader.replication.throttled.rate", "10");
1031997
assertEquals(expected, brokerConfigs);
1032-
verify(describeResult).all();
998+
verify(alterResult).all();
1033999
}
10341000

10351001
@Test
@@ -1158,35 +1124,29 @@ public void shouldNotUpdateBrokerConfigIfMalformedBracketConfig() {
11581124
}
11591125

11601126
@Test
1161-
public void shouldNotUpdateConfigIfNonExistingConfigIsDeleted() {
1127+
public void shouldAllowDeletingNonExistingConfig() throws Exception {
11621128
String resourceName = "my-topic";
11631129
ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(toArray("--bootstrap-server", "localhost:9092",
11641130
"--entity-name", resourceName,
11651131
"--entity-type", "topics",
11661132
"--alter",
11671133
"--delete-config", "missing_config1, missing_config2"));
11681134

1169-
ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, resourceName);
1170-
List<ConfigEntry> configEntries = List.of();
1171-
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
1172-
future.complete(Map.of(resource, new Config(configEntries)));
1173-
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
1174-
when(describeResult.all()).thenReturn(future);
1135+
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
1136+
alterFuture.complete(null);
1137+
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
1138+
when(alterResult.all()).thenReturn(alterFuture);
11751139

11761140
Node node = new Node(1, "localhost", 9092);
11771141
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
11781142
@Override
1179-
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
1180-
assertEquals(1, resources.size());
1181-
ConfigResource res = resources.iterator().next();
1182-
assertEquals(ConfigResource.Type.TOPIC, res.type());
1183-
assertEquals(resourceName, res.name());
1184-
return describeResult;
1143+
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
1144+
return alterResult;
11851145
}
11861146
};
11871147

1188-
assertThrows(InvalidConfigurationException.class, () -> ConfigCommand.alterConfig(mockAdminClient, createOpts));
1189-
verify(describeResult).all();
1148+
ConfigCommand.alterConfig(mockAdminClient, createOpts);
1149+
verify(alterResult).all();
11901150
}
11911151

11921152
@Test
@@ -1207,31 +1167,12 @@ private void verifyAlterClientMetricsConfig(Node node, String resourceName, List
12071167
"match=[client_software_name=kafka.python,client_software_version=1\\.2\\..*]"), resourceOpts);
12081168
ConfigCommand.ConfigCommandOptions alterOpts = new ConfigCommand.ConfigCommandOptions(toArray(optsList));
12091169

1210-
ConfigResource resource = new ConfigResource(ConfigResource.Type.CLIENT_METRICS, resourceName);
1211-
List<ConfigEntry> configEntries = List.of(new ConfigEntry("interval.ms", "1000",
1212-
ConfigEntry.ConfigSource.DYNAMIC_CLIENT_METRICS_CONFIG, false, false, List.of(),
1213-
ConfigEntry.ConfigType.UNKNOWN, null));
1214-
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
1215-
future.complete(Map.of(resource, new Config(configEntries)));
1216-
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
1217-
when(describeResult.all()).thenReturn(future);
1218-
12191170
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
12201171
alterFuture.complete(null);
12211172
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
12221173
when(alterResult.all()).thenReturn(alterFuture);
12231174

12241175
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
1225-
@Override
1226-
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
1227-
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
1228-
assertEquals(1, resources.size());
1229-
ConfigResource res = resources.iterator().next();
1230-
assertEquals(ConfigResource.Type.CLIENT_METRICS, res.type());
1231-
assertEquals(resourceName, res.name());
1232-
return describeResult;
1233-
}
1234-
12351176
@Override
12361177
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
12371178
assertEquals(1, configs.size());
@@ -1255,7 +1196,6 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
12551196
}
12561197
};
12571198
ConfigCommand.alterConfig(mockAdminClient, alterOpts);
1258-
verify(describeResult).all();
12591199
verify(alterResult).all();
12601200
}
12611201

@@ -1317,31 +1257,12 @@ private void verifyAlterGroupConfig(Node node, String resourceName, List<String>
13171257
"--add-config", "consumer.heartbeat.interval.ms=6000"), resourceOpts);
13181258
ConfigCommand.ConfigCommandOptions alterOpts = new ConfigCommand.ConfigCommandOptions(toArray(optsList));
13191259

1320-
ConfigResource resource = new ConfigResource(ConfigResource.Type.GROUP, resourceName);
1321-
List<ConfigEntry> configEntries = List.of(new ConfigEntry("consumer.session.timeout.ms", "45000",
1322-
ConfigEntry.ConfigSource.DYNAMIC_GROUP_CONFIG, false, false, List.of(),
1323-
ConfigEntry.ConfigType.UNKNOWN, null));
1324-
KafkaFutureImpl<Map<ConfigResource, Config>> future = new KafkaFutureImpl<>();
1325-
future.complete(Map.of(resource, new Config(configEntries)));
1326-
DescribeConfigsResult describeResult = mock(DescribeConfigsResult.class);
1327-
when(describeResult.all()).thenReturn(future);
1328-
13291260
KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>();
13301261
alterFuture.complete(null);
13311262
AlterConfigsResult alterResult = mock(AlterConfigsResult.class);
13321263
when(alterResult.all()).thenReturn(alterFuture);
13331264

13341265
MockAdminClient mockAdminClient = new MockAdminClient(List.of(node), node) {
1335-
@Override
1336-
public synchronized DescribeConfigsResult describeConfigs(Collection<ConfigResource> resources, DescribeConfigsOptions options) {
1337-
assertFalse(options.includeSynonyms(), "Config synonyms requested unnecessarily");
1338-
assertEquals(1, resources.size());
1339-
ConfigResource res = resources.iterator().next();
1340-
assertEquals(ConfigResource.Type.GROUP, res.type());
1341-
assertEquals(resourceName, res.name());
1342-
return describeResult;
1343-
}
1344-
13451266
@Override
13461267
public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResource, Collection<AlterConfigOp>> configs, AlterConfigsOptions options) {
13471268
assertEquals(1, configs.size());
@@ -1364,7 +1285,6 @@ public synchronized AlterConfigsResult incrementalAlterConfigs(Map<ConfigResourc
13641285
}
13651286
};
13661287
ConfigCommand.alterConfig(mockAdminClient, alterOpts);
1367-
verify(describeResult).all();
13681288
verify(alterResult).all();
13691289
}
13701290

0 commit comments

Comments
 (0)