[
https://issues.apache.org/jira/browse/KAFKA-16077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Lucas Brutschy reassigned KAFKA-16077:
--------------------------------------
Assignee: Lucas Brutschy (was: Lucas Brutschy)
> Streams fails to close task after restoration when input partitions are
> updated
> -------------------------------------------------------------------------------
>
> Key: KAFKA-16077
> URL: https://issues.apache.org/jira/browse/KAFKA-16077
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 3.7.0
> Reporter: Lucas Brutschy
> Assignee: Lucas Brutschy
> Priority: Blocker
> Labels: streams
>
> There is a race condition in the state updater that can cause the following:
> # We have an active task in the state updater
> # We get fenced. We recreate the producer, transactions now uninitialized.
> We ask the state updater to give back the task, add a pending action to close
> the task clean once it’s handed back
> # We get a new assignment with updated input partitions. The task is still
> owned by the state updater, so we ask the state updater again to hand it back
> and add a pending action to update its input partition
> # The task is handed back by the state updater. We update its input
> partitions but forget to close it clean (pending action was overwritten)
> # Now the task is in an initialized state, but the underlying producer does
> not have transactions initialized
> This can lead to an exception like this:
> {code:java}
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log-org.apache.kafka.streams.errors.StreamsException:
> Exception caught in process. taskId=1_0,
> processor=KSTREAM-SOURCE-0000000005, topic=node-name-repartition,
> partition=0, offset=618798, stacktrace=java.lang.IllegalStateException:
> TransactionalId stream-soak-test-d647640a-12e5-4e74-a0af-e105d0d0cb67-2:
> Invalid transition attempted from state UNINITIALIZED to state IN_TRANSACTION
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:999)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.clients.producer.internals.TransactionManager.transitionTo(TransactionManager.java:985)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.clients.producer.internals.TransactionManager.beginTransaction(TransactionManager.java:311)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.clients.producer.KafkaProducer.beginTransaction(KafkaProducer.java:660)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.StreamsProducer.maybeBeginTransaction(StreamsProducer.java:240)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.StreamsProducer.send(StreamsProducer.java:258)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:253)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:175)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.SinkNode.process(SinkNode.java:85)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.doJoin(KStreamKTableJoinProcessor.java:130)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.process(KStreamKTableJoinProcessor.java:99)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:152)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:291)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:270)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:229)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:84)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$doProcess$1(StreamTask.java:847)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:867)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.StreamTask.doProcess(StreamTask.java:847)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:778)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.TaskExecutor.processTask(TaskExecutor.java:97)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.TaskExecutor.process(TaskExecutor.java:78)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1919)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:953)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686)
> streams-soak-3-7-eos-v2_20240103131321/18.237.32.3/streams.log- at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645){code}
> This affects EOSv2 only.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)