lucasbru commented on code in PR #19700:
URL: https://github.com/apache/kafka/pull/19700#discussion_r2086136979
##########
tests/kafkatest/tests/streams/streams_eos_test.py:
##########
@@ -149,15 +160,26 @@ def stop_streams2(self, keep_alive_processor,
processor_to_be_stopped):
def stop_streams3(self, keep_alive_processor1, keep_alive_processor2,
processor_to_be_stopped):
with
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE)
as monitor:
- self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
- self.wait_for_startup(monitor, keep_alive_processor1)
+ with
keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE)
as monitor:
+ self.stop_streams(processor_to_be_stopped)
+ self.wait_for_startup_in_classic(monitor,
keep_alive_processor2)
+ self.wait_for_startup_in_classic(monitor, keep_alive_processor1)
def abort_streams(self, keep_alive_processor1, keep_alive_processor2,
processor_to_be_aborted):
with
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE)
as monitor1:
with
keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE)
as monitor2:
processor_to_be_aborted.stop_nodes(False)
- self.wait_for_startup(monitor2, keep_alive_processor2)
- self.wait_for_startup(monitor1, keep_alive_processor1)
+ self.wait_for_startup_in_classic(monitor2, keep_alive_processor2)
+ self.wait_for_startup_in_classic(monitor1, keep_alive_processor1)
+
+ def wait_for_startup_in_classic(self, monitor, processor):
+ if self.group_protocol == "classic":
+ self.wait_for(monitor, processor, "StateChange: REBALANCING ->
RUNNING")
+ else:
+ # Above check is not valid for streams group protocol, since not
all nodes take part in rebalances
Review Comment:
Some of the validation inside the test does not make sense for KIP-1071.
This is because in KIP-1071, if a member leaves or joins the group, not
all members may enter a REBALANCING state. For streams groups, we only
include a minor sleep. However, this simplification should not break
the test - it can just lead to going too quickly through the various
test states.
Once [KAFKA-19271](https://issues.apache.org/jira/browse/KAFKA-19271) is
implemented, we may change the test app to print a log line whenever
the member epoch is bumped, which is the only way a member can
"indirectly" observe that other members are rebalancing.
This only affects the operations above where two members are being kept
alive, one is being started / shut down. One member may not see a rebalance in
that case. I checked, but it is not easily possible to express something like
"one of the two should see a rebalance".
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]