This is an automated email from the ASF dual-hosted git repository.

kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 93adaea5999 KAFKA-19523: Gracefully handle error while building 
remoteLogAuxState (#20201)
93adaea5999 is described below

commit 93adaea59990da36730b6f07675cb5ca9a54ff43
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Wed Jul 23 19:29:31 2025 +0530

    KAFKA-19523: Gracefully handle error while building remoteLogAuxState 
(#20201)
    
    Improve the error handling while building the remote-log-auxiliary state
    when a follower node with an empty disk begin to synchronise with the
    leader. If the topic has remote storage enabled, then the
    ReplicaFetcherThread attempt to build the remote-log-auxiliary state.
    Note that the remote-log-auxiliary state gets invoked only when the
    leader-log-start-offset is non-zero and leader-log-start-offset is not
    equal to leader-local-log-start-offset.
    
    When the LeaderAndISR request is received, then the
    ReplicaManager#becomeLeaderOrFollower invokes 'makeFollowers' initially,
    followed by the RemoteLogManager#onLeadershipChange call. As a result,
    when ReplicaFetcherThread initiates the
    RemoteLogManager#fetchRemoteLogSegmentMetadata, the partition may not
    have been initialized at that time and throws retriable exception.
    
    Introduced RetriableRemoteStorageException to gracefully handle the
    error.
    
    After the patch:
    ```
    [2025-07-19 19:28:20,934] INFO [ReplicaFetcher replicaId=3, leaderId=1,
    fetcherId=0] Could not build remote log auxiliary state for orange-1 due
    to error: RemoteLogManager is not ready for partition: orange-1
    (kafka.server.ReplicaFetcherThread)
    [2025-07-19 19:28:20,934] INFO [ReplicaFetcher replicaId=3, leaderId=2,
    fetcherId=0] Could not build remote log auxiliary state for orange-0 due
    to error: RemoteLogManager is not ready for partition: orange-0
    (kafka.server.ReplicaFetcherThread)
    ```
    
    Reviewers: Luke Chen <[email protected]>, Satish Duggana 
<[email protected]>
---
 .../main/java/kafka/server/TierStateMachine.java   |  5 +++
 .../scala/kafka/server/AbstractFetcherThread.scala |  4 ++-
 .../unit/kafka/server/ReplicaManagerTest.scala     |  3 +-
 .../storage/RemoteStorageNotReadyException.java    | 40 ++++++++++++++++++++++
 .../storage/RetriableRemoteStorageException.java   | 39 +++++++++++++++++++++
 .../log/remote/storage/RemoteLogManager.java       |  9 +++++
 .../log/remote/storage/RemoteLogManagerTest.java   | 12 +++++++
 7 files changed, 110 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/kafka/server/TierStateMachine.java 
b/core/src/main/java/kafka/server/TierStateMachine.java
index ede941907f2..9d8dcafd203 100644
--- a/core/src/main/java/kafka/server/TierStateMachine.java
+++ b/core/src/main/java/kafka/server/TierStateMachine.java
@@ -35,6 +35,7 @@ import 
org.apache.kafka.server.log.remote.storage.RemoteLogManager;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import 
org.apache.kafka.server.log.remote.storage.RemoteStorageNotReadyException;
 import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
 import org.apache.kafka.storage.internals.log.EpochEntry;
 import org.apache.kafka.storage.internals.log.LogFileUtils;
@@ -230,6 +231,10 @@ public class TierStateMachine {
             }
         }
 
+        if (!rlm.isPartitionReady(topicPartition)) {
+            throw new RemoteStorageNotReadyException("RemoteLogManager is not 
ready for partition: " + topicPartition);
+        }
+
         RemoteLogSegmentMetadata remoteLogSegmentMetadata = 
rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch, 
previousOffsetToLeaderLocalLogStartOffset)
                 .orElseThrow(() -> buildRemoteStorageException(topicPartition, 
targetEpoch, currentLeaderEpoch,
                         leaderLocalLogStartOffset, leaderLogStartOffset));
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala 
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 03a46f4fba8..8dd621d1950 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.server.LeaderEndPoint
 import org.apache.kafka.server.ResultWithPartitions
 import org.apache.kafka.server.ReplicaState
 import org.apache.kafka.server.PartitionFetchState
+import 
org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.ShutdownableThread
 import org.apache.kafka.storage.internals.log.LogAppendInfo
@@ -796,7 +797,8 @@ abstract class AbstractFetcherThread(name: String,
         onPartitionFenced(topicPartition, leaderEpochInRequest)
       case e@(_: UnknownTopicOrPartitionException |
               _: UnknownLeaderEpochException |
-              _: NotLeaderOrFollowerException) =>
+              _: NotLeaderOrFollowerException |
+              _: RetriableRemoteStorageException) =>
         info(s"Could not build remote log auxiliary state for $topicPartition 
due to error: ${e.getMessage}")
         false
       case e: Throwable =>
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 623c282185b..c5393117bdc 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3717,6 +3717,7 @@ class ReplicaManagerTest {
     val storageManager = mock(classOf[RemoteStorageManager])
     when(storageManager.fetchIndex(any(), any())).thenReturn(new 
ByteArrayInputStream("0".getBytes()))
     when(remoteLogManager.storageManager()).thenReturn(storageManager)
+    when(remoteLogManager.isPartitionReady(any())).thenReturn(true)
 
     val replicaManager = setupReplicaManagerWithMockedPurgatories(new 
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true, 
shouldMockLog = true, remoteLogManager = Some(remoteLogManager), 
buildRemoteLogAuxState = true)
     try {
@@ -3775,7 +3776,7 @@ class ReplicaManagerTest {
       replicaManager.applyDelta(delta, leaderMetadataImage)
 
       // Replicas fetch from the leader periodically, therefore we check that 
the metric value is increasing
-      // We expect failedBuildRemoteLogAuxStateRate to increase because there 
is no remoteLogSegmentMetadata
+      // We expect failedBuildRemoteLogAuxStateRate to increase because the 
RemoteLogManager is not ready for the tp0
       // when attempting to build log aux state
       TestUtils.waitUntilTrue(() => 
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count
 > 0,
         "Should have buildRemoteLogAuxStateRequestRate count > 0, but got:" + 
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageNotReadyException.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageNotReadyException.java
new file mode 100644
index 00000000000..e36fd307a51
--- /dev/null
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageNotReadyException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+/**
+ * This exception is thrown when a remote storage operation cannot proceed 
because the remote storage is not ready.
+ * This may occur in situations where the remote storage (or) metadata layer 
is initializing, unreachable,
+ * or temporarily unavailable.
+ * <p>
+ * Instances of this exception indicate that the error is retriable, and the 
operation might
+ * succeed if attempted again when the remote storage (or) metadata layer 
becomes operational.
+ */
+public class RemoteStorageNotReadyException extends 
RetriableRemoteStorageException {
+
+    public RemoteStorageNotReadyException(String message) {
+        super(message);
+    }
+
+    public RemoteStorageNotReadyException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public RemoteStorageNotReadyException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git 
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RetriableRemoteStorageException.java
 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RetriableRemoteStorageException.java
new file mode 100644
index 00000000000..de180ebbaa8
--- /dev/null
+++ 
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RetriableRemoteStorageException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+/**
+ * Represents an exception that indicates a retriable error occurred during 
remote storage operations.
+ * This exception is thrown when an operation against a remote storage system 
has failed due to transient
+ * or temporary issues, and the operation has a reasonable chance of 
succeeding if retried.
+ */
+public class RetriableRemoteStorageException extends RemoteStorageException {
+
+    private static final long serialVersionUID = 1L;
+
+    public RetriableRemoteStorageException(String message) {
+        super(message);
+    }
+
+    public RetriableRemoteStorageException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public RetriableRemoteStorageException(Throwable cause) {
+        super(cause);
+    }
+}
diff --git 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index 9031de010c8..5ca596ec7b7 100644
--- 
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++ 
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -788,6 +788,15 @@ public class RemoteLogManager implements Closeable, 
AsyncOffsetReader {
         return null;
     }
 
+    public boolean isPartitionReady(TopicPartition partition) {
+        Uuid uuid = topicIdByPartitionMap.get(partition);
+        if (uuid == null) {
+            return false;
+        }
+        TopicIdPartition topicIdPartition = new TopicIdPartition(uuid, 
partition);
+        return remoteLogMetadataManagerPlugin.get().isReady(topicIdPartition);
+    }
+
     abstract class RLMTask extends CancellableRunnable {
 
         protected final TopicIdPartition topicIdPartition;
diff --git 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index 182fda9abb9..99d9d43c9a1 100644
--- 
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++ 
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -3726,6 +3726,18 @@ public class RemoteLogManagerTest {
         verifyNoMoreInteractions(remoteStorageManager);
     }
 
+    @Test
+    public void testIsPartitionReady() throws InterruptedException {
+        
assertFalse(remoteLogManager.isPartitionReady(leaderTopicIdPartition.topicPartition()));
+        remoteLogManager.onLeadershipChange(
+                Set.of(mockPartition(leaderTopicIdPartition)),
+                Set.of(mockPartition(followerTopicIdPartition)),
+                topicIds
+        );
+        
assertTrue(remoteLogManager.isPartitionReady(leaderTopicIdPartition.topicPartition()));
+        
assertTrue(remoteLogManager.isPartitionReady(followerTopicIdPartition.topicPartition()));
+    }
+
     @Test
     public void testMonitorableRemoteLogStorageManager() throws IOException {
         Properties props = new Properties();

Reply via email to