chia7712 commented on code in PR #19908:
URL: https://github.com/apache/kafka/pull/19908#discussion_r2129236431
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1525,10 +1525,27 @@ protected StreamsConfig(final Map<?, ?> props,
}
verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG));
verifyClientTelemetryConfigs();
+ verifyStreamsProtocolCompatibility(doLog);
+ }
+ private void verifyStreamsProtocolCompatibility(final boolean doLog) {
if (doLog &&
getString(GROUP_PROTOCOL_CONFIG).equals(GroupProtocol.STREAMS.name().toLowerCase(Locale.ROOT)))
{
Review Comment:
Should `equals` be replaced by `equalsIgnoreCase`, since the other checks,
such as `throwIfUnsupportedFeatureIsUsedWithStreamsRebalanceProtocol` and
`checkUnsupportedConfigsPostProcess`, use `equalsIgnoreCase`?
##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1616,6 +1616,67 @@ public void shouldSetGroupProtocolToStreams() {
assertTrue(GroupProtocol.STREAMS.name().equalsIgnoreCase(streamsConfig.getString(GROUP_PROTOCOL_CONFIG)));
}
+ @Test
+ public void shouldLogWarningWhenStreamsProtocolIsUsed() {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+ appender.setClassLogger(StreamsConfig.class, Level.WARN);
+ props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+
+ new StreamsConfig(props);
+
+ assertTrue(appender.getMessages().stream()
+ .anyMatch(msg -> msg.contains("The streams rebalance protocol
is still in development and should " +
+ "not be used in production. Please set
group.protocol=classic (default) in all production use cases.")));
+ }
+ }
+
+ @Test
+ public void shouldLogWarningWhenWarmupReplicasSetWithStreamsProtocol() {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+ appender.setClassLogger(StreamsConfig.class, Level.WARN);
+ props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+ props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 1);
+
+ new StreamsConfig(props);
+
+ assertTrue(appender.getMessages().stream()
+ .anyMatch(msg -> msg.contains("Warmup replicas are not
supported yet with the streams protocol and " +
+ "will be ignored. If you want to use warmup replicas,
please set group.protocol=classic.")));
+ }
+ }
+
+ @Test
+ public void shouldLogWarningWhenStandbyReplicasSetWithStreamsProtocol() {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+ appender.setClassLogger(StreamsConfig.class, Level.WARN);
+ props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+ props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+
+ new StreamsConfig(props);
+
+ assertTrue(appender.getMessages().stream()
+ .anyMatch(msg -> msg.contains("Standby replicas are configured
broker-side in the streams group " +
+ "protocol and will be ignored. Please use the admin client
or kafka-configs.sh to set the streams " +
+ "groups's standby replicas.")));
+ }
+ }
+
+ @Test
+ public void
shouldThrowConfigExceptionWhenStreamsProtocolUsedWithStaticMembership() {
+ final Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
+ props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+ props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "static-member-1");
Review Comment:
should we test the static id with consumer prefix?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]