bbejeck commented on code in PR #18722:
URL: https://github.com/apache/kafka/pull/18722#discussion_r1933030988
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractStream.java:
##########
@@ -91,20 +91,20 @@ Set<String> ensureCopartitionWith(final Collection<?
extends AbstractStream<K, ?
return allSourceNodes;
}
- static <T2, T1, R> ValueJoiner<T2, T1, R> reverseJoiner(final
ValueJoiner<T1, T2, R> joiner) {
+ static <VRight, VLeft, VOut> ValueJoiner<VRight, VLeft, VOut>
reverseJoiner(final ValueJoiner<VLeft, VRight, VOut> joiner) {
return (value2, value1) -> joiner.apply(value1, value2);
}
- static <K, T2, T1, R> ValueJoinerWithKey<K, T2, T1, R>
reverseJoinerWithKey(final ValueJoinerWithKey<K, T1, T2, R> joiner) {
+ static <K, VRight, VLeft, VOut> ValueJoinerWithKey<K, VRight, VLeft, VOut>
reverseJoinerWithKey(final ValueJoinerWithKey<K, VLeft, VRight, VOut> joiner) {
return (key, value2, value1) -> joiner.apply(key, value1, value2);
}
- static <K, V, VR> ValueMapperWithKey<K, V, VR> withKey(final
ValueMapper<V, VR> valueMapper) {
+ static <K, V, VOut> ValueMapperWithKey<K, V, VOut> withKey(final
ValueMapper<V, VOut> valueMapper) {
Objects.requireNonNull(valueMapper, "valueMapper can't be null");
return (readOnlyKey, value) -> valueMapper.apply(value);
}
- static <K, V1, V2, VR> ValueJoinerWithKey<K, V1, V2, VR>
toValueJoinerWithKey(final ValueJoiner<V1, V2, VR> valueJoiner) {
+ static <K, VLeft, VRight, VOut> ValueJoinerWithKey<K, VLeft, VRight, VOut>
toValueJoinerWithKey(final ValueJoiner<VLeft, VRight, VOut> valueJoiner) {
Review Comment:
All the other methods have generics of `? super` or `? extends` why not here
- asking more more my own education.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java:
##########
@@ -3062,7 +3062,7 @@ <GK, GV, RV> KStream<K, RV> leftJoin(final
GlobalKTable<GK, GV> globalTable,
* @see #map(KeyValueMapper)
*/
<KOut, VOut> KStream<KOut, VOut> process(
- final ProcessorSupplier<? super K, ? super V, KOut, VOut>
processorSupplier,
+ final ProcessorSupplier<? super K, ? super V, ? extends KOut, ?
extends VOut> processorSupplier,
Review Comment:
I think given that we have current usage of `? super/extend X` in the code
base, we don't need a KIP.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1103,26 +1108,28 @@ public <KG, VG, VR> KStream<K, VR> leftJoin(final
GlobalKTable<KG, VG> globalTab
return globalTableJoin(globalTable, keySelector, joiner, true, named);
}
- private <KG, VG, VR> KStream<K, VR> globalTableJoin(final GlobalKTable<KG,
VG> globalTable,
- final KeyValueMapper<?
super K, ? super V, ? extends KG> keySelector,
- final
ValueJoinerWithKey<? super K, ? super V, ? super VG, ? extends VR> joiner,
- final boolean leftJoin,
- final Named named) {
+ private <KGlobalTable, VGlobalTable, VOut> KStream<K, VOut>
globalTableJoin(
Review Comment:
nit would `GlobalTableK` be more descriptive than `KGlobalTable` ? same for
`VGlobalTable`
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -153,9 +153,9 @@ public KStream<K, V> filter(final Predicate<? super K, ?
super V> predicate,
Objects.requireNonNull(named, "named can't be null");
final String name = new
NamedInternal(named).orElseGenerateWithPrefix(builder, FILTER_NAME);
- final ProcessorParameters<? super K, ? super V, ?, ?>
processorParameters =
+ final ProcessorParameters<K, V, K, V> processorParameters =
Review Comment:
why the changes to the generic types? For example, from `? super K` to `K`
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -248,9 +248,9 @@ public <KR, VR> KStream<KR, VR> map(final KeyValueMapper<?
super K, ? super V, ?
Objects.requireNonNull(named, "named can't be null");
final String name = new
NamedInternal(named).orElseGenerateWithPrefix(builder, MAP_NAME);
- final ProcessorParameters<? super K, ? super V, ?, ?>
processorParameters =
+ final ProcessorParameters<K, V, KR, VR> processorParameters =
Review Comment:
And here from `?` to `KR`
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -906,13 +910,14 @@ private KStreamImpl<K, V> repartitionForJoin(final String
repartitionName,
builder);
}
- static <K1, V1, RN extends BaseRepartitionNode<K1, V1>> String
createRepartitionedSource(final InternalStreamsBuilder builder,
-
final Serde<K1> keySerde,
-
final Serde<V1> valueSerde,
-
final String repartitionTopicNamePrefix,
-
final StreamPartitioner<K1, V1> streamPartitioner,
-
final BaseRepartitionNodeBuilder<K1, V1, RN>
baseRepartitionNodeBuilder) {
-
+ static <KStream, VStream, RepartitionNode extends
BaseRepartitionNode<KStream, VStream>> String createRepartitionedSource(
+ final InternalStreamsBuilder builder,
Review Comment:
why `KStream` and `VStream`? Are there two streams involved here? It's been
a while since I've looked at this part of the code.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]