mjsax commented on code in PR #15482:
URL: https://github.com/apache/kafka/pull/15482#discussion_r1697731647


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -1071,14 +1071,14 @@ public <VR, KO, VO> KTable<K, VR> leftJoin(final 
KTable<KO, VO> other,
         return doJoinOnForeignKey(other, foreignKeyExtractor, joiner, 
TableJoined.with(null, null), materialized, true);
     }
 
-    private final Function<Optional<Set<Integer>>, Integer> getPartition = 
maybeMulticastPartitions -> {
-        if (!maybeMulticastPartitions.isPresent()) {
-            return null;
+    private final Function<Optional<Set<Integer>>, Optional<Set<Integer>>> 
getPartition = maybeMulticastPartitions -> {
+        if (maybeMulticastPartitions == null || 
!maybeMulticastPartitions.isPresent()) {

Review Comment:
   Do we need this `null` check? Seems it's actually not allowed to return 
`null`?



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -1229,7 +1229,7 @@ private <VR, KO, VO> KTable<K, VR> 
doJoinOnForeignKey(final KTable<KO, VO> forei
 
         final StreamPartitioner<K, SubscriptionResponseWrapper<VO>> 
foreignResponseSinkPartitioner =
                 tableJoinedInternal.partitioner() == null
-                        ? (topic, key, subscriptionResponseWrapper, 
numPartitions) -> subscriptionResponseWrapper.getPrimaryPartition()
+                        ? (topic, key, subscriptionResponseWrapper, 
numPartitions) -> 
Optional.of(Collections.singleton(subscriptionResponseWrapper.getPrimaryPartition()))

Review Comment:
   Should we change `getPrimaryPartition` to return an `Optional` instead ?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java:
##########
@@ -1569,7 +1569,7 @@ private void assertNextOutputRecord(final 
TestRecord<String, String> record,
     }
 
     private StreamPartitioner<Object, Object> constantPartitioner(final 
Integer partition) {

Review Comment:
   Can we improve the generic types?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -153,7 +153,7 @@ public <K, V> void send(final String topic,
             }
             if (partitions.size() > 0) {
                 final Optional<Set<Integer>> maybeMulticastPartitions = 
partitioner.partitions(topic, key, value, partitions.size());
-                if (!maybeMulticastPartitions.isPresent()) {
+                if (maybeMulticastPartitions == null || 
!maybeMulticastPartitions.isPresent()) {

Review Comment:
   As above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to