This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 74e13b1aa0 Merge new columns in existing record with default merge strategy (#9851) 74e13b1aa0 is described below commit 74e13b1aa064e13aebba060ec1d292689721ff91 Author: Navina Ramesh <nav...@apache.org> AuthorDate: Tue Nov 29 10:02:55 2022 -0800 Merge new columns in existing record with default merge strategy (#9851) --- .../segment/local/upsert/PartialUpsertHandler.java | 36 ++++++++++++---------- .../local/upsert/PartialUpsertHandlerTest.java | 5 +-- 2 files changed, 22 insertions(+), 19 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java index 4a1cfad39f..3444a5ac54 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java @@ -19,6 +19,7 @@ package org.apache.pinot.segment.local.upsert; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger; import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory; @@ -33,20 +34,19 @@ import org.apache.pinot.spi.data.readers.GenericRow; public class PartialUpsertHandler { // _column2Mergers maintains the mapping of merge strategies per columns. private final Map<String, PartialUpsertMerger> _column2Mergers = new HashMap<>(); + private final PartialUpsertMerger _defaultPartialUpsertMerger; + private final String _comparisonColumn; + private final List<String> _primaryKeyColumns; public PartialUpsertHandler(Schema schema, Map<String, UpsertConfig.Strategy> partialUpsertStrategies, UpsertConfig.Strategy defaultPartialUpsertStrategy, String comparisonColumn) { + _defaultPartialUpsertMerger = PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy); + _comparisonColumn = comparisonColumn; + _primaryKeyColumns = schema.getPrimaryKeyColumns(); + for (Map.Entry<String, UpsertConfig.Strategy> entry : partialUpsertStrategies.entrySet()) { _column2Mergers.put(entry.getKey(), PartialUpsertMergerFactory.getMerger(entry.getValue())); } - // For all physical columns (including date time columns) except for primary key columns and comparison column. - // If no comparison column is configured, use main time column as the comparison time. - for (String columnName : schema.getPhysicalColumnNames()) { - if (!schema.getPrimaryKeyColumns().contains(columnName) && !_column2Mergers.containsKey(columnName) - && !comparisonColumn.equals(columnName)) { - _column2Mergers.put(columnName, PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy)); - } - } } /** @@ -65,15 +65,17 @@ public class PartialUpsertHandler { * @return a new row after merge */ public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) { - for (Map.Entry<String, PartialUpsertMerger> entry : _column2Mergers.entrySet()) { - String column = entry.getKey(); - if (!previousRecord.isNullValue(column)) { - if (newRecord.isNullValue(column)) { - newRecord.putValue(column, previousRecord.getValue(column)); - newRecord.removeNullValueField(column); - } else { - newRecord.putValue(column, - entry.getValue().merge(previousRecord.getValue(column), newRecord.getValue(column))); + for (String column : previousRecord.getFieldToValueMap().keySet()) { + if (!_primaryKeyColumns.contains(column) && !_comparisonColumn.equals(column)) { + if (!previousRecord.isNullValue(column)) { + if (newRecord.isNullValue(column)) { + newRecord.putValue(column, previousRecord.getValue(column)); + newRecord.removeNullValueField(column); + } else { + PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, _defaultPartialUpsertMerger); + newRecord.putValue(column, + merger.merge(previousRecord.getValue(column), newRecord.getValue(column))); + } } } } diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java index 31a508e988..913215c85d 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java @@ -66,7 +66,7 @@ public class PartialUpsertHandlerTest { // newRecord is default null value, while previousRecord is not. // field1 should not be incremented since the newRecord is null. - // special case: field2 should be overrided by null value because we didn't enabled default partial upsert strategy. + // special case: field2 should be merged based on default partial upsert strategy. previousRecord.clear(); incomingRecord.clear(); previousRecord.putValue("field1", 1); @@ -76,7 +76,8 @@ public class PartialUpsertHandlerTest { newRecord = handler.merge(previousRecord, incomingRecord); assertFalse(newRecord.isNullValue("field1")); assertEquals(newRecord.getValue("field1"), 1); - assertTrue(newRecord.isNullValue("field2")); + assertFalse(newRecord.isNullValue("field2")); + assertEquals(newRecord.getValue("field2"), 2); // neither of records is null. previousRecord.clear(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org