mjsax commented on code in PR #18800:
URL: https://github.com/apache/kafka/pull/18800#discussion_r2162780239
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -100,14 +100,14 @@ public <K, V> KStream<K, V> stream(final
Collection<String> topics,
final String name = new
NamedInternal(consumed.name()).orElseGenerateWithPrefix(this,
KStreamImpl.SOURCE_NAME);
final StreamSourceNode<K, V> streamSourceNode = new
StreamSourceNode<>(name, topics, consumed);
+ streamSourceNode.requireRepartitionByKey();
Review Comment:
Not sure what this means? Ie, the name and semantics are not clear to me.
When we add a new `KStream` from a topic, the assumption is, that the `KStream`
is partitioned by key. So this operator does not "require" any repartitioning
(it just reads from a topic), and it does also not change the key (so
downstream repartitioning is also not required as it's not a key changing
operation).
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java:
##########
@@ -27,12 +27,17 @@
import java.util.Optional;
public abstract class GraphNode {
-
+ private enum Repartition {
+ NOT_REQUIRED,
+ BY_KEY_ONLY,
Review Comment:
Not sure what this means?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java:
##########
@@ -93,6 +98,14 @@ public String nodeName() {
return nodeName;
}
+ public boolean canResolveRepartition() {
+ return keyChangingOperation || repartition != Repartition.NOT_REQUIRED;
Review Comment:
The name of this method and it's logic is not clear to me? What does
"resolve" actually mean?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java:
##########
@@ -105,6 +118,14 @@ public boolean isMergeNode() {
return mergeNode;
}
+ public void requireRepartitionByKey() {
+ this.repartition = Repartition.BY_KEY_ONLY;
+ }
+
+ public void requireRepartitionAlways() {
+ this.repartition = Repartition.ALWAYS_REQUIRED;
Review Comment:
nit: avoid unnecessary `this.`
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java:
##########
@@ -105,6 +118,14 @@ public boolean isMergeNode() {
return mergeNode;
}
+ public void requireRepartitionByKey() {
+ this.repartition = Repartition.BY_KEY_ONLY;
Review Comment:
nit: avoid unnecessary `this.`
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -127,19 +127,19 @@ public class KStreamImpl<K, V> extends AbstractStream<K,
V> implements KStream<K
private static final String REPARTITION_NAME = "KSTREAM-REPARTITION-";
- private final boolean repartitionRequired;
-
private OptimizableRepartitionNode<K, V> repartitionNode;
KStreamImpl(final String name,
final Serde<K> keySerde,
final Serde<V> valueSerde,
final Set<String> subTopologySourceNodes,
- final boolean repartitionRequired,
final GraphNode graphNode,
final InternalStreamsBuilder builder) {
super(name, keySerde, valueSerde, subTopologySourceNodes, graphNode,
builder);
- this.repartitionRequired = repartitionRequired;
+ }
+
+ private boolean isRepartitionRequired() {
+ return this.builder.isRepartitionRequired(this.graphNode);
Review Comment:
nit: avoid unnecessary `this.` prefix
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java:
##########
@@ -27,12 +27,17 @@
import java.util.Optional;
public abstract class GraphNode {
-
+ private enum Repartition {
+ NOT_REQUIRED,
Review Comment:
Just to make sure I understand correct: Does `NOT_REQUIRED` mean that this
node is not a key-dependent operation, and it will never trigger a
repartitioning?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java:
##########
@@ -45,28 +45,28 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K, V>
implements KGroupedS
static final String AGGREGATE_NAME = "KSTREAM-AGGREGATE-";
private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
- final boolean repartitionRequired;
final String userProvidedRepartitionTopicName;
KGroupedStreamImpl(final String name,
final Set<String> subTopologySourceNodes,
final GroupedInternal<K, V> groupedInternal,
- final boolean repartitionRequired,
final GraphNode graphNode,
final InternalStreamsBuilder builder) {
super(name, groupedInternal.keySerde(), groupedInternal.valueSerde(),
subTopologySourceNodes, graphNode, builder);
- this.repartitionRequired = repartitionRequired;
this.userProvidedRepartitionTopicName = groupedInternal.name();
this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(
builder,
groupedInternal,
- repartitionRequired,
subTopologySourceNodes,
name,
graphNode
);
}
+ public boolean repartitionRequired() {
+ return builder.isRepartitionRequired(graphNode);
Review Comment:
Cf my commend above. This could be `graphNode.repartitionRequired()` is we
move the method. Or do I miss something?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -116,14 +116,14 @@ public <K, V> KStream<K, V> stream(final Pattern
topicPattern,
final ConsumedInternal<K, V> consumed) {
final String name = new
NamedInternal(consumed.name()).orElseGenerateWithPrefix(this,
KStreamImpl.SOURCE_NAME);
final StreamSourceNode<K, V> streamPatternSourceNode = new
StreamSourceNode<>(name, topicPattern, consumed);
+ streamPatternSourceNode.requireRepartitionByKey();
Review Comment:
Same question as above.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -531,6 +522,7 @@ private KStream<K, V> doRepartition(final Repartitioned<K,
V> repartitioned) {
);
final UnoptimizableRepartitionNode<K, V> unoptimizableRepartitionNode
= unoptimizableRepartitionNodeBuilder.build();
+ unoptimizableRepartitionNode.requireRepartitionByKey();
Review Comment:
Not sure about the naming and semantics of this method? We have a
repartition node at hand, which, well, always repartitions. So it does not care
about upstream repartition requirements, and returns a partitioned `KStream`
(so downstream, it's not necessary to repartition again.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1307,6 +1295,7 @@ public <KOut, VOut> KStream<KOut, VOut> process(
name,
new ProcessorParameters<>(processorSupplier, name),
stateStoreNames);
+ processNode.requireRepartitionAlways();
Review Comment:
Why "always"? A `process()` step is key-changing operation, to it
repartitioning might be required downstream, but only if a key-dependent
operator like groupByKey() or join() follows.
So should we just call `processNode.setKeyChangingOperation(true)` instead?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -614,6 +614,14 @@ private GraphNode getKeyChangingParentNode(final GraphNode
repartitionNode) {
return null;
}
+ protected boolean isRepartitionRequired(final GraphNode node) {
Review Comment:
If we want to know, if `node` requires repartitioning, why do we add this
method here? It seems to belong to `GraphNode` class?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java:
##########
@@ -27,12 +27,17 @@
import java.util.Optional;
public abstract class GraphNode {
-
+ private enum Repartition {
+ NOT_REQUIRED,
+ BY_KEY_ONLY,
+ ALWAYS_REQUIRED,
Review Comment:
What does "always" mean? We have key-dependent operators that require
repartitioning (if the key was changes upstream) -- but there is no operator
that would require auto-repartitioning if they key was not changed?
There is of course `repartition()` operator, but it does not require a "auto
repartition step" as it does repartition by itself.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java:
##########
@@ -93,6 +98,14 @@ public String nodeName() {
return nodeName;
}
+ public boolean canResolveRepartition() {
+ return keyChangingOperation || repartition != Repartition.NOT_REQUIRED;
+ }
+
+ public boolean isRepartitionRequired() {
+ return keyChangingOperation || repartition ==
Repartition.ALWAYS_REQUIRED;
Review Comment:
A key-changing operation by itself does not require repartitioning so not
clear about the "or" condition. Can you elaborate?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -516,11 +516,12 @@ public KStream<K, V> toStream(final Named named) {
name,
processorParameters
);
+ toStreamNode.requireRepartitionByKey();
Review Comment:
If we convert a `KTable` into a `KStream`, we know that the `KStream` is
partitioned by key (at least for non-windowed tables, this is true, but this
some other issue we might not want to address with this PR) -- so there what
information are we tracking here?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java:
##########
@@ -93,6 +98,14 @@ public String nodeName() {
return nodeName;
}
+ public boolean canResolveRepartition() {
+ return keyChangingOperation || repartition != Repartition.NOT_REQUIRED;
+ }
+
+ public boolean isRepartitionRequired() {
Review Comment:
nit: rename to `repartitioningRequired()` or `requiresRepartitioning()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]