[
https://issues.apache.org/jira/browse/KAFKA-19479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18010746#comment-18010746
]
Matthias J. Sax commented on KAFKA-19479:
-----------------------------------------
Thanks for the test case. I believe it's actually a bug in the producer:
https://github.com/apache/kafka/pull/20254#issuecomment-3134139853
> at_least_once mode in Kafka Streams silently drops messages when the producer
> fails with MESSAGE_TOO_LARGE, violating delivery guarantees
> -----------------------------------------------------------------------------------------------------------------------------------------
>
> Key: KAFKA-19479
> URL: https://issues.apache.org/jira/browse/KAFKA-19479
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 4.0.0
> Environment: Micronaut 4.5.4
> Java 21
> Kotlin 1.9.3
> Kafka clients/streams:
> Apache Kafka 3.7.0, 4.0.0
> Confluent: 7.9.2-ccs, 8.0.0-ccs
> Kafka running in Docker (local test environment)
> Reporter: Mihai Lucian
> Assignee: Shashank
> Priority: Critical
> Attachments: poc-kafka-streams-al-least-once-proj.zip,
> stream-configs.txt
>
>
> *Description*
> It appears there is a scenario where Kafka Streams running with
> {{processing.guarantee=at_least_once}} does {*}not uphold its delivery
> guarantees{*}, resulting in *message loss.*
>
> *Reproduction Details*
> We run a simple Kafka Streams topology like the following:
> {code:java}
> props[StreamsConfig.APPLICATION_ID_CONFIG] = "poc-at-least-once"
> props[StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG] =
> Serdes.String().javaClass.name
> props[StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG] =
> Serdes.String().javaClass.name
> props[StreamsConfig.PROCESSING_GUARANTEE_CONFIG] = StreamsConfig.AT_LEAST_ONCE
> // Large producer batch size to induce MESSAGE_TOO_LARGE
> props[ProducerConfig.LINGER_MS_CONFIG] = "300000"
> props[ProducerConfig.BATCH_SIZE_CONFIG] = "33554432"
> /**
> * a custom ProductionExceptionHandler is registered to demonstrate that it is
> not triggered in this scenario.
> * in fact, neither the ProductionExceptionHandler nor the
> StreamsUncaughtExceptionHandler are invoked during this failure
> */ props[StreamsConfig.PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG] =
> "poc.MyProductionExceptionHandler"
> val stream = streamsBuilder.stream<String, String>("input.topic")
> stream.peek { key, value -> println("$key:$value") }
> .to("output.topic")*
> {code}
>
> *What we observe:*
> * Records from {{input.topic}} are consumed and buffered at producer side
> * After some time (likely based on {{{}commit.interval.ms{}}}), the
> *consumer offset is committed*
> * Producer records *flush* is triggered
> * The sendind of records to kafka broker fails with
> {{{}MESSAGE_TOO_LARGE{}}}{*}{*}
> * As a result, the application {*}commits offsets without actually producing
> the records{*}, which leads to *silent message loss*
>
> *Steps to Reproduce*
> # Generate ~50,000 records (sized similarly to the sample project) in
> {{input.topic to induce MESSAGE_TOO_LARGE}}
> # Start the topology with the configuration above
> # Wait for all messages to be consumed
> # Observe:
> *
> ** Offsets are committed
> *
> ** Output topic receives no messages
> *
> ** Log shows repeated {{MESSAGE_TOO_LARGE}} error:
> {code:java}
> 11:50:30.695 [kafka-producer-network-thread |
> kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer]
> WARN o.a.k.c.producer.internals.Sender - [Producer
> clientId=kstreams-poc-v1-37858c2e-7584-4489-8081-0111f710c431-StreamThread-1-producer]
> Got error produce response in correlation id 255 on topic-partition
> output.topic-0, splitting and retrying (2147483647 attempts left). Error:
> MESSAGE_TOO_LARGE {code}
>
> *Reproduced* with :
> * kafka-client-3.7.0, kafka-streams-3.7.0
> * kafka-client-4.-.0, kafka-streams-4.0.0
> * kafka-client-7.9.2-ccs, kafka-streams-7.9.2-ccs
> * kafka-client-8.0.0-ccs, kafka-streams-8.0.0-ccs
>
> *Expected Behavior*
> In {{at_least_once}} mode, Kafka Streams should *not commit offsets* unless
> records are {*}successfully produced{*}.
>
> *Attached*
> * configs for stream, producer, consumer
> * sample project used to replicate the issue
--
This message was sent by Atlassian Jira
(v8.20.10#820010)