chia7712 commented on code in PR #19908:
URL: https://github.com/apache/kafka/pull/19908#discussion_r2129236431


##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1525,10 +1525,27 @@ protected StreamsConfig(final Map<?, ?> props,
         }
         
verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG));
         verifyClientTelemetryConfigs();
+        verifyStreamsProtocolCompatibility(doLog);
+    }
 
+    private void verifyStreamsProtocolCompatibility(final boolean doLog) {
         if (doLog && 
getString(GROUP_PROTOCOL_CONFIG).equals(GroupProtocol.STREAMS.name().toLowerCase(Locale.ROOT)))
 {

Review Comment:
   Should `equals` be replaced by `equalsIgnoreCase`, since the other checks, 
such as `throwIfUnsupportedFeatureIsUsedWithStreamsRebalanceProtocol` and 
`checkUnsupportedConfigsPostProcess`, use `equalsIgnoreCase`?



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1616,6 +1616,67 @@ public void shouldSetGroupProtocolToStreams() {
         
assertTrue(GroupProtocol.STREAMS.name().equalsIgnoreCase(streamsConfig.getString(GROUP_PROTOCOL_CONFIG)));
     }
 
+    @Test
+    public void shouldLogWarningWhenStreamsProtocolIsUsed() {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+            appender.setClassLogger(StreamsConfig.class, Level.WARN);
+            props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+
+            new StreamsConfig(props);
+
+            assertTrue(appender.getMessages().stream()
+                .anyMatch(msg -> msg.contains("The streams rebalance protocol 
is still in development and should " +
+                    "not be used in production. Please set 
group.protocol=classic (default) in all production use cases.")));
+        }
+    }
+
+    @Test
+    public void shouldLogWarningWhenWarmupReplicasSetWithStreamsProtocol() {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+            appender.setClassLogger(StreamsConfig.class, Level.WARN);
+            props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+            props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 1);
+
+            new StreamsConfig(props);
+
+            assertTrue(appender.getMessages().stream()
+                .anyMatch(msg -> msg.contains("Warmup replicas are not 
supported yet with the streams protocol and " +
+                    "will be ignored. If you want to use warmup replicas, 
please set group.protocol=classic.")));
+        }
+    }
+
+    @Test
+    public void shouldLogWarningWhenStandbyReplicasSetWithStreamsProtocol() {
+        try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+            appender.setClassLogger(StreamsConfig.class, Level.WARN);
+            props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+            props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+
+            new StreamsConfig(props);
+
+            assertTrue(appender.getMessages().stream()
+                .anyMatch(msg -> msg.contains("Standby replicas are configured 
broker-side in the streams group " +
+                    "protocol and will be ignored. Please use the admin client 
or kafka-configs.sh to set the streams " +
+                    "groups's standby replicas.")));
+        }
+    }
+
+    @Test
+    public void 
shouldThrowConfigExceptionWhenStreamsProtocolUsedWithStaticMembership() {
+        final Properties props = new Properties();
+        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
+        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
+        props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+        props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "static-member-1");

Review Comment:
   should we test the static id with consumer prefix? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to