gharris1727 commented on code in PR #15601:
URL: https://github.com/apache/kafka/pull/15601#discussion_r1540138631
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/LeftOrRightValue.java:
##########
@@ -39,45 +39,6 @@ private LeftOrRightValue(final V1 leftValue, final V2
rightValue) {
this.rightValue = rightValue;
}
- /**
- * Create a new {@link LeftOrRightValue} instance with the V1 value as
{@code leftValue} and
- * V2 value as null.
- *
- * @param leftValue the left V1 value
- * @param <V1> the type of the value
- * @return a new {@link LeftOrRightValue} instance
- */
- public static <V1, V2> LeftOrRightValue<V1, V2> makeLeftValue(final V1
leftValue) {
Review Comment:
I think you should keep the `makeLeftValue` and `makeRightValue` functions
as entrypoints to this class. Only the the impossible-to-type `make` function
should be removed.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -250,7 +252,7 @@ private void emitNonJoinedOuterRecords(
}
final VOut nullJoinedValue;
- if (isLeftSide) {
+ if (joinSide.isLeftSide()) {
Review Comment:
This is another unsafe place that should be part of the enum.
Try and eliminate all of the `unchecked` warnings in this class.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoin.java:
##########
@@ -155,7 +156,8 @@ public void process(final Record<K, V1> record) {
// we may delete some values with the same key early
but since we are going
// range over all values of the same key even after
failure, since the other window-store
// is only cleaned up by stream time, so this is okay
for at-least-once.
-
store.putIfAbsent(TimestampedKeyAndJoinSide.make(!isLeftSide, record.key(),
otherRecordTimestamp), null);
+ final JoinSide otherJoinSide = joinSide.isLeftSide() ?
JoinSide.RIGHT : JoinSide.LEFT;
Review Comment:
This could be a `JoinSide#complement` or `JoinSide#opposite` function, in
case there are other situations where we need to know what the other side of
the join is doing.
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java:
##########
@@ -173,7 +174,7 @@ public <K, V1, V2, VOut> KStream<K, VOut> join(final
KStream<K, V1> lhs,
final JoinWindowsInternal internalWindows = new
JoinWindowsInternal(windows);
final KStreamKStreamJoin<K, V1, V2, VOut> joinThis = new
KStreamKStreamJoin<>(
- true,
+ JoinSide.LEFT,
Review Comment:
I think these call-sites are much more clear with the explicit "left" and
"right". It's interesting that the variables here are joinThis and joinOther,
instead of joinLeft and joinRight.
##########
streams/src/main/java/org/apache/kafka/streams/state/internals/JoinSide.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import java.util.Objects;
+
+/**
+ * An enum representing the side of a join operation.
+ * It provides methods to create instances of {@link LeftOrRightValue} based
on the side specified.
+ */
+@SuppressWarnings("unchecked")
+public enum JoinSide {
+ LEFT("left") {
+ /**
+ * Create a new {@link LeftOrRightValue} instance with the V1 value as
{@code leftValue} and V2 value as null.
+ *
+ * @param leftValue the left V1 value
+ * @param <V1> the type of the value
+ * @return a new {@link LeftOrRightValue} instance
+ */
+ @Override
+ public <V, V1, V2> LeftOrRightValue<V1, V2> make(final V leftValue) {
+ Objects.requireNonNull(leftValue, "The left join value is null");
+ return (LeftOrRightValue<V1, V2>) new
LeftOrRightValue<>(leftValue, null);
+ }
+ },
+
+ RIGHT("right") {
+ /**
+ * Create a new {@link LeftOrRightValue} instance with the V2 value as
{@code rightValue} and V1 value as null.
+ *
+ * @param rightValue the right V2 value
+ * @param <V2> the type of the value
+ * @return a new {@link LeftOrRightValue} instance
+ */
+ @Override
+ public <V, V1, V2> LeftOrRightValue<V1, V2> make(final V rightValue) {
+ Objects.requireNonNull(rightValue, "The left join value is null");
+ return (LeftOrRightValue<V1, V2>) new LeftOrRightValue<>(null,
rightValue);
+ }
+
+ };
+
+ private final String joinSideName;
+
+ JoinSide(final String joinSideName) {
+ this.joinSideName = joinSideName;
+ }
+
+ public abstract <V, V1, V2> LeftOrRightValue<V1, V2> make(final V value);
+
+ /**
+ * Returns true if this JoinSide represents the left side.
+ *
+ * @return true if this JoinSide represents the left side, otherwise false
+ */
+ public boolean isLeftSide() {
Review Comment:
This is a very permissive "escape hatch" from this enum, and could be hiding
other abstractions that could be pulled into the enum.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]