This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e8841316fa Simplify the handling for partial-upsert record update (#10970) e8841316fa is described below commit e8841316fa63bf5406d03f3459ae0c40985fbf10 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Jun 23 15:52:01 2023 -0700 Simplify the handling for partial-upsert record update (#10970) --- ...oncurrentMapPartitionUpsertMetadataManager.java | 47 ++++++++-------------- 1 file changed, 16 insertions(+), 31 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index e7ad9c5919..a65ce670bd 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -23,7 +23,6 @@ import java.util.Iterator; import java.util.List; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -256,40 +255,26 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp protected GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo) { assert _partialUpsertHandler != null; AtomicReference<GenericRow> previousRecordReference = new AtomicReference<>(); - AtomicBoolean outOfOrder = new AtomicBoolean(); - RecordLocation currentRecordLocation = _primaryKeyToRecordLocationMap.computeIfPresent( - HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), (pk, recordLocation) -> { - if (recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue()) >= 0) { - if (!recordInfo.isDeleteRecord()) { - IndexSegment currentSegment = recordLocation.getSegment(); - int currentDocId = recordLocation.getDocId(); - ThreadSafeMutableRoaringBitmap currentQueryableDocIds = currentSegment.getQueryableDocIds(); - if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) { - // if delete is not enabled or previous record not marked as deleted - _reuse.clear(); - previousRecordReference.set(currentSegment.getRecord(currentDocId, _reuse)); - } + _primaryKeyToRecordLocationMap.computeIfPresent(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), _hashFunction), + (pk, recordLocation) -> { + // Read the previous record if the following conditions are met: + // - New record is not a DELETE record + // - New record is not out-of-order + // - Previous record is not deleted + if (!recordInfo.isDeleteRecord() + && recordInfo.getComparisonValue().compareTo(recordLocation.getComparisonValue()) >= 0) { + IndexSegment currentSegment = recordLocation.getSegment(); + ThreadSafeMutableRoaringBitmap currentQueryableDocIds = currentSegment.getQueryableDocIds(); + int currentDocId = recordLocation.getDocId(); + if (currentQueryableDocIds == null || currentQueryableDocIds.contains(currentDocId)) { + _reuse.clear(); + previousRecordReference.set(currentSegment.getRecord(currentDocId, _reuse)); } - } else { - outOfOrder.set(true); } return recordLocation; }); - if (currentRecordLocation != null) { - // Existing primary key - if (!outOfOrder.get()) { - GenericRow previousRecord = previousRecordReference.get(); - if (previousRecord == null) { - return record; - } - return _partialUpsertHandler.merge(previousRecord, record); - } else { - return record; - } - } else { - // New primary key - return record; - } + GenericRow previousRecord = previousRecordReference.get(); + return previousRecord != null ? _partialUpsertHandler.merge(previousRecord, record) : record; } @VisibleForTesting --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org