Nikita-Shupletsov commented on code in PR #21365:
URL: https://github.com/apache/kafka/pull/21365#discussion_r2744762670


##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KafkaStreamsWrapper.java:
##########
@@ -48,7 +48,11 @@ public List<StreamThread> streamThreads() {
     public void setStreamThreadStateListener(final StreamThread.StateListener 
listener) {
         if (state == State.CREATED) {
             for (final StreamThread thread : threads) {
-                thread.setStateListener(listener);
+                StreamThread.StateListener originalListener = 
thread.getStateListener();
+                thread.setStateListener((t, newState, oldState) -> {
+                    originalListener.onChange(t, newState, oldState);
+                    listener.onChange(t, newState, oldState);
+                });

Review Comment:
   because it overrides the default one, and KafkaStreams#state doesn't work.
   with this change if we add a special state listener, we keep the old one as 
well.
   if for some reason we would like to add two additional state listeners, this 
implementation will work too. we will just need to call it twice. then it will 
wrap it twice and all three listeners will be called instead of one.
   
   the reason I changed the method was pretty much because I wanted to have a 
listener and a to be able to call KafkaStreams#state and get the actual state 
as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to