mjsax commented on code in PR #18778:
URL: https://github.com/apache/kafka/pull/18778#discussion_r1940561921


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -587,31 +550,34 @@ public final void addProcessor(final String name,
     public final <KIn, VIn, VOut> void addProcessor(final String name,
                                                     final 
FixedKeyProcessorSupplier<KIn, VIn, VOut> supplier,
                                                     final String... 
predecessorNames) {
-        Objects.requireNonNull(name, "name must not be null");
-        Objects.requireNonNull(supplier, "supplier must not be null");
-        Objects.requireNonNull(predecessorNames, "predecessor names must not 
be null");
+        verifyName(name);
         ApiUtils.checkSupplier(supplier);
-        if (nodeFactories.containsKey(name)) {
-            throw new TopologyException("Processor " + name + " is already 
added.");
-        }
+        verifyParents(name, predecessorNames);
+
+        nodeFactories.put(name, new FixedKeyProcessorNodeFactory<>(name, 
predecessorNames, supplier));
+        nodeGrouper.add(name);
+        nodeGrouper.unite(name, predecessorNames);
+        nodeGroups = null;
+    }
+
+    private void verifyParents(final String processorName, final String... 
predecessorNames) {
+        Objects.requireNonNull(predecessorNames, "predecessorNames must not be 
null");
         if (predecessorNames.length == 0) {
-            throw new TopologyException("Processor " + name + " must have at 
least one parent");
+            throw new TopologyException("predecessorNames cannot be empty");
         }
 
         for (final String predecessor : predecessorNames) {
-            Objects.requireNonNull(predecessor, "predecessor name must not be 
null");
-            if (predecessor.equals(name)) {
-                throw new TopologyException("Processor " + name + " cannot be 
a predecessor of itself.");

Review Comment:
   To align to the new JaveDocs, move this further below and changes the error 
message.
   
   We could keep it this way, but if we do, we should update the JavaDocs, and 
they are already quite long for `TopologyException`, so I thought it would be 
simpler to change the internal code, to not complicate the JavaDocs even more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to