Skip to content

Commit 67ae18e

Browse files
lianetmCopilot
andcommitted
KAFKA-20428: Fix unsubscribe failure with assignment updates (#22011)
Fix to ensure that unsubscribe does not apply any pending assignment update that may exist in the background queue (e.g, if a reconciliation completed right before unsubscribing). Fix by filtering out the assignment update events on unsubscribe (same approach already done for filtering out error events that the unsubscribe should not process). This issue and fix only affects unsubscribe (not close), as unsubscribe is the only one, other than poll, that processes background events. Reviewers: Kirk True <kirk@kirktrue.pro>, Andrew Schofield <aschofield@confluent.io> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
1 parent 5e8dc88 commit 67ae18e

4 files changed

Lines changed: 165 additions & 10 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1917,7 +1917,11 @@ public void unsubscribe() {
19171917
try {
19181918
// If users have fatal error, they will get some exceptions in the background queue.
19191919
// When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully.
1920-
processBackgroundEvents(unsubscribeEvent.future(), timer, e -> (e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException));
1920+
// We also skip processing assignment events (PARTITIONS_ASSIGNED, STREAMS_TASKS_ASSIGNED) because
1921+
// they are not relevant anymore (consumer already unsubscribing).
1922+
processBackgroundEvents(unsubscribeEvent.future(), timer,
1923+
e -> (e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException),
1924+
true);
19211925
log.info("Unsubscribed all topics or patterns and assigned partitions");
19221926
} catch (TimeoutException e) {
19231927
log.error("Failed while waiting for the unsubscribe event to complete");
@@ -2299,6 +2303,31 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal
22992303
* Visible for testing.
23002304
*/
23012305
boolean processBackgroundEvents() {
2306+
return processBackgroundEvents(false);
2307+
}
2308+
2309+
/**
2310+
* Checks if the given background event is an assignment update event.
2311+
* Those are to update reconciled assignments, so should only be processed from poll() and not from unsubscribe().
2312+
*/
2313+
private static boolean isAssignmentEvent(BackgroundEvent event) {
2314+
return event.type() == BackgroundEvent.Type.PARTITIONS_ASSIGNED ||
2315+
event.type() == BackgroundEvent.Type.STREAMS_TASKS_ASSIGNED;
2316+
}
2317+
2318+
/**
2319+
* Process the events produced by the background thread.
2320+
* It is possible that {@link ErrorEvent an error}
2321+
* could occur when processing the events. In such cases, the processor will take a reference to the first
2322+
* error, continue to process the remaining events, and then throw the first error that occurred.
2323+
* Visible for testing.
2324+
*
2325+
* @param skipAssignmentEvents If true, skip processing events that update a new assignment after a reconciliation
2326+
* (PARTITIONS_ASSIGNED and STREAMS_TASKS_ASSIGNED)
2327+
* These events should only be processed from poll(), not from unsubscribe().
2328+
* @return true if any events were drained from the queue
2329+
*/
2330+
boolean processBackgroundEvents(boolean skipAssignmentEvents) {
23022331
AtomicReference<KafkaException> firstError = new AtomicReference<>();
23032332

23042333
List<BackgroundEvent> events = backgroundEventHandler.drainEvents();
@@ -2310,6 +2339,18 @@ boolean processBackgroundEvents() {
23102339
if (event instanceof CompletableEvent)
23112340
backgroundEventReaper.add((CompletableEvent<?>) event);
23122341

2342+
// Skip assignment events if requested (e.g., during unsubscribe).
2343+
// These events should only be processed from poll().
2344+
// Complete them exceptionally to unblock the reconciliation in the background.
2345+
if (skipAssignmentEvents && isAssignmentEvent(event)) {
2346+
if (event instanceof CompletableEvent) {
2347+
((CompletableEvent<?>) event).future().completeExceptionally(
2348+
new KafkaException("Assignment event skipped because consumer is unsubscribing"));
2349+
}
2350+
log.debug("Skipped processing {} during unsubscribe", event.type());
2351+
continue;
2352+
}
2353+
23132354
backgroundEventProcessor.process(event);
23142355
} catch (Throwable t) {
23152356
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
@@ -2367,14 +2408,19 @@ boolean processBackgroundEvents() {
23672408
* @param timer Overall timer that bounds how long to wait for the event to complete
23682409
* @param ignoreErrorEventException Predicate to ignore background errors.
23692410
* Any exceptions found while processing background events that match the predicate won't be propagated.
2370-
* @return {@code true} if the event completed within the timeout, {@code false} otherwise
2411+
* @param skipAssignmentEvents If true, skip processing PARTITIONS_ASSIGNED and STREAMS_TASKS_ASSIGNED
2412+
* events and complete them exceptionally. These events should only be
2413+
* processed from poll(), not from unsubscribe() or other operations.
2414+
* @return the completed result of the supplied {@code future}
2415+
* @throws TimeoutException if the operation does not complete before the timer expires
23712416
*/
23722417
// Visible for testing
2373-
<T> T processBackgroundEvents(Future<T> future, Timer timer, Predicate<Exception> ignoreErrorEventException) {
2418+
<T> T processBackgroundEvents(Future<T> future, Timer timer, Predicate<Exception> ignoreErrorEventException,
2419+
boolean skipAssignmentEvents) {
23742420
do {
23752421
boolean hadEvents = false;
23762422
try {
2377-
hadEvents = processBackgroundEvents();
2423+
hadEvents = processBackgroundEvents(skipAssignmentEvents);
23782424
} catch (Exception e) {
23792425
if (!ignoreErrorEventException.test(e))
23802426
throw e;

clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1774,7 +1774,7 @@ public void testLongPollWaitIsLimited() {
17741774
}
17751775

17761776
/**
1777-
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
1777+
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate, boolean) processBackgroundEvents}
17781778
* handles the case where the {@link Future} takes a bit of time to complete, but does within the timeout.
17791779
*/
17801780
@Test
@@ -1800,14 +1800,14 @@ public void testProcessBackgroundEventsWithInitialDelay() throws Exception {
18001800
return null;
18011801
}).when(future).get(any(Long.class), any(TimeUnit.class));
18021802

1803-
consumer.processBackgroundEvents(future, timer, e -> false);
1803+
consumer.processBackgroundEvents(future, timer, e -> false, false);
18041804

18051805
// 800 is the 1000 ms timeout (above) minus the 200 ms delay for the two incremental timeouts/retries.
18061806
assertEquals(800, timer.remainingMs());
18071807
}
18081808

18091809
/**
1810-
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
1810+
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate, boolean) processBackgroundEvents}
18111811
* handles the case where the {@link Future} is already complete when invoked, so it doesn't have to wait.
18121812
*/
18131813
@Test
@@ -1818,15 +1818,15 @@ public void testProcessBackgroundEventsWithoutDelay() {
18181818
// Create a future that is already completed.
18191819
CompletableFuture<?> future = CompletableFuture.completedFuture(null);
18201820

1821-
consumer.processBackgroundEvents(future, timer, e -> false);
1821+
consumer.processBackgroundEvents(future, timer, e -> false, false);
18221822

18231823
// Because we didn't need to perform a timed get, we should still have every last millisecond
18241824
// of our initial timeout.
18251825
assertEquals(1000, timer.remainingMs());
18261826
}
18271827

18281828
/**
1829-
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
1829+
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate, boolean) processBackgroundEvents}
18301830
* handles the case where the {@link Future} does not complete within the timeout.
18311831
*/
18321832
@Test
@@ -1841,7 +1841,7 @@ public void testProcessBackgroundEventsTimesOut() throws Exception {
18411841
throw new java.util.concurrent.TimeoutException("Intentional timeout");
18421842
}).when(future).get(any(Long.class), any(TimeUnit.class));
18431843

1844-
assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(future, timer, e -> false));
1844+
assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(future, timer, e -> false, false));
18451845

18461846
// Because we forced our mocked future to continuously time out, we should have no time remaining.
18471847
assertEquals(0, timer.remainingMs());
@@ -1910,6 +1910,40 @@ public void testUnsubscribeWithoutGroupId() {
19101910
verify(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class));
19111911
}
19121912

1913+
private static Stream<CompletableBackgroundEvent<?>> assignmentEventsSource() {
1914+
return Stream.of(
1915+
new PartitionsAssignedEvent(Set.of(), new TreeSet<>(TOPIC_PARTITION_COMPARATOR)),
1916+
new StreamsTasksAssignedEvent(
1917+
new TreeSet<>(TOPIC_PARTITION_COMPARATOR),
1918+
new TreeSet<>(TOPIC_PARTITION_COMPARATOR),
1919+
new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of(), true))
1920+
);
1921+
}
1922+
1923+
/**
1924+
* Test to ensure that assignment updates are not applied while unsubscribing
1925+
* (it would cause an IllegalArgumentException when calling unsubscribe()).
1926+
* Validates the fix for KAFKA-20428.
1927+
*/
1928+
@ParameterizedTest
1929+
@MethodSource("assignmentEventsSource")
1930+
public void testUnsubscribeWithPendingAssignmentEvent(CompletableBackgroundEvent<?> assignedEvent) {
1931+
consumer = newConsumer(requiredConsumerConfigAndGroupId("consumerGroup"));
1932+
completeTopicSubscriptionChangeEventSuccessfully();
1933+
consumer.subscribe(singletonList("topic"));
1934+
completeUnsubscribeApplicationEventSuccessfully();
1935+
1936+
// Add assignment event to the background queue (simulating an ongoing reconciliation
1937+
// that completed just before unsubscribe was called)
1938+
backgroundEventQueue.add(assignedEvent);
1939+
1940+
// The call to unsubscribe should complete successfully (assignment event not processed and completed exceptionally)
1941+
assertDoesNotThrow(() -> consumer.unsubscribe());
1942+
verify(applicationEventHandler, never().description("Reconciled assignment updates shouldn't be processed while unsubscribing"))
1943+
.addAndGet(any(ApplyAssignmentEvent.class));
1944+
assertTrue(assignedEvent.future().isCompletedExceptionally());
1945+
}
1946+
19131947
@Test
19141948
public void testSeekToBeginning() {
19151949
Collection<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));

clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1248,6 +1248,45 @@ public void testLeaveGroupWhenMemberIsStale() {
12481248
assertEquals(MemberState.STALE, membershipManager.state());
12491249
}
12501250

1251+
/**
1252+
* Test that when unsubscribe/leaveGroup is called during an ongoing reconciliation and the pending
1253+
* assignment event is completed exceptionally, the member can still rejoin and start
1254+
* a new reconciliation.
1255+
*/
1256+
@Test
1257+
public void testLeaveGroupDuringReconciliationThenRejoin() {
1258+
Uuid topicId = Uuid.randomUuid();
1259+
String topicName = "topic1";
1260+
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup();
1261+
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, Collections.emptyList());
1262+
1263+
// Start reconciliation - assignment event is pending
1264+
receiveAssignment(topicId, Collections.singletonList(0), membershipManager);
1265+
membershipManager.maybeReconcile(true);
1266+
PartitionsAssignedEvent pendingAssignmentEvent = (PartitionsAssignedEvent) backgroundEventQueue.poll();
1267+
assertNotNull(pendingAssignmentEvent);
1268+
1269+
// Call leaveGroup while reconciliation is in progress
1270+
mockLeaveGroup();
1271+
membershipManager.leaveGroup();
1272+
assertEquals(MemberState.LEAVING, membershipManager.state());
1273+
1274+
// Complete the pending assignment event exceptionally (simulating unsubscribe skipping it)
1275+
pendingAssignmentEvent.future().completeExceptionally(
1276+
new KafkaException("Assignment event skipped because consumer is unsubscribing"));
1277+
1278+
// Complete leave and rejoin
1279+
membershipManager.onHeartbeatRequestGenerated();
1280+
clearInvocations(membershipManager);
1281+
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, Collections.emptyList());
1282+
membershipManager.transitionToJoining();
1283+
1284+
// Receive assignment - verify new reconciliation starts
1285+
receiveAssignment(topicId, Collections.singletonList(0), membershipManager);
1286+
membershipManager.maybeReconcile(true);
1287+
verifyReconciliationTriggered(membershipManager);
1288+
}
1289+
12511290
@Test
12521291
public void testFatalFailureWhenStateIsUnjoined() {
12531292
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup();

clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManagerTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1343,6 +1343,42 @@ public void testLeaveGroupOnCloseWhenInGroupWithAssignment() {
13431343
assertFalse(onGroupLeft.isCompletedExceptionally());
13441344
}
13451345

1346+
/**
1347+
* Test that when unsubscribe/leaveGroup is called during an ongoing reconciliation and the pending
1348+
* assignment event is completed exceptionally, the member can still rejoin and start
1349+
* a new reconciliation.
1350+
*/
1351+
@Test
1352+
public void testLeaveGroupDuringReconciliationThenRejoin() {
1353+
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
1354+
final Set<StreamsRebalanceData.TaskId> activeTasks =
1355+
Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0));
1356+
when(subscriptionState.assignedPartitions()).thenReturn(Set.of());
1357+
joining();
1358+
1359+
// Start reconciliation - assignment event is pending
1360+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
1361+
final StreamsTasksAssignedEvent pendingAssignmentEvent =
1362+
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
1363+
1364+
// Call leaveGroup while reconciliation is in progress
1365+
membershipManager.leaveGroup();
1366+
1367+
// Complete the pending assignment event exceptionally (simulating unsubscribe skipping it)
1368+
pendingAssignmentEvent.future().completeExceptionally(
1369+
new KafkaException("Assignment event skipped because consumer is unsubscribing"));
1370+
1371+
// Complete leave and rejoin
1372+
membershipManager.onHeartbeatRequestGenerated();
1373+
Mockito.clearInvocations(backgroundEventHandler);
1374+
tasksAssignedAddCount = 0;
1375+
joining();
1376+
1377+
// Receive assignment - verify new reconciliation starts
1378+
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
1379+
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
1380+
}
1381+
13461382
@Test
13471383
public void testOnHeartbeatRequestSkippedWhenInLeaving() {
13481384
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, "topic");

0 commit comments

Comments
 (0)