This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 22c1e445f17 MINOR: Include numeric bound in group config validation
errors (#22185)
22c1e445f17 is described below
commit 22c1e445f17e82ac66800d8150ab46b5547e4035
Author: David Jacot <[email protected]>
AuthorDate: Thu Apr 30 16:53:38 2026 +0200
MINOR: Include numeric bound in group config validation errors (#22185)
When a dynamic group config value is out of range, the error message
referenced the broker-level config name that defines the bound (e.g.
`group.consumer.min.heartbeat.interval.ms`). Operators had to look up
the broker config to know which value would be accepted.
This patch updates the `validateIntRange`, `validateIntMin`, and
`validateIntMax` helpers in `GroupConfig` to include the numeric bound
directly. For example, setting `consumer.heartbeat.interval.ms` below
the minimum now reports `consumer.heartbeat.interval.ms must be greater
than or equal to 5`.
A new parameterized test `testValidationErrorMessageIncludesBound`
covers both directions for each range-bounded config and the max-only
and min-only checks.
Reviewers: Andrew Schofield <[email protected]>
---------
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../kafka/api/PlaintextAdminIntegrationTest.scala | 2 +-
.../kafka/coordinator/group/GroupConfig.java | 77 +++++++---------------
.../kafka/coordinator/group/GroupConfigTest.java | 71 ++++++++++++++++++++
.../kafka/tools/ConfigCommandIntegrationTest.java | 14 ++--
4 files changed, 101 insertions(+), 63 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
index 97adc98cee4..b118a4ec02d 100644
---
a/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
@@ -1154,7 +1154,7 @@ class PlaintextAdminIntegrationTest extends
BaseAdminIntegrationTest {
assertFutureThrows(classOf[InvalidConfigurationException],
alterResult.values.get(groupResource),
- "consumer.session.timeout.ms must be greater than or equal to
group.consumer.min.session.timeout.ms")
+ "consumer.session.timeout.ms must be in the range 45000 to 60000
inclusive.")
}
@Test
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
index 5ee40332369..cc6508148cd 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java
@@ -446,25 +446,19 @@ public final class GroupConfig extends AbstractConfig {
parsed,
CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.consumerGroupMinHeartbeatIntervalMs(),
-
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
- groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs(),
-
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG
+ groupCoordinatorConfig.consumerGroupMaxHeartbeatIntervalMs()
);
validateIntRange(
parsed,
CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.consumerGroupMinSessionTimeoutMs(),
-
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
- groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs(),
- GroupCoordinatorConfig.CONSUMER_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG
+ groupCoordinatorConfig.consumerGroupMaxSessionTimeoutMs()
);
validateIntRange(
parsed,
CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.consumerGroupMinAssignmentIntervalMs(),
-
GroupCoordinatorConfig.CONSUMER_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
- groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs(),
-
GroupCoordinatorConfig.CONSUMER_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG
+ groupCoordinatorConfig.consumerGroupMaxAssignmentIntervalMs()
);
// Share group configs.
@@ -472,49 +466,37 @@ public final class GroupConfig extends AbstractConfig {
parsed,
SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.shareGroupMinHeartbeatIntervalMs(),
-
GroupCoordinatorConfig.SHARE_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
- groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs(),
- GroupCoordinatorConfig.SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG
+ groupCoordinatorConfig.shareGroupMaxHeartbeatIntervalMs()
);
validateIntRange(
parsed,
SHARE_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.shareGroupMinSessionTimeoutMs(),
- GroupCoordinatorConfig.SHARE_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
- groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs(),
- GroupCoordinatorConfig.SHARE_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG
+ groupCoordinatorConfig.shareGroupMaxSessionTimeoutMs()
);
validateIntRange(
parsed,
SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
shareGroupConfig.shareGroupMinRecordLockDurationMs(),
- ShareGroupConfig.SHARE_GROUP_MIN_RECORD_LOCK_DURATION_MS_CONFIG,
- shareGroupConfig.shareGroupMaxRecordLockDurationMs(),
- ShareGroupConfig.SHARE_GROUP_MAX_RECORD_LOCK_DURATION_MS_CONFIG
+ shareGroupConfig.shareGroupMaxRecordLockDurationMs()
);
validateIntRange(
parsed,
SHARE_DELIVERY_COUNT_LIMIT_CONFIG,
shareGroupConfig.shareGroupMinDeliveryCountLimit(),
- ShareGroupConfig.SHARE_GROUP_MIN_DELIVERY_COUNT_LIMIT_CONFIG,
- shareGroupConfig.shareGroupMaxDeliveryCountLimit(),
- ShareGroupConfig.SHARE_GROUP_MAX_DELIVERY_COUNT_LIMIT_CONFIG
+ shareGroupConfig.shareGroupMaxDeliveryCountLimit()
);
validateIntRange(
parsed,
SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
shareGroupConfig.shareGroupMinPartitionMaxRecordLocks(),
- ShareGroupConfig.SHARE_GROUP_MIN_PARTITION_MAX_RECORD_LOCKS_CONFIG,
- shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks(),
- ShareGroupConfig.SHARE_GROUP_MAX_PARTITION_MAX_RECORD_LOCKS_CONFIG
+ shareGroupConfig.shareGroupMaxPartitionMaxRecordLocks()
);
validateIntRange(
parsed,
SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.shareGroupMinAssignmentIntervalMs(),
-
GroupCoordinatorConfig.SHARE_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
- groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs(),
-
GroupCoordinatorConfig.SHARE_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG
+ groupCoordinatorConfig.shareGroupMaxAssignmentIntervalMs()
);
// Streams group configs.
@@ -522,43 +504,34 @@ public final class GroupConfig extends AbstractConfig {
parsed,
STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.streamsGroupMinHeartbeatIntervalMs(),
-
GroupCoordinatorConfig.STREAMS_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG,
- groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs(),
-
GroupCoordinatorConfig.STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG
+ groupCoordinatorConfig.streamsGroupMaxHeartbeatIntervalMs()
);
validateIntRange(
parsed,
STREAMS_SESSION_TIMEOUT_MS_CONFIG,
groupCoordinatorConfig.streamsGroupMinSessionTimeoutMs(),
- GroupCoordinatorConfig.STREAMS_GROUP_MIN_SESSION_TIMEOUT_MS_CONFIG,
- groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs(),
- GroupCoordinatorConfig.STREAMS_GROUP_MAX_SESSION_TIMEOUT_MS_CONFIG
+ groupCoordinatorConfig.streamsGroupMaxSessionTimeoutMs()
);
validateIntMax(
parsed,
STREAMS_NUM_STANDBY_REPLICAS_CONFIG,
- groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas(),
- GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG
+ groupCoordinatorConfig.streamsGroupMaxNumStandbyReplicas()
);
validateIntRange(
parsed,
STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
groupCoordinatorConfig.streamsGroupMinAssignmentIntervalMs(),
-
GroupCoordinatorConfig.STREAMS_GROUP_MIN_ASSIGNMENT_INTERVAL_MS_CONFIG,
- groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs(),
-
GroupCoordinatorConfig.STREAMS_GROUP_MAX_ASSIGNMENT_INTERVAL_MS_CONFIG
+ groupCoordinatorConfig.streamsGroupMaxAssignmentIntervalMs()
);
validateIntMin(
parsed,
STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
- groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs(),
-
GroupCoordinatorConfig.STREAMS_GROUP_MIN_TASK_OFFSET_INTERVAL_MS_CONFIG
+ groupCoordinatorConfig.streamsGroupMinTaskOffsetIntervalMs()
);
validateIntMax(
parsed,
STREAMS_NUM_WARMUP_REPLICAS_CONFIG,
- groupCoordinatorConfig.streamsGroupMaxWarmupReplicas(),
- GroupCoordinatorConfig.STREAMS_GROUP_MAX_WARMUP_REPLICAS_CONFIG
+ groupCoordinatorConfig.streamsGroupMaxWarmupReplicas()
);
// Cross-field validations: session timeout must be greater than
heartbeat interval.
@@ -601,16 +574,12 @@ public final class GroupConfig extends AbstractConfig {
Map<String, Object> parsed,
String key,
int min,
- String minConfigName,
- int max,
- String maxConfigName
+ int max
) {
if (!parsed.containsKey(key)) return;
int value = (Integer) parsed.get(key);
- if (value < min)
- throw new InvalidConfigurationException(key + " must be greater
than or equal to " + minConfigName);
- if (value > max)
- throw new InvalidConfigurationException(key + " must be less than
or equal to " + maxConfigName);
+ if (value < min || value > max)
+ throw new InvalidConfigurationException(key + " must be in the
range " + min + " to " + max + " inclusive.");
}
/**
@@ -620,13 +589,12 @@ public final class GroupConfig extends AbstractConfig {
private static void validateIntMax(
Map<String, Object> parsed,
String key,
- int max,
- String maxConfigName
+ int max
) {
if (!parsed.containsKey(key)) return;
int value = (Integer) parsed.get(key);
if (value > max)
- throw new InvalidConfigurationException(key + " must be less than
or equal to " + maxConfigName);
+ throw new InvalidConfigurationException(key + " must be less than
or equal to " + max);
}
/**
@@ -636,13 +604,12 @@ public final class GroupConfig extends AbstractConfig {
private static void validateIntMin(
Map<String, Object> parsed,
String key,
- int min,
- String minConfigName
+ int min
) {
if (!parsed.containsKey(key)) return;
int value = (Integer) parsed.get(key);
if (value < min)
- throw new InvalidConfigurationException(key + " must be greater
than or equal to " + minConfigName);
+ throw new InvalidConfigurationException(key + " must be greater
than or equal to " + min);
}
/**
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
index 83ef07ac496..e5c69e71adb 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java
@@ -353,6 +353,77 @@ public class GroupConfigTest {
assertDoesNotThrow(() -> GroupConfig.validate(props,
createGroupCoordinatorConfig(), createShareGroupConfig()));
}
+ private static Stream<Arguments> outOfRangeValuesAndExpectedMessages() {
+ return Stream.of(
+ // Consumer group configs.
+ Arguments.of(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
"1",
+ "consumer.heartbeat.interval.ms must be in the range 5 to
15000 inclusive."),
+ Arguments.of(GroupConfig.CONSUMER_HEARTBEAT_INTERVAL_MS_CONFIG,
"20000",
+ "consumer.heartbeat.interval.ms must be in the range 5 to
15000 inclusive."),
+ Arguments.of(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "1",
+ "consumer.session.timeout.ms must be in the range 45 to 60000
inclusive."),
+ Arguments.of(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG,
"70000",
+ "consumer.session.timeout.ms must be in the range 45 to 60000
inclusive."),
+ Arguments.of(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
"500",
+ "consumer.assignment.interval.ms must be in the range 1000 to
15000 inclusive."),
+ Arguments.of(GroupConfig.CONSUMER_ASSIGNMENT_INTERVAL_MS_CONFIG,
"20000",
+ "consumer.assignment.interval.ms must be in the range 1000 to
15000 inclusive."),
+
+ // Share group configs.
+ Arguments.of(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG, "1",
+ "share.heartbeat.interval.ms must be in the range 5 to 15000
inclusive."),
+ Arguments.of(GroupConfig.SHARE_HEARTBEAT_INTERVAL_MS_CONFIG,
"20000",
+ "share.heartbeat.interval.ms must be in the range 5 to 15000
inclusive."),
+ Arguments.of(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "1",
+ "share.session.timeout.ms must be in the range 45 to 60000
inclusive."),
+ Arguments.of(GroupConfig.SHARE_SESSION_TIMEOUT_MS_CONFIG, "70000",
+ "share.session.timeout.ms must be in the range 45 to 60000
inclusive."),
+ Arguments.of(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
"10000",
+ "share.record.lock.duration.ms must be in the range 15000 to
60000 inclusive."),
+ Arguments.of(GroupConfig.SHARE_RECORD_LOCK_DURATION_MS_CONFIG,
"70000",
+ "share.record.lock.duration.ms must be in the range 15000 to
60000 inclusive."),
+ Arguments.of(GroupConfig.SHARE_DELIVERY_COUNT_LIMIT_CONFIG, "11",
+ "share.delivery.count.limit must be in the range 2 to 10
inclusive."),
+ Arguments.of(GroupConfig.SHARE_PARTITION_MAX_RECORD_LOCKS_CONFIG,
"11000",
+ "share.partition.max.record.locks must be in the range 100 to
10000 inclusive."),
+ Arguments.of(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
"500",
+ "share.assignment.interval.ms must be in the range 1000 to
15000 inclusive."),
+ Arguments.of(GroupConfig.SHARE_ASSIGNMENT_INTERVAL_MS_CONFIG,
"20000",
+ "share.assignment.interval.ms must be in the range 1000 to
15000 inclusive."),
+
+ // Streams group configs.
+ Arguments.of(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
"1000",
+ "streams.heartbeat.interval.ms must be in the range 5000 to
15000 inclusive."),
+ Arguments.of(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG,
"20000",
+ "streams.heartbeat.interval.ms must be in the range 5000 to
15000 inclusive."),
+ Arguments.of(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "1",
+ "streams.session.timeout.ms must be in the range 45000 to
60000 inclusive."),
+ Arguments.of(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG,
"70000",
+ "streams.session.timeout.ms must be in the range 45000 to
60000 inclusive."),
+ Arguments.of(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "5",
+ "streams.num.standby.replicas must be less than or equal to
2"),
+ Arguments.of(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
"500",
+ "streams.assignment.interval.ms must be in the range 1000 to
15000 inclusive."),
+ Arguments.of(GroupConfig.STREAMS_ASSIGNMENT_INTERVAL_MS_CONFIG,
"20000",
+ "streams.assignment.interval.ms must be in the range 1000 to
15000 inclusive."),
+ Arguments.of(GroupConfig.STREAMS_TASK_OFFSET_INTERVAL_MS_CONFIG,
"1000",
+ "streams.task.offset.interval.ms must be greater than or equal
to 15000"),
+ Arguments.of(GroupConfig.STREAMS_NUM_WARMUP_REPLICAS_CONFIG, "50",
+ "streams.num.warmup.replicas must be less than or equal to 20")
+ );
+ }
+
+ @ParameterizedTest(name =
"testValidationErrorMessageIncludesBound[{0}={1}]")
+ @MethodSource("outOfRangeValuesAndExpectedMessages")
+ public void testValidationErrorMessageIncludesBound(String key, String
value, String expectedMessage) {
+ var props = Map.of(key, value);
+ var exception = assertThrows(
+ InvalidConfigurationException.class,
+ () -> GroupConfig.validate(props, createGroupCoordinatorConfig(),
createShareGroupConfig())
+ );
+ assertEquals(expectedMessage, exception.getMessage());
+ }
+
@Test
public void testFromPropsWithDefaultValue() {
Map<String, String> defaultValue = new HashMap<>();
diff --git
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
index c4368076690..3ff74fc3d2e 100644
---
a/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/ConfigCommandIntegrationTest.java
@@ -316,7 +316,7 @@ public class ConfigCommandIntegrationTest {
"--entity-name", "group",
"--alter", "--add-config", "streams.session.timeout.ms=1"));
message = captureStandardErr(run(command));
-
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
streams.session.timeout.ms must be greater than or equal to
group.streams.min.session.timeout.ms"));
+
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
streams.session.timeout.ms must be in the range 45000 to 60000 inclusive."));
// Should fail to set above max
command = Stream.concat(quorumArgs(), Stream.of(
@@ -324,7 +324,7 @@ public class ConfigCommandIntegrationTest {
"--entity-name", "group",
"--alter", "--add-config", "streams.session.timeout.ms=100000"));
message = captureStandardErr(run(command));
-
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
streams.session.timeout.ms must be less than or equal to
group.streams.max.session.timeout.ms"));
+
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
streams.session.timeout.ms must be in the range 45000 to 60000 inclusive."));
}
@ClusterTest(serverProperties = {
@@ -347,7 +347,7 @@ public class ConfigCommandIntegrationTest {
"--entity-name", "group",
"--alter", "--add-config", "streams.heartbeat.interval.ms=1"));
message = captureStandardErr(run(command));
-
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
streams.heartbeat.interval.ms must be greater than or equal to
group.streams.min.heartbeat.interval.ms"));
+
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
streams.heartbeat.interval.ms must be in the range 5000 to 55000 inclusive."));
// Should fail to set above max
command = Stream.concat(quorumArgs(), Stream.of(
@@ -355,7 +355,7 @@ public class ConfigCommandIntegrationTest {
"--entity-name", "group",
"--alter", "--add-config",
"streams.heartbeat.interval.ms=100000"));
message = captureStandardErr(run(command));
-
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
streams.heartbeat.interval.ms must be less than or equal to
group.streams.max.heartbeat.interval.ms"));
+
assertTrue(message.contains("org.apache.kafka.common.errors.InvalidConfigurationException:
streams.heartbeat.interval.ms must be in the range 5000 to 55000 inclusive."));
// Should fail to set above session timeout
command = Stream.concat(quorumArgs(), Stream.of(
@@ -398,7 +398,7 @@ public class ConfigCommandIntegrationTest {
"--entity-name", "group",
"--alter", "--add-config", "streams.num.standby.replicas=3"));
message = captureStandardErr(run(command));
- assertTrue(message.contains("streams.num.standby.replicas must be less
than or equal to group.streams.max.standby.replicas"));
+ assertTrue(message.contains("streams.num.standby.replicas must be less
than or equal to 2"));
}
@ClusterTest
@@ -432,7 +432,7 @@ public class ConfigCommandIntegrationTest {
"--entity-name", "group",
"--alter", "--add-config", "streams.task.offset.interval.ms=1"));
message = captureStandardErr(run(command));
- assertTrue(message.contains("streams.task.offset.interval.ms must be
greater than or equal to group.streams.min.task.offset.interval.ms"));
+ assertTrue(message.contains("streams.task.offset.interval.ms must be
greater than or equal to 15000"));
}
@@ -467,7 +467,7 @@ public class ConfigCommandIntegrationTest {
"--entity-name", "group",
"--alter", "--add-config", "streams.num.warmup.replicas=25"));
message = captureStandardErr(run(command));
- assertTrue(message.contains("streams.num.warmup.replicas must be less
than or equal to group.streams.max.warmup.replicas"));
+ assertTrue(message.contains("streams.num.warmup.replicas must be less
than or equal to 20"));
}
private void verifyGroupConfigUpdate(List<String> alterOpts) throws
Exception {