cadonna commented on code in PR #15894:
URL: https://github.com/apache/kafka/pull/15894#discussion_r1593990443
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -787,9 +796,12 @@ public DefaultStateUpdater(final String name,
this.log = logContext.logger(DefaultStateUpdater.class);
}
- @Override
public void start() {
if (stateUpdaterThread == null) {
+ if (!restoredActiveTasks.isEmpty() ||
!exceptionsAndFailedTasks.isEmpty()) {
Review Comment:
I thought that since we allow to restart the state updater, we should verify
that the state updater starts with clean queue to avoid invalid states coming
from a past run of the state updater. At the moment, we never restart the
state updater but I thought having clear invariants might be good.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]