Jackie-Jiang commented on code in PR #10234: URL: https://github.com/apache/pinot/pull/10234#discussion_r1117748110
########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java: ########## @@ -154,10 +135,22 @@ public void setDefaultPartialUpsertStrategy(Strategy defaultPartialUpsertStrateg * same primary key, the record with the larger value of the time column is picked as the * latest update. * However, there are cases when users need to use another column to determine the order. - * In such case, you can use option comparisonColumn to override the column used for comparison. + * In such case, you can use option comparisonColumn to override the column used for comparison. When using + * multiple comparison columns, typically in the case of partial upserts, it is expected that input documents will + * each only have a singular non-null comparisonColumn. Multiple non-null values in an input document _will_ result + * in undefined behaviour. Typically, one comparisonColumn is allocated per distinct producer application of data + * in the case where there are multiple producers sinking to the same table. */ + public void setComparisonColumns(List<String> comparisonColumns) { Review Comment: Consider adding a check here to prevent user from setting empty list ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java: ########## @@ -558,15 +560,39 @@ private RecordInfo getRecordInfo(GenericRow row, int docId) { PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns()); if (isUpsertEnabled()) { - Object upsertComparisonValue = row.getValue(_upsertComparisonColumn); - Preconditions.checkState(upsertComparisonValue instanceof Comparable, - "Upsert comparison column: %s must be comparable", _upsertComparisonColumn); - return new RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue); + if (_upsertComparisonColumns.size() > 1) { + return multiComparisonRecordInfo(primaryKey, docId, row); + } + Comparable comparisonValue = (Comparable) row.getValue(_upsertComparisonColumns.get(0)); + return new RecordInfo(primaryKey, docId, comparisonValue); } return new RecordInfo(primaryKey, docId, null); } + private RecordInfo multiComparisonRecordInfo(PrimaryKey primaryKey, int docId, GenericRow row) { + Comparable[] comparisonColumns = new Comparable[_upsertComparisonColumns.size()]; + + int i = -1; + for (String columnName : _upsertComparisonColumns) { + i++; Review Comment: (minor) More readable ```suggestion int numComparisonColumns = _upsertComparisonColumns.size(); Comparable[] comparisonValues = new Comparable[numComparisonColumns]; for (int i = 0; i < numComparisonColumns; i++) { String columnName = _upsertComparisonColumns.get(i); ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java: ########## @@ -558,15 +560,39 @@ private RecordInfo getRecordInfo(GenericRow row, int docId) { PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns()); if (isUpsertEnabled()) { - Object upsertComparisonValue = row.getValue(_upsertComparisonColumn); - Preconditions.checkState(upsertComparisonValue instanceof Comparable, - "Upsert comparison column: %s must be comparable", _upsertComparisonColumn); - return new RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue); + if (_upsertComparisonColumns.size() > 1) { + return multiComparisonRecordInfo(primaryKey, docId, row); + } + Comparable comparisonValue = (Comparable) row.getValue(_upsertComparisonColumns.get(0)); + return new RecordInfo(primaryKey, docId, comparisonValue); } return new RecordInfo(primaryKey, docId, null); } + private RecordInfo multiComparisonRecordInfo(PrimaryKey primaryKey, int docId, GenericRow row) { + Comparable[] comparisonColumns = new Comparable[_upsertComparisonColumns.size()]; + + int i = -1; + for (String columnName : _upsertComparisonColumns) { + i++; + Object comparisonValue = row.getValue(columnName); + + Preconditions.checkState(comparisonValue instanceof Comparable, + "Upsert comparison column: %s must be comparable", columnName); + + if (row.isNullValue(columnName)) { + // Inbound records may only have exactly 1 non-null value in one of the comparison column i.e. comparison + // columns are mutually exclusive + comparisonColumns[i] = null; + continue; + } + + comparisonColumns[i] = (Comparable) comparisonValue; Review Comment: Check `null` first, then read value only if it is not `null` ```suggestion if (!row.isNullValue(columnName)) { Object comparisonValue = row.getValue(columnName); ... comparisonColumns[i] = (Comparable) comparisonValue; } ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java: ########## @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ComparisonColumns implements Comparable<ComparisonColumns> { + private Comparable[] _comparisonColumns; + + public ComparisonColumns(Comparable[] comparisonColumns) { + _comparisonColumns = comparisonColumns; + } + + public Comparable[] getComparisonColumns() { + return _comparisonColumns; + } + + @Override + public int compareTo(ComparisonColumns other) { + // _comparisonColumns should only at most one non-null comparison value. If not, it is the user's responsibility. + // There is no attempt to guarantee behavior in the case where there are multiple non-null values + int comparisonResult; + int comparableIndex = getComparableIndex(); + + if (comparableIndex < 0) { + // All comparison values were null. This record is only ok to keep if all prior values were also null + comparisonResult = 1; + for (int i = 0; i < other.getComparisonColumns().length; i++) { + if (other.getComparisonColumns()[i] != null) { + comparisonResult = -1; + break; + } + } + } else { + Comparable comparisonValue = _comparisonColumns[comparableIndex]; + Comparable otherComparisonValue = other.getComparisonColumns()[comparableIndex]; + + if (otherComparisonValue == null) { + // Keep this record because the existing record has no value for the same comparison column, therefore the + // (lack of) existing value could not possibly cause the new value to be rejected. + comparisonResult = 1; + } else { + comparisonResult = comparisonValue.compareTo(otherComparisonValue); + } Review Comment: Let's also add a TODO here to mention the modification within `compareTo()` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java: ########## @@ -558,15 +560,39 @@ private RecordInfo getRecordInfo(GenericRow row, int docId) { PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns()); if (isUpsertEnabled()) { - Object upsertComparisonValue = row.getValue(_upsertComparisonColumn); - Preconditions.checkState(upsertComparisonValue instanceof Comparable, - "Upsert comparison column: %s must be comparable", _upsertComparisonColumn); - return new RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue); + if (_upsertComparisonColumns.size() > 1) { + return multiComparisonRecordInfo(primaryKey, docId, row); + } + Comparable comparisonValue = (Comparable) row.getValue(_upsertComparisonColumns.get(0)); + return new RecordInfo(primaryKey, docId, comparisonValue); } return new RecordInfo(primaryKey, docId, null); } + private RecordInfo multiComparisonRecordInfo(PrimaryKey primaryKey, int docId, GenericRow row) { + Comparable[] comparisonColumns = new Comparable[_upsertComparisonColumns.size()]; + + int i = -1; + for (String columnName : _upsertComparisonColumns) { + i++; + Object comparisonValue = row.getValue(columnName); + + Preconditions.checkState(comparisonValue instanceof Comparable, + "Upsert comparison column: %s must be comparable", columnName); + + if (row.isNullValue(columnName)) { + // Inbound records may only have exactly 1 non-null value in one of the comparison column i.e. comparison Review Comment: Do we want to enforce this? If so, we should probably add a check ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java: ########## @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ComparisonColumns implements Comparable<ComparisonColumns> { + private Comparable[] _comparisonColumns; Review Comment: (minor) Suggest renaming it to `_values` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java: ########## @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ComparisonColumns implements Comparable<ComparisonColumns> { + private Comparable[] _comparisonColumns; + + public ComparisonColumns(Comparable[] comparisonColumns) { + _comparisonColumns = comparisonColumns; + } + + public Comparable[] getComparisonColumns() { + return _comparisonColumns; + } + + @Override + public int compareTo(ComparisonColumns other) { + // _comparisonColumns should only at most one non-null comparison value. If not, it is the user's responsibility. + // There is no attempt to guarantee behavior in the case where there are multiple non-null values + int comparisonResult; + int comparableIndex = getComparableIndex(); + + if (comparableIndex < 0) { + // All comparison values were null. This record is only ok to keep if all prior values were also null + comparisonResult = 1; + for (int i = 0; i < other.getComparisonColumns().length; i++) { + if (other.getComparisonColumns()[i] != null) { + comparisonResult = -1; + break; + } + } + } else { + Comparable comparisonValue = _comparisonColumns[comparableIndex]; + Comparable otherComparisonValue = other.getComparisonColumns()[comparableIndex]; + + if (otherComparisonValue == null) { + // Keep this record because the existing record has no value for the same comparison column, therefore the + // (lack of) existing value could not possibly cause the new value to be rejected. + comparisonResult = 1; + } else { + comparisonResult = comparisonValue.compareTo(otherComparisonValue); + } Review Comment: Since we already know the `comparableIndex`, we may simplify this part to just copy other values to the current value array ``` } if (comparisonResult >= 0) { for (int i = 0; i < _values.length; i++) { if (i != comparableIndex) { _values[i] = other._values[i]; } } } ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/MultiComparisonColumnReader.java: ########## @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; +import org.apache.pinot.segment.spi.IndexSegment; + + +@SuppressWarnings("rawtypes") +public class MultiComparisonColumnReader implements UpsertUtils.ComparisonColumnReader { + private final Map<String, PinotSegmentColumnReader> _comparisonColumnReaders; + + public MultiComparisonColumnReader(IndexSegment segment, List<String> comparisonColumns) { + _comparisonColumnReaders = new TreeMap<>(); Review Comment: More performant if we keep a list of `PinotSegmentColumnReader`s ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java: ########## @@ -75,19 +75,32 @@ public RecordInfo next() { }; } + public static RecordInfoReader makeRecordReader(IndexSegment segment, List<String> primaryKeyColumns, + List<String> comparisonColumns) { + + if (comparisonColumns.size() > 1) { + return new RecordInfoReader(segment, primaryKeyColumns, comparisonColumns); + } + return new RecordInfoReader(segment, primaryKeyColumns, comparisonColumns.get(0)); + } + public static class RecordInfoReader implements Closeable { public final PrimaryKeyReader _primaryKeyReader; - public final PinotSegmentColumnReader _comparisonColumnReader; + public final ComparisonColumnReader _comparisonColumnReader; Review Comment: Let's also move `SingleComparisonColumnReader` and `MultiComparisonColumnReader` into this class as inner class ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java: ########## @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ComparisonColumns implements Comparable<ComparisonColumns> { + private Comparable[] _comparisonColumns; + + public ComparisonColumns(Comparable[] comparisonColumns) { + _comparisonColumns = comparisonColumns; + } + + public Comparable[] getComparisonColumns() { + return _comparisonColumns; + } + + @Override + public int compareTo(ComparisonColumns other) { + // _comparisonColumns should only at most one non-null comparison value. If not, it is the user's responsibility. + // There is no attempt to guarantee behavior in the case where there are multiple non-null values + int comparisonResult; + int comparableIndex = getComparableIndex(); + + if (comparableIndex < 0) { + // All comparison values were null. This record is only ok to keep if all prior values were also null Review Comment: Is this allowed? Should we just ignore this record (return -1) ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java: ########## @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ComparisonColumns implements Comparable<ComparisonColumns> { + private Comparable[] _comparisonColumns; + + public ComparisonColumns(Comparable[] comparisonColumns) { + _comparisonColumns = comparisonColumns; + } + + public Comparable[] getComparisonColumns() { + return _comparisonColumns; + } + + @Override + public int compareTo(ComparisonColumns other) { + // _comparisonColumns should only at most one non-null comparison value. If not, it is the user's responsibility. + // There is no attempt to guarantee behavior in the case where there are multiple non-null values + int comparisonResult; + int comparableIndex = getComparableIndex(); + + if (comparableIndex < 0) { + // All comparison values were null. This record is only ok to keep if all prior values were also null + comparisonResult = 1; + for (int i = 0; i < other.getComparisonColumns().length; i++) { + if (other.getComparisonColumns()[i] != null) { + comparisonResult = -1; + break; + } + } + } else { + Comparable comparisonValue = _comparisonColumns[comparableIndex]; + Comparable otherComparisonValue = other.getComparisonColumns()[comparableIndex]; + + if (otherComparisonValue == null) { + // Keep this record because the existing record has no value for the same comparison column, therefore the + // (lack of) existing value could not possibly cause the new value to be rejected. + comparisonResult = 1; + } else { + comparisonResult = comparisonValue.compareTo(otherComparisonValue); + } + } + + if (comparisonResult >= 0) { + _comparisonColumns = merge(other.getComparisonColumns(), _comparisonColumns); + } + + return comparisonResult; + } + + private int getComparableIndex() { + for (int i = 0; i < _comparisonColumns.length; i++) { + if (_comparisonColumns[i] == null) { + continue; + } + return i; + } + return -1; + } + + public static Comparable[] merge(Comparable[] current, Comparable[] next) { + // Create a shallow copy so {@param current} is unmodified Review Comment: No need to create a copy here since the merge is happening in-place anyway. We can actually change the return to `void` to make it explicit that the merge is happening in-place -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org