Jackie-Jiang commented on code in PR #10234:
URL: https://github.com/apache/pinot/pull/10234#discussion_r1120935766


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -558,15 +560,40 @@ private RecordInfo getRecordInfo(GenericRow row, int 
docId) {
     PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
 
     if (isUpsertEnabled()) {
-      Object upsertComparisonValue = row.getValue(_upsertComparisonColumn);
-      Preconditions.checkState(upsertComparisonValue instanceof Comparable,
-          "Upsert comparison column: %s must be comparable", 
_upsertComparisonColumn);
-      return new RecordInfo(primaryKey, docId, (Comparable) 
upsertComparisonValue);
+      if (_upsertComparisonColumns.size() > 1) {
+        return multiComparisonRecordInfo(primaryKey, docId, row);
+      }
+      Comparable comparisonValue = (Comparable) 
row.getValue(_upsertComparisonColumns.get(0));
+      return new RecordInfo(primaryKey, docId, comparisonValue);
     }
 
     return new RecordInfo(primaryKey, docId, null);
   }
 
+  private RecordInfo multiComparisonRecordInfo(PrimaryKey primaryKey, int 
docId, GenericRow row) {
+    int numComparisonColumns = _upsertComparisonColumns.size();
+    Comparable[] comparisonValues = new Comparable[numComparisonColumns];
+
+    int numNonNull = 0;
+    for (int i = 0; i < numComparisonColumns; i++) {
+      String columnName = _upsertComparisonColumns.get(i);
+
+      if (!row.isNullValue(columnName)) {
+        // Inbound records may only have exactly 1 non-null value in one of 
the comparison column i.e. comparison
+        // columns are mutually exclusive
+        numNonNull++;
+
+        Object comparisonValue = row.getValue(columnName);
+        Preconditions.checkState(comparisonValue instanceof Comparable,
+            "Upsert comparison column: %s must be comparable", columnName);
+        comparisonValues[i] = (Comparable) comparisonValue;
+      }
+    }
+    Preconditions.checkState(numNonNull == 1 || numNonNull == 0,

Review Comment:
   (nit)
   ```suggestion
       Preconditions.checkState(numNonNull <= 1,
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java:
##########
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ComparisonColumns implements Comparable<ComparisonColumns> {
+  private Comparable[] _comparisonColumns;
+
+  public ComparisonColumns(Comparable[] comparisonColumns) {
+    _comparisonColumns = comparisonColumns;
+  }
+
+  public Comparable[] getComparisonColumns() {
+    return _comparisonColumns;
+  }
+
+  @Override
+  public int compareTo(ComparisonColumns other) {
+    // _comparisonColumns should only at most one non-null comparison value. 
If not, it is the user's responsibility.
+    // There is no attempt to guarantee behavior in the case where there are 
multiple non-null values
+    int comparisonResult;
+    int comparableIndex = getComparableIndex();
+
+    if (comparableIndex < 0) {
+      // All comparison values were null.  This record is only ok to keep if 
all prior values were also null

Review Comment:
   For singular comparison column case, comparison value should not be null, 
and null value can have undefined behavior because we just use the default 
value as the comparison value.
   For multiple comparison columns case, there should be at least one 
comparison column not null. If one event does not have any non-null value, we 
can just drop it (unless there is no event associated with the key) since we 
don't know which value to compare.



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java:
##########
@@ -154,10 +135,22 @@ public void setDefaultPartialUpsertStrategy(Strategy 
defaultPartialUpsertStrateg
    * same primary key, the record with the larger value of the time column is 
picked as the
    * latest update.
    * However, there are cases when users need to use another column to 
determine the order.
-   * In such case, you can use option comparisonColumn to override the column 
used for comparison.
+   * In such case, you can use option comparisonColumn to override the column 
used for comparison. When using
+   * multiple comparison columns, typically in the case of partial upserts, it 
is expected that input documents will
+   * each only have a singular non-null comparisonColumn. Multiple non-null 
values in an input document _will_ result
+   * in undefined behaviour. Typically, one comparisonColumn is allocated per 
distinct producer application of data
+   * in the case where there are multiple producers sinking to the same table.
    */
+  public void setComparisonColumns(List<String> comparisonColumns) {

Review Comment:
   Here I'm suggesting adding a non-empty check instead of non-null check. The 
`_comparisonColumns` cannot be empty
   `Preconditions.checkArgument(comparisonColumns == null || 
!comparisonColumns.isEmpty(), ...)`



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ComparisonColumns implements Comparable<ComparisonColumns> {
+  private Comparable[] _values;
+
+  public ComparisonColumns(Comparable[] values) {
+    _values = values;
+  }
+
+  public Comparable[] getValues() {
+    return _values;
+  }
+
+  @Override
+  public int compareTo(ComparisonColumns other) {
+    // _comparisonColumns should only at most one non-null comparison value. 
If not, it is the user's responsibility.
+    // There is no attempt to guarantee behavior in the case where there are 
multiple non-null values
+    int comparisonResult;
+    int comparableIndex = getComparableIndex();
+
+    if (comparableIndex < 0) {
+      // All comparison values were null.  This record is only ok to keep if 
all prior values were also null
+      comparisonResult = 1;
+      for (int i = 0; i < other.getValues().length; i++) {
+        if (other.getValues()[i] != null) {
+          comparisonResult = -1;
+          break;
+        }
+      }
+    } else {
+      Comparable comparisonValue = _values[comparableIndex];
+      Comparable otherComparisonValue = other.getValues()[comparableIndex];
+
+      if (otherComparisonValue == null) {
+        // Keep this record because the existing record has no value for the 
same comparison column, therefore the
+        // (lack of) existing value could not possibly cause the new value to 
be rejected.
+        comparisonResult = 1;
+      } else {
+        comparisonResult = comparisonValue.compareTo(otherComparisonValue);
+      }
+    }
+
+    if (comparisonResult >= 0) {
+      // TODO(egalpin):  This method currently may have side-effects on 
_values. Depending on the comparison result,
+      //  entities from {@param other} may be merged into _values. This really 
should not be done implicitly as part
+      //  of compareTo, but has been implemented this way to minimize the 
changes required within all subclasses of
+      //  {@link BasePartitionUpsertMetadataManager}. Ideally, this merge 
should only be triggered explicitly by
+      //  implementations of {@link BasePartitionUpsertMetadataManager}.
+      for (int i = 0; i < _values.length; i++) {
+        // N.B. null check _must_ be here to prevent overwriting _values[i] 
with null from other._values[i], such
+        // as in the case where this is the first time that a non-null value 
has been received for a given
+        // comparableIndex after previously receiving non-null value for a 
different comparableIndex
+        if (i != comparableIndex && other._values[i] != null) {
+          _values[i] = other._values[i];
+        }
+      }
+    }
+
+    return comparisonResult;
+  }
+
+  private int getComparableIndex() {
+    for (int i = 0; i < _values.length; i++) {
+      if (_values[i] == null) {
+        continue;
+      }
+      return i;

Review Comment:
   (nit)
   ```suggestion
         if (_values[i] != null) {
           return i;
         }
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java:
##########
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ComparisonColumns implements Comparable<ComparisonColumns> {
+  private Comparable[] _values;
+
+  public ComparisonColumns(Comparable[] values) {
+    _values = values;
+  }
+
+  public Comparable[] getValues() {
+    return _values;
+  }
+
+  @Override
+  public int compareTo(ComparisonColumns other) {
+    // _comparisonColumns should only at most one non-null comparison value. 
If not, it is the user's responsibility.
+    // There is no attempt to guarantee behavior in the case where there are 
multiple non-null values
+    int comparisonResult;
+    int comparableIndex = getComparableIndex();
+
+    if (comparableIndex < 0) {
+      // All comparison values were null.  This record is only ok to keep if 
all prior values were also null
+      comparisonResult = 1;
+      for (int i = 0; i < other.getValues().length; i++) {
+        if (other.getValues()[i] != null) {
+          comparisonResult = -1;
+          break;
+        }
+      }
+    } else {
+      Comparable comparisonValue = _values[comparableIndex];
+      Comparable otherComparisonValue = other.getValues()[comparableIndex];
+
+      if (otherComparisonValue == null) {
+        // Keep this record because the existing record has no value for the 
same comparison column, therefore the
+        // (lack of) existing value could not possibly cause the new value to 
be rejected.
+        comparisonResult = 1;
+      } else {
+        comparisonResult = comparisonValue.compareTo(otherComparisonValue);
+      }
+    }
+
+    if (comparisonResult >= 0) {
+      // TODO(egalpin):  This method currently may have side-effects on 
_values. Depending on the comparison result,
+      //  entities from {@param other} may be merged into _values. This really 
should not be done implicitly as part
+      //  of compareTo, but has been implemented this way to minimize the 
changes required within all subclasses of
+      //  {@link BasePartitionUpsertMetadataManager}. Ideally, this merge 
should only be triggered explicitly by
+      //  implementations of {@link BasePartitionUpsertMetadataManager}.
+      for (int i = 0; i < _values.length; i++) {
+        // N.B. null check _must_ be here to prevent overwriting _values[i] 
with null from other._values[i], such
+        // as in the case where this is the first time that a non-null value 
has been received for a given
+        // comparableIndex after previously receiving non-null value for a 
different comparableIndex

Review Comment:
   I don't think this is possible since there can be up to one value set in the 
event



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to