[ 
https://issues.apache.org/jira/browse/KAFKA-17429?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17877082#comment-17877082
 ] 

Matthias J. Sax commented on KAFKA-17429:
-----------------------------------------

Thanks. After digging into this a little bit, it seems the problem is that the 
task has two stores, and we don't flush them correctly... After each 
`pipeInput` call, there shouldn't be anything left to be flushed, but there is. 
Thus, when `close()` is called, we have pending stuff which we should not 
have...

Btw: Calling `pipeInput` after `close()` does not make sense – it will be a 
no-op (guess we should change the code to through an exception for this case to 
make this more clear...)

> TopologyTestDriver.close() : The processor is already closed when cache is 
> activated on state store
> ---------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-17429
>                 URL: https://issues.apache.org/jira/browse/KAFKA-17429
>             Project: Kafka
>          Issue Type: Bug
>          Components: unit tests
>    Affects Versions: 3.4.0
>            Reporter: Olivier Sergeant
>            Priority: Minor
>         Attachments: KafkaStreamsApp.java, KafkaStreamsAppTest.java
>
>
> Hello,
> There seems to be a problem with the call to TopologyTestDriver.close() : the 
> processors are all closed when it subsequently try to flush the state store's 
> record cache.
> The error is :
>  
> {code:java}
> org.apache.kafka.streams.errors.ProcessorStateException: topology-test-driver 
> Failed to flush state store my-aggregate-store
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:486)
> ....
>  Caused by: java.lang.IllegalStateException: The processor is already closed  
>        at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.throwIfClosed(ProcessorNode.java:147)
> ....
> {code}
> Attached are the class and the test class reproducing the problem.
> When I add .withCachingDisabled() on the state store the problem is gone.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to