[
https://issues.apache.org/jira/browse/KAFKA-19523?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Kamal Chandraprakash updated KAFKA-19523:
-----------------------------------------
Description:
Improve the error handling while building the remote-log-auxiliary state when a
follower node with an empty disk begin to synchronise with the leader. If the
topic has remote storage enabled, then the ReplicaFetcherThread attempt to
build the remote-log-auxiliary state. Note that the remote-log-auxiliary state
gets invoked only when the leader-log-start-offset is non-zero and
leader-log-start-offset is not equal to leader-local-log-start-offset.
When the LeaderAndISR request is received, then the
ReplicaManager#becomeLeaderOrFollower invokes 'makeFollowers' initially,
followed by the RemoteLogManager#onLeadershipChange call. As a result, when
ReplicaFetcherThread initiates the
RemoteLogManager#fetchRemoteLogSegmentMetadata, the partition may not have been
initialized at that time.
Introduce RetriableRemoteStorageException to gracefully handle the error.
stacktrace:
{code}
[2025-07-19 19:15:47,915] ERROR [ReplicaFetcher replicaId=3, leaderId=2,
fetcherId=0] Error building remote log auxiliary state for orange-0
(kafka.server.ReplicaFetcherThread)
java.lang.IllegalStateException: This instance is in invalid state,
initialized: false close: false
at
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed(TopicBasedRemoteLogMetadataManager.java:569)
~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(TopicBasedRemoteLogMetadataManager.java:221)
~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.server.log.remote.storage.RemoteLogManager.fetchRemoteLogSegmentMetadata(RemoteLogManager.java:606)
~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
at
kafka.server.TierStateMachine.buildRemoteLogAuxState(TierStateMachine.java:233)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at kafka.server.TierStateMachine.start(TierStateMachine.java:114)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:785)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:434)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at scala.Option.foreach(Option.scala:437) ~[scala-library-2.13.16.jar:?]
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:342)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:341)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:430)
~[scala-library-2.13.16.jar:?]
at
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:426)
~[scala-library-2.13.16.jar:?]
at
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:344)
~[scala-library-2.13.16.jar:?]
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:341)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at java.base/java.util.Optional.ifPresent(Optional.java:178) [?:?]
at
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:117)
[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:96)
[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136)
[kafka-server-common-4.2.0-SNAPSHOT.jar:?]
{code}
The exception gets thrown repeatedly until the
RemoteLogMetadataManager#isReady(topicIdPartition) becomes true. This is a
retriable error so we have to handle it gracefully.
was:
Improve the error handling while building the remote-log-auxiliary state when a
follower node with an empty disk begin to synchronise with the leader. If the
topic has remote storage enabled, then the ReplicaFetcherThread attempt to
build the remote-log-auxiliary state. Note that the remote-log-auxiliary state
gets invoked only when the leader-log-start-offset is non-zero and
leader-log-start-offset is not equal to leader-local-log-start-offset.
When the LeaderAndISR request is received, then the
ReplicaManager#becomeLeaderOrFollower invokes 'makeFollowers' initially,
followed by the RemoteLogManager#onLeadershipChange call. As a result, when
ReplicaFetcherThread initiates the
RemoteLogManager#fetchRemoteLogSegmentMetadata, the partition may not have been
initialized at that time.
Introducing a new RetriableRemoteStorageException requires a KIP as it is a
public API change, so wrap the IllegalStateException in RemoteStorageException
to gracefully handle the error.
stacktrace:
{code}
[2025-07-19 19:15:47,915] ERROR [ReplicaFetcher replicaId=3, leaderId=2,
fetcherId=0] Error building remote log auxiliary state for orange-0
(kafka.server.ReplicaFetcherThread)
java.lang.IllegalStateException: This instance is in invalid state,
initialized: false close: false
at
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed(TopicBasedRemoteLogMetadataManager.java:569)
~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(TopicBasedRemoteLogMetadataManager.java:221)
~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.server.log.remote.storage.RemoteLogManager.fetchRemoteLogSegmentMetadata(RemoteLogManager.java:606)
~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
at
kafka.server.TierStateMachine.buildRemoteLogAuxState(TierStateMachine.java:233)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at kafka.server.TierStateMachine.start(TierStateMachine.java:114)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:785)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:434)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at scala.Option.foreach(Option.scala:437) ~[scala-library-2.13.16.jar:?]
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:342)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:341)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:430)
~[scala-library-2.13.16.jar:?]
at
scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:426)
~[scala-library-2.13.16.jar:?]
at
scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:344)
~[scala-library-2.13.16.jar:?]
at
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:341)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at java.base/java.util.Optional.ifPresent(Optional.java:178) [?:?]
at
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:117)
[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:96)
[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136)
[kafka-server-common-4.2.0-SNAPSHOT.jar:?]
{code}
The exception gets thrown repeatedly until the
RemoteLogMetadataManager#isReady(topicIdPartition) becomes true. This is a
retriable error so we have to handle it gracefully.
> Gracefully handle error while building remoteLogAuxState
> --------------------------------------------------------
>
> Key: KAFKA-19523
> URL: https://issues.apache.org/jira/browse/KAFKA-19523
> Project: Kafka
> Issue Type: Task
> Reporter: Kamal Chandraprakash
> Assignee: Kamal Chandraprakash
> Priority: Major
>
> Improve the error handling while building the remote-log-auxiliary state when
> a follower node with an empty disk begin to synchronise with the leader. If
> the topic has remote storage enabled, then the ReplicaFetcherThread attempt
> to build the remote-log-auxiliary state. Note that the remote-log-auxiliary
> state gets invoked only when the leader-log-start-offset is non-zero and
> leader-log-start-offset is not equal to leader-local-log-start-offset.
> When the LeaderAndISR request is received, then the
> ReplicaManager#becomeLeaderOrFollower invokes 'makeFollowers' initially,
> followed by the RemoteLogManager#onLeadershipChange call. As a result, when
> ReplicaFetcherThread initiates the
> RemoteLogManager#fetchRemoteLogSegmentMetadata, the partition may not have
> been initialized at that time.
> Introduce RetriableRemoteStorageException to gracefully handle the error.
> stacktrace:
> {code}
> [2025-07-19 19:15:47,915] ERROR [ReplicaFetcher replicaId=3, leaderId=2,
> fetcherId=0] Error building remote log auxiliary state for orange-0
> (kafka.server.ReplicaFetcherThread)
> java.lang.IllegalStateException: This instance is in invalid state,
> initialized: false close: false
> at
> org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.ensureInitializedAndNotClosed(TopicBasedRemoteLogMetadataManager.java:569)
> ~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManager.remoteLogSegmentMetadata(TopicBasedRemoteLogMetadataManager.java:221)
> ~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.server.log.remote.storage.RemoteLogManager.fetchRemoteLogSegmentMetadata(RemoteLogManager.java:606)
> ~[kafka-storage-4.2.0-SNAPSHOT.jar:?]
> at
> kafka.server.TierStateMachine.buildRemoteLogAuxState(TierStateMachine.java:233)
> ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
> at kafka.server.TierStateMachine.start(TierStateMachine.java:114)
> ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
> at
> kafka.server.AbstractFetcherThread.handleOffsetsMovedToTieredStorage(AbstractFetcherThread.scala:785)
> ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
> at
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$7(AbstractFetcherThread.scala:434)
> ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
> at scala.Option.foreach(Option.scala:437)
> ~[scala-library-2.13.16.jar:?]
> at
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6(AbstractFetcherThread.scala:342)
> ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
> at
> kafka.server.AbstractFetcherThread.$anonfun$processFetchRequest$6$adapted(AbstractFetcherThread.scala:341)
> ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
> at
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry(JavaCollectionWrappers.scala:430)
> ~[scala-library-2.13.16.jar:?]
> at
> scala.collection.convert.JavaCollectionWrappers$JMapWrapperLike.foreachEntry$(JavaCollectionWrappers.scala:426)
> ~[scala-library-2.13.16.jar:?]
> at
> scala.collection.convert.JavaCollectionWrappers$AbstractJMapWrapper.foreachEntry(JavaCollectionWrappers.scala:344)
> ~[scala-library-2.13.16.jar:?]
> at
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:341)
> ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
> at
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:137)
> ~[kafka_2.13-4.2.0-SNAPSHOT.jar:?]
> at java.base/java.util.Optional.ifPresent(Optional.java:178) [?:?]
> at
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:136)
> [kafka_2.13-4.2.0-SNAPSHOT.jar:?]
> at
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:117)
> [kafka_2.13-4.2.0-SNAPSHOT.jar:?]
> at
> kafka.server.ReplicaFetcherThread.doWork(ReplicaFetcherThread.scala:96)
> [kafka_2.13-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:136)
> [kafka-server-common-4.2.0-SNAPSHOT.jar:?]
> {code}
> The exception gets thrown repeatedly until the
> RemoteLogMetadataManager#isReady(topicIdPartition) becomes true. This is a
> retriable error so we have to handle it gracefully.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)