Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1928,7 +1928,11 @@ public void unsubscribe() {
try {
// If users have fatal error, they will get some exceptions in the background queue.
// When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully.
processBackgroundEvents(unsubscribeEvent.future(), timer, e -> (e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException));
// We also skip processing assignment events (PARTITIONS_ASSIGNED, STREAMS_TASKS_ASSIGNED) because
// they are not relevant anymore (consumer already unsubscribing).
processBackgroundEvents(unsubscribeEvent.future(), timer,
e -> (e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException),
true);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we're already using an anonymous function for Predicate<Exception> ignoreErrorEventException, could we also do that instead of a flag, i.e.:

Suggested change
true);
e -> (event.type() == BackgroundEvent.Type.PARTITIONS_ASSIGNED || event.type() == BackgroundEvent.Type.STREAMS_TASKS_ASSIGNED));

Then we can delete isAssignmentEvent() and the event filtering becomes a little more generalized.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree that it would given some consistency in the params, but at the price of complexity in a case where seems to me we don't really need the flexibitliy of a predicate?

This is only used from the unsubscribe to "skip assignment", not different callers filtering different kinds of events. So I would lean towards simple/YAGNI in this case, makes sense?

log.info("Unsubscribed all topics or patterns and assigned partitions");
} catch (TimeoutException e) {
log.error("Failed while waiting for the unsubscribe event to complete");
Expand Down Expand Up @@ -2310,6 +2314,31 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal
* Visible for testing.
*/
boolean processBackgroundEvents() {
return processBackgroundEvents(false);
}

/**
* Checks if the given background event is an assignment update event.
* Those are to update reconciled assignments, so should only be processed from poll() and not from unsubscribe().
*/
private static boolean isAssignmentEvent(BackgroundEvent event) {
return event.type() == BackgroundEvent.Type.PARTITIONS_ASSIGNED ||
event.type() == BackgroundEvent.Type.STREAMS_TASKS_ASSIGNED;
}

/**
* Process the events produced by the background thread.
* It is possible that {@link ErrorEvent an error}
* could occur when processing the events. In such cases, the processor will take a reference to the first
* error, continue to process the remaining events, and then throw the first error that occurred.
* Visible for testing.
*
* @param skipAssignmentEvents If true, skip processing events that update a new assignment after a reconciliation
* (PARTITIONS_ASSIGNED and STREAMS_TASKS_ASSIGNED)
* These events should only be processed from poll(), not from unsubscribe().
* @return true if any events were drained from the queue
*/
boolean processBackgroundEvents(boolean skipAssignmentEvents) {
AtomicReference<KafkaException> firstError = new AtomicReference<>();

List<BackgroundEvent> events = backgroundEventHandler.drainEvents();
Expand All @@ -2321,6 +2350,18 @@ boolean processBackgroundEvents() {
if (event instanceof CompletableEvent)
backgroundEventReaper.add((CompletableEvent<?>) event);

// Skip assignment events if requested (e.g., during unsubscribe).
// These events should only be processed from poll().
// Complete them exceptionally to unblock the reconciliation in the background.
if (skipAssignmentEvents && isAssignmentEvent(event)) {
if (event instanceof CompletableEvent) {
((CompletableEvent<?>) event).future().completeExceptionally(
new KafkaException("Assignment event skipped because consumer is unsubscribing"));
Comment thread
lianetm marked this conversation as resolved.
}
Comment on lines +2357 to +2360
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both of these events are CompletableEvents. I guess this is defensive?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, just extra check because at this level we don't know about the specific events

log.debug("Skipped processing {} during unsubscribe", event.type());
continue;
}

backgroundEventProcessor.process(event);
} catch (Throwable t) {
KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t);
Expand Down Expand Up @@ -2378,14 +2419,19 @@ boolean processBackgroundEvents() {
* @param timer Overall timer that bounds how long to wait for the event to complete
* @param ignoreErrorEventException Predicate to ignore background errors.
* Any exceptions found while processing background events that match the predicate won't be propagated.
* @return {@code true} if the event completed within the timeout, {@code false} otherwise
* @param skipAssignmentEvents If true, skip processing PARTITIONS_ASSIGNED and STREAMS_TASKS_ASSIGNED
* events and complete them exceptionally. These events should only be
* processed from poll(), not from unsubscribe() or other operations.
* @return the completed result of the supplied {@code future}
* @throws TimeoutException if the operation does not complete before the timer expires
*/
// Visible for testing
<T> T processBackgroundEvents(Future<T> future, Timer timer, Predicate<Exception> ignoreErrorEventException) {
<T> T processBackgroundEvents(Future<T> future, Timer timer, Predicate<Exception> ignoreErrorEventException,
boolean skipAssignmentEvents) {
do {
boolean hadEvents = false;
try {
hadEvents = processBackgroundEvents();
hadEvents = processBackgroundEvents(skipAssignmentEvents);
} catch (Exception e) {
if (!ignoreErrorEventException.test(e))
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1825,7 +1825,7 @@ public void testPollDoesNotAddNewAsyncPollEventWhenOneIsAlreadyInFlight() {
}

/**
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate, boolean) processBackgroundEvents}
* handles the case where the {@link Future} takes a bit of time to complete, but does within the timeout.
*/
@Test
Expand All @@ -1851,14 +1851,14 @@ public void testProcessBackgroundEventsWithInitialDelay() throws Exception {
return null;
}).when(future).get(any(Long.class), any(TimeUnit.class));

consumer.processBackgroundEvents(future, timer, e -> false);
consumer.processBackgroundEvents(future, timer, e -> false, false);

// 800 is the 1000 ms timeout (above) minus the 200 ms delay for the two incremental timeouts/retries.
assertEquals(800, timer.remainingMs());
}

/**
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate, boolean) processBackgroundEvents}
* handles the case where the {@link Future} is already complete when invoked, so it doesn't have to wait.
*/
@Test
Expand All @@ -1869,15 +1869,15 @@ public void testProcessBackgroundEventsWithoutDelay() {
// Create a future that is already completed.
CompletableFuture<?> future = CompletableFuture.completedFuture(null);

consumer.processBackgroundEvents(future, timer, e -> false);
consumer.processBackgroundEvents(future, timer, e -> false, false);

// Because we didn't need to perform a timed get, we should still have every last millisecond
// of our initial timeout.
assertEquals(1000, timer.remainingMs());
}

/**
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate) processBackgroundEvents}
* Tests {@link AsyncKafkaConsumer#processBackgroundEvents(Future, Timer, Predicate, boolean) processBackgroundEvents}
* handles the case where the {@link Future} does not complete within the timeout.
*/
@Test
Expand All @@ -1892,7 +1892,7 @@ public void testProcessBackgroundEventsTimesOut() throws Exception {
throw new java.util.concurrent.TimeoutException("Intentional timeout");
}).when(future).get(any(Long.class), any(TimeUnit.class));

assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(future, timer, e -> false));
assertThrows(TimeoutException.class, () -> consumer.processBackgroundEvents(future, timer, e -> false, false));

// Because we forced our mocked future to continuously time out, we should have no time remaining.
assertEquals(0, timer.remainingMs());
Expand Down Expand Up @@ -1993,6 +1993,40 @@ public void testUnsubscribeDoesNotCommitOffsetsEvenWithAutoCommitEnabled() {
verify(applicationEventHandler, never()).add(ArgumentMatchers.isA(CommitOnCloseEvent.class));
}

private static Stream<CompletableBackgroundEvent<?>> assignmentEventsSource() {
return Stream.of(
new PartitionsAssignedEvent(Set.of(), new TreeSet<>(TOPIC_PARTITION_COMPARATOR)),
new StreamsTasksAssignedEvent(
new TreeSet<>(TOPIC_PARTITION_COMPARATOR),
new TreeSet<>(TOPIC_PARTITION_COMPARATOR),
new StreamsRebalanceData.Assignment(Set.of(), Set.of(), Set.of(), true))
);
}

/**
* Test to ensure that assignment updates are not applied while unsubscribing
* (it would cause an IllegalArgumentException when calling unsubscribe()).
* Validates the fix for KAFKA-20428.
*/
@ParameterizedTest
@MethodSource("assignmentEventsSource")
public void testUnsubscribeWithPendingAssignmentEvent(CompletableBackgroundEvent<?> assignedEvent) {
consumer = newConsumer(requiredConsumerConfigAndGroupId("consumerGroup"));
completeTopicSubscriptionChangeEventSuccessfully();
consumer.subscribe(singletonList("topic"));
completeUnsubscribeApplicationEventSuccessfully();

// Add assignment event to the background queue (simulating an ongoing reconciliation
// that completed just before unsubscribe was called)
backgroundEventQueue.add(assignedEvent);

// The call to unsubscribe should complete successfully (assignment event not processed and completed exceptionally)
assertDoesNotThrow(() -> consumer.unsubscribe());
verify(applicationEventHandler, never().description("Reconciled assignment updates shouldn't be processed while unsubscribing"))
.addAndGet(any(ApplyAssignmentEvent.class));
assertTrue(assignedEvent.future().isCompletedExceptionally());
}

@Test
public void testSeekToBeginning() {
Collection<TopicPartition> topics = Collections.singleton(new TopicPartition("test", 0));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1248,6 +1248,45 @@ public void testLeaveGroupWhenMemberIsStale() {
assertEquals(MemberState.STALE, membershipManager.state());
}

/**
* Test that when unsubscribe/leaveGroup is called during an ongoing reconciliation and the pending
* assignment event is completed exceptionally, the member can still rejoin and start
* a new reconciliation.
*/
@Test
public void testLeaveGroupDuringReconciliationThenRejoin() {
Uuid topicId = Uuid.randomUuid();
String topicName = "topic1";
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup();
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, Collections.emptyList());

// Start reconciliation - assignment event is pending
receiveAssignment(topicId, Collections.singletonList(0), membershipManager);
membershipManager.maybeReconcile(true);
PartitionsAssignedEvent pendingAssignmentEvent = (PartitionsAssignedEvent) backgroundEventQueue.poll();
assertNotNull(pendingAssignmentEvent);

// Call leaveGroup while reconciliation is in progress
mockLeaveGroup();
membershipManager.leaveGroup();
assertEquals(MemberState.LEAVING, membershipManager.state());

// Complete the pending assignment event exceptionally (simulating unsubscribe skipping it)
pendingAssignmentEvent.future().completeExceptionally(
new KafkaException("Assignment event skipped because consumer is unsubscribing"));

// Complete leave and rejoin
membershipManager.onHeartbeatRequestGenerated();
clearInvocations(membershipManager);
mockOwnedPartitionAndAssignmentReceived(membershipManager, topicId, topicName, Collections.emptyList());
membershipManager.transitionToJoining();

// Receive assignment - verify new reconciliation starts
receiveAssignment(topicId, Collections.singletonList(0), membershipManager);
membershipManager.maybeReconcile(true);
verifyReconciliationTriggered(membershipManager);
}

@Test
public void testFatalFailureWhenStateIsUnjoined() {
ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,42 @@ public void testLeaveGroupOnCloseWhenInGroupWithAssignment() {
assertFalse(onGroupLeft.isCompletedExceptionally());
}

/**
* Test that when unsubscribe/leaveGroup is called during an ongoing reconciliation and the pending
* assignment event is completed exceptionally, the member can still rejoin and start
* a new reconciliation.
*/
@Test
public void testLeaveGroupDuringReconciliationThenRejoin() {
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, TOPIC_0);
final Set<StreamsRebalanceData.TaskId> activeTasks =
Set.of(new StreamsRebalanceData.TaskId(SUBTOPOLOGY_ID_0, PARTITION_0));
when(subscriptionState.assignedPartitions()).thenReturn(Set.of());
joining();

// Start reconciliation - assignment event is pending
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
final StreamsTasksAssignedEvent pendingAssignmentEvent =
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());

// Call leaveGroup while reconciliation is in progress
membershipManager.leaveGroup();

// Complete the pending assignment event exceptionally (simulating unsubscribe skipping it)
pendingAssignmentEvent.future().completeExceptionally(
new KafkaException("Assignment event skipped because consumer is unsubscribing"));

// Complete leave and rejoin
membershipManager.onHeartbeatRequestGenerated();
Mockito.clearInvocations(backgroundEventHandler);
tasksAssignedAddCount = 0;
joining();

// Receive assignment - verify new reconciliation starts
reconcile(makeHeartbeatResponseWithActiveTasks(SUBTOPOLOGY_ID_0, List.of(PARTITION_0)));
verifyOnTasksAssignedCallbackNeededEventAddedToBackgroundEventHandler(activeTasks, Set.of(), Set.of());
}

@Test
public void testOnHeartbeatRequestSkippedWhenInLeaving() {
setupStreamsRebalanceDataWithOneSubtopologyOneSourceTopic(SUBTOPOLOGY_ID_0, "topic");
Expand Down
Loading