mjsax commented on code in PR #15482:
URL: https://github.com/apache/kafka/pull/15482#discussion_r1697731647
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -1071,14 +1071,14 @@ public <VR, KO, VO> KTable<K, VR> leftJoin(final
KTable<KO, VO> other,
return doJoinOnForeignKey(other, foreignKeyExtractor, joiner,
TableJoined.with(null, null), materialized, true);
}
- private final Function<Optional<Set<Integer>>, Integer> getPartition =
maybeMulticastPartitions -> {
- if (!maybeMulticastPartitions.isPresent()) {
- return null;
+ private final Function<Optional<Set<Integer>>, Optional<Set<Integer>>>
getPartition = maybeMulticastPartitions -> {
+ if (maybeMulticastPartitions == null ||
!maybeMulticastPartitions.isPresent()) {
Review Comment:
Do we need this `null` check? Seems it's actually not allowed to return
`null`?
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java:
##########
@@ -1229,7 +1229,7 @@ private <VR, KO, VO> KTable<K, VR>
doJoinOnForeignKey(final KTable<KO, VO> forei
final StreamPartitioner<K, SubscriptionResponseWrapper<VO>>
foreignResponseSinkPartitioner =
tableJoinedInternal.partitioner() == null
- ? (topic, key, subscriptionResponseWrapper,
numPartitions) -> subscriptionResponseWrapper.getPrimaryPartition()
+ ? (topic, key, subscriptionResponseWrapper,
numPartitions) ->
Optional.of(Collections.singleton(subscriptionResponseWrapper.getPrimaryPartition()))
Review Comment:
Should we change `getPrimaryPartition` to return an `Optional` instead ?
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java:
##########
@@ -1569,7 +1569,7 @@ private void assertNextOutputRecord(final
TestRecord<String, String> record,
}
private StreamPartitioner<Object, Object> constantPartitioner(final
Integer partition) {
Review Comment:
Can we improve the generic types?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -153,7 +153,7 @@ public <K, V> void send(final String topic,
}
if (partitions.size() > 0) {
final Optional<Set<Integer>> maybeMulticastPartitions =
partitioner.partitions(topic, key, value, partitions.size());
- if (!maybeMulticastPartitions.isPresent()) {
+ if (maybeMulticastPartitions == null ||
!maybeMulticastPartitions.isPresent()) {
Review Comment:
As above.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]