egalpin commented on code in PR #10234: URL: https://github.com/apache/pinot/pull/10234#discussion_r1116097731
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ComparisonColumns.java: ########## @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.upsert; + +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ComparisonColumns implements Comparable { + private Map<String, ComparisonValue> _comparisonColumns; + + public ComparisonColumns(Map<String, ComparisonValue> comparisonColumns) { + _comparisonColumns = comparisonColumns; + } + + public Map<String, ComparisonValue> getComparisonColumns() { + return _comparisonColumns; + } + + @Override + public int compareTo(@Nonnull Object other) { + if (other instanceof ComparisonColumns) { + return compareToComparisonColumns((ComparisonColumns) other); + } + + // If other is not an instance of ComparisonColumns, it must be the case that the upsert config has been updated + // since last server restart. + // + // In the case where the upsert config is edited between restarts, the first comparison for an existing row will + // end up comparing a value from the _new_ column against the previously stored Comparable from the _previous_ + // column. The same functionality can be used here, where the previously stored Comparable will be compared + // against the non-null ComparisonColumn value. + return compareToComparable(other); + } + + private int compareToComparable(@Nonnull Object other) { + + for (Map.Entry<String, ComparisonValue> columnEntry : _comparisonColumns.entrySet()) { + ComparisonValue comparisonValue = columnEntry.getValue(); + if (!comparisonValue.isNull()) { + return comparisonValue.compareTo(other); + } + } + return -1; + } + + private int compareToComparisonColumns(@Nonnull ComparisonColumns other) { + for (Map.Entry<String, ComparisonValue> columnEntry : _comparisonColumns.entrySet()) { + ComparisonValue comparisonValue = columnEntry.getValue(); + // Inbound records may have at most 1 non-null value. _other may have all non-null values, however. + if (comparisonValue.isNull()) { + continue; + } + + ComparisonValue otherComparisonValue = other.getComparisonColumns().get(columnEntry.getKey()); + + if (otherComparisonValue == null) { + // This can happen if a new column is added to the list of comparisonColumns. We want to support that without + // requiring a server restart, so handle the null here. + _comparisonColumns = merge(other.getComparisonColumns(), _comparisonColumns); + return 1; + } + + int comparisonResult = comparisonValue.compareTo(otherComparisonValue); + if (comparisonResult >= 0) { + _comparisonColumns = merge(other.getComparisonColumns(), _comparisonColumns); + return comparisonResult; + } + } + + // note that we will reach here if all comparison values are null + return -1; + } + + private static Map<String, ComparisonValue> merge(@Nullable Map<String, ComparisonValue> current, Review Comment: Ya I agree, I've never been happy with the implicit merging. I implemented this way currently so that any implementation of `BasePartitionUpsertMetadataManager` would be able to support multiple comparison columns without the need to re-implement any logic. Ex. `RecordLocation` is an inner class of `ConcurrentMapPartitionUpsertMetadataManager`. Each implementation of `BasePartitionUpsertMetadataManager`would need updating separately in order to ensure calling `merge` in the correct way if we make `merge` an explicit process. Thoughts on the best way to proceed? Is `RecordLocation` or a similar abstraction something that could be made part of `BasePartitionUpsertMetadataManager` such that a factory method could be implemented and shared across extensions of the base class? Something like: ```java public RecordLocation updateRecordLocation(IndexSegment indexSegment, int docId, RecordInfo inbound, RecordLocation existing) { if (_comparisonColumns.size() > 1) { ComparisonColumns inboundComp = (ComparisonColumns) inbound.getComparisonValue(); ComparisonColumns existingComp = (ComparisonColumns) existing.getComparisonValue(); Map<String, Comparable> merged = ComparisonColumns.merge(existingComp.getComparisonColumns(), inboundComp.getComparisonColumns()); return new RecordLocation(indexSegment, docId, new ComparisonColumns(merged)); } return new RecordLocation(indexSegment, docId, inbound.getComparisonValue()); } ``` And have sub classes all use this method? It may not be feasible, as there might be considerable differences between sub-classes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org