This is an automated email from the ASF dual-hosted git repository.
kamalcph pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 93adaea5999 KAFKA-19523: Gracefully handle error while building
remoteLogAuxState (#20201)
93adaea5999 is described below
commit 93adaea59990da36730b6f07675cb5ca9a54ff43
Author: Kamal Chandraprakash <[email protected]>
AuthorDate: Wed Jul 23 19:29:31 2025 +0530
KAFKA-19523: Gracefully handle error while building remoteLogAuxState
(#20201)
Improve the error handling while building the remote-log-auxiliary state
when a follower node with an empty disk begin to synchronise with the
leader. If the topic has remote storage enabled, then the
ReplicaFetcherThread attempt to build the remote-log-auxiliary state.
Note that the remote-log-auxiliary state gets invoked only when the
leader-log-start-offset is non-zero and leader-log-start-offset is not
equal to leader-local-log-start-offset.
When the LeaderAndISR request is received, then the
ReplicaManager#becomeLeaderOrFollower invokes 'makeFollowers' initially,
followed by the RemoteLogManager#onLeadershipChange call. As a result,
when ReplicaFetcherThread initiates the
RemoteLogManager#fetchRemoteLogSegmentMetadata, the partition may not
have been initialized at that time and throws retriable exception.
Introduced RetriableRemoteStorageException to gracefully handle the
error.
After the patch:
```
[2025-07-19 19:28:20,934] INFO [ReplicaFetcher replicaId=3, leaderId=1,
fetcherId=0] Could not build remote log auxiliary state for orange-1 due
to error: RemoteLogManager is not ready for partition: orange-1
(kafka.server.ReplicaFetcherThread)
[2025-07-19 19:28:20,934] INFO [ReplicaFetcher replicaId=3, leaderId=2,
fetcherId=0] Could not build remote log auxiliary state for orange-0 due
to error: RemoteLogManager is not ready for partition: orange-0
(kafka.server.ReplicaFetcherThread)
```
Reviewers: Luke Chen <[email protected]>, Satish Duggana
<[email protected]>
---
.../main/java/kafka/server/TierStateMachine.java | 5 +++
.../scala/kafka/server/AbstractFetcherThread.scala | 4 ++-
.../unit/kafka/server/ReplicaManagerTest.scala | 3 +-
.../storage/RemoteStorageNotReadyException.java | 40 ++++++++++++++++++++++
.../storage/RetriableRemoteStorageException.java | 39 +++++++++++++++++++++
.../log/remote/storage/RemoteLogManager.java | 9 +++++
.../log/remote/storage/RemoteLogManagerTest.java | 12 +++++++
7 files changed, 110 insertions(+), 2 deletions(-)
diff --git a/core/src/main/java/kafka/server/TierStateMachine.java
b/core/src/main/java/kafka/server/TierStateMachine.java
index ede941907f2..9d8dcafd203 100644
--- a/core/src/main/java/kafka/server/TierStateMachine.java
+++ b/core/src/main/java/kafka/server/TierStateMachine.java
@@ -35,6 +35,7 @@ import
org.apache.kafka.server.log.remote.storage.RemoteLogManager;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
+import
org.apache.kafka.server.log.remote.storage.RemoteStorageNotReadyException;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.log.EpochEntry;
import org.apache.kafka.storage.internals.log.LogFileUtils;
@@ -230,6 +231,10 @@ public class TierStateMachine {
}
}
+ if (!rlm.isPartitionReady(topicPartition)) {
+ throw new RemoteStorageNotReadyException("RemoteLogManager is not
ready for partition: " + topicPartition);
+ }
+
RemoteLogSegmentMetadata remoteLogSegmentMetadata =
rlm.fetchRemoteLogSegmentMetadata(topicPartition, targetEpoch,
previousOffsetToLeaderLocalLogStartOffset)
.orElseThrow(() -> buildRemoteStorageException(topicPartition,
targetEpoch, currentLeaderEpoch,
leaderLocalLogStartOffset, leaderLogStartOffset));
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 03a46f4fba8..8dd621d1950 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.server.LeaderEndPoint
import org.apache.kafka.server.ResultWithPartitions
import org.apache.kafka.server.ReplicaState
import org.apache.kafka.server.PartitionFetchState
+import
org.apache.kafka.server.log.remote.storage.RetriableRemoteStorageException
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.LogAppendInfo
@@ -796,7 +797,8 @@ abstract class AbstractFetcherThread(name: String,
onPartitionFenced(topicPartition, leaderEpochInRequest)
case e@(_: UnknownTopicOrPartitionException |
_: UnknownLeaderEpochException |
- _: NotLeaderOrFollowerException) =>
+ _: NotLeaderOrFollowerException |
+ _: RetriableRemoteStorageException) =>
info(s"Could not build remote log auxiliary state for $topicPartition
due to error: ${e.getMessage}")
false
case e: Throwable =>
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 623c282185b..c5393117bdc 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -3717,6 +3717,7 @@ class ReplicaManagerTest {
val storageManager = mock(classOf[RemoteStorageManager])
when(storageManager.fetchIndex(any(), any())).thenReturn(new
ByteArrayInputStream("0".getBytes()))
when(remoteLogManager.storageManager()).thenReturn(storageManager)
+ when(remoteLogManager.isPartitionReady(any())).thenReturn(true)
val replicaManager = setupReplicaManagerWithMockedPurgatories(new
MockTimer(time), aliveBrokerIds = Seq(0, 1, 2), enableRemoteStorage = true,
shouldMockLog = true, remoteLogManager = Some(remoteLogManager),
buildRemoteLogAuxState = true)
try {
@@ -3775,7 +3776,7 @@ class ReplicaManagerTest {
replicaManager.applyDelta(delta, leaderMetadataImage)
// Replicas fetch from the leader periodically, therefore we check that
the metric value is increasing
- // We expect failedBuildRemoteLogAuxStateRate to increase because there
is no remoteLogSegmentMetadata
+ // We expect failedBuildRemoteLogAuxStateRate to increase because the
RemoteLogManager is not ready for the tp0
// when attempting to build log aux state
TestUtils.waitUntilTrue(() =>
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count
> 0,
"Should have buildRemoteLogAuxStateRequestRate count > 0, but got:" +
brokerTopicStats.topicStats(tp0.topic()).buildRemoteLogAuxStateRequestRate.count)
diff --git
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageNotReadyException.java
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageNotReadyException.java
new file mode 100644
index 00000000000..e36fd307a51
--- /dev/null
+++
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteStorageNotReadyException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+/**
+ * This exception is thrown when a remote storage operation cannot proceed
because the remote storage is not ready.
+ * This may occur in situations where the remote storage (or) metadata layer
is initializing, unreachable,
+ * or temporarily unavailable.
+ * <p>
+ * Instances of this exception indicate that the error is retriable, and the
operation might
+ * succeed if attempted again when the remote storage (or) metadata layer
becomes operational.
+ */
+public class RemoteStorageNotReadyException extends
RetriableRemoteStorageException {
+
+ public RemoteStorageNotReadyException(String message) {
+ super(message);
+ }
+
+ public RemoteStorageNotReadyException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RemoteStorageNotReadyException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RetriableRemoteStorageException.java
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RetriableRemoteStorageException.java
new file mode 100644
index 00000000000..de180ebbaa8
--- /dev/null
+++
b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RetriableRemoteStorageException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+/**
+ * Represents an exception that indicates a retriable error occurred during
remote storage operations.
+ * This exception is thrown when an operation against a remote storage system
has failed due to transient
+ * or temporary issues, and the operation has a reasonable chance of
succeeding if retried.
+ */
+public class RetriableRemoteStorageException extends RemoteStorageException {
+
+ private static final long serialVersionUID = 1L;
+
+ public RetriableRemoteStorageException(String message) {
+ super(message);
+ }
+
+ public RetriableRemoteStorageException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public RetriableRemoteStorageException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
index 9031de010c8..5ca596ec7b7 100644
---
a/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
+++
b/storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java
@@ -788,6 +788,15 @@ public class RemoteLogManager implements Closeable,
AsyncOffsetReader {
return null;
}
+ public boolean isPartitionReady(TopicPartition partition) {
+ Uuid uuid = topicIdByPartitionMap.get(partition);
+ if (uuid == null) {
+ return false;
+ }
+ TopicIdPartition topicIdPartition = new TopicIdPartition(uuid,
partition);
+ return remoteLogMetadataManagerPlugin.get().isReady(topicIdPartition);
+ }
+
abstract class RLMTask extends CancellableRunnable {
protected final TopicIdPartition topicIdPartition;
diff --git
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
index 182fda9abb9..99d9d43c9a1 100644
---
a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
+++
b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogManagerTest.java
@@ -3726,6 +3726,18 @@ public class RemoteLogManagerTest {
verifyNoMoreInteractions(remoteStorageManager);
}
+ @Test
+ public void testIsPartitionReady() throws InterruptedException {
+
assertFalse(remoteLogManager.isPartitionReady(leaderTopicIdPartition.topicPartition()));
+ remoteLogManager.onLeadershipChange(
+ Set.of(mockPartition(leaderTopicIdPartition)),
+ Set.of(mockPartition(followerTopicIdPartition)),
+ topicIds
+ );
+
assertTrue(remoteLogManager.isPartitionReady(leaderTopicIdPartition.topicPartition()));
+
assertTrue(remoteLogManager.isPartitionReady(followerTopicIdPartition.topicPartition()));
+ }
+
@Test
public void testMonitorableRemoteLogStorageManager() throws IOException {
Properties props = new Properties();