Skip to content

Commit 95d1204

Browse files
committed
fix & tests
1 parent 6f4b47d commit 95d1204

2 files changed

Lines changed: 102 additions & 9 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1928,7 +1928,11 @@ public void unsubscribe() {
19281928
try {
19291929
// If users have fatal error, they will get some exceptions in the background queue.
19301930
// When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully.
1931-
processBackgroundEvents(unsubscribeEvent.future(), timer, e -> (e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException));
1931+
// We also skip processing assignment events (PARTITIONS_ASSIGNED, STREAMS_TASKS_ASSIGNED) because
1932+
// they are not relevant anymore (consumer already unsubscribing).
1933+
processBackgroundEvents(unsubscribeEvent.future(), timer,
1934+
e -> (e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException),
1935+
true);
19321936
log.info("Unsubscribed all topics or patterns and assigned partitions");
19331937
} catch (TimeoutException e) {
19341938
log.error("Failed while waiting for the unsubscribe event to complete");
@@ -2310,6 +2314,31 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal
23102314
* Visible for testing.
23112315
*/
23122316
boolean processBackgroundEvents() {
2317+
return processBackgroundEvents(false);
2318+
}
2319+
2320+
/**
2321+
* Checks if the given background event is an assignment update event.
2322+
* Those are to update reconciled assignments, so should only be processed from poll() and not from unsubscribe().
2323+
*/
2324+
private static boolean isAssignmentEvent(BackgroundEvent event) {
2325+
return event.type() == BackgroundEvent.Type.PARTITIONS_ASSIGNED ||
2326+
event.type() == BackgroundEvent.Type.STREAMS_TASKS_ASSIGNED;
2327+
}
2328+
2329+
/**
2330+
* Process the events produced by the background thread.
2331+
* It is possible that {@link ErrorEvent an error}
2332+
* could occur when processing the events. In such cases, the processor will take a reference to the first
2333+
* error, continue to process the remaining events, and then throw the first error that occurred.
2334+
* Visible for testing.
2335+
*
2336+
* @param skipAssignmentEvents If true, skip processing events that update a new assignment after a reconciliation
2337+
* (PARTITIONS_ASSIGNED and STREAMS_TASKS_ASSIGNED)
2338+
* These events should only be processed from poll(), not from unsubscribe().
2339+
* @return true if any events were drained from the queue
2340+
*/
2341+
boolean processBackgroundEvents(boolean skipAssignmentEvents) {
23132342
AtomicReference<KafkaException> firstError = new AtomicReference<>();
23142343

23152344
List<BackgroundEvent> events = backgroundEventHandler.drainEvents();
@@ -2321,6 +2350,18 @@ boolean processBackgroundEvents() {
23212350
if (event instanceof CompletableEvent)
23222351
backgroundEventReaper.add((CompletableEvent<?>) event);
23232352

2353+
// Skip assignment events if requested (e.g., during unsubscribe).
2354+
// These events should only be processed from poll().
2355+
// Complete them exceptionally to unblock the reconciliation in the background.
2356+
if (skipAssignmentEvents && isAssignmentEvent(event)) {
2357+
if (event instanceof CompletableEvent) {
2358+
((CompletableEvent<?>) event).future().completeExceptionally(
2359+
new KafkaException("Assignment event skipped because consumer is unsubscribing"));
2360+
}
2361+
log.debug("Skipped processing {} during unsubscribe", event.type());
2362+
continue;
2363+
}
2364+
23242365
backgroundEventProcessor.process(event);
23252366
} catch (Throwable t) {
23262367
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
@@ -2378,14 +2419,18 @@ boolean processBackgroundEvents() {
23782419
* @param timer Overall timer that bounds how long to wait for the event to complete
23792420
* @param ignoreErrorEventException Predicate to ignore background errors.
23802421
* Any exceptions found while processing background events that match the predicate won't be propagated.
2422+
* @param skipAssignmentEvents If true, skip processing PARTITIONS_ASSIGNED and STREAMS_TASKS_ASSIGNED
2423+
* events and complete them exceptionally. These events should only be
2424+
* processed from poll(), not from unsubscribe() or other operations.
23812425
* @return {@code true} if the event completed within the timeout, {@code false} otherwise
23822426
*/
23832427
// Visible for testing
2384-
<T> T processBackgroundEvents(Future<T> future, Timer timer, Predicate<Exception> ignoreErrorEventException) {
2428+
<T> T processBackgroundEvents(Future<T> future, Timer timer, Predicate<Exception> ignoreErrorEventException,
2429+
boolean skipAssignmentEvents) {
23852430
do {
23862431
boolean hadEvents = false;
23872432
try {
2388-
hadEvents = processBackgroundEvents();
2433+
hadEvents = processBackgroundEvents(skipAssignmentEvents);
23892434
} catch (Exception e) {
23902435
if (!ignoreErrorEventException.test(e))
23912436
throw e;

clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java

Lines changed: 54 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1825,7 +1825,7 @@ public void testPollDoesNotAddNewAsyncPollEventWhenOneIsAlreadyInFlight() {
18251825
}
18261826

18271827
/**
1828-
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
1828+
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate, boolean) processBackgroundEvents}
18291829
* handles the case where the {@link Future} takes a bit of time to complete, but does within the timeout.
18301830
*/
18311831
@Test
@@ -1851,14 +1851,14 @@ public void testProcessBackgroundEventsWithInitialDelay() throws Exception {
18511851
return null;
18521852
}).when(future).get(any(Long.class), any(TimeUnit.class));
18531853

1854-
consumer.processBackgroundEvents(future, timer, e -> false);
1854+
consumer.processBackgroundEvents(future, timer, e -> false, false);
18551855

18561856
// 800 is the 1000 ms timeout (above) minus the 200 ms delay for the two incremental timeouts/retries.
18571857
assertEquals(800, timer.remainingMs());
18581858
}
18591859

18601860
/**
1861-
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
1861+
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate, boolean) processBackgroundEvents}
18621862
* handles the case where the {@link Future} is already complete when invoked, so it doesn't have to wait.
18631863
*/
18641864
@Test
@@ -1869,15 +1869,15 @@ public void testProcessBackgroundEventsWithoutDelay() {
18691869
// Create a future that is already completed.
18701870
CompletableFuture<?> future = CompletableFuture.completedFuture(null);
18711871

1872-
consumer.processBackgroundEvents(future, timer, e -> false);
1872+
consumer.processBackgroundEvents(future, timer, e -> false, false);
18731873

18741874
// Because we didn't need to perform a timed get, we should still have every last millisecond
18751875
// of our initial timeout.
18761876
assertEquals(1000, timer.remainingMs());
18771877
}
18781878

18791879
/**
1880-
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
1880+
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate, boolean) processBackgroundEvents}
18811881
* handles the case where the {@link Future} does not complete within the timeout.
18821882
*/
18831883
@Test
@@ -1892,7 +1892,7 @@ public void testProcessBackgroundEventsTimesOut() throws Exception {
18921892
throw new java.util.concurrent.TimeoutException("Intentional timeout");
18931893
}).when(future).get(any(Long.class), any(TimeUnit.class));
18941894

1895-
assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(future, timer, e -> false));
1895+
assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(future, timer, e -> false, false));
18961896

18971897
// Because we forced our mocked future to continuously time out, we should have no time remaining.
18981898
assertEquals(0, timer.remainingMs());
@@ -1993,6 +1993,54 @@ public void testUnsubscribeDoesNotCommitOffsetsEvenWithAutoCommitEnabled() {
19931993
verify(applicationEventHandler, never()).add(ArgumentMatchers.isA(CommitOnCloseEvent.class));
19941994
}
19951995

1996+
/**
1997+
* Test to ensure that assignment updates are not applied while unsubscribing
1998+
* (it would cause an IllegalArgumentException when calling unsubscribe()).
1999+
* Validates the fix for KAFKA-20428.
2000+
*/
2001+
@Test
2002+
public void testUnsubscribeWithPendingPartitionsAssignedEvent() {
2003+
consumer = newConsumer(requiredConsumerConfigAndGroupId("consumerGroup"));
2004+
completeTopicSubscriptionChangeEventSuccessfully();
2005+
consumer.subscribe(singletonList("topic"));
2006+
completeUnsubscribeApplicationEventSuccessfully();
2007+
2008+
// Add PartitionsAssignedEvent to the background queue (simulating an ongoing reconciliation
2009+
// that completed just before unsubscribe was called)
2010+
PartitionsAssignedEvent assignedEvent = new PartitionsAssignedEvent(Set.of(), new TreeSet<>(TOPIC_PARTITION_COMPARATOR));
2011+
backgroundEventQueue.add(assignedEvent);
2012+
2013+
// The call to unsubscribe should complete successfully (PartitionsAssignedEvent not processed and completed exceptionally)
2014+
assertDoesNotThrow(() -> consumer.unsubscribe());
2015+
verify(applicationEventHandler, never().description("Reconciled assignment updates shouldn't be processed while unsubscribing"))
2016+
.addAndGet(any(ApplyAssignmentEvent.class));
2017+
assertTrue(assignedEvent.future().isCompletedExceptionally());
2018+
}
2019+
2020+
/**
2021+
* Streams equivalent of {@link #testUnsubscribeWithPendingPartitionsAssignedEvent()}.
2022+
*/
2023+
@Test
2024+
public void testUnsubscribeWithPendingStreamsTasksAssignedEvent() {
2025+
consumer = newConsumer(requiredConsumerConfigAndGroupId("consumerGroup"));
2026+
completeTopicSubscriptionChangeEventSuccessfully();
2027+
consumer.subscribe(singletonList("topic"));
2028+
completeUnsubscribeApplicationEventSuccessfully();
2029+
2030+
// Add StreamsTasksAssignedEvent to the background queue (simulating an ongoing reconciliation
2031+
// that completed just before unsubscribe was called)
2032+
StreamsTasksAssignedEvent assignedEvent = new StreamsTasksAssignedEvent(
2033+
new TreeSet<>(TOPIC_PARTITION_COMPARATOR), new TreeSet<>(TOPIC_PARTITION_COMPARATOR),
2034+
new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of(), true));
2035+
backgroundEventQueue.add(assignedEvent);
2036+
2037+
// The call to unsubscribe should complete successfully (StreamsTasksAssignedEvent not processed and completed exceptionally)
2038+
assertDoesNotThrow(() -> consumer.unsubscribe());
2039+
verify(applicationEventHandler, never().description("Reconciled assignment updates shouldn't be processed while unsubscribing"))
2040+
.addAndGet(any(ApplyAssignmentEvent.class));
2041+
assertTrue(assignedEvent.future().isCompletedExceptionally());
2042+
}
2043+
19962044
@Test
19972045
public void testSeekToBeginning() {
19982046
Collection<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));

0 commit comments

Comments
 (0)