lucasbru commented on code in PR #19908:
URL: https://github.com/apache/kafka/pull/19908#discussion_r2131868905
##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1047,6 +1051,21 @@ private KafkaStreams(final TopologyMetadata
topologyMetadata,
rocksDBMetricsRecordingService =
maybeCreateRocksDBMetricsRecordingService(clientId, applicationConfigs);
}
+ private void throwIfUnsupportedFeatureIsUsedWithStreamsRebalanceProtocol()
{
+ if
(applicationConfigs.getString(StreamsConfig.GROUP_PROTOCOL_CONFIG).equalsIgnoreCase(GroupProtocol.STREAMS.name))
{
Review Comment:
Done
##########
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##########
@@ -1525,10 +1525,27 @@ protected StreamsConfig(final Map<?, ?> props,
}
verifyTopologyOptimizationConfigs(getString(TOPOLOGY_OPTIMIZATION_CONFIG));
verifyClientTelemetryConfigs();
+ verifyStreamsProtocolCompatibility(doLog);
+ }
+ private void verifyStreamsProtocolCompatibility(final boolean doLog) {
if (doLog &&
getString(GROUP_PROTOCOL_CONFIG).equals(GroupProtocol.STREAMS.name().toLowerCase(Locale.ROOT)))
{
Review Comment:
Good point. I actually added a little protected method to check if streams
protocol is enabled, to eliminate this source of bugs.
##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -1616,6 +1616,67 @@ public void shouldSetGroupProtocolToStreams() {
assertTrue(GroupProtocol.STREAMS.name().equalsIgnoreCase(streamsConfig.getString(GROUP_PROTOCOL_CONFIG)));
}
+ @Test
+ public void shouldLogWarningWhenStreamsProtocolIsUsed() {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+ appender.setClassLogger(StreamsConfig.class, Level.WARN);
+ props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+
+ new StreamsConfig(props);
+
+ assertTrue(appender.getMessages().stream()
+ .anyMatch(msg -> msg.contains("The streams rebalance protocol
is still in development and should " +
+ "not be used in production. Please set
group.protocol=classic (default) in all production use cases.")));
+ }
+ }
+
+ @Test
+ public void shouldLogWarningWhenWarmupReplicasSetWithStreamsProtocol() {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+ appender.setClassLogger(StreamsConfig.class, Level.WARN);
+ props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+ props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, 1);
+
+ new StreamsConfig(props);
+
+ assertTrue(appender.getMessages().stream()
+ .anyMatch(msg -> msg.contains("Warmup replicas are not
supported yet with the streams protocol and " +
+ "will be ignored. If you want to use warmup replicas,
please set group.protocol=classic.")));
+ }
+ }
+
+ @Test
+ public void shouldLogWarningWhenStandbyReplicasSetWithStreamsProtocol() {
+ try (final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister(StreamsConfig.class)) {
+ appender.setClassLogger(StreamsConfig.class, Level.WARN);
+ props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+ props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+
+ new StreamsConfig(props);
+
+ assertTrue(appender.getMessages().stream()
+ .anyMatch(msg -> msg.contains("Standby replicas are configured
broker-side in the streams group " +
+ "protocol and will be ignored. Please use the admin client
or kafka-configs.sh to set the streams " +
+ "groups's standby replicas.")));
+ }
+ }
+
+ @Test
+ public void
shouldThrowConfigExceptionWhenStreamsProtocolUsedWithStaticMembership() {
+ final Properties props = new Properties();
+ props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test-app");
+ props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:9092");
+ props.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, "streams");
+ props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, "static-member-1");
Review Comment:
Good point. Updated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]