This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2a0b96f0d24 MINOR: improve StreamsConfig client-tags verification 
(#21724)
2a0b96f0d24 is described below

commit 2a0b96f0d24bd2c7d1688b36d43399b583a7d4de
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Mar 16 10:11:34 2026 -0700

    MINOR: improve StreamsConfig client-tags verification (#21724)
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 .../org/apache/kafka/streams/StreamsConfig.java    | 56 ++++++++++++++----
 .../apache/kafka/streams/StreamsConfigTest.java    | 69 ++++++++++++++++++++--
 2 files changed, 108 insertions(+), 17 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 7ad19d3562c..d28818e47e3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -1664,29 +1664,61 @@ public class StreamsConfig extends AbstractConfig {
         final Map<String, String> clientTags = getClientTags();
 
         if (clientTags.size() > MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE) {
-            throw new ConfigException("At most " + 
MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE + " client tags " +
-                                      "can be specified using " + 
CLIENT_TAG_PREFIX + " prefix.");
+            throw new ConfigException(
+                String.format(
+                    "At most %s client tags can be specified using %s prefix.",
+                    MAX_RACK_AWARE_ASSIGNMENT_TAG_LIST_SIZE,
+                    CLIENT_TAG_PREFIX
+                )
+            );
         }
 
         for (final String rackAwareAssignmentTag : rackAwareAssignmentTags) {
+            // no need to call `trim()` because for LIST type `AbstractConfig` 
takes already care of this
+            if (rackAwareAssignmentTag.isEmpty()) {
+                throw new ConfigException(
+                    RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+                    rackAwareAssignmentTags,
+                    "Contains invalid value []. Tag key cannot be empty."
+                );
+            }
             if (!clientTags.containsKey(rackAwareAssignmentTag)) {
-                throw new ConfigException(RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
-                                          rackAwareAssignmentTags,
-                                          "Contains invalid value [" + 
rackAwareAssignmentTag + "] " +
-                                          "which doesn't have corresponding 
tag set via [" + CLIENT_TAG_PREFIX + "] prefix.");
+                throw new ConfigException(
+                    RACK_AWARE_ASSIGNMENT_TAGS_CONFIG,
+                    rackAwareAssignmentTags,
+                    String.format(
+                        "Contains invalid value [%s] which doesn't have 
corresponding tag set via [%s] prefix.",
+                        rackAwareAssignmentTag,
+                        CLIENT_TAG_PREFIX
+                    )
+                );
             }
         }
 
         clientTags.forEach((tagKey, tagValue) -> {
+            if (tagKey.trim().isEmpty()) {
+                throw new ConfigException("Invalid config `client.tag.` 
(missing client tag key).");
+            }
+            if (tagValue.trim().isEmpty()) {
+                throw new ConfigException(
+                    CLIENT_TAG_PREFIX + tagKey,
+                    "[]",
+                    "Tag value cannot be empty."
+                );
+            }
             if (tagKey.length() > MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH) {
-                throw new ConfigException(CLIENT_TAG_PREFIX,
-                                          tagKey,
-                                          "Tag key exceeds maximum length of " 
+ MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH + ".");
+                throw new ConfigException(
+                    CLIENT_TAG_PREFIX + tagKey,
+                    tagKey,
+                    "Tag key exceeds maximum length of " + 
MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH + "."
+                );
             }
             if (tagValue.length() > 
MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH) {
-                throw new ConfigException(CLIENT_TAG_PREFIX,
-                                          tagValue,
-                                          "Tag value exceeds maximum length of 
" + MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH + ".");
+                throw new ConfigException(
+                    CLIENT_TAG_PREFIX + tagKey,
+                    tagValue,
+                    "Tag value exceeds maximum length of " + 
MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH + "."
+                );
             }
         });
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index f33aa9bd767..34db1ccc253 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -1182,7 +1182,36 @@ public class StreamsConfigTest {
     @Test
     public void 
shouldThrowExceptionWhenClientTagRackAwarenessIsConfiguredWithUnknownTags() {
         props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, "cluster");
-        assertThrows(ConfigException.class, () -> new StreamsConfig(props));
+        final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
+        assertEquals(
+            "Invalid value [cluster] for configuration 
rack.aware.assignment.tags: Contains invalid value [cluster] which doesn't have 
corresponding tag set via [client.tag.] prefix.",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void shouldAllowWhitespacesInRackAwareAssignmentTagsList() {
+        // AbstractConfig is supposed to take care of WS handling for LIST type
+        props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, " zone , 
cluster ");
+        props.put(StreamsConfig.clientTagPrefix("zone"), "eu-central-1a");
+        props.put(StreamsConfig.clientTagPrefix("cluster"), "cluster-1");
+        final StreamsConfig config = new StreamsConfig(props);
+        final Map<String, String> clientTags = config.getClientTags();
+        assertEquals(2, clientTags.size());
+        assertEquals("eu-central-1a", clientTags.get("zone"));
+        assertEquals("cluster-1", clientTags.get("cluster"));
+    }
+
+    @Test
+    public void 
shouldThrowExceptionWhenClientTagRackAwarenessIsConfiguredWithEmptyTag() {
+        // AbstractConfig is supposed to take care of WS handling for LIST type
+        props.put(StreamsConfig.RACK_AWARE_ASSIGNMENT_TAGS_CONFIG, "zone, ");
+        props.put(StreamsConfig.clientTagPrefix("zone"), "eu-central-1a");
+        final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
+        assertEquals(
+            "Invalid value [zone, ] for configuration 
rack.aware.assignment.tags: Contains invalid value []. Tag key cannot be 
empty.",
+            exception.getMessage()
+        );
     }
 
     @Test
@@ -1191,8 +1220,12 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.clientTagPrefix(key), "eu-central-1a");
         final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
         assertEquals(
-            String.format("Invalid value %s for configuration %s: Tag key 
exceeds maximum length of %s.",
-                          key, StreamsConfig.CLIENT_TAG_PREFIX, 
StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH),
+            String.format(
+                "Invalid value %s for configuration client.tag.%s: Tag key 
exceeds maximum length of %s.",
+                key,
+                key,
+                StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_KEY_LENGTH
+            ),
             exception.getMessage()
         );
     }
@@ -1203,8 +1236,34 @@ public class StreamsConfigTest {
         props.put(StreamsConfig.clientTagPrefix("x"), value);
         final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
         assertEquals(
-            String.format("Invalid value %s for configuration %s: Tag value 
exceeds maximum length of %s.",
-                          value, StreamsConfig.CLIENT_TAG_PREFIX, 
StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH),
+            String.format(
+                "Invalid value %s for configuration client.tag.x: Tag value 
exceeds maximum length of %s.",
+                value,
+                StreamsConfig.MAX_RACK_AWARE_ASSIGNMENT_TAG_VALUE_LENGTH
+            ),
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenClientTagKeyIsEmpty() {
+        props.put(StreamsConfig.clientTagPrefix(" "), "tagValue");
+        final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
+        assertEquals(
+            "Invalid config `client.tag.` (missing client tag key).",
+            exception.getMessage()
+        );
+    }
+
+    @Test
+    public void shouldThrowExceptionWhenClientTagValueIsEmpty() {
+        final Map<String, Object> config = new HashMap<>();
+        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "app.id");
+        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+        config.put(StreamsConfig.clientTagPrefix("tagKey"), " ");
+        final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(config));
+        assertEquals(
+            "Invalid value [] for configuration client.tag.tagKey: Tag value 
cannot be empty.",
             exception.getMessage()
         );
     }

Reply via email to