This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 74e13b1aa0 Merge new columns in existing record with default merge 
strategy (#9851)
74e13b1aa0 is described below

commit 74e13b1aa064e13aebba060ec1d292689721ff91
Author: Navina Ramesh <nav...@apache.org>
AuthorDate: Tue Nov 29 10:02:55 2022 -0800

    Merge new columns in existing record with default merge strategy (#9851)
---
 .../segment/local/upsert/PartialUpsertHandler.java | 36 ++++++++++++----------
 .../local/upsert/PartialUpsertHandlerTest.java     |  5 +--
 2 files changed, 22 insertions(+), 19 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
index 4a1cfad39f..3444a5ac54 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandler.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.upsert;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMerger;
 import org.apache.pinot.segment.local.upsert.merger.PartialUpsertMergerFactory;
@@ -33,20 +34,19 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 public class PartialUpsertHandler {
   // _column2Mergers maintains the mapping of merge strategies per columns.
   private final Map<String, PartialUpsertMerger> _column2Mergers = new 
HashMap<>();
+  private final PartialUpsertMerger _defaultPartialUpsertMerger;
+  private final String _comparisonColumn;
+  private final List<String> _primaryKeyColumns;
 
   public PartialUpsertHandler(Schema schema, Map<String, 
UpsertConfig.Strategy> partialUpsertStrategies,
       UpsertConfig.Strategy defaultPartialUpsertStrategy, String 
comparisonColumn) {
+    _defaultPartialUpsertMerger = 
PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy);
+    _comparisonColumn = comparisonColumn;
+    _primaryKeyColumns = schema.getPrimaryKeyColumns();
+
     for (Map.Entry<String, UpsertConfig.Strategy> entry : 
partialUpsertStrategies.entrySet()) {
       _column2Mergers.put(entry.getKey(), 
PartialUpsertMergerFactory.getMerger(entry.getValue()));
     }
-    // For all physical columns (including date time columns) except for 
primary key columns and comparison column.
-    // If no comparison column is configured, use main time column as the 
comparison time.
-    for (String columnName : schema.getPhysicalColumnNames()) {
-      if (!schema.getPrimaryKeyColumns().contains(columnName) && 
!_column2Mergers.containsKey(columnName)
-          && !comparisonColumn.equals(columnName)) {
-        _column2Mergers.put(columnName, 
PartialUpsertMergerFactory.getMerger(defaultPartialUpsertStrategy));
-      }
-    }
   }
 
   /**
@@ -65,15 +65,17 @@ public class PartialUpsertHandler {
    * @return a new row after merge
    */
   public GenericRow merge(GenericRow previousRecord, GenericRow newRecord) {
-    for (Map.Entry<String, PartialUpsertMerger> entry : 
_column2Mergers.entrySet()) {
-      String column = entry.getKey();
-      if (!previousRecord.isNullValue(column)) {
-        if (newRecord.isNullValue(column)) {
-          newRecord.putValue(column, previousRecord.getValue(column));
-          newRecord.removeNullValueField(column);
-        } else {
-          newRecord.putValue(column,
-              entry.getValue().merge(previousRecord.getValue(column), 
newRecord.getValue(column)));
+    for (String column : previousRecord.getFieldToValueMap().keySet()) {
+      if (!_primaryKeyColumns.contains(column) && 
!_comparisonColumn.equals(column)) {
+        if (!previousRecord.isNullValue(column)) {
+          if (newRecord.isNullValue(column)) {
+            newRecord.putValue(column, previousRecord.getValue(column));
+            newRecord.removeNullValueField(column);
+          } else {
+            PartialUpsertMerger merger = _column2Mergers.getOrDefault(column, 
_defaultPartialUpsertMerger);
+            newRecord.putValue(column,
+                merger.merge(previousRecord.getValue(column), 
newRecord.getValue(column)));
+          }
         }
       }
     }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
index 31a508e988..913215c85d 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/PartialUpsertHandlerTest.java
@@ -66,7 +66,7 @@ public class PartialUpsertHandlerTest {
 
     // newRecord is default null value, while previousRecord is not.
     // field1 should not be incremented since the newRecord is null.
-    // special case: field2 should be overrided by null value because we 
didn't enabled default partial upsert strategy.
+    // special case: field2 should be merged based on default partial upsert 
strategy.
     previousRecord.clear();
     incomingRecord.clear();
     previousRecord.putValue("field1", 1);
@@ -76,7 +76,8 @@ public class PartialUpsertHandlerTest {
     newRecord = handler.merge(previousRecord, incomingRecord);
     assertFalse(newRecord.isNullValue("field1"));
     assertEquals(newRecord.getValue("field1"), 1);
-    assertTrue(newRecord.isNullValue("field2"));
+    assertFalse(newRecord.isNullValue("field2"));
+    assertEquals(newRecord.getValue("field2"), 2);
 
     // neither of records is null.
     previousRecord.clear();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to