This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e8841316fa Simplify the handling for partial-upsert record update 
(#10970)
e8841316fa is described below

commit e8841316fa63bf5406d03f3459ae0c40985fbf10
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Jun 23 15:52:01 2023 -0700

    Simplify the handling for partial-upsert record update (#10970)
---
 ...oncurrentMapPartitionUpsertMetadataManager.java | 47 ++++++++--------------
 1 file changed, 16 insertions(+), 31 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index e7ad9c5919..a65ce670bd 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
@@ -256,40 +255,26 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
   protected GenericRow doUpdateRecord(GenericRow record, RecordInfo 
recordInfo) {
     assert _partialUpsertHandler != null;
     AtomicReference<GenericRow> previousRecordReference = new 
AtomicReference<>();
-    AtomicBoolean outOfOrder = new AtomicBoolean();
-    RecordLocation currentRecordLocation = 
_primaryKeyToRecordLocationMap.computeIfPresent(
-        HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), 
(pk, recordLocation) -> {
-          if 
(recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue()) 
>= 0) {
-            if (!recordInfo.isDeleteRecord()) {
-              IndexSegment currentSegment = recordLocation.getSegment();
-              int currentDocId = recordLocation.getDocId();
-              ThreadSafeMutableRoaringBitmap currentQueryableDocIds = 
currentSegment.getQueryableDocIds();
-              if (currentQueryableDocIds == null || 
currentQueryableDocIds.contains(currentDocId)) {
-                // if delete is not enabled or previous record not marked as 
deleted
-                _reuse.clear();
-                
previousRecordReference.set(currentSegment.getRecord(currentDocId, _reuse));
-              }
+    
_primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
 _hashFunction),
+        (pk, recordLocation) -> {
+          // Read the previous record if the following conditions are met:
+          // - New record is not a DELETE record
+          // - New record is not out-of-order
+          // - Previous record is not deleted
+          if (!recordInfo.isDeleteRecord()
+              && 
recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue()) 
>= 0) {
+            IndexSegment currentSegment = recordLocation.getSegment();
+            ThreadSafeMutableRoaringBitmap currentQueryableDocIds = 
currentSegment.getQueryableDocIds();
+            int currentDocId = recordLocation.getDocId();
+            if (currentQueryableDocIds == null || 
currentQueryableDocIds.contains(currentDocId)) {
+              _reuse.clear();
+              
previousRecordReference.set(currentSegment.getRecord(currentDocId, _reuse));
             }
-          } else {
-            outOfOrder.set(true);
           }
           return recordLocation;
         });
-    if (currentRecordLocation != null) {
-      // Existing primary key
-      if (!outOfOrder.get()) {
-        GenericRow previousRecord = previousRecordReference.get();
-        if (previousRecord == null) {
-          return record;
-        }
-        return _partialUpsertHandler.merge(previousRecord, record);
-      } else {
-        return record;
-      }
-    } else {
-      // New primary key
-      return record;
-    }
+    GenericRow previousRecord = previousRecordReference.get();
+    return previousRecord != null ? 
_partialUpsertHandler.merge(previousRecord, record) : record;
   }
 
   @VisibleForTesting


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to