Skip to content

Commit b954b35

Browse files
authored
KAFKA-20332: Fix to ensure app thread not collecting records for partitions being revoked (#21897)
This addresses race conditions where the app thread could collect/return records for revoked partitions. Fix by ensuring that the app thread does not return buffered records if it hasn't checked pending reconciliations. Once it checked pending reconciliations, we know that partitions being revoked were marked as non-fetchable (so it's when we can safely move onto fetching/collecting in the app thread). Also ensure that background reconciliations do not trigger revocations (the app thread could already have records in memory, collected from the buffer, for those partitions, which would lead to the consumer returning records for revoked partitions if the background completes the revocation before the app thread returns). With these fixes we are sure that the app thread only collects/returns records after it has marked revoked partitions as non-fetchable. This fix applies to the consumer only (share consumer remains unchanged with this PR, can trigger full reconciliation & assignment update from the background) Reviewers: Andrew Schofield <aschofield@confluent.io>, nileshkumar3 <nileshkumar3@gmail.com>, PoAn Yang <payang@apache.org>, Chia-Ping Tsai <chia7712@gmail.com>, Kirk True <ktrue@confluent.io>
1 parent 5a2dcf8 commit b954b35

6 files changed

Lines changed: 148 additions & 7 deletions

File tree

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractMembershipManager.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -838,9 +838,6 @@ public void maybeReconcile(boolean canCommit) {
838838
return;
839839
}
840840

