mjsax commented on code in PR #18778:
URL: https://github.com/apache/kafka/pull/18778#discussion_r1940565584


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -729,13 +707,21 @@ public boolean isStoreVersioned(final String storeName) {
 
     public final void connectProcessorAndStateStores(final String 
processorName,
                                                      final String... 
stateStoreNames) {
-        Objects.requireNonNull(processorName, "processorName can't be null");
-        Objects.requireNonNull(stateStoreNames, "state store list must not be 
null");
+        Objects.requireNonNull(processorName, "processorName cannot be null");
+        Objects.requireNonNull(stateStoreNames, "stateStoreNames cannot  
null");
         if (stateStoreNames.length == 0) {
-            throw new TopologyException("Must provide at least one state store 
name.");
+            throw new TopologyException("stateStoreNames cannot be empty");
+        }
+
+        if (nodeToSourceTopics.containsKey(processorName)
+            || nodeToSourcePatterns.containsKey(processorName)
+            || nodeToSinkTopic.containsKey(processorName)) {
+            throw new TopologyException("State stores cannot be connect to 
sources or sinks.");

Review Comment:
   Adding missing check as called out above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to