chia7712 commented on code in PR #16873:
URL: https://github.com/apache/kafka/pull/16873#discussion_r1737601713


##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -355,6 +355,10 @@ public boolean isKRaftSupported() {
         return this.featureLevel > 0;
     }
 
+    public boolean isEarliestLocalOffsetSupported() {

Review Comment:
   please revert this change



##########
server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java:
##########
@@ -383,6 +383,12 @@ public void testIsElrSupported(MetadataVersion 
metadataVersion) {
         assertEquals(metadataVersion.isAtLeast(IBP_4_0_IV1), 
metadataVersion.isElrSupported());
     }
 
+    @ParameterizedTest
+    @EnumSource(value = MetadataVersion.class)
+    public void testIsEarliestLocalOffsetSupported(MetadataVersion 
metadataVersion) {

Review Comment:
   ditto



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1104,35 +1104,41 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val responseTopics = authorizedRequestInfo.map { topic =>
       val responsePartitions = topic.partitions.asScala.map { partition =>
-        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
-
-        try {
-          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-            topicPartition = topicPartition,
-            timestamp = partition.timestamp,
-            maxNumOffsets = partition.maxNumOffsets,
-            isFromConsumer = offsetRequest.replicaId == 
ListOffsetsRequest.CONSUMER_REPLICA_ID,
-            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetsRequest.DEBUGGING_REPLICA_ID)
+        if (partition.timestamp() < ListOffsetsRequest.EARLIEST_TIMESTAMP) {

Review Comment:
   Could you please add comments



##########
core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala:
##########
@@ -104,6 +104,9 @@ class RemoteLeaderEndPoint(logPrefix: String,
   }
 
   override def fetchEarliestLocalOffset(topicPartition: TopicPartition, 
currentLeaderEpoch: Int): OffsetAndEpoch = {
+    val metadataVersion = metadataVersionSupplier()
+    if (!metadataVersion.isEarliestLocalOffsetSupported)

Review Comment:
   please revert it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to