Jackie-Jiang commented on code in PR #11983: URL: https://github.com/apache/pinot/pull/11983#discussion_r1550949947
########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java: ########## @@ -32,76 +31,79 @@ /** * Handler for partial-upsert. + * + * This class is responsible for merging the new record with the previous record. + * It uses the configured merge strategies to merge the columns. If no merge strategy is configured for a column, + * it uses the default merge strategy. + * + * It is also possible to define a custom logic for merging rows by implementing {@link PartialUpsertMerger}. + * If a merger for row is defined then it takes precedence and ignores column mergers. */ public class PartialUpsertHandler { - // _column2Mergers maintains the mapping of merge strategies per columns. - private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>(); - private final PartialUpsertMerger _defaultPartialUpsertMerger; private final List<String> _comparisonColumns; private final List<String> _primaryKeyColumns; + private final PartialUpsertMerger _partialUpsertMerger; - public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies, - UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) { - _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy); + public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, UpsertConfig upsertConfig) { _comparisonColumns = comparisonColumns; _primaryKeyColumns = schema.getPrimaryKeyColumns(); - for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) { - _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); - } + _partialUpsertMerger = + PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig); } - /** - * Merges records and returns the merged record. - * We used a map to indicate all configured fields for partial upsert. For these fields - * (1) If the prev value is null, return the new value - * (2) If the prev record is not null, the new value is null, return the prev value. - * (3) If neither values are not null, then merge the value and return. - * For un-configured fields, they are using default override behavior, regardless null values. - * - * For example, overwrite merger will only override the prev value if the new value is not null. - * Null values will override existing values if not configured. They can be ignored by using ignoreMerger. - * - * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for - * re-reads. - * @param newRecord the new consumed record. - */ - public void merge(LazyRow prevRecord, GenericRow newRecord) { - for (String column : prevRecord.getColumnNames()) { - if (!_primaryKeyColumns.contains(column)) { - PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger); - // Non-overwrite mergers - // (1) If the value of the previous is null value, skip merging and use the new value - // (2) Else If the value of new value is null, use the previous value (even for comparison columns). - // (3) Else If the column is not a comparison column, we applied the merged value to it. - if (!(merger instanceof OverwriteMerger)) { - Object prevValue = prevRecord.getValue(column); - if (prevValue != null) { - if (newRecord.isNullValue(column)) { - // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of - // using - // multiple comparison columns. We never apply a merge function to it, rather we just take any/all - // non-null comparison column values from the previous record, and the sole non-null comparison column - // value from the new record. - newRecord.putValue(column, prevValue); - newRecord.removeNullValueField(column); - } else if (!_comparisonColumns.contains(column)) { - newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column))); - } - } - } else { - // Overwrite mergers. - // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value - // (2) Otherwise, if previous is not null, init columnReader and use the previous value. - if (newRecord.isNullValue(column)) { - Object prevValue = prevRecord.getValue(column); - if (prevValue != null) { - newRecord.putValue(column, prevValue); - newRecord.removeNullValueField(column); - } - } + public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) { + reuseMergerResult.clear(); Review Comment: (minor) Clear the result on the caller side ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java: ########## @@ -61,11 +62,12 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD PartialUpsertHandler partialUpsertHandler = null; if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) { Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); - Preconditions.checkArgument(partialUpsertStrategies != null, + String rowMergerCustomImplementation = upsertConfig.getPartialUpsertMergerClass(); + Preconditions.checkArgument( + StringUtils.isNotBlank(rowMergerCustomImplementation) || partialUpsertStrategies != null, "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType); partialUpsertHandler = - new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(), - comparisonColumns); + new PartialUpsertHandler(schema, comparisonColumns, upsertConfig); Review Comment: (format) Please auto-reformat all the changes with [Pinot Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#set-up-ide) ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java: ########## @@ -32,76 +31,79 @@ /** * Handler for partial-upsert. + * + * This class is responsible for merging the new record with the previous record. + * It uses the configured merge strategies to merge the columns. If no merge strategy is configured for a column, + * it uses the default merge strategy. + * + * It is also possible to define a custom logic for merging rows by implementing {@link PartialUpsertMerger}. + * If a merger for row is defined then it takes precedence and ignores column mergers. */ public class PartialUpsertHandler { - // _column2Mergers maintains the mapping of merge strategies per columns. - private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>(); - private final PartialUpsertMerger _defaultPartialUpsertMerger; private final List<String> _comparisonColumns; private final List<String> _primaryKeyColumns; + private final PartialUpsertMerger _partialUpsertMerger; - public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies, - UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) { - _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy); + public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, UpsertConfig upsertConfig) { _comparisonColumns = comparisonColumns; _primaryKeyColumns = schema.getPrimaryKeyColumns(); - for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) { - _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); - } + _partialUpsertMerger = + PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig); } - /** - * Merges records and returns the merged record. - * We used a map to indicate all configured fields for partial upsert. For these fields - * (1) If the prev value is null, return the new value - * (2) If the prev record is not null, the new value is null, return the prev value. - * (3) If neither values are not null, then merge the value and return. - * For un-configured fields, they are using default override behavior, regardless null values. - * - * For example, overwrite merger will only override the prev value if the new value is not null. - * Null values will override existing values if not configured. They can be ignored by using ignoreMerger. - * - * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for - * re-reads. - * @param newRecord the new consumed record. - */ - public void merge(LazyRow prevRecord, GenericRow newRecord) { - for (String column : prevRecord.getColumnNames()) { - if (!_primaryKeyColumns.contains(column)) { - PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger); - // Non-overwrite mergers - // (1) If the value of the previous is null value, skip merging and use the new value - // (2) Else If the value of new value is null, use the previous value (even for comparison columns). - // (3) Else If the column is not a comparison column, we applied the merged value to it. - if (!(merger instanceof OverwriteMerger)) { - Object prevValue = prevRecord.getValue(column); - if (prevValue != null) { - if (newRecord.isNullValue(column)) { - // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of - // using - // multiple comparison columns. We never apply a merge function to it, rather we just take any/all - // non-null comparison column values from the previous record, and the sole non-null comparison column - // value from the new record. - newRecord.putValue(column, prevValue); - newRecord.removeNullValueField(column); - } else if (!_comparisonColumns.contains(column)) { - newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column))); - } - } - } else { - // Overwrite mergers. - // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value - // (2) Otherwise, if previous is not null, init columnReader and use the previous value. - if (newRecord.isNullValue(column)) { - Object prevValue = prevRecord.getValue(column); - if (prevValue != null) { - newRecord.putValue(column, prevValue); - newRecord.removeNullValueField(column); - } - } + public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) { + reuseMergerResult.clear(); + + // merger current row with previously indexed row + _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult); + + if (_partialUpsertMerger instanceof PartialUpsertColumnarMerger) { + // iterate over all columns in prevRecord and update newRecord with merged values + for (String column : prevRecord.getColumnNames()) { + // no merger to apply on primary key columns + if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) { + continue; + } + + // use merged column value from result map + if (reuseMergerResult.containsKey(column)) { + Object mergedValue = reuseMergerResult.get(column); + setMergedValue(newRecord, column, mergedValue); + } + } + } else { + // iterate over only merger results and update newRecord with merged values + for (Map.Entry<String, Object> entry : reuseMergerResult.entrySet()) { + // skip if primary key column + String column = entry.getKey(); + if (_primaryKeyColumns.contains(column) || _comparisonColumns.contains(column)) { + continue; } + + Object mergedValue = entry.getValue(); + setMergedValue(newRecord, column, mergedValue); + } + } + + // handle comparison columns + for (String column: _comparisonColumns) { + if (newRecord.isNullValue(column) && !prevRecord.isNullValue(column)) { + newRecord.putValue(column, prevRecord.getValue(column)); + newRecord.removeNullValueField(column); } } } + + private void setMergedValue(GenericRow newRecord, String column, Object mergedValue) { + if (mergedValue != null) { + // remove null value field if it was set + newRecord.removeNullValueField(column); + newRecord.putValue(column, mergedValue); + } else { + // if column exists but mapped to a null value then merger result was a null value + newRecord.addNullValueField(column); + newRecord.putValue(column, null); Review Comment: (MAJOR) This won't work. In order to set a value to null, you'll need to call `putDefaultNullValue()` with a default null value. Do you see a scenario where you want to explicitly set a value to `null`? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java: ########## @@ -32,76 +31,79 @@ /** * Handler for partial-upsert. + * + * This class is responsible for merging the new record with the previous record. + * It uses the configured merge strategies to merge the columns. If no merge strategy is configured for a column, + * it uses the default merge strategy. + * + * It is also possible to define a custom logic for merging rows by implementing {@link PartialUpsertMerger}. + * If a merger for row is defined then it takes precedence and ignores column mergers. */ public class PartialUpsertHandler { - // _column2Mergers maintains the mapping of merge strategies per columns. - private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>(); - private final PartialUpsertMerger _defaultPartialUpsertMerger; private final List<String> _comparisonColumns; private final List<String> _primaryKeyColumns; + private final PartialUpsertMerger _partialUpsertMerger; - public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies, - UpsertConfig.Strategy defaultPartialUpsertStrategy, List<String> comparisonColumns) { - _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy); + public PartialUpsertHandler(Schema schema, List<String> comparisonColumns, UpsertConfig upsertConfig) { _comparisonColumns = comparisonColumns; _primaryKeyColumns = schema.getPrimaryKeyColumns(); - for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) { - _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); - } + _partialUpsertMerger = + PartialUpsertMergerFactory.getPartialUpsertMerger(_primaryKeyColumns, comparisonColumns, upsertConfig); } - /** - * Merges records and returns the merged record. - * We used a map to indicate all configured fields for partial upsert. For these fields - * (1) If the prev value is null, return the new value - * (2) If the prev record is not null, the new value is null, return the prev value. - * (3) If neither values are not null, then merge the value and return. - * For un-configured fields, they are using default override behavior, regardless null values. - * - * For example, overwrite merger will only override the prev value if the new value is not null. - * Null values will override existing values if not configured. They can be ignored by using ignoreMerger. - * - * @param prevRecord wrapper for previous record, which lazily reads column values of previous row and caches for - * re-reads. - * @param newRecord the new consumed record. - */ - public void merge(LazyRow prevRecord, GenericRow newRecord) { - for (String column : prevRecord.getColumnNames()) { - if (!_primaryKeyColumns.contains(column)) { - PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger); - // Non-overwrite mergers - // (1) If the value of the previous is null value, skip merging and use the new value - // (2) Else If the value of new value is null, use the previous value (even for comparison columns). - // (3) Else If the column is not a comparison column, we applied the merged value to it. - if (!(merger instanceof OverwriteMerger)) { - Object prevValue = prevRecord.getValue(column); - if (prevValue != null) { - if (newRecord.isNullValue(column)) { - // Note that we intentionally want to overwrite any previous _comparisonColumn value in the case of - // using - // multiple comparison columns. We never apply a merge function to it, rather we just take any/all - // non-null comparison column values from the previous record, and the sole non-null comparison column - // value from the new record. - newRecord.putValue(column, prevValue); - newRecord.removeNullValueField(column); - } else if (!_comparisonColumns.contains(column)) { - newRecord.putValue(column, merger.merge(prevValue, newRecord.getValue(column))); - } - } - } else { - // Overwrite mergers. - // (1) If the merge strategy is Overwrite merger and newValue is not null, skip and use the new value - // (2) Otherwise, if previous is not null, init columnReader and use the previous value. - if (newRecord.isNullValue(column)) { - Object prevValue = prevRecord.getValue(column); - if (prevValue != null) { - newRecord.putValue(column, prevValue); - newRecord.removeNullValueField(column); - } - } + public void merge(LazyRow prevRecord, GenericRow newRecord, Map<String, Object> reuseMergerResult) { + reuseMergerResult.clear(); + + // merger current row with previously indexed row + _partialUpsertMerger.merge(prevRecord, newRecord, reuseMergerResult); + + if (_partialUpsertMerger instanceof PartialUpsertColumnarMerger) { Review Comment: Do we need to differentiate `PartialUpsertColumnarMerger` and custom merger? The logic should be the same? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java: ########## @@ -61,11 +62,12 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD PartialUpsertHandler partialUpsertHandler = null; if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) { Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); - Preconditions.checkArgument(partialUpsertStrategies != null, + String rowMergerCustomImplementation = upsertConfig.getPartialUpsertMergerClass(); + Preconditions.checkArgument( + StringUtils.isNotBlank(rowMergerCustomImplementation) || partialUpsertStrategies != null, Review Comment: (minor) These checks can be pushed down to the constructor of `PartialUpsertHandler` ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java: ########## @@ -61,11 +62,12 @@ public void init(TableConfig tableConfig, Schema schema, TableDataManager tableD PartialUpsertHandler partialUpsertHandler = null; if (upsertConfig.getMode() == UpsertConfig.Mode.PARTIAL) { Map<String, UpsertConfig.Strategy> partialUpsertStrategies = upsertConfig.getPartialUpsertStrategies(); - Preconditions.checkArgument(partialUpsertStrategies != null, + String rowMergerCustomImplementation = upsertConfig.getPartialUpsertMergerClass(); + Preconditions.checkArgument( + StringUtils.isNotBlank(rowMergerCustomImplementation) || partialUpsertStrategies != null, "Partial-upsert strategies must be configured for partial-upsert enabled table: %s", _tableNameWithType); partialUpsertHandler = - new PartialUpsertHandler(schema, partialUpsertStrategies, upsertConfig.getDefaultPartialUpsertStrategy(), - comparisonColumns); + new PartialUpsertHandler(schema, comparisonColumns, upsertConfig); Review Comment: (format) Please auto reformat all the changes with [Pinot Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#set-up-ide) ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java: ########## @@ -18,39 +18,50 @@ */ package org.apache.pinot.segment.local.upsert.merger; +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.config.table.UpsertConfig; public class PartialUpsertMergerFactory { + private PartialUpsertMergerFactory() { } - private static final AppendMerger APPEND_MERGER = new AppendMerger(); - private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger(); - private static final IgnoreMerger IGNORE_MERGER = new IgnoreMerger(); - private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger(); - private static final MaxMerger MAX_MERGER = new MaxMerger(); - private static final MinMerger MIN_MERGER = new MinMerger(); - private static final UnionMerger UNION_MERGER = new UnionMerger(); - - public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) { - switch (strategy) { - case APPEND: - return APPEND_MERGER; - case INCREMENT: - return INCREMENT_MERGER; - case IGNORE: - return IGNORE_MERGER; - case MAX: - return MAX_MERGER; - case MIN: - return MIN_MERGER; - case OVERWRITE: - return OVERWRITE_MERGER; - case UNION: - return UNION_MERGER; - default: - throw new IllegalStateException("Unsupported partial upsert strategy: " + strategy); + /** + * Initialise the default partial upsert merger or initialise a custom implementation from a given class name in + * config + * @param primaryKeyColumns + * @param comparisonColumns + * @param upsertConfig + * @return + */ + public static PartialUpsertMerger getPartialUpsertMerger(List<String> primaryKeyColumns, + List<String> comparisonColumns, UpsertConfig upsertConfig) { + PartialUpsertMerger partialUpsertMerger; + String customRowMerger = upsertConfig.getPartialUpsertMergerClass(); + // If a custom implementation is provided in config, initialize an implementation and return. + if (StringUtils.isNotBlank(customRowMerger)) { + try { + Class<?> partialUpsertMergerClass = Class.forName(customRowMerger); + if (!BasePartialUpsertMerger.class.isAssignableFrom(partialUpsertMergerClass)) { Review Comment: Why does it have to extend `BasePartialUpsertMerger`? Implementing `PartialUpsertMerger` should be good right? ########## pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/merger/PartialUpsertMergerFactory.java: ########## @@ -18,39 +18,50 @@ */ package org.apache.pinot.segment.local.upsert.merger; +import java.lang.reflect.InvocationTargetException; +import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.spi.config.table.UpsertConfig; public class PartialUpsertMergerFactory { + private PartialUpsertMergerFactory() { } - private static final AppendMerger APPEND_MERGER = new AppendMerger(); - private static final IncrementMerger INCREMENT_MERGER = new IncrementMerger(); - private static final IgnoreMerger IGNORE_MERGER = new IgnoreMerger(); - private static final OverwriteMerger OVERWRITE_MERGER = new OverwriteMerger(); - private static final MaxMerger MAX_MERGER = new MaxMerger(); - private static final MinMerger MIN_MERGER = new MinMerger(); - private static final UnionMerger UNION_MERGER = new UnionMerger(); - - public static PartialUpsertMerger getMerger(UpsertConfig.Strategy strategy) { - switch (strategy) { - case APPEND: - return APPEND_MERGER; - case INCREMENT: - return INCREMENT_MERGER; - case IGNORE: - return IGNORE_MERGER; - case MAX: - return MAX_MERGER; - case MIN: - return MIN_MERGER; - case OVERWRITE: - return OVERWRITE_MERGER; - case UNION: - return UNION_MERGER; - default: - throw new IllegalStateException("Unsupported partial upsert strategy: " + strategy); + /** + * Initialise the default partial upsert merger or initialise a custom implementation from a given class name in + * config + * @param primaryKeyColumns + * @param comparisonColumns + * @param upsertConfig + * @return + */ + public static PartialUpsertMerger getPartialUpsertMerger(List<String> primaryKeyColumns, + List<String> comparisonColumns, UpsertConfig upsertConfig) { + PartialUpsertMerger partialUpsertMerger; + String customRowMerger = upsertConfig.getPartialUpsertMergerClass(); + // If a custom implementation is provided in config, initialize an implementation and return. + if (StringUtils.isNotBlank(customRowMerger)) { + try { + Class<?> partialUpsertMergerClass = Class.forName(customRowMerger); + if (!BasePartialUpsertMerger.class.isAssignableFrom(partialUpsertMergerClass)) { + throw new RuntimeException( + "Provided partialUpsertMergerClass is not an implementation of BasePartialUpsertMerger.class"); + } + partialUpsertMerger = + (PartialUpsertMerger) partialUpsertMergerClass.getConstructor(List.class, List.class, UpsertConfig.class) + .newInstance(primaryKeyColumns, comparisonColumns, upsertConfig); + } catch (ClassNotFoundException + | NoSuchMethodException | InstantiationException | IllegalAccessException + | InvocationTargetException e) { Review Comment: (minor) These can be merged as `Exception e` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org