junrao commented on code in PR #16873:
URL: https://github.com/apache/kafka/pull/16873#discussion_r1728047972


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1103,35 +1103,41 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     val responseTopics = authorizedRequestInfo.map { topic =>
       val responsePartitions = topic.partitions.asScala.map { partition =>
-        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
-
-        try {
-          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-            topicPartition = topicPartition,
-            timestamp = partition.timestamp,
-            maxNumOffsets = partition.maxNumOffsets,
-            isFromConsumer = offsetRequest.replicaId == 
ListOffsetsRequest.CONSUMER_REPLICA_ID,
-            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetsRequest.DEBUGGING_REPLICA_ID)
+        if (partition.timestamp() < ListOffsetsRequest.EARLIEST_TIMESTAMP) {
           new ListOffsetsPartitionResponse()
             .setPartitionIndex(partition.partitionIndex)
-            .setErrorCode(Errors.NONE.code)
-            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
-        } catch {
-          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderOrFollowerException are special cases since these error messages
-          // are typically transient and there is no value in logging the 
entire stack trace for the same
-          case e @ (_ : UnknownTopicOrPartitionException |
-                    _ : NotLeaderOrFollowerException |
-                    _ : KafkaStorageException) =>
-            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-              correlationId, clientId, topicPartition, e.getMessage))
-            new ListOffsetsPartitionResponse()
-              .setPartitionIndex(partition.partitionIndex)
-              .setErrorCode(Errors.forException(e).code)
-          case e: Throwable =>
-            error("Error while responding to offset request", e)
+            .setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+        } else {
+          val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
+
+          try {
+            val offsets = replicaManager.legacyFetchOffsetsForTimestamp(

Review Comment:
   > IMHO, checking the version when tiered storage sends request is a enough 
simple solution to backport to 3.9.
   
   This is simple, but I am not sure if this completely solves the problem. 
Note that EARLIEST_LOCAL_TIMESTAMP is not always used by tier storage. So, if 
someone enables tier storage with the wrong MV, the ListOffset issue may show 
up at a much later time. If the broker has the wrong MV, it's probably better 
to prevent enabling tier storage in the first place.
   
   For 3.9, I was thinking that we could change `createRemoteLogManager()` in 
BrokerServer to take the MV into consideration. If the MV is too low, we could 
just return None even if remote storage has been enabled. This is a bit tricky 
since MV is only available after the following statement.
   
   ```
         FutureUtils.waitWithLogging(logger.underlying, logIdent,
           "the initial broker metadata update to be published",
           brokerMetadataPublisher.firstPublishFuture , startupDeadline, time)
   ```
   However, remoteLogManager is created and passed into ReplicaManager before 
that. Maybe we could have a method to set the remoteLogManager instead of 
passing it through the constructor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to