zheguang commented on code in PR #20553:
URL: https://github.com/apache/kafka/pull/20553#discussion_r2409002908


##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##########
@@ -202,23 +207,61 @@ private void handlePartitionError(
     public Map<TopicPartition, Throwable> handleUnsupportedVersionException(
         int brokerId, UnsupportedVersionException exception, 
Set<TopicPartition> keys
     ) {
-        log.warn("Broker {} does not support MAX_TIMESTAMP offset specs", 
brokerId);
-        Map<TopicPartition, Throwable> maxTimestampPartitions = new 
HashMap<>();
+        log.warn("Broker {} does not support {} offset specs", brokerId, 
timestampToString(currentUnsupportedVersion));
+        Map<TopicPartition, Throwable> supportedTimestampPartitions = new 
HashMap<>();
+
         for (TopicPartition topicPartition : keys) {
             Long offsetTimestamp = 
offsetTimestampsByPartition.get(topicPartition);
-            if (offsetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
-                maxTimestampPartitions.put(topicPartition, exception);
+            if (offsetTimestamp != null && offsetTimestamp == 
currentUnsupportedVersion && 
!accUnsupportedTimestampPartition.containsKey(topicPartition)) {
+                supportedTimestampPartitions.put(topicPartition, exception);
+                accUnsupportedTimestampPartition.put(topicPartition, 
exception);
             }
         }
-        // If there are no partitions with MAX_TIMESTAMP specs the 
UnsupportedVersionException cannot be handled
+
+        // If there are no partitions with support specs the 
UnsupportedVersionException cannot be handled
         // and all partitions should be failed here.
-        // Otherwise, just the partitions with MAX_TIMESTAMP specs should be 
failed here and the fulfillment stage
-        // will later be retried for the potentially empty set of partitions 
with non-MAX_TIMESTAMP specs.
-        if (maxTimestampPartitions.isEmpty()) {
+        // Otherwise, just the partitions with support specs should be failed 
here and the fulfillment stage
+        // will later be retried for the potentially empty set of partitions 
with non-support specs.
+        if (unsupportedVersionRetry == maxUnsupportedVersionRetry && 
accUnsupportedTimestampPartition.isEmpty()) {
             return keys.stream().collect(Collectors.toMap(k -> k, k -> 
exception));
         } else {
-            return maxTimestampPartitions;
+            return supportedTimestampPartitions;
+        }
+    }
+
+    public void downgradeOffsetTimestampVersion() {
+        if (unsupportedVersionRetry >= maxUnsupportedVersionRetry) {
+            throw new IllegalStateException("Index should not larger than " + 
maxUnsupportedVersionRetry);
         }
+
+        unsupportedVersionRetry += 1;
+        currentUnsupportedVersion = 
ListOffsetsRequest.LEAST_TO_OLDEST_TIMESTAMPS.get(unsupportedVersionRetry);
+    }
+
+    public void reset() {
+        currentUnsupportedVersion = 
ListOffsetsRequest.LEAST_TO_OLDEST_TIMESTAMPS.get(0);
+        unsupportedVersionRetry = 0;
+        accUnsupportedTimestampPartition.clear();
+    }
+
+    public boolean isOldestTimstamp() {

Review Comment:
   Typo: `isOldestTimstamp` ~> `isOldestTimestamp` 



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##########
@@ -202,23 +207,61 @@ private void handlePartitionError(
     public Map<TopicPartition, Throwable> handleUnsupportedVersionException(
         int brokerId, UnsupportedVersionException exception, 
Set<TopicPartition> keys
     ) {
-        log.warn("Broker {} does not support MAX_TIMESTAMP offset specs", 
brokerId);
-        Map<TopicPartition, Throwable> maxTimestampPartitions = new 
HashMap<>();
+        log.warn("Broker {} does not support {} offset specs", brokerId, 
timestampToString(currentUnsupportedVersion));
+        Map<TopicPartition, Throwable> supportedTimestampPartitions = new 
HashMap<>();
+
         for (TopicPartition topicPartition : keys) {
             Long offsetTimestamp = 
offsetTimestampsByPartition.get(topicPartition);
-            if (offsetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
-                maxTimestampPartitions.put(topicPartition, exception);
+            if (offsetTimestamp != null && offsetTimestamp == 
currentUnsupportedVersion && 
!accUnsupportedTimestampPartition.containsKey(topicPartition)) {
+                supportedTimestampPartitions.put(topicPartition, exception);
+                accUnsupportedTimestampPartition.put(topicPartition, 
exception);
             }
         }
-        // If there are no partitions with MAX_TIMESTAMP specs the 
UnsupportedVersionException cannot be handled
+
+        // If there are no partitions with support specs the 
UnsupportedVersionException cannot be handled
         // and all partitions should be failed here.
-        // Otherwise, just the partitions with MAX_TIMESTAMP specs should be 
failed here and the fulfillment stage
-        // will later be retried for the potentially empty set of partitions 
with non-MAX_TIMESTAMP specs.
-        if (maxTimestampPartitions.isEmpty()) {
+        // Otherwise, just the partitions with support specs should be failed 
here and the fulfillment stage
+        // will later be retried for the potentially empty set of partitions 
with non-support specs.
+        if (unsupportedVersionRetry == maxUnsupportedVersionRetry && 
accUnsupportedTimestampPartition.isEmpty()) {
             return keys.stream().collect(Collectors.toMap(k -> k, k -> 
exception));
         } else {
-            return maxTimestampPartitions;
+            return supportedTimestampPartitions;
+        }
+    }
+
+    public void downgradeOffsetTimestampVersion() {
+        if (unsupportedVersionRetry >= maxUnsupportedVersionRetry) {
+            throw new IllegalStateException("Index should not larger than " + 
maxUnsupportedVersionRetry);
         }
+
+        unsupportedVersionRetry += 1;
+        currentUnsupportedVersion = 
ListOffsetsRequest.LEAST_TO_OLDEST_TIMESTAMPS.get(unsupportedVersionRetry);
+    }
+
+    public void reset() {
+        currentUnsupportedVersion = 
ListOffsetsRequest.LEAST_TO_OLDEST_TIMESTAMPS.get(0);
+        unsupportedVersionRetry = 0;
+        accUnsupportedTimestampPartition.clear();
+    }
+
+    public boolean isOldestTimstamp() {
+        return currentUnsupportedVersion == 
ListOffsetsRequest.LEAST_TO_OLDEST_TIMESTAMPS.get(

Review Comment:
   This `get()` could simplify to `getLast()`.



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminApiDriver.java:
##########
@@ -284,6 +284,17 @@ public void onFailure(
                         (UnsupportedVersionException) t,
                         spec.keys);
                 completeExceptionally(unrecoverableFailures);
+                if (handler instanceof ListOffsetsHandler) {
+                    // We dont need to do other operations because 
completeExceptionally
+                    // help us to remove lookup and fulfillmentMap, and the 
downgraded
+                    // offsetStamp still in fulfillmentMap so it will generate 
next request.
+                    ListOffsetsHandler listOffsetsHandler = 
(ListOffsetsHandler) handler;
+                    if (!listOffsetsHandler.isOldestTimstamp()) {

Review Comment:
   Typo: isOldestTimstamp ~> isOldestTimestamp



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##########
@@ -202,23 +207,61 @@ private void handlePartitionError(
     public Map<TopicPartition, Throwable> handleUnsupportedVersionException(
         int brokerId, UnsupportedVersionException exception, 
Set<TopicPartition> keys
     ) {
-        log.warn("Broker {} does not support MAX_TIMESTAMP offset specs", 
brokerId);
-        Map<TopicPartition, Throwable> maxTimestampPartitions = new 
HashMap<>();
+        log.warn("Broker {} does not support {} offset specs", brokerId, 
timestampToString(currentUnsupportedVersion));
+        Map<TopicPartition, Throwable> supportedTimestampPartitions = new 
HashMap<>();
+
         for (TopicPartition topicPartition : keys) {
             Long offsetTimestamp = 
offsetTimestampsByPartition.get(topicPartition);
-            if (offsetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
-                maxTimestampPartitions.put(topicPartition, exception);
+            if (offsetTimestamp != null && offsetTimestamp == 
currentUnsupportedVersion && 
!accUnsupportedTimestampPartition.containsKey(topicPartition)) {
+                supportedTimestampPartitions.put(topicPartition, exception);
+                accUnsupportedTimestampPartition.put(topicPartition, 
exception);
             }
         }
-        // If there are no partitions with MAX_TIMESTAMP specs the 
UnsupportedVersionException cannot be handled
+
+        // If there are no partitions with support specs the 
UnsupportedVersionException cannot be handled
         // and all partitions should be failed here.
-        // Otherwise, just the partitions with MAX_TIMESTAMP specs should be 
failed here and the fulfillment stage
-        // will later be retried for the potentially empty set of partitions 
with non-MAX_TIMESTAMP specs.
-        if (maxTimestampPartitions.isEmpty()) {
+        // Otherwise, just the partitions with support specs should be failed 
here and the fulfillment stage
+        // will later be retried for the potentially empty set of partitions 
with non-support specs.
+        if (unsupportedVersionRetry == maxUnsupportedVersionRetry && 
accUnsupportedTimestampPartition.isEmpty()) {
             return keys.stream().collect(Collectors.toMap(k -> k, k -> 
exception));
         } else {
-            return maxTimestampPartitions;
+            return supportedTimestampPartitions;
+        }
+    }
+
+    public void downgradeOffsetTimestampVersion() {
+        if (unsupportedVersionRetry >= maxUnsupportedVersionRetry) {
+            throw new IllegalStateException("Index should not larger than " + 
maxUnsupportedVersionRetry);
         }
+
+        unsupportedVersionRetry += 1;
+        currentUnsupportedVersion = 
ListOffsetsRequest.LEAST_TO_OLDEST_TIMESTAMPS.get(unsupportedVersionRetry);
+    }
+
+    public void reset() {
+        currentUnsupportedVersion = 
ListOffsetsRequest.LEAST_TO_OLDEST_TIMESTAMPS.get(0);
+        unsupportedVersionRetry = 0;
+        accUnsupportedTimestampPartition.clear();
+    }
+
+    public boolean isOldestTimstamp() {
+        return currentUnsupportedVersion == 
ListOffsetsRequest.LEAST_TO_OLDEST_TIMESTAMPS.get(
+                ListOffsetsRequest.LEAST_TO_OLDEST_TIMESTAMPS.size() - 1);
+    }
+
+    // Visible for test
+    public long currentUnsupportedVersion() {
+        return currentUnsupportedVersion;
+    }
+
+    // Visible for test
+    public int unsupportedVersionRetry() {

Review Comment:
   Same as above, scope could be package-private.



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##########
@@ -202,23 +207,61 @@ private void handlePartitionError(
     public Map<TopicPartition, Throwable> handleUnsupportedVersionException(
         int brokerId, UnsupportedVersionException exception, 
Set<TopicPartition> keys
     ) {
-        log.warn("Broker {} does not support MAX_TIMESTAMP offset specs", 
brokerId);
-        Map<TopicPartition, Throwable> maxTimestampPartitions = new 
HashMap<>();
+        log.warn("Broker {} does not support {} offset specs", brokerId, 
timestampToString(currentUnsupportedVersion));
+        Map<TopicPartition, Throwable> supportedTimestampPartitions = new 
HashMap<>();
+
         for (TopicPartition topicPartition : keys) {
             Long offsetTimestamp = 
offsetTimestampsByPartition.get(topicPartition);
-            if (offsetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
-                maxTimestampPartitions.put(topicPartition, exception);
+            if (offsetTimestamp != null && offsetTimestamp == 
currentUnsupportedVersion && 
!accUnsupportedTimestampPartition.containsKey(topicPartition)) {
+                supportedTimestampPartitions.put(topicPartition, exception);
+                accUnsupportedTimestampPartition.put(topicPartition, 
exception);

Review Comment:
   It's quite confusing that both "supported" and "unsupported" partitions are 
accumulated here.  Could the logic be clarified?



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##########
@@ -202,23 +207,61 @@ private void handlePartitionError(
     public Map<TopicPartition, Throwable> handleUnsupportedVersionException(
         int brokerId, UnsupportedVersionException exception, 
Set<TopicPartition> keys
     ) {
-        log.warn("Broker {} does not support MAX_TIMESTAMP offset specs", 
brokerId);
-        Map<TopicPartition, Throwable> maxTimestampPartitions = new 
HashMap<>();
+        log.warn("Broker {} does not support {} offset specs", brokerId, 
timestampToString(currentUnsupportedVersion));
+        Map<TopicPartition, Throwable> supportedTimestampPartitions = new 
HashMap<>();
+
         for (TopicPartition topicPartition : keys) {
             Long offsetTimestamp = 
offsetTimestampsByPartition.get(topicPartition);
-            if (offsetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
-                maxTimestampPartitions.put(topicPartition, exception);
+            if (offsetTimestamp != null && offsetTimestamp == 
currentUnsupportedVersion && 
!accUnsupportedTimestampPartition.containsKey(topicPartition)) {
+                supportedTimestampPartitions.put(topicPartition, exception);
+                accUnsupportedTimestampPartition.put(topicPartition, 
exception);
             }
         }
-        // If there are no partitions with MAX_TIMESTAMP specs the 
UnsupportedVersionException cannot be handled
+
+        // If there are no partitions with support specs the 
UnsupportedVersionException cannot be handled
         // and all partitions should be failed here.
-        // Otherwise, just the partitions with MAX_TIMESTAMP specs should be 
failed here and the fulfillment stage
-        // will later be retried for the potentially empty set of partitions 
with non-MAX_TIMESTAMP specs.
-        if (maxTimestampPartitions.isEmpty()) {
+        // Otherwise, just the partitions with support specs should be failed 
here and the fulfillment stage
+        // will later be retried for the potentially empty set of partitions 
with non-support specs.
+        if (unsupportedVersionRetry == maxUnsupportedVersionRetry && 
accUnsupportedTimestampPartition.isEmpty()) {
             return keys.stream().collect(Collectors.toMap(k -> k, k -> 
exception));
         } else {
-            return maxTimestampPartitions;
+            return supportedTimestampPartitions;
+        }
+    }
+
+    public void downgradeOffsetTimestampVersion() {
+        if (unsupportedVersionRetry >= maxUnsupportedVersionRetry) {
+            throw new IllegalStateException("Index should not larger than " + 
maxUnsupportedVersionRetry);
         }
+
+        unsupportedVersionRetry += 1;
+        currentUnsupportedVersion = 
ListOffsetsRequest.LEAST_TO_OLDEST_TIMESTAMPS.get(unsupportedVersionRetry);
+    }
+
+    public void reset() {
+        currentUnsupportedVersion = 
ListOffsetsRequest.LEAST_TO_OLDEST_TIMESTAMPS.get(0);
+        unsupportedVersionRetry = 0;
+        accUnsupportedTimestampPartition.clear();
+    }
+
+    public boolean isOldestTimstamp() {
+        return currentUnsupportedVersion == 
ListOffsetsRequest.LEAST_TO_OLDEST_TIMESTAMPS.get(
+                ListOffsetsRequest.LEAST_TO_OLDEST_TIMESTAMPS.size() - 1);
+    }
+
+    // Visible for test
+    public long currentUnsupportedVersion() {

Review Comment:
   The scope could be package-private, as only the `ListOffsetsHandlerTest` 
calls it.



##########
clients/src/main/java/org/apache/kafka/clients/admin/internals/ListOffsetsHandler.java:
##########
@@ -202,23 +207,61 @@ private void handlePartitionError(
     public Map<TopicPartition, Throwable> handleUnsupportedVersionException(
         int brokerId, UnsupportedVersionException exception, 
Set<TopicPartition> keys
     ) {
-        log.warn("Broker {} does not support MAX_TIMESTAMP offset specs", 
brokerId);
-        Map<TopicPartition, Throwable> maxTimestampPartitions = new 
HashMap<>();
+        log.warn("Broker {} does not support {} offset specs", brokerId, 
timestampToString(currentUnsupportedVersion));
+        Map<TopicPartition, Throwable> supportedTimestampPartitions = new 
HashMap<>();
+
         for (TopicPartition topicPartition : keys) {
             Long offsetTimestamp = 
offsetTimestampsByPartition.get(topicPartition);
-            if (offsetTimestamp == ListOffsetsRequest.MAX_TIMESTAMP) {
-                maxTimestampPartitions.put(topicPartition, exception);
+            if (offsetTimestamp != null && offsetTimestamp == 
currentUnsupportedVersion && 
!accUnsupportedTimestampPartition.containsKey(topicPartition)) {
+                supportedTimestampPartitions.put(topicPartition, exception);
+                accUnsupportedTimestampPartition.put(topicPartition, 
exception);
             }
         }
-        // If there are no partitions with MAX_TIMESTAMP specs the 
UnsupportedVersionException cannot be handled
+
+        // If there are no partitions with support specs the 
UnsupportedVersionException cannot be handled
         // and all partitions should be failed here.
-        // Otherwise, just the partitions with MAX_TIMESTAMP specs should be 
failed here and the fulfillment stage
-        // will later be retried for the potentially empty set of partitions 
with non-MAX_TIMESTAMP specs.
-        if (maxTimestampPartitions.isEmpty()) {
+        // Otherwise, just the partitions with support specs should be failed 
here and the fulfillment stage
+        // will later be retried for the potentially empty set of partitions 
with non-support specs.
+        if (unsupportedVersionRetry == maxUnsupportedVersionRetry && 
accUnsupportedTimestampPartition.isEmpty()) {

Review Comment:
   This condition suggests that when we've exhausted all retries _and_ 
accumulated no unsupported partitions, we fail everything.  But if we've 
accumulated unsupported partitions during previous retries, this check doesn't 
trigger.  Why not checking `supportedTimestampPartitions.isEmpty()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to