This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2a0b96f0d24 MINOR: improve StreamsConfig client-tags verification
(#21724)
2a0b96f0d24 is described below
commit 2a0b96f0d24bd2c7d1688b36d43399b583a7d4de
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Mar 16 10:11:34 2026 -0700
MINOR: improve StreamsConfig client-tags verification (#21724)
Reviewers: Lucas Brutschy <[email protected]>
---
.../org/apache/kafka/streams/StreamsConfig.java | 56 ++++++++++++++----
.../apache/kafka/streams/StreamsConfigTest.java | 69 ++++++++++++++++++++--
2 files changed, 108 insertions(+), 17 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 7ad19d3562c..d28818e47e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1664,29 +1664,61 @@ public class StreamsConfig extends AbstractConfig {
final Map<String, String> clientTags = getClientTags();
if (clientTags.size() > MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE) {
- throw new ConfigException("At most " +
MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + " client tags " +
- "can be specified using " +
CLIENT_TAG_PREFIX + " prefix.");
+ throw new ConfigException(
+ String.format(
+ "At most %s client tags can be specified using %s prefix.",
+ MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
+ CLIENT_TAG_PREFIX
+ )
+ );
}
for (final String rackAwareAssignmentTag : rackAwareAssignmentTags) {
+ // no need to call `trim()` because for LIST type `AbstractConfig`
takes already care of this
+ if (rackAwareAssignmentTag.isEmpty()) {
+ throw new ConfigException(
+ RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+ rackAwareAssignmentTags,
+ "Contains invalid value []. Tag key cannot be empty."
+ );
+ }
if (!clientTags.containsKey(rackAwareAssignmentTag)) {
- throw new ConfigException(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
- rackAwareAssignmentTags,
- "Contains invalid value [" +
rackAwareAssignmentTag + "] " +
- "which doesn't have corresponding
tag set via [" + CLIENT_TAG_PREFIX + "] prefix.");
+ throw new ConfigException(
+ RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+ rackAwareAssignmentTags,
+ String.format(
+ "Contains invalid value [%s] which doesn't have
corresponding tag set via [%s] prefix.",
+ rackAwareAssignmentTag,
+ CLIENT_TAG_PREFIX
+ )
+ );
}
}
clientTags.forEach((tagKey, tagValue) -> {
+ if (tagKey.trim().isEmpty()) {
+ throw new ConfigException("Invalid config `client.tag.`
(missing client tag key).");
+ }
+ if (tagValue.trim().isEmpty()) {
+ throw new ConfigException(
+ CLIENT_TAG_PREFIX + tagKey,
+ "[]",
+ "Tag value cannot be empty."
+ );
+ }
if (tagKey.length() > MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH) {
- throw new ConfigException(CLIENT_TAG_PREFIX,
- tagKey,
- "Tag key exceeds maximum length of "
+ MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH + ".");
+ throw new ConfigException(
+ CLIENT_TAG_PREFIX + tagKey,
+ tagKey,
+ "Tag key exceeds maximum length of " +
MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH + "."
+ );
}
if (tagValue.length() >
MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH) {
- throw new ConfigException(CLIENT_TAG_PREFIX,
- tagValue,
- "Tag value exceeds maximum length of
" + MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH + ".");
+ throw new ConfigException(
+ CLIENT_TAG_PREFIX + tagKey,
+ tagValue,
+ "Tag value exceeds maximum length of " +
MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH + "."
+ );
}
});
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index f33aa9bd767..34db1ccc253 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -1182,7 +1182,36 @@ public class StreamsConfigTest {
@Test
public void
shouldThrowExceptionWhenClientTagRackAwarenessIsConfiguredWithUnknownTags() {
props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, "cluster");
- assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+ final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(props));
+ assertEquals(
+ "Invalid value [cluster] for configuration
rack.aware.assignment.tags: Contains invalid value [cluster] which doesn't have
corresponding tag set via [client.tag.] prefix.",
+ exception.getMessage()
+ );
+ }
+
+ @Test
+ public void shouldAllowWhitespacesInRackAwareAssignmentTagsList() {
+ // AbstractConfig is supposed to take care of WS handling for LIST type
+ props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, " zone ,
cluster ");
+ props.put(StreamsConfig.clientTagPrefix("zone"), "eu-central-1a");
+ props.put(StreamsConfig.clientTagPrefix("cluster"), "cluster-1");
+ final StreamsConfig config = new StreamsConfig(props);
+ final Map<String, String> clientTags = config.getClientTags();
+ assertEquals(2, clientTags.size());
+ assertEquals("eu-central-1a", clientTags.get("zone"));
+ assertEquals("cluster-1", clientTags.get("cluster"));
+ }
+
+ @Test
+ public void
shouldThrowExceptionWhenClientTagRackAwarenessIsConfiguredWithEmptyTag() {
+ // AbstractConfig is supposed to take care of WS handling for LIST type
+ props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, "zone, ");
+ props.put(StreamsConfig.clientTagPrefix("zone"), "eu-central-1a");
+ final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(props));
+ assertEquals(
+ "Invalid value [zone, ] for configuration
rack.aware.assignment.tags: Contains invalid value []. Tag key cannot be
empty.",
+ exception.getMessage()
+ );
}
@Test
@@ -1191,8 +1220,12 @@ public class StreamsConfigTest {
props.put(StreamsConfig.clientTagPrefix(key), "eu-central-1a");
final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(props));
assertEquals(
- String.format("Invalid value %s for configuration %s: Tag key
exceeds maximum length of %s.",
- key, StreamsConfig.CLIENT_TAG_PREFIX,
StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH),
+ String.format(
+ "Invalid value %s for configuration client.tag.%s: Tag key
exceeds maximum length of %s.",
+ key,
+ key,
+ StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH
+ ),
exception.getMessage()
);
}
@@ -1203,8 +1236,34 @@ public class StreamsConfigTest {
props.put(StreamsConfig.clientTagPrefix("x"), value);
final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(props));
assertEquals(
- String.format("Invalid value %s for configuration %s: Tag value
exceeds maximum length of %s.",
- value, StreamsConfig.CLIENT_TAG_PREFIX,
StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH),
+ String.format(
+ "Invalid value %s for configuration client.tag.x: Tag value
exceeds maximum length of %s.",
+ value,
+ StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH
+ ),
+ exception.getMessage()
+ );
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenClientTagKeyIsEmpty() {
+ props.put(StreamsConfig.clientTagPrefix(" "), "tagValue");
+ final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(props));
+ assertEquals(
+ "Invalid config `client.tag.` (missing client tag key).",
+ exception.getMessage()
+ );
+ }
+
+ @Test
+ public void shouldThrowExceptionWhenClientTagValueIsEmpty() {
+ final Map<String, Object> config = new HashMap<>();
+ config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app.id");
+ config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ config.put(StreamsConfig.clientTagPrefix("tagKey"), " ");
+ final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(config));
+ assertEquals(
+ "Invalid value [] for configuration client.tag.tagKey: Tag value
cannot be empty.",
exception.getMessage()
);
}