From af182cc7e8b5c260786846406218c748c383c13d Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Thu, 28 May 2026 11:31:05 +0200 Subject: [PATCH 1/2] fix(inkless): allow replicas to handle OffsetsForLeaderEpoch for switched topics --- .../scala/kafka/server/ReplicaManager.scala | 20 +++- .../kafka/server/ReplicaManagerTest.scala | 111 ++++++++++++++++++ 2 files changed, 128 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 14774ea32a..81657c9f81 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2801,7 +2801,11 @@ class ReplicaManager(val config: KafkaConfig, lazy val inklessFetchOffsetHandlerJob: Option[FetchOffsetHandler.Job] = inklessFetchOffsetHandler.map(_.createJob()) var disklessOffsetForLeaderEpochRequested = false - def localOffsetForLeaderEpoch(topicPartition: TopicPartition, offsetForLeaderPartition: OffsetForLeaderPartition): EpochEndOffset = { + def localOffsetForLeaderEpoch( + topicPartition: TopicPartition, + offsetForLeaderPartition: OffsetForLeaderPartition, + fetchOnlyFromLeader: Boolean = true + ): EpochEndOffset = { getPartition(topicPartition) match { case HostedPartition.Online(partition) => val currentLeaderEpochOpt = @@ -2813,7 +2817,7 @@ class ReplicaManager(val config: KafkaConfig, partition.lastOffsetForLeaderEpoch( currentLeaderEpochOpt, offsetForLeaderPartition.leaderEpoch, - fetchOnlyFromLeader = true) + fetchOnlyFromLeader = fetchOnlyFromLeader) case HostedPartition.Offline(_) => new EpochEndOffset() @@ -2891,7 +2895,17 @@ class ReplicaManager(val config: KafkaConfig, disklessOffsetForLeaderEpoch(topicPartition, offsetForLeaderPartition) case classicToDisklessStartOffset if classicToDisklessStartOffset >= 0L => - val localResult = localOffsetForLeaderEpoch(topicPartition, offsetForLeaderPartition) + // The classic prefix is sealed at the switch offset, so any replica with the + // complete local classic log can answer epoch lookups for that prefix. + val hasCompleteLocalClassicPrefix = getPartition(topicPartition) match { + case HostedPartition.Online(partition) => + partition.log.exists(_.highWatermark >= classicToDisklessStartOffset) + case _ => false + } + val localResult = localOffsetForLeaderEpoch( + topicPartition, + offsetForLeaderPartition, + fetchOnlyFromLeader = !hasCompleteLocalClassicPrefix) val localError = Errors.forCode(localResult.errorCode) if (localError != Errors.NONE) { () => localResult diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a79f75a494..178086288d 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -8284,6 +8284,117 @@ class ReplicaManagerTest { } } + @Test + def testLastOffsetForLeaderEpochHybridAtSwitchBoundaryUsesFollowerLocalLog(): Unit = { + val jobMock = Mockito.mock(classOf[FetchOffsetHandler.Job]) + when(jobMock.mustHandle(any())).thenReturn(true) + doNothing().when(jobMock).start() + + val fetchOffsetHandlerCtorInit: MockedConstruction.MockInitializer[FetchOffsetHandler] = { + case (handlerMock, _) => + when(handlerMock.createJob()).thenReturn(jobMock) + } + val fetchOffsetHandlerCtor = mockConstruction(classOf[FetchOffsetHandler], fetchOffsetHandlerCtorInit) + + val replicaManager = try { + createReplicaManager(List(disklessTopicPartition.topic()), disklessManagedReplicasEnabled = true) + } finally { + fetchOffsetHandlerCtor.close() + } + try { + val partition = setupHybridLeaderPartition(replicaManager, disklessTopicPartition, localEndOffset = 101L) + val remoteLeaderId = replicaManager.config.brokerId + 1 + partition.makeFollower( + partitionRegistration( + remoteLeaderId, + leaderEpoch = 1, + isr = Array(remoteLeaderId, replicaManager.config.brokerId), + partitionEpoch = 1, + replicas = Array(replicaManager.config.brokerId, remoteLeaderId)), + isNew = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), + Some(disklessTopicPartition.topicId())) + assertFalse(partition.isLeader) + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(101L) + + val requestedEpochInfo = Seq( + new OffsetForLeaderTopic() + .setTopic(disklessTopicPartition.topic()) + .setPartitions(util.List.of( + new OffsetForLeaderPartition() + .setPartition(disklessTopicPartition.partition()) + .setLeaderEpoch(0) + )) + ) + + val result = replicaManager.lastOffsetForLeaderEpoch(requestedEpochInfo) + + val partitionResult = result.head.partitions().get(0) + assertEquals(Errors.NONE.code, partitionResult.errorCode()) + assertEquals(101L, partitionResult.endOffset()) + verify(jobMock, never()).add(any(), any()) + verify(jobMock, never()).start() + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testLastOffsetForLeaderEpochHybridAtSwitchBoundaryRejectsLaggingFollowerLocalLog(): Unit = { + val jobMock = Mockito.mock(classOf[FetchOffsetHandler.Job]) + when(jobMock.mustHandle(any())).thenReturn(true) + doNothing().when(jobMock).start() + + val fetchOffsetHandlerCtorInit: MockedConstruction.MockInitializer[FetchOffsetHandler] = { + case (handlerMock, _) => + when(handlerMock.createJob()).thenReturn(jobMock) + } + val fetchOffsetHandlerCtor = mockConstruction(classOf[FetchOffsetHandler], fetchOffsetHandlerCtorInit) + + val replicaManager = try { + createReplicaManager(List(disklessTopicPartition.topic()), disklessManagedReplicasEnabled = true) + } finally { + fetchOffsetHandlerCtor.close() + } + try { + val partition = setupHybridLeaderPartition(replicaManager, disklessTopicPartition, localEndOffset = 50L) + val remoteLeaderId = replicaManager.config.brokerId + 1 + partition.makeFollower( + partitionRegistration( + remoteLeaderId, + leaderEpoch = 1, + isr = Array(remoteLeaderId), + partitionEpoch = 1, + replicas = Array(replicaManager.config.brokerId, remoteLeaderId)), + isNew = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), + Some(disklessTopicPartition.topicId())) + assertFalse(partition.isLeader) + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(101L) + + val requestedEpochInfo = Seq( + new OffsetForLeaderTopic() + .setTopic(disklessTopicPartition.topic()) + .setPartitions(util.List.of( + new OffsetForLeaderPartition() + .setPartition(disklessTopicPartition.partition()) + .setLeaderEpoch(0) + )) + ) + + val result = replicaManager.lastOffsetForLeaderEpoch(requestedEpochInfo) + + val partitionResult = result.head.partitions().get(0) + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionResult.errorCode()) + verify(jobMock, never()).add(any(), any()) + verify(jobMock, never()).start() + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + @Test def testLastOffsetForLeaderEpochHybridFallsBackToDisklessWhenClassicLogCannotAnswer(): Unit = { val disklessResult = new OffsetResultHolder.FileRecordsOrError( From 6c75b5d28d71121ea9b39abd6a8cde246c0e7a21 Mon Sep 17 00:00:00 2001 From: Giuseppe Lillo Date: Thu, 28 May 2026 12:03:04 +0200 Subject: [PATCH 2/2] fix(inkless): fix conditions for allowing ListOffsets to be handled by replicas Replicas of switched topics from classic to diskless should handle ListOffsets searches only if UnifiedLog if fully replicated from the leader. Cases like new replicas that have just been added as should not handle ListOffsets requests. --- .../server/DisklessFetchOffsetRouter.scala | 22 ++++++++---- .../scala/kafka/server/ReplicaManager.scala | 4 ++- .../DisklessFetchOffsetRouterTest.scala | 34 +++++++++++++++++++ 3 files changed, 52 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/server/DisklessFetchOffsetRouter.scala b/core/src/main/scala/kafka/server/DisklessFetchOffsetRouter.scala index 9a8d9222b7..4ecba80ff9 100644 --- a/core/src/main/scala/kafka/server/DisklessFetchOffsetRouter.scala +++ b/core/src/main/scala/kafka/server/DisklessFetchOffsetRouter.scala @@ -79,6 +79,8 @@ class DisklessFetchOffsetRouter( * @param classicLogStartOffsetProvider returns the local UnifiedLog's `logStartOffset` for the * given partition, used to decide whether the EARLIEST * timestamp can be served by the classic path. + * @param hasCompleteClassicPrefix returns whether the local UnifiedLog's high watermark has + * reached the classic-to-diskless switch offset. * @param classicFetchOffset runs the standard Kafka classic-path lookup for the * given `(topicPartition, partition, allowFromFollower)`. */ @@ -90,20 +92,26 @@ class DisklessFetchOffsetRouter( replicaId: Int, version: Short, classicLogStartOffsetProvider: TopicPartition => Option[Long], + hasCompleteClassicPrefix: (TopicPartition, Long) => Boolean, classicFetchOffset: (TopicPartition, ListOffsetsPartition, Boolean) => ListOffsetsPartitionStatus ): ListOffsetsPartitionStatus = { val classicToDisklessStartOffset = inklessMetadataView.getClassicToDisklessStartOffset(topicPartition) val switchPending = classicToDisklessStartOffset == PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING - val isSwitchedWithClassicAccess = (classicToDisklessStartOffset > 0 && disklessManagedReplicasEnabled) + val hasCommittedSwitchOffset = classicToDisklessStartOffset > 0 + val isSwitchedWithClassicAccess = hasCommittedSwitchOffset && disklessManagedReplicasEnabled val isConsolidatingPartition = disklessConsolidationEnabled && inklessMetadataView.isConsolidatingDisklessTopic(topicPartition.topic) // Switched partitions seal their classic local log: once classicToDisklessStartOffset is - // committed the LEO can no longer grow and every ISR replica has the same data on disk. - // Any replica can therefore safely answer ListOffsets from its local log, so we let - // followers serve the classic-side query as well. - // Since consolidating partitions contain only data that has been stored in the diskless - // coordinator and its offsets won't change, we can allow follower requests. - val allowFromFollower = isSwitchedWithClassicAccess || isConsolidatingPartition + // committed the LEO can no longer grow. Any replica whose local HW has reached the seal + // can therefore safely answer ListOffsets from its local log, so we let those followers + // serve the classic-side query as well. + // Consolidating partitions can also allow follower requests, except when they are switched + // and this broker has not caught up to the sealed classic prefix. + val switchedAllowsFollower = + isSwitchedWithClassicAccess && hasCompleteClassicPrefix(topicPartition, classicToDisklessStartOffset) + val consolidatingAllowsFollower = + isConsolidatingPartition && (!hasCommittedSwitchOffset || hasCompleteClassicPrefix(topicPartition, classicToDisklessStartOffset)) + val allowFromFollower = switchedAllowsFollower || consolidatingAllowsFollower val isFollowerRequest = replicaId >= 0 def classicLookup(): ListOffsetsPartitionStatus = classicFetchOffset(topicPartition, partition, allowFromFollower) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 81657c9f81..9242945a81 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1711,6 +1711,8 @@ class ReplicaManager(val config: KafkaConfig, classicFetchOffset(tp, p, replicaId, isolationLevel, version, correlationId, clientId, buildErrorResponse, allowFromFollower = allowFromFollower) val classicLogStart: TopicPartition => Option[Long] = tp => logManager.getLog(tp).map(_.logStartOffset) + val hasCompleteClassicPrefix: (TopicPartition, Long) => Boolean = + (tp, classicToDisklessStartOffset) => logManager.getLog(tp).exists(_.highWatermark >= classicToDisklessStartOffset) topics.foreach { topic => topic.partitions.asScala.foreach { partition => @@ -1726,7 +1728,7 @@ class ReplicaManager(val config: KafkaConfig, } else if (maybeFetchOffsetJob.exists(_.mustHandle(topic.name))) { statusByPartition += topicPartition -> disklessFetchOffsetRouter.route(maybeFetchOffsetJob.get, () => inklessFetchOffsetHandler.get.createJob(), - topicPartition, partition, replicaId, version, classicLogStart, classicFetch) + topicPartition, partition, replicaId, version, classicLogStart, hasCompleteClassicPrefix, classicFetch) } else { statusByPartition += topicPartition -> classicFetch(topicPartition, partition, false) } diff --git a/core/src/test/scala/unit/kafka/server/DisklessFetchOffsetRouterTest.scala b/core/src/test/scala/unit/kafka/server/DisklessFetchOffsetRouterTest.scala index be3e0eb6dc..8a42c9293a 100644 --- a/core/src/test/scala/unit/kafka/server/DisklessFetchOffsetRouterTest.scala +++ b/core/src/test/scala/unit/kafka/server/DisklessFetchOffsetRouterTest.scala @@ -94,6 +94,7 @@ class DisklessFetchOffsetRouterTest { replicaId: Int = consumerReplicaId, version: Short = 7, classicLogStartOffset: Option[Long] = None, + hasCompleteClassicPrefix: Boolean = true, classicResult: ListOffsetsPartitionStatus = defaultClassicResult, newJob: () => FetchOffsetHandler.Job = () => throw new AssertionError("newJob() should not be called by this routing path")): ListOffsetsPartitionStatus = { @@ -105,6 +106,7 @@ class DisklessFetchOffsetRouterTest { replicaId = replicaId, version = version, classicLogStartOffsetProvider = _ => classicLogStartOffset, + hasCompleteClassicPrefix = (_, _) => hasCompleteClassicPrefix, classicFetchOffset = (tpArg, partition, allow) => { classicCalls += ((tpArg, partition, allow)) classicResult @@ -179,6 +181,21 @@ class DisklessFetchOffsetRouterTest { verify(job, never()).add(any(), any()) } + @Test + def routesToClassicWithoutFollowerAccessWhenSwitchedFollowerClassicPrefixIsIncomplete(): Unit = { + when(inklessMetadataView.getClassicToDisklessStartOffset(tp)).thenReturn(100L) + + val status = route( + newRouter(), + timestamp = ListOffsetsRequest.LATEST_TIMESTAMP, + replicaId = followerReplicaId, + hasCompleteClassicPrefix = false) + + assertSame(defaultClassicResult, status) + assertClassicCalledWith(allowFromFollower = false) + verify(job, never()).add(any(), any()) + } + // --------------------------------------------------------------------------- // Case 2: hybrid routing by timestamp. // --------------------------------------------------------------------------- @@ -480,6 +497,23 @@ class DisklessFetchOffsetRouterTest { assertNotSame(noMatch, status) } + @Test + def hybridConsolidatingWithCommittedBoundaryRequiresCompleteClassicPrefixForFollowerAccess(): Unit = { + when(inklessMetadataView.getClassicToDisklessStartOffset(tp)).thenReturn(100L) + when(inklessMetadataView.isConsolidatingDisklessTopic(tp.topic)).thenReturn(true) + + val status = route( + newRouter(disklessConsolidationEnabled = true), + timestamp = ListOffsetsRequest.EARLIEST_TIMESTAMP, + classicLogStartOffset = Some(0L), + hasCompleteClassicPrefix = false + ) + + assertSame(defaultClassicResult, status) + assertClassicCalledWith(allowFromFollower = false) + verify(job, never()).add(any(), any()) + } + @Test def hybridConsolidatingEarliestReturnsClassicWhenClassicHitsEvenIfLogPastBoundary(): Unit = { when(inklessMetadataView.getClassicToDisklessStartOffset(tp)).thenReturn(100L)