Jackie-Jiang commented on code in PR #10234:
URL: https://github.com/apache/pinot/pull/10234#discussion_r1122447601


##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java:
##########
@@ -75,19 +75,32 @@ public RecordInfo next() {
     };
   }
 
+  public static RecordInfoReader makeRecordReader(IndexSegment segment, 
List<String> primaryKeyColumns,
+      List<String> comparisonColumns) {
+

Review Comment:
   (nit) extra empty line



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java:
##########
@@ -154,10 +136,26 @@ public void setDefaultPartialUpsertStrategy(Strategy 
defaultPartialUpsertStrateg
    * same primary key, the record with the larger value of the time column is 
picked as the
    * latest update.
    * However, there are cases when users need to use another column to 
determine the order.
-   * In such case, you can use option comparisonColumn to override the column 
used for comparison.
+   * In such case, you can use option comparisonColumn to override the column 
used for comparison. When using
+   * multiple comparison columns, typically in the case of partial upserts, it 
is expected that input documents will
+   * each only have a singular non-null comparisonColumn. Multiple non-null 
values in an input document _will_ result
+   * in undefined behaviour. Typically, one comparisonColumn is allocated per 
distinct producer application of data
+   * in the case where there are multiple producers sinking to the same table.
    */
+  public void setComparisonColumns(List<String> comparisonColumns) {
+    Preconditions.checkArgument(_comparisonColumns == null || 
!comparisonColumns.isEmpty(),
+        "Possible duplicated entries: comparisonColumn, comparisonColumns");
+    _comparisonColumns = comparisonColumns;
+  }
+
+  public void setComparisonColumn(List<String> comparisonColumns) {
+    Preconditions.checkArgument(comparisonColumns == null || 
!comparisonColumns.isEmpty(),
+        "Possible duplicated entries: comparisonColumn, comparisonColumns");
+    _comparisonColumns = comparisonColumns;
+  }
+

Review Comment:
   Remove this function. I feel you might misunderstand what I meant. We want 
to prevent user from putting an empty list as the comparison columns because 
that is invalid



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertRestletResource.java:
##########
@@ -129,14 +129,25 @@ public String estimateHeapUsage(String 
tableSchemaConfigStr,
     // Estimated value space, it contains <segmentName, DocId, 
ComparisonValue(timestamp)> and overhead.
     // Here we only calculate the map content size. TODO: Add the map entry 
size and the array size within the map.
     int bytesPerValue = 60;
-    String comparisonColumn = 
tableConfig.getUpsertConfig().getComparisonColumn();
-    if (comparisonColumn != null) {
-      FieldSpec.DataType dt = 
schema.getFieldSpecFor(comparisonColumn).getDataType();
-      if (!dt.isFixedWidth()) {
-        String msg = "Not support data types for the comparison column";
-        throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST);
-      } else {
-        bytesPerValue = 52 + dt.size();
+    List<String> comparisonColumns = 
tableConfig.getUpsertConfig().getComparisonColumns();
+    if (comparisonColumns != null) {
+      int bytesPerArrayElem = 8;  // object ref
+      bytesPerValue = 52;
+      for (String columnName : comparisonColumns) {
+        FieldSpec.DataType dt = 
schema.getFieldSpecFor(columnName).getDataType();
+        if (!dt.isFixedWidth()) {
+          String msg = "Not support data types for the comparison column";
+          throw new ControllerApplicationException(LOGGER, msg, 
Response.Status.BAD_REQUEST);
+        } else {
+          if (comparisonColumns.size() == 1) {
+            bytesPerKey += dt.size();

Review Comment:
   Seems like a typo, should be `bytesPerValue`?



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java:
##########
@@ -558,15 +560,40 @@ private RecordInfo getRecordInfo(GenericRow row, int 
docId) {
     PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns());
 
     if (isUpsertEnabled()) {
-      Object upsertComparisonValue = row.getValue(_upsertComparisonColumn);
-      Preconditions.checkState(upsertComparisonValue instanceof Comparable,
-          "Upsert comparison column: %s must be comparable", 
_upsertComparisonColumn);
-      return new RecordInfo(primaryKey, docId, (Comparable) 
upsertComparisonValue);
+      if (_upsertComparisonColumns.size() > 1) {
+        return multiComparisonRecordInfo(primaryKey, docId, row);
+      }
+      Comparable comparisonValue = (Comparable) 
row.getValue(_upsertComparisonColumns.get(0));
+      return new RecordInfo(primaryKey, docId, comparisonValue);
     }
 
     return new RecordInfo(primaryKey, docId, null);
   }
 
+  private RecordInfo multiComparisonRecordInfo(PrimaryKey primaryKey, int 
docId, GenericRow row) {
+    int numComparisonColumns = _upsertComparisonColumns.size();
+    Comparable[] comparisonValues = new Comparable[numComparisonColumns];
+
+    int numNonNull = 0;
+    for (int i = 0; i < numComparisonColumns; i++) {
+      String columnName = _upsertComparisonColumns.get(i);
+
+      if (!row.isNullValue(columnName)) {
+        // Inbound records may only have exactly 1 non-null value in one of 
the comparison column i.e. comparison
+        // columns are mutually exclusive
+        numNonNull++;

Review Comment:
   Since we are looping over all the columns here, we can store the column 
index here:
   ```
     int comparableIndex = -1;
     for (...) {
       if (!row.isNullValue(columnName)) {
         Preconditions.checkState(comparableIndex == -1, "Must have exactly 1 
non-null comparison column");
         comparableIndex = i;
       }
     }
     Preconditions.checkState(comparableIndex != -1, ...);
   ```



##########
pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java:
##########
@@ -154,10 +136,26 @@ public void setDefaultPartialUpsertStrategy(Strategy 
defaultPartialUpsertStrateg
    * same primary key, the record with the larger value of the time column is 
picked as the
    * latest update.
    * However, there are cases when users need to use another column to 
determine the order.
-   * In such case, you can use option comparisonColumn to override the column 
used for comparison.
+   * In such case, you can use option comparisonColumn to override the column 
used for comparison. When using
+   * multiple comparison columns, typically in the case of partial upserts, it 
is expected that input documents will
+   * each only have a singular non-null comparisonColumn. Multiple non-null 
values in an input document _will_ result
+   * in undefined behaviour. Typically, one comparisonColumn is allocated per 
distinct producer application of data
+   * in the case where there are multiple producers sinking to the same table.
    */
+  public void setComparisonColumns(List<String> comparisonColumns) {
+    Preconditions.checkArgument(_comparisonColumns == null || 
!comparisonColumns.isEmpty(),
+        "Possible duplicated entries: comparisonColumn, comparisonColumns");

Review Comment:
   ```suggestion
       Preconditions.checkArgument(comparisonColumns == null || 
!comparisonColumns.isEmpty(),
           "Comparison columns cannot be empty");
   ```



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java:
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.upsert;
+
+
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class ComparisonColumns implements Comparable<ComparisonColumns> {
+  private final Comparable[] _values;
+  private int _comparableIndex;

Review Comment:
   We can make it final, and pass it from outside



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to