mjsax commented on code in PR #18778:
URL: https://github.com/apache/kafka/pull/18778#discussion_r1940563422
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -507,46 +510,23 @@ public final <K, V> void addSink(final String name,
final Serializer<V> valSerializer,
final StreamPartitioner<? super K, ?
super V> partitioner,
final String... predecessorNames) {
- Objects.requireNonNull(name, "name must not be null");
- Objects.requireNonNull(topic, "topic must not be null");
- Objects.requireNonNull(predecessorNames, "predecessor names must not
be null");
- if (predecessorNames.length == 0) {
- throw new TopologyException("Sink " + name + " must have at least
one parent");
- }
+ verifyName(name);
+ Objects.requireNonNull(topic, "topic cannot be null");
+ verifyParents(name, predecessorNames);
addSink(name, new StaticTopicNameExtractor<>(topic), keySerializer,
valSerializer, partitioner, predecessorNames);
nodeToSinkTopic.put(name, topic);
- nodeGroups = null;
}
public final <K, V> void addSink(final String name,
- final TopicNameExtractor<K, V>
topicExtractor,
+ final TopicNameExtractor<? super K, ?
super V> topicExtractor,
final Serializer<K> keySerializer,
final Serializer<V> valSerializer,
final StreamPartitioner<? super K, ?
super V> partitioner,
final String... predecessorNames) {
- Objects.requireNonNull(name, "name must not be null");
- Objects.requireNonNull(topicExtractor, "topic extractor must not be
null");
- Objects.requireNonNull(predecessorNames, "predecessor names must not
be null");
- if (nodeFactories.containsKey(name)) {
- throw new TopologyException("Processor " + name + " is already
added.");
- }
- if (predecessorNames.length == 0) {
- throw new TopologyException("Sink " + name + " must have at least
one parent");
- }
-
- for (final String predecessor : predecessorNames) {
- Objects.requireNonNull(predecessor, "predecessor name can't be
null");
- if (predecessor.equals(name)) {
- throw new TopologyException("Processor " + name + " cannot be
a predecessor of itself.");
- }
- if (!nodeFactories.containsKey(predecessor)) {
- throw new TopologyException("Predecessor processor " +
predecessor + " is not added yet.");
- }
- if (nodeToSinkTopic.containsKey(predecessor)) {
- throw new TopologyException("Sink " + predecessor + " cannot
be used a parent.");
Review Comment:
This "sink" check was actually no don't everywhere. Pulling it into the
`verifyParent` method closes these gaps and fixes it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]