mjsax commented on code in PR #15740:
URL: https://github.com/apache/kafka/pull/15740#discussion_r1671666407


##########
streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java:
##########
@@ -439,6 +439,87 @@ public void 
shouldPerformSelectKeyWithRepartitionOperation(final String topology
         assertEquals(1, countOccurrencesInTopology(topology, "Sink: 
.*-repartition.*"));
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {StreamsConfig.OPTIMIZE, 
StreamsConfig.NO_OPTIMIZATION})
+    public void 
shouldNotRepartitionWithMarkAsPartitionedFollowingSelectKey(final String 
topologyOptimization) throws Exception {
+        final long timestamp = System.currentTimeMillis();
+
+        sendEvents(
+                timestamp,
+                Arrays.asList(
+                        new KeyValue<>(1, "10"),
+                        new KeyValue<>(2, "20")
+                )
+        );
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.stream(inputTopic, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+                .selectKey((key, value) -> Integer.valueOf(value))
+                .markAsPartitioned(Named.as("partition-preserved"))
+                .groupByKey()
+                .count()
+                .toStream()
+                .to(outputTopic);
+
+
+        startStreams(builder, createStreamsConfig(topologyOptimization));
+
+        validateReceivedMessages(
+                new IntegerDeserializer(),
+                new LongDeserializer(),
+                Arrays.asList(
+                        new KeyValue<>(10, 1L),
+                        new KeyValue<>(20, 1L)
+                )
+        );
+
+        final String topology = builder.build().describe().toString();
+
+        assertEquals(0, countOccurrencesInTopology(topology, "Sink: 
.*-repartition.*"));
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {StreamsConfig.OPTIMIZE, 
StreamsConfig.NO_OPTIMIZATION})
+    public void shouldNotRepartitionWithMarkAsPartitionedFollowingMap(final 
String topologyOptimization) throws Exception {
+        final String topicBMapperName = "topic-b-mapper";
+        final long timestamp = System.currentTimeMillis();
+
+        sendEvents(
+                timestamp,
+                Arrays.asList(
+                        new KeyValue<>(1, "10"),
+                        new KeyValue<>(2, "20")
+                )
+        );
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        builder.stream(inputTopic, Consumed.with(Serdes.Integer(), 
Serdes.String()))
+                .map(KeyValue::new, Named.as(topicBMapperName))

Review Comment:
   This seems to be pretty much the same as the test above? Do we gain much 
adding it with regard to "useful" test coverage?
   
   It might be more interesting to add a "branching" case, as well as a 
"downstream" key-changing operation:
   ```
   // branching
   KStream s = builder.stream(...).map(...)
   
   s.markAsPartitioned(Named.as("partition-preserved"))
    .groupByKey()
    .count()
    .toStream()
    .to(outputTopic);
   
   // re-using `s` here, "branches / fans-out" the topology, and because we 
call `groupByKey()` directly on `s` we should get a repartition topic here (we 
don't see the `markAsPartitions()` operation from above, and it should not 
impact `s` itself.
   s.groupByKey()
    .count()
    .toStream()
    .to(outputTopic2);
    
    
   // downstream map
   KStream s = builder.stream(...)
       .map(...)
       .markAsPartitioned(Named.as("partition-preserved"))
       .map(...) // insert a second map -- this should make the upstream 
`markAsPartitioned()` void and we should get a repartition topic again
       .groupByKey()
       .count()
       .toStream()
       .to(outputTopic);
       
   // maybe also: branch after `markAsPartitions`
   KStream s = builder.stream(...)
       .map(...)
       .markAsPartitioned(Named.as("partition-preserved"));
   
   // map() after "markAsPartitios in this branch -- should trigger 
repartitoning
   s.map(...)
    .groupByKey()
    .count()
    .toStream()
    .to(outputTopic);
   
   // no reparation topic in this branch
   s.groupByKey()
    .count()
    .toStream()
    .to(outputTopic2);
   ```



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1624,4 +1624,28 @@ public <VOut> KStream<K, VOut> processValues(
             processNode,
             builder);
     }
+
+    @Override
+    public KStream<K, V> markAsPartitioned(final Named named) {
+
+        final String name = new 
NamedInternal(named).orElseGenerateWithPrefix(builder, PARTITION_PRESERVE_NAME);
+
+        final ProcessorParameters<? super K, ? super V, ?, ?> 
processorParameters =
+                new ProcessorParameters<>(new PassThrough<>(), 
PARTITION_PRESERVE_NAME + name);
+
+        final PartitionPreservingNode<? super K, ? super V> 
partitionPreservingNode = new PartitionPreservingNode<>(
+                processorParameters,
+                PARTITION_PRESERVE_NAME + name);

Review Comment:
   As above. Passing just `name` should be sufficient to avoid double prefixing 
with `PARTITION_PRESERVE_NAME` 



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1616,4 +1623,25 @@ public <VOut> KStream<K, VOut> processValues(
             processNode,
             builder);
     }
+
+    @Override
+    public KStream<K, V> markAsPartitioned() {
+        final ProcessorParameters<? super K, ? super V, ?, ?> 
processorParameters =
+                new ProcessorParameters<>(new PassThrough<>(), 
PARTITION_PRESERVE_NAME + name);

Review Comment:
   I think we should only pass in `name` but not `PARTITION_PRESERVE_NAME + 
name` -- if a name is set by the user, we would just use the user's specified 
name, and if not set, we generate a name via `orElseGenerateWithPrefix` already 
which will use `PARTITION_PRESERVE_NAME` as prefix for the generated name 
already.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/PartitionPreservingNode.java:
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals.graph;
+
+/**
+ * Inform downstream operations to preserve the partition, i.e. does not 
trigger repartition.
+ */
+public class PartitionPreservingNode<K, V> extends ProcessorGraphNode<K, V> {
+
+    private String nodeName;
+
+    public PartitionPreservingNode(final ProcessorParameters<K, V, ?, ?> 
processorParameters, final String nodeName) {
+        super(processorParameters);
+        this.nodeName = nodeName;
+    }
+
+    @Override
+    public boolean isKeyChangingOperation() {
+        return false;
+    }
+
+    @Override
+    public void keyChangingOperation(final boolean keyChangingOperation) {
+        // can not change this to a key changing operation
+        if (keyChangingOperation) {
+            throw new IllegalArgumentException("Cannot set a 
PartitionPreservingNode to key changing");
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "PartitionPreservingNode{" +
+                "nodeName='" + nodeName + '}';

Review Comment:
   We should also include `super.toString()` somehow? Other class just add ` + 
super.toString();` to the end.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -796,7 +797,6 @@ public <KR> KGroupedStream<KR, V> groupBy(final 
KeyValueMapper<? super K, ? supe
 
         final GroupedInternal<KR, V> groupedInternal = new 
GroupedInternal<>(grouped);
         final ProcessorGraphNode<K, V> selectKeyMapNode = 
internalSelectKey(keySelector, new NamedInternal(groupedInternal.name()));
-        selectKeyMapNode.keyChangingOperation(true);

Review Comment:
   Why do we remove this? `groupBy()` does set a new key (in contrast to 
`groupByKey()`) and thus it is a key-changing operation.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -685,6 +685,41 @@ <VR> KStream<K, VR> flatMapValues(final ValueMapper<? 
super V, ? extends Iterabl
     <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? 
super V, ? extends Iterable<? extends VR>> mapper,
                                       final Named named);
 
+    /**
+     * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+     * and does not require further repartitioning by downstream key changing 
operations.
+     * <p>
+     * Note that {@link KStream#markAsPartitioned()} SHOULD NOT be used with 
interactive query(IQ) or {@link KStream#join}.
+     * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+     * However, if the repartitions were cancelled, records stayed in their 
original partition by its original key. IQ or joins
+     * assumes and uses the composite key instead of the original key.

Review Comment:
   Thanks. I think the composite key thing was more or less an example only. It 
might not be idea to put into the JavaDocs?
   
   I would not necessarily put a full explanation into the JavaDocs -- if we 
think we want to cover more details, lets add a detailed section into the docs 
instead.
   
   How about this:
   ```
       /**
        * Marks the {@code KStream} as partitioned, signaling that this {@code 
KStream} is partitioned as intended,
        * and that it does not require further repartitioning by downstream key 
dependent operations.
        * Note that this method SHOULD NOT be used if downstream state stores 
are accessed via interactive queries (IQ) or downstream joins.
        * Both IQ and joins are expected to not work as intended if 
repartitioning is canceled.
        * This method may be used with care for downstream aggregations (ie, 
{@link #groupByKey()) only.
        * <p>
        * This method will overwrite a default behavior as described below.
        * By default, Kafka Streams automatically repartition the records to 
prepare for a stateful operation if necessary,
        * however, it is not always required when input stream is partitioned 
as intended. As an example,
        * if an input stream is partitioned by a String key1, calling the below 
function will trigger a repartition:
        *
        * <pre>{@code
        *     KStream<String, String> inputStream = builder.stream("topic");
        *     stream
        *       .selectKey( ... => (key1, metric))
        *       .groupByKey()
        *       .aggregate()
        * }</pre>
        *
        * You can then overwrite the default behavior by calling this method:
        * <pre>{@code
        *     stream
        *       .selectKey( ... => (key1, metric))
        *       .markAsPartitioned(Named.as(...))
        *       .groupByKey()
        *       .aggregate()
        * }</pre>
        *
        * @return a new {@code KStream} instance that will not repartition in 
subsequent operations.
        */
   ```



##########
streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java:
##########
@@ -439,6 +439,87 @@ public void 
shouldPerformSelectKeyWithRepartitionOperation(final String topology
         assertEquals(1, countOccurrencesInTopology(topology, "Sink: 
.*-repartition.*"));
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {StreamsConfig.OPTIMIZE, 
StreamsConfig.NO_OPTIMIZATION})
+    public void 
shouldNotRepartitionWithMarkAsPartitionedFollowingSelectKey(final String 
topologyOptimization) throws Exception {

Review Comment:
   Just wondering what test coverage we need? Are (slow) integration tests 
necessary? Or would (faster) unit test be sufficient?
   
   In the end, it might be sufficient to just verify the "structure" of the 
generated topology, and it it contains a repartition topic or not? (Cf 
`TopologyTest.java` -- would adding some unit test there be sufficient and just 
compare `TopologyDescription`?)
   
   Thoughts?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -685,6 +685,39 @@ <VR> KStream<K, VR> flatMapValues(final ValueMapper<? 
super V, ? extends Iterabl
     <VR> KStream<K, VR> flatMapValues(final ValueMapperWithKey<? super K, ? 
super V, ? extends Iterable<? extends VR>> mapper,
                                       final Named named);
 
+    /**
+     * Marking the {@code KStream} as partitioned signals the stream is 
partitioned as intended,
+     * and does not require further repartitioning by downstream key dependent 
operations.
+     * Note that this method SHOULD NOT be used with interactive query(IQ) or 
{@link KStream#join}.
+     * For reasons that when repartitions happen, records are physically 
shuffled by a composite key defined in the stateful operation.
+     * However, if the repartitions were cancelled, records stayed in their 
original partition by its original key. IQ or joins
+     * assumes and uses the composite key instead of the original key.
+     * This method will overwrite a default behavior as described below.
+     * By default, Kafka Streams always automatically repartition the records 
to prepare for a stateful operation,
+     * however, it is not always required when input stream is partitioned as 
intended. As an example,
+     * if an input stream is partitioned by a String key1, calling the below 
function will trigger a repartition:
+     *
+     * <pre>{@code
+     *     KStream<String, String> inputStream = builder.stream("topic");
+     *     stream
+     *       .selectKey( ... => (key1, metric))
+     *       .groupByKey()
+     *       .aggregate()
+     * }</pre>
+     *
+     * You can then overwrite the default behavior by calling this method:
+     * <pre>{@code
+     *     stream
+     *       .selectKey( ... => (key1, metric))
+     *       .markAsPartitioned(Named.as(...))
+     *       .groupByKey()
+     *       .aggregate()
+     * }</pre>
+     *
+     * @return a new {@code KStream} instance that will not repartition in 
subsequent operations.
+     */
+    KStream<K, V> markAsPartitioned(Named named);

Review Comment:
   In the KIP itself, we only proposed to add `markAsPartitioned()`, w/o the 
`Named` parameter -- I believe we should have both overloads w/ and w/o `Named` 
parameter



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to