-
Notifications
You must be signed in to change notification settings - Fork 15.2k
KAFKA-20428: Fix unsubscribe failure with assignment updates #22011
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1928,7 +1928,11 @@ public void unsubscribe() { | |
| try { | ||
| // If users have fatal error, they will get some exceptions in the background queue. | ||
| // When running unsubscribe, these exceptions should be ignored, or users can't unsubscribe successfully. | ||
| processBackgroundEvents(unsubscribeEvent.future(), timer, e -> (e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException)); | ||
| // We also skip processing assignment events (PARTITIONS_ASSIGNED, STREAMS_TASKS_ASSIGNED) because | ||
| // they are not relevant anymore (consumer already unsubscribing). | ||
| processBackgroundEvents(unsubscribeEvent.future(), timer, | ||
| e -> (e instanceof GroupAuthorizationException || e instanceof TopicAuthorizationException), | ||
| true); | ||
| log.info("Unsubscribed all topics or patterns and assigned partitions"); | ||
| } catch (TimeoutException e) { | ||
| log.error("Failed while waiting for the unsubscribe event to complete"); | ||
|
|
@@ -2310,6 +2314,31 @@ private void subscribeInternal(Collection<String> topics, Optional<ConsumerRebal | |
| * Visible for testing. | ||
| */ | ||
| boolean processBackgroundEvents() { | ||
| return processBackgroundEvents(false); | ||
| } | ||
|
|
||
| /** | ||
| * Checks if the given background event is an assignment update event. | ||
| * Those are to update reconciled assignments, so should only be processed from poll() and not from unsubscribe(). | ||
| */ | ||
| private static boolean isAssignmentEvent(BackgroundEvent event) { | ||
| return event.type() == BackgroundEvent.Type.PARTITIONS_ASSIGNED || | ||
| event.type() == BackgroundEvent.Type.STREAMS_TASKS_ASSIGNED; | ||
| } | ||
|
|
||
| /** | ||
| * Process the events produced by the background thread. | ||
| * It is possible that {@link ErrorEvent an error} | ||
| * could occur when processing the events. In such cases, the processor will take a reference to the first | ||
| * error, continue to process the remaining events, and then throw the first error that occurred. | ||
| * Visible for testing. | ||
| * | ||
| * @param skipAssignmentEvents If true, skip processing events that update a new assignment after a reconciliation | ||
| * (PARTITIONS_ASSIGNED and STREAMS_TASKS_ASSIGNED) | ||
| * These events should only be processed from poll(), not from unsubscribe(). | ||
| * @return true if any events were drained from the queue | ||
| */ | ||
| boolean processBackgroundEvents(boolean skipAssignmentEvents) { | ||
| AtomicReference<KafkaException> firstError = new AtomicReference<>(); | ||
|
|
||
| List<BackgroundEvent> events = backgroundEventHandler.drainEvents(); | ||
|
|
@@ -2321,6 +2350,18 @@ boolean processBackgroundEvents() { | |
| if (event instanceof CompletableEvent) | ||
| backgroundEventReaper.add((CompletableEvent<?>) event); | ||
|
|
||
| // Skip assignment events if requested (e.g., during unsubscribe). | ||
| // These events should only be processed from poll(). | ||
| // Complete them exceptionally to unblock the reconciliation in the background. | ||
| if (skipAssignmentEvents && isAssignmentEvent(event)) { | ||
| if (event instanceof CompletableEvent) { | ||
| ((CompletableEvent<?>) event).future().completeExceptionally( | ||
| new KafkaException("Assignment event skipped because consumer is unsubscribing")); | ||
|
lianetm marked this conversation as resolved.
|
||
| } | ||
|
Comment on lines
+2357
to
+2360
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Both of these events are
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, just extra check because at this level we don't know about the specific events |
||
| log.debug("Skipped processing {} during unsubscribe", event.type()); | ||
| continue; | ||
| } | ||
|
|
||
| backgroundEventProcessor.process(event); | ||
| } catch (Throwable t) { | ||
| KafkaException e = ConsumerUtils.maybeWrapAsKafkaException(t); | ||
|
|
@@ -2378,14 +2419,19 @@ boolean processBackgroundEvents() { | |
| * @param timer Overall timer that bounds how long to wait for the event to complete | ||
| * @param ignoreErrorEventException Predicate to ignore background errors. | ||
| * Any exceptions found while processing background events that match the predicate won't be propagated. | ||
| * @return {@code true} if the event completed within the timeout, {@code false} otherwise | ||
| * @param skipAssignmentEvents If true, skip processing PARTITIONS_ASSIGNED and STREAMS_TASKS_ASSIGNED | ||
| * events and complete them exceptionally. These events should only be | ||
| * processed from poll(), not from unsubscribe() or other operations. | ||
| * @return the completed result of the supplied {@code future} | ||
| * @throws TimeoutException if the operation does not complete before the timer expires | ||
| */ | ||
| // Visible for testing | ||
| <T> T processBackgroundEvents(Future<T> future, Timer timer, Predicate<Exception> ignoreErrorEventException) { | ||
| <T> T processBackgroundEvents(Future<T> future, Timer timer, Predicate<Exception> ignoreErrorEventException, | ||
| boolean skipAssignmentEvents) { | ||
| do { | ||
| boolean hadEvents = false; | ||
| try { | ||
| hadEvents = processBackgroundEvents(); | ||
| hadEvents = processBackgroundEvents(skipAssignmentEvents); | ||
| } catch (Exception e) { | ||
| if (!ignoreErrorEventException.test(e)) | ||
| throw e; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we're already using an anonymous function for
Predicate<Exception> ignoreErrorEventException, could we also do that instead of a flag, i.e.:Then we can delete
isAssignmentEvent()and the event filtering becomes a little more generalized.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree that it would given some consistency in the params, but at the price of complexity in a case where seems to me we don't really need the flexibitliy of a predicate?
This is only used from the unsubscribe to "skip assignment", not different callers filtering different kinds of events. So I would lean towards simple/YAGNI in this case, makes sense?