José Armando García Sancio created KAFKA-18723:
--------------------------------------------------
Summary: KRaft must handle corrupted records in the fetch response
Key: KAFKA-18723
URL: https://issues.apache.org/jira/browse/KAFKA-18723
Project: Kafka
Issue Type: Bug
Components: kraft
Reporter: José Armando García Sancio
Assignee: José Armando García Sancio
Fix For: 3.9.1, 3.8.2, 3.7.3
It is possible for a KRaft replica to send corrupted records to the fetching
replicas in the FETCH response. This is because there is a race between when
the FETCH response gets generated by the KRaft IO thread and when the network
thread, or linux kernel, reads the byte position in the log segment.
This race can generated corrupted records if the KRaft replica performed a
truncation after the FETCH response was created but before the network thread
read the bytes from the log segment.
I have seen the following errors:
{code:java}
[ERROR] 2025-01-07 15:04:18,273 [kafka-0-raft-io-thread]
org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault -
Encountered fatal fault: Unexpected error in raft IO thread
org.apache.kafka.common.KafkaException: Append failed unexpectedly
at
kafka.raft.KafkaMetadataLog.handleAndConvertLogAppendInfo(KafkaMetadataLog.scala:117)
at
kafka.raft.KafkaMetadataLog.appendAsFollower(KafkaMetadataLog.scala:110)
at
org.apache.kafka.raft.KafkaRaftClient.appendAsFollower(KafkaRaftClient.java:1227)
at
org.apache.kafka.raft.KafkaRaftClient.handleFetchResponse(KafkaRaftClient.java:1209)
at
org.apache.kafka.raft.KafkaRaftClient.handleResponse(KafkaRaftClient.java:1644)
at
org.apache.kafka.raft.KafkaRaftClient.handleInboundMessage(KafkaRaftClient.java:1770)
at org.apache.kafka.raft.KafkaRaftClient.poll(KafkaRaftClient.java:2355)
at kafka.raft.KafkaRaftManager$RaftIoThread.doWork(RaftManager.scala:71)
at
org.apache.kafka.server.util.ShutdownableThread.run(ShutdownableThread.java:138){code}
and
{code:java}
[ERROR] 2025-01-07 18:06:20,121 [kafka-1-raft-io-thread]
org.apache.kafka.server.fault.ProcessTerminatingFaultHandler handleFault -
Encountered fatal fault: Unexpected error in raft IO thread"
org.apache.kafka.common.errors.CorruptRecordException: Record size 0 is less
than the minimum record overhead (14)"{code}
This issue can also happen in Kafka's ISR based topic partition. The replica
fetcher handles this case by catching CorruptRecordException and
InvalidRecordException.
{code:java}
} catch {
case ime@(_: CorruptRecordException | _:
InvalidRecordException) =>
// we log the error and continue. This ensures two
things
// 1. If there is a corrupt message in a topic
partition, it does not bring the fetcher thread
// down and cause other topic partition to also lag
// 2. If the message is corrupt due to a transient
state in the log (truncation, partial writes
// can cause this), we simply continue and should
get fixed in the subsequent fetches
error(s"Found invalid messages during fetch for
partition $topicPartition " +
s"offset ${currentFetchState.fetchOffset}", ime)
partitionsWithError += topicPartition
{code}
The KRaft implementation doesn't handle this case:
{code:java}
} else {
Records records =
FetchResponse.recordsOrFail(partitionResponse);
if (records.sizeInBytes() > 0) {
appendAsFollower(records);
} OptionalLong highWatermark =
partitionResponse.highWatermark() < 0 ?
OptionalLong.empty() :
OptionalLong.of(partitionResponse.highWatermark());
updateFollowerHighWatermark(state, highWatermark);
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)