kamalcph commented on code in PR #20428:
URL: https://github.com/apache/kafka/pull/20428#discussion_r2317900129
##########
core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala:
##########
@@ -135,6 +135,35 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
new OffsetAndEpoch(localLogStartOffset, epoch.orElse(0))
}
+ override def fetchEarliestPendingUploadOffset(topicPartition:
TopicPartition, currentLeaderEpoch: Int): OffsetAndEpoch = {
+ val partition = replicaManager.getPartitionOrException(topicPartition)
+ val log = partition.localLogOrException
+
+ if (!log.remoteLogEnabled())
+ return new OffsetAndEpoch(-1L, -1)
+
+ val highestRemoteOffset = log.highestOffsetInRemoteStorage()
+
+ if (highestRemoteOffset == -1L) {
+ val localLogStartOffset = fetchEarliestLocalOffset(topicPartition,
currentLeaderEpoch)
+ val logStartOffset = fetchEarliestOffset(topicPartition,
currentLeaderEpoch)
+
+ if (localLogStartOffset.offset() == logStartOffset.offset()) {
+ // No segments have been uploaded yet
+ return logStartOffset;
+ } else {
+ // Leader currently does not know about the already uploaded segments
+ return new OffsetAndEpoch(-1L, -1);
+ }
+ }
+
+ val logStartOffset = fetchEarliestOffset(topicPartition,
currentLeaderEpoch)
+ val earliestPendingUploadOffset = Math.max(highestRemoteOffset + 1,
logStartOffset.offset())
+ val epoch =
log.leaderEpochCache.epochForOffset(earliestPendingUploadOffset)
+
+ new OffsetAndEpoch(earliestPendingUploadOffset, epoch.orElse(0))
Review Comment:
epoch 0 is vaild. If the epoch is empty, shall we return -1?
##########
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##########
@@ -818,4 +820,96 @@ class ReplicaFetcherThreadTest {
when(replicaManager.localLogOrException(t2p1)).thenReturn(log)
when(replicaManager.getPartitionOrException(t2p1)).thenReturn(partition)
}
+
+ @ParameterizedTest
+ @CsvSource(Array(
+ "false, false, compact, 0, 0, false",
+ "false, false, compact, 5, 0, false",
+ "false, false, compact, 5, 1, false",
+ "false, false, delete, 0, 0, false",
+ "false, false, delete, 5, 0, false",
+ "false, false, delete, 5, 1, false",
+ "false, true, compact, 0, 0, false",
+ "false, true, compact, 5, 0, false",
+ "false, true, compact, 5, 1, false",
+ "false, true, delete, 0, 0, false",
+ "false, true, delete, 5, 0, false",
+ "false, true, delete, 5, 1, false",
+ "true, false, compact, 0, 0, false",
+ "true, false, compact, 5, 0, false",
+ "true, false, compact, 5, 1, false",
+ "true, false, delete, 0, 0, false",
+ "true, false, delete, 5, 0, false",
+ "true, false, delete, 5, 1, false",
+ "true, true, compact, 0, 0, false",
+ "true, true, compact, 5, 0, false",
+ "true, true, compact, 5, 1, false",
+ "true, true, delete, 0, 0, false",
+ "true, true, delete, 5, 0, true",
+ "true, true, delete, 5, 1, false"))
+ def testShouldFetchFromLastTieredOffset(enableLastTieredOffsetFetch: Boolean,
+ remoteStorageEnabled: Boolean,
+ cleanUpPolicy: String,
+ leaderEndOffset: Long,
+ replicaEndOffset: Long,
+ expected: Boolean): Unit = {
+ val tp = new TopicPartition("t", 0)
+
+ def runScenario(enableLastTieredOffsetFetch: Boolean,
Review Comment:
do we need this internal `runScenario` method? Can this be flattened?
```java
assertEquals(
expected,
thread.callShouldFetchFromLastTieredOffset(tp, leaderEndOffset,
replicaEndOffset)
)
```
##########
core/src/main/java/kafka/server/TierStateMachine.java:
##########
@@ -182,7 +179,7 @@ private void buildProducerSnapshotFile(UnifiedLog
unifiedLog,
* fetching records from the leader. The return value is the next offset
to fetch from the leader, which is the
* next offset following the end offset of the remote log portion.
*/
- private Long buildRemoteLogAuxState(TopicPartition topicPartition,
+ protected Long buildRemoteLogAuxState(TopicPartition topicPartition,
Review Comment:
nit: fix alignment
##########
core/src/test/scala/unit/kafka/server/MockLeaderEndPoint.scala:
##########
@@ -135,6 +135,16 @@ class MockLeaderEndPoint(sourceBroker: BrokerEndPoint =
new BrokerEndPoint(1, "l
new OffsetAndEpoch(leaderState.localLogStartOffset,
leaderState.leaderEpoch)
}
+ override def fetchEarliestPendingUploadOffset(topicPartition:
TopicPartition, leaderEpoch: Int): OffsetAndEpoch = {
+ val leaderState = leaderPartitionState(topicPartition)
+ checkLeaderEpochAndThrow(leaderEpoch, leaderState)
+ val offsetAndEpoch = leaderState.earliestPendingUploadOffset match {
+ case -1L => (-1L, -1)
Review Comment:
can the OffsetAndEpoch be returned directly?
```
leaderState.earliestPendingUploadOffset match {
case -1L => new OffsetAndEpoch(-1L, -1)
case _ => new
OffsetAndEpoch(math.max(leaderState.earliestPendingUploadOffset,
leaderState.logStartOffset), leaderState.leaderEpoch)
}
```
##########
core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala:
##########
@@ -818,4 +820,96 @@ class ReplicaFetcherThreadTest {
when(replicaManager.localLogOrException(t2p1)).thenReturn(log)
when(replicaManager.getPartitionOrException(t2p1)).thenReturn(partition)
}
+
+ @ParameterizedTest
+ @CsvSource(Array(
+ "false, false, compact, 0, 0, false",
+ "false, false, compact, 5, 0, false",
+ "false, false, compact, 5, 1, false",
+ "false, false, delete, 0, 0, false",
+ "false, false, delete, 5, 0, false",
+ "false, false, delete, 5, 1, false",
+ "false, true, compact, 0, 0, false",
+ "false, true, compact, 5, 0, false",
+ "false, true, compact, 5, 1, false",
+ "false, true, delete, 0, 0, false",
+ "false, true, delete, 5, 0, false",
+ "false, true, delete, 5, 1, false",
+ "true, false, compact, 0, 0, false",
+ "true, false, compact, 5, 0, false",
+ "true, false, compact, 5, 1, false",
+ "true, false, delete, 0, 0, false",
+ "true, false, delete, 5, 0, false",
+ "true, false, delete, 5, 1, false",
+ "true, true, compact, 0, 0, false",
+ "true, true, compact, 5, 0, false",
+ "true, true, compact, 5, 1, false",
+ "true, true, delete, 0, 0, false",
+ "true, true, delete, 5, 0, true",
+ "true, true, delete, 5, 1, false"))
+ def testShouldFetchFromLastTieredOffset(enableLastTieredOffsetFetch: Boolean,
+ remoteStorageEnabled: Boolean,
+ cleanUpPolicy: String,
+ leaderEndOffset: Long,
+ replicaEndOffset: Long,
+ expected: Boolean): Unit = {
+ val tp = new TopicPartition("t", 0)
+
+ def runScenario(enableLastTieredOffsetFetch: Boolean,
+ cleanupPolicy: String,
+ remoteStorageEnabled: Boolean,
+ leaderEndOffset: Long,
+ replicaEndOffset: Long): Boolean = {
+ val props = TestUtils.createBrokerConfig(1)
+
props.put(ReplicationConfigs.FOLLOWER_FETCH_LAST_TIERED_OFFSET_ENABLE_CONFIG,
String.valueOf(enableLastTieredOffsetFetch))
+ val config = KafkaConfig.fromProps(props)
+
+ val mockBlockingSend: BlockingSend = mock(classOf[BlockingSend])
+ when(mockBlockingSend.brokerEndPoint()).thenReturn(brokerEndPoint)
+
+ val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+ when(replicaManager.brokerTopicStats).thenReturn(new BrokerTopicStats)
+
+ val lcOverrides = new Properties()
+ lcOverrides.put(TopicConfig.CLEANUP_POLICY_CONFIG, cleanupPolicy)
+ val logConfig = LogConfig.fromProps(config.extractLogConfigMap,
lcOverrides)
+
+ val log: UnifiedLog = mock(classOf[UnifiedLog])
+ when(log.config).thenReturn(logConfig)
+ when(log.remoteLogEnabled()).thenReturn(remoteStorageEnabled)
+ when(replicaManager.localLog(tp)).thenReturn(Some(log))
+
+ val logContext = new LogContext(s"[ReplicaFetcher
replicaId=${config.brokerId}, leaderId=${mockBlockingSend.brokerEndPoint().id},
fetcherId=0] ")
+ val fetchSessionHandler = new FetchSessionHandler(logContext,
mockBlockingSend.brokerEndPoint().id)
+ val leader = new RemoteLeaderEndPoint(logContext.logPrefix,
mockBlockingSend, fetchSessionHandler,
+ config, replicaManager, UNBOUNDED_QUOTA, () =>
MetadataVersion.MINIMUM_VERSION, () => 1)
+
+ val thread = new MockReplicaFetcherThread("test-fetcher", leader,
config, failedPartitions, replicaManager, UNBOUNDED_QUOTA, logContext.logPrefix)
+ thread.callShouldFetchFromLastTieredOffset(tp, leaderEndOffset,
replicaEndOffset)
+ }
+
+ assertEquals(
+ expected,
+ runScenario(
+ enableLastTieredOffsetFetch = enableLastTieredOffsetFetch,
+ remoteStorageEnabled = remoteStorageEnabled,
+ cleanupPolicy = cleanUpPolicy,
+ leaderEndOffset = leaderEndOffset,
+ replicaEndOffset = replicaEndOffset
+ )
+ )
+ }
+
+ private class MockReplicaFetcherThread(name: String,
Review Comment:
can we remove this `MockReplicaFetcherThread` and expose the
`shouldFetchFromLastTieredOffset` in RFT to server package?
```scala
override protected[server] def
shouldFetchFromLastTieredOffset(topicPartition: TopicPartition,
leaderEndOffset: Long, replicaEndOffset: Long): Boolean
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]