bbejeck commented on code in PR #18778:
URL: https://github.com/apache/kafka/pull/18778#discussion_r1943825510
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -701,6 +679,8 @@ public final <KIn, VIn> void addGlobalStore(final String
sourceName,
nodeGrouper.add(processorName);
nodeGrouper.unite(processorName, predecessors);
globalStateBuilders.put(storeFactory.storeName(), storeFactory);
+ // connect source topic as (read-only) changelog topic for
fault-tolerance
Review Comment:
```suggestion
// connect the source topic as (read-only) changelog topic for
fault-tolerance
```
##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -1016,82 +775,86 @@ public synchronized <KIn, VIn> Topology
addReadOnlyStateStore(final StoreBuilder
);
internalTopologyBuilder.addProcessor(processorName,
stateUpdateSupplier, sourceName);
internalTopologyBuilder.addStateStore(storeBuilder, processorName);
+
+ // connect source topic as (read-only) changelog topic for
fault-tolerance
Review Comment:
```suggestion
// connect the source topic as (read-only) changelog topic for
fault-tolerance
```
##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -119,10 +119,14 @@ private static AutoOffsetResetInternal
convertOldToNew(final Topology.AutoOffset
* @return itself
*
* @throws TopologyException
- * if the provided source name is not unique, or
- * if topics have already been registered by another source,
+ * if the provided source name is not unique,
+ * no topics are specified, or
+ * a topic has already been registered by another source,
* {@link #addReadOnlyStateStore(StoreBuilder, String,
Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state
store}, or
* {@link #addGlobalStore(StoreBuilder, String, Deserializer,
Deserializer, String, String, ProcessorSupplier) global state store}
+ * @throws NullPointerException
+ * if {@code name} or {@code topics} is {@code null}, or
+ * {@code topics} contains a {@code null} topic
Review Comment:
I'm ok with it
##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -677,10 +662,10 @@ public synchronized <KIn, VIn, KOut, VOut> Topology
addProcessor(final String na
*
* @throws TopologyException
* if the {@link StoreBuilder#name() state store} was already
added, or
- * if a processor name is unknown
+ * if a processor name is unknown or specifies a source or sink
Review Comment:
Good call
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]