This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c1bb02dec Misc fixes for upsert metadata manager (#12319)
9c1bb02dec is described below

commit 9c1bb02decc32f5e685c69667a87e1bf7621fb2e
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Wed Jan 24 15:59:12 2024 -0800

    Misc fixes for upsert metadata manager (#12319)
---
 .../upsert/BasePartitionUpsertMetadataManager.java | 52 +++++++++++-----------
 ...oncurrentMapPartitionUpsertMetadataManager.java | 43 ++++++++----------
 2 files changed, 45 insertions(+), 50 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index b63f58e013..aca199659d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.upsert;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.AtomicDouble;
 import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
@@ -91,7 +92,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
 
   // Used to maintain the largestSeenComparisonValue to avoid handling 
out-of-ttl segments/records.
   // If upsertTTL enabled, we will keep track of largestSeenComparisonValue to 
compute expired segments.
-  protected volatile double _largestSeenComparisonValue;
+  protected final AtomicDouble _largestSeenComparisonValue;
 
   // The following variables are always accessed within synchronized block
   private boolean _stopped;
@@ -116,9 +117,9 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     _serverMetrics = ServerMetrics.get();
     _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + 
"-" + getClass().getSimpleName());
     if (_metadataTTL > 0) {
-      _largestSeenComparisonValue = loadWatermark();
+      _largestSeenComparisonValue = new AtomicDouble(loadWatermark());
     } else {
-      _largestSeenComparisonValue = Double.MIN_VALUE;
+      _largestSeenComparisonValue = new AtomicDouble(Double.MIN_VALUE);
       deleteWatermark();
     }
   }
@@ -166,17 +167,17 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       double maxComparisonValue =
           ((Number) 
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0))
               .getMaxValue()).doubleValue();
-      _largestSeenComparisonValue = Math.max(_largestSeenComparisonValue, 
maxComparisonValue);
+      _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, 
maxComparisonValue));
     }
 
     // Skip adding segment that has max comparison value smaller than 
(largestSeenComparisonValue - TTL)
-    if (_metadataTTL > 0 && _largestSeenComparisonValue > 0) {
+    if (_metadataTTL > 0 && _largestSeenComparisonValue.get() > 0) {
       Preconditions.checkState(_enableSnapshot, "Upsert TTL must have snapshot 
enabled");
       Preconditions.checkState(_comparisonColumns.size() == 1,
           "Upsert TTL does not work with multiple comparison columns");
       Number maxComparisonValue =
           (Number) 
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
-      if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue - 
_metadataTTL) {
+      if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() 
- _metadataTTL) {
         _logger.info("Skip adding segment: {} because it's out of TTL", 
segmentName);
         MutableRoaringBitmap validDocIdsSnapshot = 
immutableSegment.loadValidDocIdsFromSnapshot();
         if (validDocIdsSnapshot != null) {
@@ -245,11 +246,20 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
 
     // Update metrics
     long numPrimaryKeys = getNumPrimaryKeys();
+    updatePrimaryKeyGauge(numPrimaryKeys);
+    _logger.info("Finished adding segment: {} in {}ms, current primary key 
count: {}", segmentName,
+        System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
+  }
+
+  protected abstract long getNumPrimaryKeys();
+
+  protected void updatePrimaryKeyGauge(long numPrimaryKeys) {
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
         numPrimaryKeys);
+  }
 
-    _logger.info("Finished adding segment: {} in {}ms, current primary key 
count: {}", segmentName,
-        System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
+  protected void updatePrimaryKeyGauge() {
+    updatePrimaryKeyGauge(getNumPrimaryKeys());
   }
 
   @Override
@@ -275,7 +285,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
   }
 
-  private void doPreloadSegment(ImmutableSegmentImpl segment) {
+  protected void doPreloadSegment(ImmutableSegmentImpl segment) {
     String segmentName = segment.getSegmentName();
     _logger.info("Preloading segment: {}, current primary key count: {}", 
segmentName, getNumPrimaryKeys());
     long startTimeMs = System.currentTimeMillis();
@@ -301,8 +311,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
 
     // Update metrics
     long numPrimaryKeys = getNumPrimaryKeys();
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
-        numPrimaryKeys);
+    updatePrimaryKeyGauge(numPrimaryKeys);
     _logger.info("Finished preloading segment: {} in {}ms, current primary key 
count: {}", segmentName,
         System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
@@ -347,8 +356,6 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
   }
 
-  protected abstract long getNumPrimaryKeys();
-
   protected abstract void addOrReplaceSegment(ImmutableSegmentImpl segment, 
ThreadSafeMutableRoaringBitmap validDocIds,
       @Nullable ThreadSafeMutableRoaringBitmap queryableDocIds, 
Iterator<RecordInfo> recordInfoIterator,
       @Nullable IndexSegment oldSegment, @Nullable MutableRoaringBitmap 
validDocIdsForOldSegment);
@@ -378,8 +385,8 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   }
 
   /**
-   Returns {@code true} when the record is added to the upsert metadata 
manager,
-   {@code false} when the record is out-of-order thus not added.
+   * Returns {@code true} when the record is added to the upsert metadata 
manager, {@code false} when the record is
+   * out-of-order thus not added.
    */
   protected abstract boolean doAddRecord(MutableSegment segment, RecordInfo 
recordInfo);
 
@@ -433,9 +440,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
 
     // Update metrics
     long numPrimaryKeys = getNumPrimaryKeys();
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
-        numPrimaryKeys);
-
+    updatePrimaryKeyGauge(numPrimaryKeys);
     _logger.info("Finished replacing segment: {} in {}ms, current primary key 
count: {}", segmentName,
         System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
@@ -506,10 +511,10 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       return;
     }
     // Skip removing segment that has max comparison value smaller than 
(largestSeenComparisonValue - TTL)
-    if (_metadataTTL > 0 && _largestSeenComparisonValue > 0) {
+    if (_metadataTTL > 0 && _largestSeenComparisonValue.get() > 0) {
       Number maxComparisonValue =
           (Number) 
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
-      if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue - 
_metadataTTL) {
+      if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() 
- _metadataTTL) {
         _logger.info("Skip removing segment: {} because it's out of TTL", 
segmentName);
         return;
       }
@@ -556,9 +561,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
 
     // Update metrics
     long numPrimaryKeys = getNumPrimaryKeys();
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
-        numPrimaryKeys);
-
+    updatePrimaryKeyGauge(numPrimaryKeys);
     _logger.info("Finished removing segment: {} in {}ms, current primary key 
count: {}", segmentName,
         System.currentTimeMillis() - startTimeMs, numPrimaryKeys);
   }
@@ -793,8 +796,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     // We don't remove the segment from the metadata manager when
     // it's closed. This was done to make table deletion faster. Since we 
don't remove the segment, we never decrease
     // the primary key count. So, we set the primary key count to 0 here.
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
-        0L);
+    updatePrimaryKeyGauge(0);
     _logger.info("Closed the metadata manager");
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index 576d679368..887582538c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
-import org.apache.pinot.common.metrics.ServerGauge;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
@@ -223,22 +222,22 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
 
   @Override
   public void doRemoveExpiredPrimaryKeys() {
-    AtomicInteger numDeletedTTLKeysRemoved = new AtomicInteger();
     AtomicInteger numMetadataTTLKeysRemoved = new AtomicInteger();
+    AtomicInteger numDeletedTTLKeysRemoved = new AtomicInteger();
+    double largestSeenComparisonValue = _largestSeenComparisonValue.get();
     double metadataTTLKeysThreshold;
     if (_metadataTTL > 0) {
-      metadataTTLKeysThreshold = _largestSeenComparisonValue - _metadataTTL;
+      metadataTTLKeysThreshold = largestSeenComparisonValue - _metadataTTL;
     } else {
       metadataTTLKeysThreshold = Double.MIN_VALUE;
     }
-
     double deletedKeysThreshold;
-
     if (_deletedKeysTTL > 0) {
-      deletedKeysThreshold = _largestSeenComparisonValue - _deletedKeysTTL;
+      deletedKeysThreshold = largestSeenComparisonValue - _deletedKeysTTL;
     } else {
       deletedKeysThreshold = Double.MIN_VALUE;
     }
+
     _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
       double comparisonValue = ((Number) 
recordLocation.getComparisonValue()).doubleValue();
       if (_metadataTTL > 0 && comparisonValue < metadataTTLKeysThreshold) {
@@ -255,29 +254,25 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
       }
     });
     if (_metadataTTL > 0) {
-      persistWatermark(_largestSeenComparisonValue);
+      persistWatermark(largestSeenComparisonValue);
     }
 
