Jackie-Jiang commented on code in PR #10234: URL: https://github.com/apache/pinot/pull/10234#discussion_r1114863525
########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotUpsertRestletResource.java: ########## @@ -129,14 +129,16 @@ public String estimateHeapUsage(String tableSchemaConfigStr, // Estimated value space, it contains <segmentName, DocId, ComparisonValue(timestamp)> and overhead. // Here we only calculate the map content size. TODO: Add the map entry size and the array size within the map. int bytesPerValue = 60; - String comparisonColumn = tableConfig.getUpsertConfig().getComparisonColumn(); - if (comparisonColumn != null) { - FieldSpec.DataType dt = schema.getFieldSpecFor(comparisonColumn).getDataType(); - if (!dt.isFixedWidth()) { - String msg = "Not support data types for the comparison column"; - throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST); - } else { - bytesPerValue = 52 + dt.size(); + List<String> comparisonColumns = tableConfig.getUpsertConfig().getComparisonColumns(); + if (comparisonColumns != null) { + for (String columnName : comparisonColumns) { + FieldSpec.DataType dt = schema.getFieldSpecFor(columnName).getDataType(); + if (!dt.isFixedWidth()) { + String msg = "Not support data types for the comparison column"; + throw new ControllerApplicationException(LOGGER, msg, Response.Status.BAD_REQUEST); + } else { + bytesPerValue = 52 + dt.size(); Review Comment: This is incorrect as it only counts the last column's size. When we use multiple comparison columns, there will be a wrapper over multiple values, so the size should also take account on that. ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java: ########## @@ -558,15 +561,31 @@ private RecordInfo getRecordInfo(GenericRow row, int docId) { PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns()); if (isUpsertEnabled()) { - Object upsertComparisonValue = row.getValue(_upsertComparisonColumn); - Preconditions.checkState(upsertComparisonValue instanceof Comparable, - "Upsert comparison column: %s must be comparable", _upsertComparisonColumn); - return new RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue); + if (_upsertComparisonColumns.size() > 1) { + return multiComparisonRecordInfo(primaryKey, docId, row); + } + Comparable comparisonValue = (Comparable) row.getValue(_upsertComparisonColumns.get(0)); + return new RecordInfo(primaryKey, docId, comparisonValue); } return new RecordInfo(primaryKey, docId, null); } + private RecordInfo multiComparisonRecordInfo(PrimaryKey primaryKey, int docId, GenericRow row) { + Map<String, ComparisonValue> comparisonColumns = new HashMap<>(); Review Comment: I feel the `ComparisonValue` wrapper is not needed. When the value is `null`, we can simply ignore the column, or put `null` in the map. This should reduce the memory footprint ########## pinot-spi/src/main/java/org/apache/pinot/spi/config/table/UpsertConfig.java: ########## @@ -67,7 +69,7 @@ public enum Strategy { public UpsertConfig(@JsonProperty(value = "mode", required = true) Mode mode, Review Comment: Let's remove this constructor completely since it is already deprecated, and there is no usage ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java: ########## @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ComparisonColumns implements Comparable { Review Comment: ```suggestion public class ComparisonColumns implements Comparable<ComparisonColumns> { ``` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java: ########## @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ComparisonColumns implements Comparable { + private Map<String, ComparisonValue> _comparisonColumns; + + public ComparisonColumns(Map<String, ComparisonValue> comparisonColumns) { + _comparisonColumns = comparisonColumns; + } + + public Map<String, ComparisonValue> getComparisonColumns() { + return _comparisonColumns; + } + + @Override + public int compareTo(@Nonnull Object other) { + if (other instanceof ComparisonColumns) { + return compareToComparisonColumns((ComparisonColumns) other); + } + + // If other is not an instance of ComparisonColumns, it must be the case that the upsert config has been updated + // since last server restart. + // + // In the case where the upsert config is edited between restarts, the first comparison for an existing row will + // end up comparing a value from the _new_ column against the previously stored Comparable from the _previous_ + // column. The same functionality can be used here, where the previously stored Comparable will be compared + // against the non-null ComparisonColumn value. + return compareToComparable(other); Review Comment: This shouldn't happen because the upsert config is applied when the table is created, and we don't allow modifying the config on-the-fly ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java: ########## @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ComparisonColumns implements Comparable { + private Map<String, ComparisonValue> _comparisonColumns; + + public ComparisonColumns(Map<String, ComparisonValue> comparisonColumns) { + _comparisonColumns = comparisonColumns; + } + + public Map<String, ComparisonValue> getComparisonColumns() { + return _comparisonColumns; + } + + @Override + public int compareTo(@Nonnull Object other) { + if (other instanceof ComparisonColumns) { + return compareToComparisonColumns((ComparisonColumns) other); + } + + // If other is not an instance of ComparisonColumns, it must be the case that the upsert config has been updated + // since last server restart. + // + // In the case where the upsert config is edited between restarts, the first comparison for an existing row will + // end up comparing a value from the _new_ column against the previously stored Comparable from the _previous_ + // column. The same functionality can be used here, where the previously stored Comparable will be compared + // against the non-null ComparisonColumn value. + return compareToComparable(other); + } + + private int compareToComparable(@Nonnull Object other) { + + for (Map.Entry<String, ComparisonValue> columnEntry : _comparisonColumns.entrySet()) { + ComparisonValue comparisonValue = columnEntry.getValue(); + if (!comparisonValue.isNull()) { + return comparisonValue.compareTo(other); + } + } + return -1; + } + + private int compareToComparisonColumns(@Nonnull ComparisonColumns other) { + for (Map.Entry<String, ComparisonValue> columnEntry : _comparisonColumns.entrySet()) { + ComparisonValue comparisonValue = columnEntry.getValue(); + // Inbound records may have at most 1 non-null value. _other may have all non-null values, however. + if (comparisonValue.isNull()) { + continue; + } + + ComparisonValue otherComparisonValue = other.getComparisonColumns().get(columnEntry.getKey()); + + if (otherComparisonValue == null) { + // This can happen if a new column is added to the list of comparisonColumns. We want to support that without Review Comment: I don't think this is possible without server restart right now ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java: ########## @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ComparisonColumns implements Comparable { + private Map<String, ComparisonValue> _comparisonColumns; + + public ComparisonColumns(Map<String, ComparisonValue> comparisonColumns) { + _comparisonColumns = comparisonColumns; + } + + public Map<String, ComparisonValue> getComparisonColumns() { + return _comparisonColumns; + } + + @Override + public int compareTo(@Nonnull Object other) { Review Comment: (minor, convention) We don't usually annotate `Nonnull` because it can be easily mixed with `Nullable`. We treat everything as non-null if not annotated. Same for other places ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java: ########## @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ComparisonColumns implements Comparable { + private Map<String, ComparisonValue> _comparisonColumns; + + public ComparisonColumns(Map<String, ComparisonValue> comparisonColumns) { + _comparisonColumns = comparisonColumns; + } + + public Map<String, ComparisonValue> getComparisonColumns() { + return _comparisonColumns; + } + + @Override + public int compareTo(@Nonnull Object other) { + if (other instanceof ComparisonColumns) { + return compareToComparisonColumns((ComparisonColumns) other); + } + + // If other is not an instance of ComparisonColumns, it must be the case that the upsert config has been updated + // since last server restart. + // + // In the case where the upsert config is edited between restarts, the first comparison for an existing row will + // end up comparing a value from the _new_ column against the previously stored Comparable from the _previous_ + // column. The same functionality can be used here, where the previously stored Comparable will be compared + // against the non-null ComparisonColumn value. + return compareToComparable(other); + } + + private int compareToComparable(@Nonnull Object other) { + + for (Map.Entry<String, ComparisonValue> columnEntry : _comparisonColumns.entrySet()) { + ComparisonValue comparisonValue = columnEntry.getValue(); + if (!comparisonValue.isNull()) { + return comparisonValue.compareTo(other); + } + } + return -1; + } + + private int compareToComparisonColumns(@Nonnull ComparisonColumns other) { + for (Map.Entry<String, ComparisonValue> columnEntry : _comparisonColumns.entrySet()) { + ComparisonValue comparisonValue = columnEntry.getValue(); + // Inbound records may have at most 1 non-null value. _other may have all non-null values, however. Review Comment: If we only keep the non-null entries in the inbound record map, this step should be very straight forward ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java: ########## @@ -558,15 +561,31 @@ private RecordInfo getRecordInfo(GenericRow row, int docId) { PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns()); if (isUpsertEnabled()) { - Object upsertComparisonValue = row.getValue(_upsertComparisonColumn); - Preconditions.checkState(upsertComparisonValue instanceof Comparable, - "Upsert comparison column: %s must be comparable", _upsertComparisonColumn); - return new RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue); + if (_upsertComparisonColumns.size() > 1) { + return multiComparisonRecordInfo(primaryKey, docId, row); + } + Comparable comparisonValue = (Comparable) row.getValue(_upsertComparisonColumns.get(0)); + return new RecordInfo(primaryKey, docId, comparisonValue); } return new RecordInfo(primaryKey, docId, null); } + private RecordInfo multiComparisonRecordInfo(PrimaryKey primaryKey, int docId, GenericRow row) { + Map<String, ComparisonValue> comparisonColumns = new HashMap<>(); Review Comment: Since there are usually very few comparison columns, `TreeMap` might have better performance and lower memory footprint ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java: ########## @@ -558,15 +561,31 @@ private RecordInfo getRecordInfo(GenericRow row, int docId) { PrimaryKey primaryKey = row.getPrimaryKey(_schema.getPrimaryKeyColumns()); if (isUpsertEnabled()) { - Object upsertComparisonValue = row.getValue(_upsertComparisonColumn); - Preconditions.checkState(upsertComparisonValue instanceof Comparable, - "Upsert comparison column: %s must be comparable", _upsertComparisonColumn); - return new RecordInfo(primaryKey, docId, (Comparable) upsertComparisonValue); + if (_upsertComparisonColumns.size() > 1) { + return multiComparisonRecordInfo(primaryKey, docId, row); + } + Comparable comparisonValue = (Comparable) row.getValue(_upsertComparisonColumns.get(0)); + return new RecordInfo(primaryKey, docId, comparisonValue); } return new RecordInfo(primaryKey, docId, null); } + private RecordInfo multiComparisonRecordInfo(PrimaryKey primaryKey, int docId, GenericRow row) { + Map<String, ComparisonValue> comparisonColumns = new HashMap<>(); Review Comment: These comments also apply to the `MultiComparisonColumnReader` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/UpsertUtils.java: ########## @@ -75,19 +75,32 @@ public RecordInfo next() { }; } + public static RecordInfoReader makeRecordReader(IndexSegment segment, List<String> primaryKeyColumns, + List<String> comparisonColumns) { + + if (comparisonColumns.size() > 1) { + return new RecordInfoReader(segment, primaryKeyColumns, comparisonColumns); + } + return new RecordInfoReader(segment, primaryKeyColumns, comparisonColumns.get(0)); + } + public static class RecordInfoReader implements Closeable { public final PrimaryKeyReader _primaryKeyReader; - public final PinotSegmentColumnReader _comparisonColumnReader; + public final ComparisonColumnReader _comparisonColumnReader; Review Comment: We may move the `ComparisonColumnReader` as inner class of the `UpsertUtils` as it is part of the `RecordInfoReader` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java: ########## @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ComparisonColumns implements Comparable { + private Map<String, ComparisonValue> _comparisonColumns; + + public ComparisonColumns(Map<String, ComparisonValue> comparisonColumns) { + _comparisonColumns = comparisonColumns; + } + + public Map<String, ComparisonValue> getComparisonColumns() { + return _comparisonColumns; + } + + @Override + public int compareTo(@Nonnull Object other) { + if (other instanceof ComparisonColumns) { + return compareToComparisonColumns((ComparisonColumns) other); + } + + // If other is not an instance of ComparisonColumns, it must be the case that the upsert config has been updated + // since last server restart. + // + // In the case where the upsert config is edited between restarts, the first comparison for an existing row will + // end up comparing a value from the _new_ column against the previously stored Comparable from the _previous_ + // column. The same functionality can be used here, where the previously stored Comparable will be compared + // against the non-null ComparisonColumn value. + return compareToComparable(other); + } + + private int compareToComparable(@Nonnull Object other) { + + for (Map.Entry<String, ComparisonValue> columnEntry : _comparisonColumns.entrySet()) { + ComparisonValue comparisonValue = columnEntry.getValue(); + if (!comparisonValue.isNull()) { + return comparisonValue.compareTo(other); + } + } + return -1; + } + + private int compareToComparisonColumns(@Nonnull ComparisonColumns other) { + for (Map.Entry<String, ComparisonValue> columnEntry : _comparisonColumns.entrySet()) { + ComparisonValue comparisonValue = columnEntry.getValue(); + // Inbound records may have at most 1 non-null value. _other may have all non-null values, however. + if (comparisonValue.isNull()) { + continue; + } + + ComparisonValue otherComparisonValue = other.getComparisonColumns().get(columnEntry.getKey()); + + if (otherComparisonValue == null) { + // This can happen if a new column is added to the list of comparisonColumns. We want to support that without + // requiring a server restart, so handle the null here. + _comparisonColumns = merge(other.getComparisonColumns(), _comparisonColumns); + return 1; + } + + int comparisonResult = comparisonValue.compareTo(otherComparisonValue); + if (comparisonResult >= 0) { + _comparisonColumns = merge(other.getComparisonColumns(), _comparisonColumns); + return comparisonResult; + } + } + + // note that we will reach here if all comparison values are null + return -1; + } + + private static Map<String, ComparisonValue> merge(@Nullable Map<String, ComparisonValue> current, Review Comment: Why is `current` nullable? Currently `merge()` happens during the `compareTo()` which is kind of anti-pattern because `compareTo()` shouldn't modify the value. Ideally we should call `merge()` separately when setting the comparison value into the `RecordLocation`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org