mjsax commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1671666407
##########
streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java:
##########
@@ -439,6 +439,87 @@ public void
shouldPerformSelectKeyWithRepartitionOperation(final String topology
assertEquals(1, countOccurrencesInTopology(topology, "Sink:
.*-repartition.*"));
}
+ @ParameterizedTest
+ @ValueSource(strings = {StreamsConfig.OPTIMIZE,
StreamsConfig.NO_OPTIMIZATION})
+ public void
shouldNotRepartitionWithMarkAsPartitionedFollowingSelectKey(final String
topologyOptimization) throws Exception {
+ final long timestamp = System.currentTimeMillis();
+
+ sendEvents(
+ timestamp,
+ Arrays.asList(
+ new KeyValue<>(1, "10"),
+ new KeyValue<>(2, "20")
+ )
+ );
+
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ builder.stream(inputTopic, Consumed.with(Serdes.Integer(),
Serdes.String()))
+ .selectKey((key, value) -> Integer.valueOf(value))
+ .markAsPartitioned(Named.as("partition-preserved"))
+ .groupByKey()
+ .count()
+ .toStream()
+ .to(outputTopic);
+
+
+ startStreams(builder, createStreamsConfig(topologyOptimization));
+
+ validateReceivedMessages(
+ new IntegerDeserializer(),
+ new LongDeserializer(),
+ Arrays.asList(
+ new KeyValue<>(10, 1L),
+ new KeyValue<>(20, 1L)
+ )
+ );
+
+ final String topology = builder.build().describe().toString();
+
+ assertEquals(0, countOccurrencesInTopology(topology, "Sink:
.*-repartition.*"));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {StreamsConfig.OPTIMIZE,
StreamsConfig.NO_OPTIMIZATION})
+ public void shouldNotRepartitionWithMarkAsPartitionedFollowingMap(final
String topologyOptimization) throws Exception {
+ final String topicBMapperName = "topic-b-mapper";
+ final long timestamp = System.currentTimeMillis();
+
+ sendEvents(
+ timestamp,
+ Arrays.asList(
+ new KeyValue<>(1, "10"),
+ new KeyValue<>(2, "20")
+ )
+ );
+
+ final StreamsBuilder builder = new StreamsBuilder();
+
+ builder.stream(inputTopic, Consumed.with(Serdes.Integer(),
Serdes.String()))
+ .map(KeyValue::new, Named.as(topicBMapperName))
Review Comment:
This seems to be pretty much the same as the test above? Do we gain much
adding it with regard to "useful" test coverage?
It might be more interesting to add a "branching" case, as well as a
"downstream" key-changing operation:
```
// branching
KStream s = builder.stream(...).map(...)
s.markAsPartitioned(Named.as("partition-preserved"))
.groupByKey()
.count()
.toStream()
.to(outputTopic);
// re-using `s` here, "branches / fans-out" the topology, and because we
call `groupByKey()` directly on `s` we should get a repartition topic here (we
don't see the `markAsPartitions()` operation from above, and it should not
impact `s` itself.
s.groupByKey()
.count()
.toStream()
.to(outputTopic2);
// downstream map
KStream s = builder.stream(...)
.map(...)
.markAsPartitioned(Named.as("partition-preserved"))
.map(...) // insert a second map -- this should make the upstream
`markAsPartitioned()` void and we should get a repartition topic again
.groupByKey()
.count()
.toStream()
.to(outputTopic);
// maybe also: branch after `markAsPartitions`
KStream s = builder.stream(...)
.map(...)
.markAsPartitioned(Named.as("partition-preserved"));
// map() after "markAsPartitios in this branch -- should trigger
repartitoning
s.map(...)
.groupByKey()
.count()
.toStream()
.to(outputTopic);
// no reparation topic in this branch
s.groupByKey()
.count()
.toStream()
.to(outputTopic2);
```
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1624,4 +1624,28 @@ public <VOut> KStream<K, VOut> processValues(
processNode,
builder);
}
+
+ @Override
+ public KStream<K, V> markAsPartitioned(final Named named) {
+
+ final String name = new
NamedInternal(named).orElseGenerateWithPrefix(builder, PARTITION_PRESERVE_NAME);
+
+ final ProcessorParameters<? super K, ? super V, ?, ?>
processorParameters =
+ new ProcessorParameters<>(new PassThrough<>(),
PARTITION_PRESERVE_NAME + name);
+
+ final PartitionPreservingNode<? super K, ? super V>
partitionPreservingNode = new PartitionPreservingNode<>(
+ processorParameters,
+ PARTITION_PRESERVE_NAME + name);
Review Comment:
As above. Passing just `name` should be sufficient to avoid double prefixing
with `PARTITION_PRESERVE_NAME`
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1616,4 +1623,25 @@ public <VOut> KStream<K, VOut> processValues(
processNode,
builder);
}
+
+ @Override
+ public KStream<K, V> markAsPartitioned() {
+ final ProcessorParameters<? super K, ? super V, ?, ?>
processorParameters =
+ new ProcessorParameters<>(new PassThrough<>(),
PARTITION_PRESERVE_NAME + name);
Review Comment:
I think we should only pass in `name` but not `PARTITION_PRESERVE_NAME +
name` -- if a name is set by the user, we would just use the user's specified
name, and if not set, we generate a name via `orElseGenerateWithPrefix` already
which will use `PARTITION_PRESERVE_NAME` as prefix for the generated name
already.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/PartitionPreservingNode.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.graph;
+
+/**
+ * Inform downstream operations to preserve the partition, i.e. does not
trigger repartition.
+ */
+public class PartitionPreservingNode<K, V> extends ProcessorGraphNode<K, V> {
+
+ private String nodeName;
+
+ public PartitionPreservingNode(final ProcessorParameters<K, V, ?, ?>
processorParameters, final String nodeName) {
+ super(processorParameters);
+ this.nodeName = nodeName;
+ }
+
+ @Override
+ public boolean isKeyChangingOperation() {
+ return false;
+ }
+
+ @Override
+ public void keyChangingOperation(final boolean keyChangingOperation) {
+ // can not change this to a key changing operation
+ if (keyChangingOperation) {
+ throw new IllegalArgumentException("Cannot set a
PartitionPreservingNode to key changing");
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "PartitionPreservingNode{" +
+ "nodeName='" + nodeName + '}';
Review Comment:
We should also include `super.toString()` somehow? Other class just add ` +
super.toString();` to the end.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -796,7 +797,6 @@ public <KR> KGroupedStream<KR, V> groupBy(final
KeyValueMapper<? super K, ? supe
final GroupedInternal<KR, V> groupedInternal = new
GroupedInternal<>(grouped);
final ProcessorGraphNode<K, V> selectKeyMapNode =
internalSelectKey(keySelector, new NamedInternal(groupedInternal.name()));
- selectKeyMapNode.keyChangingOperation(true);
Review Comment:
Why do we remove this? `groupBy()` does set a new key (in contrast to
`groupByKey()`) and thus it is a key-changing operation.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -685,6 +685,41 @@ <VR> KStream<K, VR> flatMapValues(final ValueMapper<?
super V, ? extends Iterabl
<VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ?
super V, ? extends Iterable<? extends VR>> mapper,
final Named named);
+ /**
+ * Marking the {@code KStream} as partitioned signals the stream is
partitioned as intended,
+ * and does not require further repartitioning by downstream key changing
operations.
+ * <p>
+ * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with
interactive query(IQ) or {@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in their
original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.
Review Comment:
Thanks. I think the composite key thing was more or less an example only. It
might not be idea to put into the JavaDocs?
I would not necessarily put a full explanation into the JavaDocs -- if we
think we want to cover more details, lets add a detailed section into the docs
instead.
How about this:
```
/**
* Marks the {@code KStream} as partitioned, signaling that this {@code
KStream} is partitioned as intended,
* and that it does not require further repartitioning by downstream key
dependent operations.
* Note that this method SHOULD NOT be used if downstream state stores
are accessed via interactive queries (IQ) or downstream joins.
* Both IQ and joins are expected to not work as intended if
repartitioning is canceled.
* This method may be used with care for downstream aggregations (ie,
{@link #groupByKey()) only.
* <p>
* This method will overwrite a default behavior as described below.
* By default, Kafka Streams automatically repartition the records to
prepare for a stateful operation if necessary,
* however, it is not always required when input stream is partitioned
as intended. As an example,
* if an input stream is partitioned by a String key1, calling the below
function will trigger a repartition:
*
* <pre>{@code
* KStream<String, String> inputStream = builder.stream("topic");
* stream
* .selectKey( ... => (key1, metric))
* .groupByKey()
* .aggregate()
* }</pre>
*
* You can then overwrite the default behavior by calling this method:
* <pre>{@code
* stream
* .selectKey( ... => (key1, metric))
* .markAsPartitioned(Named.as(...))
* .groupByKey()
* .aggregate()
* }</pre>
*
* @return a new {@code KStream} instance that will not repartition in
subsequent operations.
*/
```
##########
streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java:
##########
@@ -439,6 +439,87 @@ public void
shouldPerformSelectKeyWithRepartitionOperation(final String topology
assertEquals(1, countOccurrencesInTopology(topology, "Sink:
.*-repartition.*"));
}
+ @ParameterizedTest
+ @ValueSource(strings = {StreamsConfig.OPTIMIZE,
StreamsConfig.NO_OPTIMIZATION})
+ public void
shouldNotRepartitionWithMarkAsPartitionedFollowingSelectKey(final String
topologyOptimization) throws Exception {
Review Comment:
Just wondering what test coverage we need? Are (slow) integration tests
necessary? Or would (faster) unit test be sufficient?
In the end, it might be sufficient to just verify the "structure" of the
generated topology, and it it contains a repartition topic or not? (Cf
`TopologyTest.java` -- would adding some unit test there be sufficient and just
compare `TopologyDescription`?)
Thoughts?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -685,6 +685,39 @@ <VR> KStream<K, VR> flatMapValues(final ValueMapper<?
super V, ? extends Iterabl
<VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ?
super V, ? extends Iterable<? extends VR>> mapper,
final Named named);
+ /**
+ * Marking the {@code KStream} as partitioned signals the stream is
partitioned as intended,
+ * and does not require further repartitioning by downstream key dependent
operations.
+ * Note that this method SHOULD NOT be used with interactive query(IQ) or
{@link KStream#join}.
+ * For reasons that when repartitions happen, records are physically
shuffled by a composite key defined in the stateful operation.
+ * However, if the repartitions were cancelled, records stayed in their
original partition by its original key. IQ or joins
+ * assumes and uses the composite key instead of the original key.
+ * This method will overwrite a default behavior as described below.
+ * By default, Kafka Streams always automatically repartition the records
to prepare for a stateful operation,
+ * however, it is not always required when input stream is partitioned as
intended. As an example,
+ * if an input stream is partitioned by a String key1, calling the below
function will trigger a repartition:
+ *
+ * <pre>{@code
+ * KStream<String, String> inputStream = builder.stream("topic");
+ * stream
+ * .selectKey( ... => (key1, metric))
+ * .groupByKey()
+ * .aggregate()
+ * }</pre>
+ *
+ * You can then overwrite the default behavior by calling this method:
+ * <pre>{@code
+ * stream
+ * .selectKey( ... => (key1, metric))
+ * .markAsPartitioned(Named.as(...))
+ * .groupByKey()
+ * .aggregate()
+ * }</pre>
+ *
+ * @return a new {@code KStream} instance that will not repartition in
subsequent operations.
+ */
+ KStream<K, V> markAsPartitioned(Named named);
Review Comment:
In the KIP itself, we only proposed to add `markAsPartitioned()`, w/o the
`Named` parameter -- I believe we should have both overloads w/ and w/o `Named`
parameter
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]