-    int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get();
-    if (numDeletedTTLKeys > 0) {
-      _logger.info("Deleted {} primary keys based on deletedKeysTTL in the 
table {}", numDeletedTTLKeys,
-          _tableNameWithType);
-      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED,
-          numDeletedTTLKeys);
-    }
+    // Update metrics
+    updatePrimaryKeyGauge();
     int numMetadataTTLKeys = numMetadataTTLKeysRemoved.get();
     if (numMetadataTTLKeys > 0) {
-      _logger.info("Deleted {} primary keys based on metadataTTL in the table 
{}", numMetadataTTLKeys,
-          _tableNameWithType);
+      _logger.info("Deleted {} primary keys based on metadataTTL", 
numMetadataTTLKeys);
       _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.METADATA_TTL_PRIMARY_KEYS_REMOVED,
           numMetadataTTLKeys);
     }
+    int numDeletedTTLKeys = numDeletedTTLKeysRemoved.get();
+    if (numDeletedTTLKeys > 0) {
+      _logger.info("Deleted {} primary keys based on deletedKeysTTL", 
numDeletedTTLKeys);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.DELETED_KEYS_TTL_PRIMARY_KEYS_REMOVED,
+          numDeletedTTLKeys);
+    }
   }
 
-  /**
-   Returns {@code true} when the record is added to the upsert metadata 
manager,
-   {@code false} when the record is out-of-order thus not added.
-   */
   @Override
   protected boolean doAddRecord(MutableSegment segment, RecordInfo recordInfo) 
{
     AtomicBoolean isOutOfOrderRecord = new AtomicBoolean(false);
@@ -289,7 +284,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
     // When TTL is enabled, update largestSeenComparisonValue when adding new 
record
     if (_metadataTTL > 0 || _deletedKeysTTL > 0) {
       double comparisonValue = ((Number) newComparisonValue).doubleValue();
-      _largestSeenComparisonValue = Math.max(_largestSeenComparisonValue, 
comparisonValue);
+      _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, 
comparisonValue));
     }
 
     
_primaryKeyToRecordLocationMap.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(),
 _hashFunction),
@@ -310,8 +305,8 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
               }
               return new RecordLocation(segment, newDocId, newComparisonValue);
             } else {
+              // Out-of-order record
               
handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), 
recordInfo.getComparisonValue());
-              // this is a out-of-order record then set value to true - this 
indicates whether out-of-order or not
               isOutOfOrderRecord.set(true);
               return currentRecordLocation;
             }
@@ -322,9 +317,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
           }
         });
 
-    // Update metrics
-    _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, 
ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
-        _primaryKeyToRecordLocationMap.size());
+    updatePrimaryKeyGauge();
     return !isOutOfOrderRecord.get();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to