[
https://issues.apache.org/jira/browse/KAFKA-17429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877082#comment-17877082
]
Matthias J. Sax commented on KAFKA-17429:
-----------------------------------------
Thanks. After digging into this a little bit, it seems the problem is that the
task has two stores, and we don't flush them correctly... After each
`pipeInput` call, there shouldn't be anything left to be flushed, but there is.
Thus, when `close()` is called, we have pending stuff which we should not
have...
Btw: Calling `pipeInput` after `close()` does not make sense – it will be a
no-op (guess we should change the code to through an exception for this case to
make this more clear...)
> TopologyTestDriver.close() : The processor is already closed when cache is
> activated on state store
> ---------------------------------------------------------------------------------------------------
>
> Key: KAFKA-17429
> URL: https://issues.apache.org/jira/browse/KAFKA-17429
> Project: Kafka
> Issue Type: Bug
> Components: unit tests
> Affects Versions: 3.4.0
> Reporter: Olivier Sergeant
> Priority: Minor
> Attachments: KafkaStreamsApp.java, KafkaStreamsAppTest.java
>
>
> Hello,
> There seems to be a problem with the call to TopologyTestDriver.close() : the
> processors are all closed when it subsequently try to flush the state store's
> record cache.
> The error is :
>
> {code:java}
> org.apache.kafka.streams.errors.ProcessorStateException: topology-test-driver
> Failed to flush state store my-aggregate-store
> at
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:486)
> ....
> Caused by: java.lang.IllegalStateException: The processor is already closed
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.throwIfClosed(ProcessorNode.java:147)
> ....
> {code}
> Attached are the class and the test class reproducing the problem.
> When I add .withCachingDisabled() on the state store the problem is gone.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)