Skip to content

Commit 40e9fcd

Browse files
authored
KAFKA-20119 Clarify that Consumer#unsubscribe does not trigger auto-commit (#21424)
Document that unsubscribe() doesn't commit offsets even with auto-commit enabled, and add test to verify this behavior Reviewers: Lianet Magrans <lmagrans@confluent.io>, David Jacot <djacot@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>
1 parent b954b35 commit 40e9fcd

4 files changed

Lines changed: 141 additions & 0 deletions

File tree

clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/PlaintextConsumerTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1823,6 +1823,49 @@ private ConsumerRecords<byte[], byte[]> awaitNonEmptyRecords(
18231823
return result.get();
18241824
}
18251825

1826+
@ClusterTest
1827+
public void testClassicConsumerUnsubscribeDoesNotCommitOffsetsWithAutoCommitEnabled() throws Exception {
1828+
testUnsubscribeDoesNotCommitOffsetsWithAutoCommitEnabled(GroupProtocol.CLASSIC);
1829+
}
1830+
1831+
@ClusterTest
1832+
public void testAsyncConsumerUnsubscribeDoesNotCommitOffsetsWithAutoCommitEnabled() throws Exception {
1833+
testUnsubscribeDoesNotCommitOffsetsWithAutoCommitEnabled(GroupProtocol.CONSUMER);
1834+
}
1835+
1836+
/**
1837+
* Verify that {@link Consumer#unsubscribe()} does not commit offsets even when
1838+
* {@code enable.auto.commit} is enabled. A second consumer using the same group ID
1839+
* should see no committed offsets after the first consumer unsubscribes.
1840+
*/
1841+
private void testUnsubscribeDoesNotCommitOffsetsWithAutoCommitEnabled(GroupProtocol groupProtocol) throws Exception {
1842+
var numRecords = 10;
1843+
var groupId = "unsubscribe-no-commit-test";
1844+
sendRecords(cluster, TP, numRecords);
1845+
1846+
// Consumer 1: subscribe, consume records, then unsubscribe (without explicit commit)
1847+
Map<String, Object> config = new HashMap<>();
1848+
config.put(GROUP_PROTOCOL_CONFIG, groupProtocol.name().toLowerCase(Locale.ROOT));
1849+
config.put(GROUP_ID_CONFIG, groupId);
1850+
config.put(ENABLE_AUTO_COMMIT_CONFIG, true);
1851+
1852+
try (Consumer<byte[], byte[]> consumer1 = cluster.consumer(config)) {
1853+
consumer1.subscribe(List.of(TOPIC));
1854+
consumeRecords(consumer1, numRecords);
1855+
1856+
// Unsubscribe - this should NOT commit offsets even though auto-commit is enabled
1857+
consumer1.unsubscribe();
1858+
}
1859+
1860+
// Consumer 2: use the same group ID to check committed offsets
1861+
try (Consumer<byte[], byte[]> consumer2 = cluster.consumer(config)) {
1862+
consumer2.subscribe(List.of(TOPIC));
1863+
OffsetAndMetadata committed = consumer2.committed(Set.of(TP)).get(TP);
1864+
assertNull(committed,
1865+
"unsubscribe() should not commit offsets even when auto-commit is enabled");
1866+
}
1867+
}
1868+
18261869
private void awaitMetricsCleanup(
18271870
Consumer<?, ?> consumer,
18281871
String metricName,

clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -832,6 +832,10 @@ public void subscribe(SubscriptionPattern pattern) {
832832
/**
833833
* Unsubscribe from topics currently subscribed with {@link #subscribe(Collection)} or {@link #subscribe(Pattern)}.
834834
* This also clears any partitions directly assigned through {@link #assign(Collection)}.
835+
* <p>
836+
* <b>Note:</b> Unlike {@link #close()}, this method does not commit the pending offsets before
837+
* unsubscribing, even if {@code enable.auto.commit} is enabled. To avoid duplicate processing upon re-joining,
838+
* it is recommended to explicitly call {@link #commitSync()} before invoking this method.
835839
*
836840
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors (e.g. rebalance callback errors)
837841
*/

clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1790,6 +1790,68 @@ public void testSubscriptionChangesWithAutoCommitDisabled(GroupProtocol groupPro
17901790
client.requests().clear();
17911791
}
17921792

1793+
/**
1794+
* Verify that unsubscribe() does not commit offsets even when auto-commit is enabled.
1795+
* This ensures users are aware that they need to explicitly call commitSync() before
1796+
* unsubscribing to avoid duplicate processing upon re-joining the group.
1797+
*/
1798+
@ParameterizedTest
1799+
@EnumSource(value = GroupProtocol.class, names = "CLASSIC")
1800+
@SuppressWarnings("unchecked")
1801+
public void testUnsubscribeDoesNotCommitOffsetsEvenWithAutoCommitEnabled(GroupProtocol groupProtocol) {
1802+
ConsumerMetadata metadata = createMetadata(subscription);
1803+
MockClient client = new MockClient(time, metadata);
1804+
1805+
Map<String, Integer> tpCounts = new HashMap<>();
1806+
tpCounts.put(topic, 1);
1807+
initMetadata(client, tpCounts);
1808+
Node node = metadata.fetch().nodes().get(0);
1809+
1810+
ConsumerPartitionAssignor assignor = new RangeAssignor();
1811+
1812+
// Create consumer with auto-commit enabled
1813+
consumer = newConsumer(groupProtocol, time, client, subscription, metadata, assignor, true, groupInstanceId);
1814+
1815+
initializeSubscriptionWithSingleTopic(consumer, getConsumerRebalanceListener(consumer));
1816+
1817+
// Mock rebalance responses
1818+
prepareRebalance(client, node, assignor, List.of(tp0), null);
1819+
1820+
consumer.updateAssignmentMetadataIfNeeded(time.timer(Long.MAX_VALUE));
1821+
consumer.poll(Duration.ZERO);
1822+
1823+
// Verify that subscription are set up correctly
1824+
assertEquals(Set.of(topic), consumer.subscription());
1825+
1826+
// Mock a fetch response so that we have consumed some data
1827+
Map<TopicPartition, FetchInfo> fetches = new HashMap<>();
1828+
fetches.put(tp0, new FetchInfo(0, 10));
1829+
client.respondFrom(fetchResponse(fetches), node);
1830+
client.poll(0, time.milliseconds());
1831+
1832+
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(Duration.ofMillis(1));
1833+
assertEquals(10, records.count());
1834+
assertEquals(10L, consumer.position(tp0));
1835+
1836+
// Clear previous requests to focus on unsubscribe behavior
1837+
client.requests().clear();
1838+
1839+
// Call unsubscribe - this should NOT commit offsets even though auto-commit is enabled
1840+
consumer.unsubscribe();
1841+
1842+
// Verify that subscription and assignment are both cleared
1843+
assertEquals(Collections.emptySet(), consumer.subscription());
1844+
assertEquals(Collections.emptySet(), consumer.assignment());
1845+
1846+
// Verify that no offset commit request was sent despite auto-commit being enabled
1847+
for (ClientRequest req : client.requests()) {
1848+
assertNotSame(ApiKeys.OFFSET_COMMIT, req.requestBuilder().apiKey(),
1849+
"unsubscribe() should not commit offsets even when auto-commit is enabled");
1850+
}
1851+
1852+
client.requests().clear();
1853+
}
1854+
17931855
// TODO: this test requires rebalance logic which is not yet implemented in the CONSUMER group protocol.
17941856
// Once it is implemented, this should use both group protocols.
17951857
@ParameterizedTest

clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1935,6 +1935,38 @@ public void testUnsubscribeWithoutGroupId() {
19351935
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
19361936
}
19371937

1938+
/**
1939+
* Verify that unsubscribe() does not commit offsets even when auto-commit is enabled.
1940+
* This ensures users are aware that they need to explicitly call commitSync() before
1941+
* unsubscribing to avoid duplicate processing upon re-joining the group.
1942+
*/
1943+
@Test
1944+
public void testUnsubscribeDoesNotCommitOffsetsEvenWithAutoCommitEnabled() {
1945+
Properties props = requiredConsumerConfigAndGroupId("test-group");
1946+
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
1947+
1948+
consumer = newConsumer(props);
1949+
1950+
// Subscribe to a topic
1951+
completeTopicSubscriptionChangeEventSuccessfully();
1952+
consumer.subscribe(singleton("topic"));
1953+
1954+
// Clear any previous invocations to focus on unsubscribe behavior
1955+
clearInvocations(applicationEventHandler);
1956+
1957+
// Call unsubscribe - this should NOT commit offsets even though auto-commit is enabled
1958+
completeUnsubscribeApplicationEventSuccessfully();
1959+
consumer.unsubscribe();
1960+
1961+
// Verify that UnsubscribeEvent was sent
1962+
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
1963+
1964+
// Verify that no commit event (sync or async) was sent despite auto-commit being enabled
1965+
verify(applicationEventHandler, never()).add(ArgumentMatchers.isA(SyncCommitEvent.class));
1966+
verify(applicationEventHandler, never()).add(ArgumentMatchers.isA(AsyncCommitEvent.class));
1967+
verify(applicationEventHandler, never()).add(ArgumentMatchers.isA(CommitOnCloseEvent.class));
1968+
}
1969+
19381970
@Test
19391971
public void testSeekToBeginning() {
19401972
Collection<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));

0 commit comments

Comments
 (0)