841-
if (autoCommitEnabled && !canCommit) return;
842-
markReconciliationInProgress();
843-
844841
// Keep copy of assigned TopicPartitions created from the TopicIdPartitions that are
845842
// being reconciled. Needed for interactions with the centralized subscription state that
846843
// does not support topic IDs yet, and for the callbacks.
@@ -858,6 +855,14 @@ public void maybeReconcile(boolean canCommit) {
858855
revokedPartitions.addAll(ownedPartitions);
859856
revokedPartitions.removeAll(assignedTopicPartitions);
860857

858+
// If canCommit is false (called from background poll(), not from AsyncPollEvent), skip
859+
// reconciliation if it would involve revocation or auto-commit.
860+
// Reconciliations revoking partitions cannot be triggered from the background because the app thread could be returning records for those partitions already.
861+
// Reconciliations just adding new partitions are safe to trigger from the background thread since new partitions won't have buffered records.
862+
if (!canCommit && (autoCommitEnabled || !revokedPartitions.isEmpty())) return;
863+
864+
markReconciliationInProgress();
865+
861866
log.info("Reconciling assignment with local epoch {}\n" +
862867
"\tMember: {}\n" +
863868
"\tAssigned partitions: {}\n" +

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1958,16 +1958,16 @@ WakeupTrigger wakeupTrigger() {
19581958
}
19591959

19601960
private Fetch<K, V> pollForFetches(Timer timer) {
1961-
long pollTimeout = isCommittedOffsetsManagementEnabled()
1962-
? Math.min(applicationEventHandler.maximumTimeToWait(), timer.remainingMs())
1963-
: timer.remainingMs();
19641961

19651962
// if data is available already, return it immediately
19661963
final Fetch<K, V> fetch = collectFetch();
19671964
if (!fetch.isEmpty()) {
19681965
return fetch;
19691966
}
19701967

1968+
long pollTimeout = isCommittedOffsetsManagementEnabled()
1969+
? Math.min(applicationEventHandler.maximumTimeToWait(), timer.remainingMs())
1970+
: timer.remainingMs();
19711971
// With the non-blocking poll design, it's possible that at this point the background thread is
19721972
// concurrently working to update positions. Therefore, a _copy_ of the current assignment is retrieved
19731973
// and iterated looking for any partitions with invalid positions. This is done to avoid being stuck
@@ -2019,6 +2019,28 @@ private Fetch<K, V> pollForFetches(Timer timer) {
20192019
* for returning.
20202020
*/
20212021
private Fetch<K, V> collectFetch() {
2022+
// Do not return buffered records if the background hasn't checked for pending reconciliations
2023+
// for the inflight poll event.
2024+
// This is key because partitions may need revocation, so we need to wait for the reconciliation check
2025+
// that triggers commits and marks partitions as pending revocation, before we can
2026+
// safely collect records from the buffer.
2027+
if (inflightPoll != null && !inflightPoll.isReconciliationCheckComplete()) {
2028+
// If the background hasn't had the time to check for pending reconciliation,
2029+
// we need to wait for that check before moving on (instead of returning empty right away,
2030+
// which will lead to blocking on buffer data)
2031+
long timeoutMs = inflightPoll.deadlineMs() - time.milliseconds();
2032+
if (timeoutMs > 0) {
2033+
try {
2034+
ConsumerUtils.getResult(inflightPoll.reconciliationCheckFuture(), timeoutMs);
2035+
} catch (TimeoutException e) {
2036+
return Fetch.empty();
2037+
}
2038+
} else {
2039+
// No time to wait and reconciliation check not complete
2040+
return Fetch.empty();
2041+
}
2042+
}
2043+
20222044
// With the non-blocking async poll, it's critical that the application thread wait until the background
20232045
// thread has completed the stage of validating positions. This prevents a race condition where both
20242046
// threads may attempt to update the SubscriptionState.position() for a given partition. So if the background

clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareMembershipManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.kafka.clients.Metadata;
2020
import org.apache.kafka.clients.consumer.ConsumerConfig;
21+
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate.PollResult;
2122
import org.apache.kafka.clients.consumer.internals.metrics.ShareRebalanceMetricsManager;
2223
import org.apache.kafka.common.TopicPartition;
2324
import org.apache.kafka.common.Uuid;
@@ -199,4 +200,16 @@ public int joinGroupEpoch() {
199200
public int leaveGroupEpoch() {
200201
return ShareGroupHeartbeatRequest.LEAVE_GROUP_MEMBER_EPOCH;
201202
}
203+
204+
/**
205+
* {@inheritDoc}
206+
* <p>
207+
* For the ShareConsumer, full reconciliations can always be triggered from the background thread
208+
* (fully updates assignment).
209+
*/
210+
@Override
211+
public PollResult poll(final long currentTimeMs) {
212+
maybeReconcile(true);
213+
return PollResult.EMPTY;
214+
}
202215
}

clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,10 @@ private void process(final AsyncPollEvent event) {
760760
requestManagers.consumerMembershipManager.ifPresent(consumerMembershipManager ->
761761
consumerMembershipManager.maybeReconcile(true));
762762

763+
// We completed checking pending reconciliations (commits triggered, revoked partitions marked to prevent fetching)
764+
// so the application thread poll loop can safely continue progress now (fetching)
765+
event.markReconciliationCheckComplete();
766+
763767
if (requestManagers.commitRequestManager.isPresent()) {
764768
CommitRequestManager commitRequestManager = requestManagers.commitRequestManager.get();
765769
commitRequestManager.updateTimerAndMaybeCommit(event.pollTimeMs());

clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/AsyncPollEvent.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import java.time.Duration;
2727
import java.util.Optional;
28+
import java.util.concurrent.CompletableFuture;
2829

2930
/**
3031
* This class represents the non-blocking event that executes logic functionally equivalent to the following:
@@ -47,6 +48,7 @@ public class AsyncPollEvent extends ApplicationEvent implements MetadataErrorNot
4748
private volatile KafkaException error;
4849
private volatile boolean isComplete;
4950
private volatile boolean isValidatePositionsComplete;
51+
private final CompletableFuture<Void> reconciliationCheckFuture = new CompletableFuture<>();
5052

5153
/**
5254
* Creates a new event to signify a multi-stage processing of {@link Consumer#poll(Duration)} logic.
@@ -85,15 +87,47 @@ public void markValidatePositionsComplete() {
8587
this.isValidatePositionsComplete = true;
8688
}
8789

90+
/**
91+
* @return the future that completes when the background thread has checked any pending reconciliation
92+
* for this poll event. Once complete, revocations have been handled (commit triggered and partitions
93+
* marked as pending revocation), so the app thread can safely proceed to fetch/collect records.
94+
*/
95+
public CompletableFuture<Void> reconciliationCheckFuture() {
96+
return reconciliationCheckFuture;
97+
}
98+
99+
/**
100+
* @return true if the background already checked any pending reconciliation when processing this poll event.
101+
* If it completed the check, we know that revocations were handled (commit triggered and partitions marked as pending revocation),
102+
* so the app thread can safely proceed to fetch/collect records.
103+
*/
104+
public boolean isReconciliationCheckComplete() {
105+
return reconciliationCheckFuture.isDone();
106+
}
107+
108+
/**
109+
* Mark that reconciliation check is complete for this poll event.
110+
* This should be called after the background has checked pending reconciliations when processing this poll event
111+
* (triggered commits, and marked partitions as pending revocation if needed)
112+
*/
113+
public void markReconciliationCheckComplete() {
114+
reconciliationCheckFuture.complete(null);
115+
}
116+
88117
public boolean isComplete() {
89118
return isComplete;
90119
}
91120

92121
public void completeSuccessfully() {
122+
// Complete reconciliation future as safety net in case it wasn't already marked complete
123+
reconciliationCheckFuture.complete(null);
93124
isComplete = true;
94125
}
95126

96127
public void completeExceptionally(KafkaException e) {
128+
// Complete reconciliation future to unblock any waiters - the error will be surfaced
129+
// through the normal checkInflightPoll() mechanism via the error field
130+
reconciliationCheckFuture.complete(null);
97131
error = e;
98132
isComplete = true;
99133
}
@@ -110,6 +144,7 @@ protected String toStringBase() {
110144
", pollTimeMs=" + pollTimeMs +
111145
", error=" + error +
112146
", isComplete=" + isComplete +
113-
", isValidatePositionsComplete=" + isValidatePositionsComplete;
147+
", isValidatePositionsComplete=" + isValidatePositionsComplete +
148+
", isReconciliationCheckComplete=" + isReconciliationCheckComplete();
114149
}
115150
}

clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -535,6 +535,68 @@ public void testClearWakeupTriggerAfterPoll() {
535535
assertDoesNotThrow(() -> consumer.poll(Duration.ZERO));
536536
}
537537

538+
/**
539+
* Test that poll() does not return records until the reconciliation check is complete.
540+
* This prevents a race condition where records could be returned for partitions that
541+
* are being revoked (see KAFKA-20332).
542+
*/
543+
@Test
544+
public void testPollWaitsForReconciliationCheckComplete() {
545+
final String topicName = "foo";
546+
final int partition = 3;
547+
final TopicPartition tp = new TopicPartition(topicName, partition);
548+
final List<ConsumerRecord<String, String>> records = asList(
549+
new ConsumerRecord<>(topicName, partition, 2, "key1", "value1"),
550+
new ConsumerRecord<>(topicName, partition, 3, "key2", "value2")
551+
);
552+
553+
SubscriptionState subscriptions = new SubscriptionState(new LogContext(), AutoOffsetResetStrategy.NONE);
554+
consumer = newConsumer(
555+
mock(FetchBuffer.class),
556+
new ConsumerInterceptors<>(Collections.emptyList(), metrics),
557+
mock(ConsumerRebalanceListenerInvoker.class),
558+
subscriptions);
559+
560+
doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any());
561+
// PositionsValidator starts with metadataUpdateVersion=-1. Stub metadata.updateVersion() to match,
562+
// so canSkipUpdateFetchPositions() passes and we test the reconciliation check path.
563+
doReturn(-1).when(metadata).updateVersion();
564+
565+
completeTopicSubscriptionChangeEventSuccessfully();
566+
consumer.subscribe(singleton(topicName), mock(ConsumerRebalanceListener.class));
567+
// Simulate partition assignment from group coordinator
568+
subscriptions.assignFromSubscribed(singleton(tp));
569+
570+
// Set up position so canSkipUpdateFetchPositions() returns true (partition in FETCHING state)
571+
completeSeekUnvalidatedEventSuccessfully();
572+
subscriptions.seek(tp, 0);
573+
574+
// Set up fetch collector to return records when called
575+
doReturn(Fetch.forPartition(tp, records, true, new OffsetAndMetadata(4, Optional.of(0), "")))
576+
.when(fetchCollector).collectFetch(any(FetchBuffer.class));
577+
578+
// Capture the AsyncPollEvent to manually control when reconciliation check is marked complete
579+
AtomicReference<AsyncPollEvent> capturedEvent = new AtomicReference<>();
580+
doAnswer(invocation -> {
581+
AsyncPollEvent event = invocation.getArgument(0);
582+
assertTrue(capturedEvent.compareAndSet(null, event));
583+
// Do NOT mark reconciliation check complete - simulating background hasn't processed it yet
584+
return null;
585+
}).when(applicationEventHandler).add(ArgumentMatchers.isA(AsyncPollEvent.class));
586+
587+
// Poll should return empty because reconciliation check is not complete.
588+
ConsumerRecords<?, ?> result1 = consumer.poll(Duration.ZERO);
589+
assertTrue(result1.isEmpty(), "Poll should not return records if it hasn't completed checking and triggering pending reconciliations.");
590+
591+
// Now mark reconciliation check complete on the captured event
592+
assertNotNull(capturedEvent.get(), "AsyncPollEvent should have been captured");
593+
capturedEvent.get().markReconciliationCheckComplete();
594+
595+
// Next poll should return the records since reconciliation check is now complete
596+
ConsumerRecords<?, ?> result2 = consumer.poll(Duration.ZERO);
597+
assertEquals(2, result2.count(), "Expected 2 records after reconciliation check is complete");
598+
}
599+
538600
@Test
539601
public void testEnsureCallbackExecutedByApplicationThread() {
540602
consumer = newConsumer();

0 commit comments

Comments
 (0)