Skip to content

Commit e1cfea3

Browse files
KAFKA-20505: Moving future completion out of locks for Share Partition (#22126)
As explianed in KAFKA-20505, there can be a deadlock when future is completed for the request where next set of actions tries to attain lock on purgatory (checkAndComplete/trigger waiting requests). As the lock might not always be released hence a deadlock can happen. The PR moves such futures out of the lock. I have also reviewed other future completions and doesn't seems we need other changes. I have tested using franz-go Kafka test and can't reproduce the issues in 160 continuous runs. Earlier the issue was reproducible between 20-50 consecutive runs. ``` === Run 160 === === RUN TestShareGroupETL === PAUSE TestShareGroupETL === CONT TestShareGroupETL [09:59:38.788 1][INFO] producing to a new topic for the first time, fetching metadata to learn its partitions; topic: f2c35a17978f41d684a36ab8297dd873138cb3c982f095a7d0e74d7a8589411e ... ... [09:59:43.94 3][INFO] immediate metadata update triggered; why: forced load because we are producing to a topic for the first time [09:59:43.947 3][INFO] done waiting for metadata for new topic; topic: 6ce53f956e91c6b106843ff7aa728451f290cbd9064cefc8dc73ee2bcadef7b9 share_test.go:507: level 1 phase 2: adding consumers after 122923 consumed [09:59:44.225 3][INFO] flushing [09:59:44.225 4][INFO] immediate metadata update triggered; why: querying metadata for consumer initialization ... ... [09:59:44.226 5][INFO] beginning to manage the share group lifecycle; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35 [09:59:44.226 10][INFO] beginning to manage the share group lifecycle; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35 [09:59:44.227 3][INFO] leaving share group; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35, member_id: LP4lpqzQjAm-QxQdCRkSXA== [09:59:44.227 7][INFO] assigning share partitions; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35, assignments: map[] [09:59:44.227 4][INFO] assigning share partitions; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35, assignments: map[] ... ... [09:59:49.232 7][INFO] assigning share partitions; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35, assignments: map[f2c35a17978f41d684a36ab8297dd873138cb3c982f095a7d0e74d7a8589411e:[1 2]] [09:59:49.232 6][INFO] assigning share partitions; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35, assignments: map[f2c35a17978f41d684a36ab8297dd873138cb3c982f095a7d0e74d7a8589411e:[1]] ... ... [09:59:49.466 18][INFO] immediate metadata update triggered; why: querying metadata for consumer initialization [09:59:49.466 13][INFO] immediate metadata update triggered; why: querying metadata for consumer initialization [09:59:49.466 15][INFO] immediate metadata update triggered; why: querying metadata for consumer initialization [09:59:49.467 16][INFO] beginning to manage the share group lifecycle; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6 [09:59:49.467 14][INFO] beginning to manage the share group lifecycle; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6 [09:59:49.467 11][INFO] beginning to manage the share group lifecycle; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6 ... ... [09:59:49.467 15][INFO] beginning to manage the share group lifecycle; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6 [09:59:49.468 13][INFO] assigning share partitions; group: 0045957d65fdd11a7bd0dfbc9293c3fd935da4bb359df6fa56aca8dcc119ff35, assignments: map[] [09:59:49.469 17][INFO] assigning share partitions; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6, assignments: map[] ... ... [09:59:54.472 18][INFO] assigning share partitions; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6, assignments: map[5da63960cb4fe97d887d575a73dfbddc51a2eb8071d119b3a5ba5a2b0d87bc7e:[1]] [09:59:54.485 14][INFO] assigning share partitions; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6, assignments: map[6ce53f956e91c6b106843ff7aa728451f290cbd9064cefc8dc73ee2bcadef7b9:[0]] ... ... [09:59:54.494 12][INFO] metadata update triggered; why: reload trigger due to produce topic still not known [09:59:54.495 12][INFO] producer id initialization success; id: 3524, epoch: 0 [09:59:54.5 13][INFO] producing to a new topic for the first time, fetching metadata to learn its partitions; topic: 6ce53f956e91c6b106843ff7aa728451f290cbd9064cefc8dc73ee2bcadef7b9 [09:59:54.5 13][INFO] immediate metadata update triggered; why: forced load because we are producing to a topic for the first time ... ... [09:59:54.525 11][INFO] leaving share group; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6, member_id: _4Jk9faoDdUlGMlSR9zKmg== share_test.go:605: level 2 rebalance 1: killing l2-c1 after 169339 consumed [09:59:55.101 14][INFO] flushing [09:59:55.101 19][INFO] immediate metadata update triggered; why: querying metadata for consumer initialization [09:59:55.102 19][INFO] beginning to manage the share group lifecycle; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6 [09:59:55.103 14][INFO] leaving share group; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6, member_id: wJibgxG934tiAuqCPloF_w== [09:59:55.107 19][INFO] assigning share partitions; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6, assignments: map[] share_test.go:619: level 2 rebalance 2: killing l2-c3 after 375726 consumed [09:59:55.401 18][INFO] flushing ... ... [10:00:00.915 20][INFO] leaving share group; group: d90b64695fc163edc7c1052c412cd0ca4d4dd1696badab2b5a9b5a1e81e000c6, member_id: HjzG4-5QwncfYfr8pSmEUQ== share_test.go:377: level 1: 499900 unique keys, 500624 total accepts, 500624 produced, 724 duplicates, 35614 redelivered, max dc 3, consumed 532987 share_test.go:377: level 2: 499900 unique keys, 501513 total accepts, 501513 produced, 1613 duplicates, 20272 redelivered, max dc 2, consumed 518049 share_test.go:704: level 1: 100 purely rejected, 35614 redelivered share_test.go:60: deleting topic f2c35a17978f41d684a36ab8297dd873138cb3c982f095a7d0e74d7a8589411e share_test.go:61: deleting topic f7e388a2de7ef0814328f9186e8c4b73b1f2437490e1b98730af9fb17ee74175 share_test.go:62: deleting topic 5da63960cb4fe97d887d575a73dfbddc51a2eb8071d119b3a5ba5a2b0d87bc7e share_test.go:63: deleting topic 6ce53f956e91c6b106843ff7aa728451f290cbd9064cefc8dc73ee2bcadef7b9 share_test.go:64: deleting topic 7e74eb054cbb02e0de5da8a8018115dc01094496222039f323841770b11b8a12 share_test.go:65: deleting topic 4b8c44d4071cd22272ae9ac694342faa3404bd10b479fe88874bdef4a8a4276d --- PASS: TestShareGroupETL (22.73s) PASS ok github.com/twmb/franz-go/pkg/kgo 22.926s ``` Reviewers: Andrew Schofield <aschofield@confluent.io>, Chia-Ping Tsai <chia7712@gmail.com>, PoAn Yang <payang@apache.org>
1 parent 0d95e46 commit e1cfea3

1 file changed

Lines changed: 16 additions & 13 deletions

File tree

core/src/main/java/kafka/server/share/SharePartition.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2484,9 +2484,9 @@ void rollbackOrProcessStateUpdates(
24842484
Throwable throwable,
24852485
List<PersisterBatch> persisterBatches
24862486
) {
2487-
lock.writeLock().lock();
2488-
try {
2489-
if (throwable != null) {
2487+
if (throwable != null) {
2488+
lock.writeLock().lock();
2489+
try {
24902490
// Log in DEBUG to avoid flooding of logs for a faulty client.
24912491
log.debug("Request failed for updating state, rollback any changed state"
24922492
+ " for the share partition: {}-{}", groupId, topicIdPartition);
@@ -2503,16 +2503,16 @@ void rollbackOrProcessStateUpdates(
25032503
deliveryCompleteCount.addAndGet(-numInFlightRecordsInBatch(persisterBatch.stateBatch.firstOffset(), persisterBatch.stateBatch.lastOffset()));
25042504
}
25052505
});
2506-
future.completeExceptionally(throwable);
2507-
return;
2506+
} finally {
2507+
lock.writeLock().unlock();
25082508
}
2509+
future.completeExceptionally(throwable);
2510+
return;
2511+
}
25092512

2510-
if (persisterBatches.isEmpty()) {
2511-
future.complete(null);
2512-
return;
2513-
}
2514-
} finally {
2515-
lock.writeLock().unlock();
2513+
if (persisterBatches.isEmpty()) {
2514+
future.complete(null);
2515+
return;
25162516
}
25172517

25182518
writeShareGroupState(persisterBatches.stream().map(PersisterBatch::stateBatch).toList())
@@ -2541,7 +2541,6 @@ void rollbackOrProcessStateUpdates(
25412541
deliveryCompleteCount.addAndGet(-numInFlightRecordsInBatch(persisterBatch.stateBatch.firstOffset(), persisterBatch.stateBatch.lastOffset()));
25422542
}
25432543
});
2544-
future.completeExceptionally(exception);
25452544
return;
25462545
}
25472546

@@ -2639,9 +2638,13 @@ void rollbackOrProcessStateUpdates(
26392638
});
26402639
// Update the cached state and start and end offsets after acknowledging/releasing the acquired records.
26412640
cacheStateUpdated = maybeUpdateCachedStateAndOffsets();
2642-
future.complete(null);
26432641
} finally {
26442642
lock.writeLock().unlock();
2643+
if (exception != null) {
2644+
future.completeExceptionally(exception);
2645+
} else {
2646+
future.complete(null);
2647+
}
26452648
// Maybe complete the delayed share fetch request if the state has been changed in cache
26462649
// which might have moved start offset ahead. Hence, the pending delayed share fetch
26472650
// request can be completed. The call should be made outside the lock to avoid deadlock.

0 commit comments

Comments
 (0)