diff --git a/core/src/main/scala/kafka/server/DisklessFetchOffsetRouter.scala b/core/src/main/scala/kafka/server/DisklessFetchOffsetRouter.scala index 9a8d9222b7..4ecba80ff9 100644 --- a/core/src/main/scala/kafka/server/DisklessFetchOffsetRouter.scala +++ b/core/src/main/scala/kafka/server/DisklessFetchOffsetRouter.scala @@ -79,6 +79,8 @@ class DisklessFetchOffsetRouter( * @param classicLogStartOffsetProvider returns the local UnifiedLog's `logStartOffset` for the * given partition, used to decide whether the EARLIEST * timestamp can be served by the classic path. + * @param hasCompleteClassicPrefix returns whether the local UnifiedLog's high watermark has + * reached the classic-to-diskless switch offset. * @param classicFetchOffset runs the standard Kafka classic-path lookup for the * given `(topicPartition, partition, allowFromFollower)`. */ @@ -90,20 +92,26 @@ class DisklessFetchOffsetRouter( replicaId: Int, version: Short, classicLogStartOffsetProvider: TopicPartition => Option[Long], + hasCompleteClassicPrefix: (TopicPartition, Long) => Boolean, classicFetchOffset: (TopicPartition, ListOffsetsPartition, Boolean) => ListOffsetsPartitionStatus ): ListOffsetsPartitionStatus = { val classicToDisklessStartOffset = inklessMetadataView.getClassicToDisklessStartOffset(topicPartition) val switchPending = classicToDisklessStartOffset == PartitionRegistration.CLASSIC_TO_DISKLESS_SWITCH_PENDING - val isSwitchedWithClassicAccess = (classicToDisklessStartOffset > 0 && disklessManagedReplicasEnabled) + val hasCommittedSwitchOffset = classicToDisklessStartOffset > 0 + val isSwitchedWithClassicAccess = hasCommittedSwitchOffset && disklessManagedReplicasEnabled val isConsolidatingPartition = disklessConsolidationEnabled && inklessMetadataView.isConsolidatingDisklessTopic(topicPartition.topic) // Switched partitions seal their classic local log: once classicToDisklessStartOffset is - // committed the LEO can no longer grow and every ISR replica has the same data on disk. - // Any replica can therefore safely answer ListOffsets from its local log, so we let - // followers serve the classic-side query as well. - // Since consolidating partitions contain only data that has been stored in the diskless - // coordinator and its offsets won't change, we can allow follower requests. - val allowFromFollower = isSwitchedWithClassicAccess || isConsolidatingPartition + // committed the LEO can no longer grow. Any replica whose local HW has reached the seal + // can therefore safely answer ListOffsets from its local log, so we let those followers + // serve the classic-side query as well. + // Consolidating partitions can also allow follower requests, except when they are switched + // and this broker has not caught up to the sealed classic prefix. + val switchedAllowsFollower = + isSwitchedWithClassicAccess && hasCompleteClassicPrefix(topicPartition, classicToDisklessStartOffset) + val consolidatingAllowsFollower = + isConsolidatingPartition && (!hasCommittedSwitchOffset || hasCompleteClassicPrefix(topicPartition, classicToDisklessStartOffset)) + val allowFromFollower = switchedAllowsFollower || consolidatingAllowsFollower val isFollowerRequest = replicaId >= 0 def classicLookup(): ListOffsetsPartitionStatus = classicFetchOffset(topicPartition, partition, allowFromFollower) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 14774ea32a..9242945a81 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1711,6 +1711,8 @@ class ReplicaManager(val config: KafkaConfig, classicFetchOffset(tp, p, replicaId, isolationLevel, version, correlationId, clientId, buildErrorResponse, allowFromFollower = allowFromFollower) val classicLogStart: TopicPartition => Option[Long] = tp => logManager.getLog(tp).map(_.logStartOffset) + val hasCompleteClassicPrefix: (TopicPartition, Long) => Boolean = + (tp, classicToDisklessStartOffset) => logManager.getLog(tp).exists(_.highWatermark >= classicToDisklessStartOffset) topics.foreach { topic => topic.partitions.asScala.foreach { partition => @@ -1726,7 +1728,7 @@ class ReplicaManager(val config: KafkaConfig, } else if (maybeFetchOffsetJob.exists(_.mustHandle(topic.name))) { statusByPartition += topicPartition -> disklessFetchOffsetRouter.route(maybeFetchOffsetJob.get, () => inklessFetchOffsetHandler.get.createJob(), - topicPartition, partition, replicaId, version, classicLogStart, classicFetch) + topicPartition, partition, replicaId, version, classicLogStart, hasCompleteClassicPrefix, classicFetch) } else { statusByPartition += topicPartition -> classicFetch(topicPartition, partition, false) } @@ -2801,7 +2803,11 @@ class ReplicaManager(val config: KafkaConfig, lazy val inklessFetchOffsetHandlerJob: Option[FetchOffsetHandler.Job] = inklessFetchOffsetHandler.map(_.createJob()) var disklessOffsetForLeaderEpochRequested = false - def localOffsetForLeaderEpoch(topicPartition: TopicPartition, offsetForLeaderPartition: OffsetForLeaderPartition): EpochEndOffset = { + def localOffsetForLeaderEpoch( + topicPartition: TopicPartition, + offsetForLeaderPartition: OffsetForLeaderPartition, + fetchOnlyFromLeader: Boolean = true + ): EpochEndOffset = { getPartition(topicPartition) match { case HostedPartition.Online(partition) => val currentLeaderEpochOpt = @@ -2813,7 +2819,7 @@ class ReplicaManager(val config: KafkaConfig, partition.lastOffsetForLeaderEpoch( currentLeaderEpochOpt, offsetForLeaderPartition.leaderEpoch, - fetchOnlyFromLeader = true) + fetchOnlyFromLeader = fetchOnlyFromLeader) case HostedPartition.Offline(_) => new EpochEndOffset() @@ -2891,7 +2897,17 @@ class ReplicaManager(val config: KafkaConfig, disklessOffsetForLeaderEpoch(topicPartition, offsetForLeaderPartition) case classicToDisklessStartOffset if classicToDisklessStartOffset >= 0L => - val localResult = localOffsetForLeaderEpoch(topicPartition, offsetForLeaderPartition) + // The classic prefix is sealed at the switch offset, so any replica with the + // complete local classic log can answer epoch lookups for that prefix. + val hasCompleteLocalClassicPrefix = getPartition(topicPartition) match { + case HostedPartition.Online(partition) => + partition.log.exists(_.highWatermark >= classicToDisklessStartOffset) + case _ => false + } + val localResult = localOffsetForLeaderEpoch( + topicPartition, + offsetForLeaderPartition, + fetchOnlyFromLeader = !hasCompleteLocalClassicPrefix) val localError = Errors.forCode(localResult.errorCode) if (localError != Errors.NONE) { () => localResult diff --git a/core/src/test/scala/unit/kafka/server/DisklessFetchOffsetRouterTest.scala b/core/src/test/scala/unit/kafka/server/DisklessFetchOffsetRouterTest.scala index be3e0eb6dc..8a42c9293a 100644 --- a/core/src/test/scala/unit/kafka/server/DisklessFetchOffsetRouterTest.scala +++ b/core/src/test/scala/unit/kafka/server/DisklessFetchOffsetRouterTest.scala @@ -94,6 +94,7 @@ class DisklessFetchOffsetRouterTest { replicaId: Int = consumerReplicaId, version: Short = 7, classicLogStartOffset: Option[Long] = None, + hasCompleteClassicPrefix: Boolean = true, classicResult: ListOffsetsPartitionStatus = defaultClassicResult, newJob: () => FetchOffsetHandler.Job = () => throw new AssertionError("newJob() should not be called by this routing path")): ListOffsetsPartitionStatus = { @@ -105,6 +106,7 @@ class DisklessFetchOffsetRouterTest { replicaId = replicaId, version = version, classicLogStartOffsetProvider = _ => classicLogStartOffset, + hasCompleteClassicPrefix = (_, _) => hasCompleteClassicPrefix, classicFetchOffset = (tpArg, partition, allow) => { classicCalls += ((tpArg, partition, allow)) classicResult @@ -179,6 +181,21 @@ class DisklessFetchOffsetRouterTest { verify(job, never()).add(any(), any()) } + @Test + def routesToClassicWithoutFollowerAccessWhenSwitchedFollowerClassicPrefixIsIncomplete(): Unit = { + when(inklessMetadataView.getClassicToDisklessStartOffset(tp)).thenReturn(100L) + + val status = route( + newRouter(), + timestamp = ListOffsetsRequest.LATEST_TIMESTAMP, + replicaId = followerReplicaId, + hasCompleteClassicPrefix = false) + + assertSame(defaultClassicResult, status) + assertClassicCalledWith(allowFromFollower = false) + verify(job, never()).add(any(), any()) + } + // --------------------------------------------------------------------------- // Case 2: hybrid routing by timestamp. // --------------------------------------------------------------------------- @@ -480,6 +497,23 @@ class DisklessFetchOffsetRouterTest { assertNotSame(noMatch, status) } + @Test + def hybridConsolidatingWithCommittedBoundaryRequiresCompleteClassicPrefixForFollowerAccess(): Unit = { + when(inklessMetadataView.getClassicToDisklessStartOffset(tp)).thenReturn(100L) + when(inklessMetadataView.isConsolidatingDisklessTopic(tp.topic)).thenReturn(true) + + val status = route( + newRouter(disklessConsolidationEnabled = true), + timestamp = ListOffsetsRequest.EARLIEST_TIMESTAMP, + classicLogStartOffset = Some(0L), + hasCompleteClassicPrefix = false + ) + + assertSame(defaultClassicResult, status) + assertClassicCalledWith(allowFromFollower = false) + verify(job, never()).add(any(), any()) + } + @Test def hybridConsolidatingEarliestReturnsClassicWhenClassicHitsEvenIfLogPastBoundary(): Unit = { when(inklessMetadataView.getClassicToDisklessStartOffset(tp)).thenReturn(100L) diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index a79f75a494..178086288d 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -8284,6 +8284,117 @@ class ReplicaManagerTest { } } + @Test + def testLastOffsetForLeaderEpochHybridAtSwitchBoundaryUsesFollowerLocalLog(): Unit = { + val jobMock = Mockito.mock(classOf[FetchOffsetHandler.Job]) + when(jobMock.mustHandle(any())).thenReturn(true) + doNothing().when(jobMock).start() + + val fetchOffsetHandlerCtorInit: MockedConstruction.MockInitializer[FetchOffsetHandler] = { + case (handlerMock, _) => + when(handlerMock.createJob()).thenReturn(jobMock) + } + val fetchOffsetHandlerCtor = mockConstruction(classOf[FetchOffsetHandler], fetchOffsetHandlerCtorInit) + + val replicaManager = try { + createReplicaManager(List(disklessTopicPartition.topic()), disklessManagedReplicasEnabled = true) + } finally { + fetchOffsetHandlerCtor.close() + } + try { + val partition = setupHybridLeaderPartition(replicaManager, disklessTopicPartition, localEndOffset = 101L) + val remoteLeaderId = replicaManager.config.brokerId + 1 + partition.makeFollower( + partitionRegistration( + remoteLeaderId, + leaderEpoch = 1, + isr = Array(remoteLeaderId, replicaManager.config.brokerId), + partitionEpoch = 1, + replicas = Array(replicaManager.config.brokerId, remoteLeaderId)), + isNew = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), + Some(disklessTopicPartition.topicId())) + assertFalse(partition.isLeader) + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(101L) + + val requestedEpochInfo = Seq( + new OffsetForLeaderTopic() + .setTopic(disklessTopicPartition.topic()) + .setPartitions(util.List.of( + new OffsetForLeaderPartition() + .setPartition(disklessTopicPartition.partition()) + .setLeaderEpoch(0) + )) + ) + + val result = replicaManager.lastOffsetForLeaderEpoch(requestedEpochInfo) + + val partitionResult = result.head.partitions().get(0) + assertEquals(Errors.NONE.code, partitionResult.errorCode()) + assertEquals(101L, partitionResult.endOffset()) + verify(jobMock, never()).add(any(), any()) + verify(jobMock, never()).start() + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + + @Test + def testLastOffsetForLeaderEpochHybridAtSwitchBoundaryRejectsLaggingFollowerLocalLog(): Unit = { + val jobMock = Mockito.mock(classOf[FetchOffsetHandler.Job]) + when(jobMock.mustHandle(any())).thenReturn(true) + doNothing().when(jobMock).start() + + val fetchOffsetHandlerCtorInit: MockedConstruction.MockInitializer[FetchOffsetHandler] = { + case (handlerMock, _) => + when(handlerMock.createJob()).thenReturn(jobMock) + } + val fetchOffsetHandlerCtor = mockConstruction(classOf[FetchOffsetHandler], fetchOffsetHandlerCtorInit) + + val replicaManager = try { + createReplicaManager(List(disklessTopicPartition.topic()), disklessManagedReplicasEnabled = true) + } finally { + fetchOffsetHandlerCtor.close() + } + try { + val partition = setupHybridLeaderPartition(replicaManager, disklessTopicPartition, localEndOffset = 50L) + val remoteLeaderId = replicaManager.config.brokerId + 1 + partition.makeFollower( + partitionRegistration( + remoteLeaderId, + leaderEpoch = 1, + isr = Array(remoteLeaderId), + partitionEpoch = 1, + replicas = Array(replicaManager.config.brokerId, remoteLeaderId)), + isNew = false, + new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints.asJava), + Some(disklessTopicPartition.topicId())) + assertFalse(partition.isLeader) + when(replicaManager.inklessMetadataView().getClassicToDisklessStartOffset(disklessTopicPartition.topicPartition())) + .thenReturn(101L) + + val requestedEpochInfo = Seq( + new OffsetForLeaderTopic() + .setTopic(disklessTopicPartition.topic()) + .setPartitions(util.List.of( + new OffsetForLeaderPartition() + .setPartition(disklessTopicPartition.partition()) + .setLeaderEpoch(0) + )) + ) + + val result = replicaManager.lastOffsetForLeaderEpoch(requestedEpochInfo) + + val partitionResult = result.head.partitions().get(0) + assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, partitionResult.errorCode()) + verify(jobMock, never()).add(any(), any()) + verify(jobMock, never()).start() + } finally { + replicaManager.shutdown(checkpointHW = false) + } + } + @Test def testLastOffsetForLeaderEpochHybridFallsBackToDisklessWhenClassicLogCannotAnswer(): Unit = { val disklessResult = new OffsetResultHolder.FileRecordsOrError(