This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new ad1eda52ea fix checks on largest comparison value for upsert ttl and 
allow to add segments out of ttl (#14094)
ad1eda52ea is described below

commit ad1eda52ea3e9a040f87d0e0cabff1fba933f7bf
Author: Xiaobing <61892277+klsi...@users.noreply.github.com>
AuthorDate: Fri Sep 27 19:37:03 2024 -0700

    fix checks on largest comparison value for upsert ttl and allow to add 
segments out of ttl (#14094)
    
    * fix checks on largest comparison value for ttl and allow subclass to add 
segments even out of ttl
    
    * fix the special value for TTL comparison and use Double.Negtive_Infinity
    
    * unify TTL enable check and persist watermark after taking snapshot
    
    * update ttl watermark and skip segment out of ttl during preloading
    
    * refine and add tests
---
 .../upsert/BasePartitionUpsertMetadataManager.java | 137 +++++++++------
 ...oncurrentMapPartitionUpsertMetadataManager.java |  22 +--
 ...nUpsertMetadataManagerForConsistentDeletes.java |  12 +-
 ...rrentMapPartitionUpsertMetadataManagerTest.java | 183 ++++++++++++++++-----
 4 files changed, 239 insertions(+), 115 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 048213e1b1..18142e2981 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -88,6 +88,8 @@ import org.slf4j.LoggerFactory;
 @ThreadSafe
 public abstract class BasePartitionUpsertMetadataManager implements 
PartitionUpsertMetadataManager {
   protected static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = 
TimeUnit.MINUTES.toNanos(1);
+  // The special value to indicate the largest comparison value is not set 
yet, and allow negative comparison values.
+  protected static final double TTL_WATERMARK_NOT_SET = 
Double.NEGATIVE_INFINITY;
 
   protected final String _tableNameWithType;
   protected final int _partitionId;
@@ -178,10 +180,13 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
     _serverMetrics = ServerMetrics.get();
     _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + 
"-" + getClass().getSimpleName());
-    if (_metadataTTL > 0) {
+    if (isTTLEnabled()) {
+      Preconditions.checkState(_comparisonColumns.size() == 1,
+          "Upsert TTL does not work with multiple comparison columns");
+      Preconditions.checkState(_metadataTTL <= 0 || _enableSnapshot, "Upsert 
metadata TTL must have snapshot enabled");
       _largestSeenComparisonValue = new AtomicDouble(loadWatermark());
     } else {
-      _largestSeenComparisonValue = new AtomicDouble(Double.MIN_VALUE);
+      _largestSeenComparisonValue = new AtomicDouble(TTL_WATERMARK_NOT_SET);
       deleteWatermark();
     }
   }
@@ -383,37 +388,9 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
         "Got unsupported segment implementation: %s for segment: %s, table: 
%s", segment.getClass(), segmentName,
         _tableNameWithType);
-    ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment;
-
-    if (_deletedKeysTTL > 0) {
-      double maxComparisonValue =
-          ((Number) 
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0))
-              .getMaxValue()).doubleValue();
-      _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, 
maxComparisonValue));
+    if (isTTLEnabled()) {
+      updateWatermark(segment);
     }
-
-    // Skip adding segment that has max comparison value smaller than 
(largestSeenComparisonValue - TTL)
-    if (_metadataTTL > 0 && _largestSeenComparisonValue.get() > 0) {
-      Preconditions.checkState(_enableSnapshot, "Upsert TTL must have snapshot 
enabled");
-      Preconditions.checkState(_comparisonColumns.size() == 1,
-          "Upsert TTL does not work with multiple comparison columns");
-      Number maxComparisonValue =
-          (Number) 
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
-      if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() 
- _metadataTTL) {
-        _logger.info("Skip adding segment: {} because it's out of TTL", 
segmentName);
-        MutableRoaringBitmap validDocIdsSnapshot = 
immutableSegment.loadValidDocIdsFromSnapshot();
-        if (validDocIdsSnapshot != null) {
-          MutableRoaringBitmap queryableDocIds = getQueryableDocIds(segment, 
validDocIdsSnapshot);
-          immutableSegment.enableUpsert(this, new 
ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot),
-              queryableDocIds != null ? new 
ThreadSafeMutableRoaringBitmap(queryableDocIds) : null);
-        } else {
-          _logger.warn("Failed to find snapshot from segment: {} which is out 
of TTL, treating all documents as valid",
-              segmentName);
-        }
-        return;
-      }
-    }
-
     if (!startOperation()) {
       _logger.info("Skip adding segment: {} because metadata manager is 
already stopped", segment.getSegmentName());
       return;
@@ -422,7 +399,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       _snapshotLock.readLock().lock();
     }
     try {
-      doAddSegment(immutableSegment);
+      doAddSegment((ImmutableSegmentImpl) segment);
       _trackedSegments.add(segment);
       if (_enableSnapshot) {
         _updatedSegmentsSinceLastSnapshot.add(segment);
@@ -435,9 +412,51 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     }
   }
 
+  protected boolean isTTLEnabled() {
+    return _metadataTTL > 0 || _deletedKeysTTL > 0;
+  }
+
+  protected boolean isOutOfMetadataTTL(IndexSegment segment) {
+    if (_metadataTTL > 0 && _largestSeenComparisonValue.get() != 
TTL_WATERMARK_NOT_SET) {
+      Number maxComparisonValue =
+          (Number) 
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
+      return maxComparisonValue.doubleValue() < 
_largestSeenComparisonValue.get() - _metadataTTL;
+    }
+    return false;
+  }
+
+  protected boolean skipAddSegmentOutOfTTL(ImmutableSegmentImpl segment) {
+    String segmentName = segment.getSegmentName();
+    _logger.info("Skip adding segment: {} because it's out of TTL", 
segmentName);
+    MutableRoaringBitmap validDocIdsSnapshot = 
segment.loadValidDocIdsFromSnapshot();
+    if (validDocIdsSnapshot != null) {
+      MutableRoaringBitmap queryableDocIds = getQueryableDocIds(segment, 
validDocIdsSnapshot);
+      segment.enableUpsert(this, new 
ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot),
+          queryableDocIds != null ? new 
ThreadSafeMutableRoaringBitmap(queryableDocIds) : null);
+    } else {
+      _logger.warn("Failed to find validDocIds snapshot to add segment: {} out 
of TTL, treating all docs as valid",
+          segmentName);
+    }
+    // Return true if segment is skipped. This boolean value allows subclass 
to decide whether to skip.
+    return true;
+  }
+
+  protected boolean skipPreloadSegmentOutOfTTL(ImmutableSegmentImpl segment, 
MutableRoaringBitmap validDocIdsSnapshot) {
+    String segmentName = segment.getSegmentName();
+    _logger.info("Skip preloading segment: {} because it's out of TTL", 
segmentName);
+    MutableRoaringBitmap queryableDocIds = getQueryableDocIds(segment, 
validDocIdsSnapshot);
+    segment.enableUpsert(this, new 
ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot),
+        queryableDocIds != null ? new 
ThreadSafeMutableRoaringBitmap(queryableDocIds) : null);
+    // Return true if segment is skipped. This boolean value allows subclass 
to decide whether to skip.
+    return true;
+  }
+
   protected void doAddSegment(ImmutableSegmentImpl segment) {
     String segmentName = segment.getSegmentName();
     _logger.info("Adding segment: {}, current primary key count: {}", 
segmentName, getNumPrimaryKeys());
+    if (isOutOfMetadataTTL(segment) && skipAddSegmentOutOfTTL(segment)) {
+      return;
+    }
     long startTimeMs = System.currentTimeMillis();
     if (!_enableSnapshot) {
       segment.deleteValidDocIdsSnapshot();
@@ -479,6 +498,9 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
         "Got unsupported segment implementation: %s for segment: %s, table: 
%s", segment.getClass(), segmentName,
         _tableNameWithType);
+    if (isTTLEnabled()) {
+      updateWatermark(segment);
+    }
     if (!startOperation()) {
       _logger.info("Skip preloading segment: {} because metadata manager is 
already stopped", segmentName);
       return;
@@ -508,7 +530,9 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null);
       return;
     }
-
+    if (isOutOfMetadataTTL(segment) && skipPreloadSegmentOutOfTTL(segment, 
validDocIds)) {
+      return;
+    }
     try (UpsertUtils.RecordInfoReader recordInfoReader = new 
UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns,
         _comparisonColumns, _deleteRecordColumn)) {
       doPreloadSegment(segment, null, null, 
UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds));
@@ -780,21 +804,17 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       _logger.info("Skip removing untracked (replaced or empty) segment: {}", 
segmentName);
       return;
     }
+    if (!startOperation()) {
+      _logger.info("Skip removing segment: {} because metadata manager is 
already stopped", segmentName);
+      return;
+    }
     // Skip removing the upsert metadata of segment that has max comparison 
value smaller than
     // (largestSeenComparisonValue - TTL), i.e. out of metadata TTL. The 
expired metadata is removed while creating
     // new consuming segment in batches.
     boolean skipRemoveMetadata = false;
-    if (_metadataTTL > 0 && _largestSeenComparisonValue.get() > 0) {
-      Number maxComparisonValue =
-          (Number) 
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue();
-      if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() 
- _metadataTTL) {
-        _logger.info("Skip removing segment: {} because it's out of TTL", 
segmentName);
-        skipRemoveMetadata = true;
-      }
-    }
-    if (!startOperation()) {
-      _logger.info("Skip removing segment: {} because metadata manager is 
already stopped", segmentName);
-      return;
+    if (isOutOfMetadataTTL(segment)) {
+      _logger.info("Skip removing segment: {} because it's out of TTL", 
segmentName);
+      skipRemoveMetadata = true;
     }
     if (_enableSnapshot) {
       _snapshotLock.readLock().lock();
@@ -989,6 +1009,12 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       }
     }
     _updatedSegmentsSinceLastSnapshot.clear();
+    // Persist TTL watermark after taking snapshots if TTL is enabled, so that 
segments out of TTL can be loaded with
+    // updated validDocIds bitmaps. If the TTL watermark is persisted first, 
segments out of TTL may get loaded with
+    // stale bitmaps or even no bitmap snapshots to use.
+    if (isTTLEnabled()) {
+      persistWatermark(_largestSeenComparisonValue.get());
+    }
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
         ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, numImmutableSegments);
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
@@ -1020,7 +1046,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
         _logger.warn("Caught exception while loading watermark file: {}, 
skipping", watermarkFile);
       }
     }
-    return Double.MIN_VALUE;
+    return TTL_WATERMARK_NOT_SET;
   }
 
   /**
@@ -1061,9 +1087,26 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION 
+ _partitionId);
   }
 
+  protected void updateWatermark(ImmutableSegment segment) {
+    double maxComparisonValue =
+        ((Number) 
segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0))
+            .getMaxValue()).doubleValue();
+    _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, 
maxComparisonValue));
+  }
+
+  @VisibleForTesting
+  double getWatermark() {
+    return _largestSeenComparisonValue.get();
+  }
+
+  @VisibleForTesting
+  void setWatermark(double watermark) {
+    _largestSeenComparisonValue.set(watermark);
+  }
+
   @Override
   public void removeExpiredPrimaryKeys() {
-    if (_metadataTTL <= 0 && _deletedKeysTTL <= 0) {
+    if (!isTTLEnabled()) {
       return;
     }
     if (!startOperation()) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index b1fc92a2bf..5552a6c65c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -192,19 +192,10 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
     AtomicInteger numTotalKeysMarkForDeletion = new AtomicInteger();
     AtomicInteger numDeletedKeysWithinTTLWindow = new AtomicInteger();
     double largestSeenComparisonValue = _largestSeenComparisonValue.get();
-    double metadataTTLKeysThreshold;
-    if (_metadataTTL > 0) {
-      metadataTTLKeysThreshold = largestSeenComparisonValue - _metadataTTL;
-    } else {
-      metadataTTLKeysThreshold = Double.MIN_VALUE;
-    }
-    double deletedKeysThreshold;
-    if (_deletedKeysTTL > 0) {
-      deletedKeysThreshold = largestSeenComparisonValue - _deletedKeysTTL;
-    } else {
-      deletedKeysThreshold = Double.MIN_VALUE;
-    }
-
+    double metadataTTLKeysThreshold =
+        _metadataTTL > 0 ? largestSeenComparisonValue - _metadataTTL : 
Double.NEGATIVE_INFINITY;
+    double deletedKeysThreshold =
+        _deletedKeysTTL > 0 ? largestSeenComparisonValue - _deletedKeysTTL : 
Double.NEGATIVE_INFINITY;
     _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
       double comparisonValue = ((Number) 
recordLocation.getComparisonValue()).doubleValue();
       if (_metadataTTL > 0 && comparisonValue < metadataTTLKeysThreshold) {
@@ -227,9 +218,6 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
         }
       }
     });
-    if (_metadataTTL > 0) {
-      persistWatermark(largestSeenComparisonValue);
-    }
 
     // Update metrics
     updatePrimaryKeyGauge();
@@ -266,7 +254,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
     Comparable newComparisonValue = recordInfo.getComparisonValue();
 
     // When TTL is enabled, update largestSeenComparisonValue when adding new 
record
-    if (_metadataTTL > 0 || _deletedKeysTTL > 0) {
+    if (isTTLEnabled()) {
       double comparisonValue = ((Number) newComparisonValue).doubleValue();
       _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, 
comparisonValue));
     }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
index 8507d00cc9..a179a05fa7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java
@@ -202,8 +202,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
         segment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName, getNumPrimaryKeys());
     long startTimeMs = System.currentTimeMillis();
 
-    try (
-        PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment, 
_primaryKeyColumns)) {
+    try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment, 
_primaryKeyColumns)) {
       removeSegment(segment,
           UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, 
segment.getSegmentMetadata().getTotalDocs()));
     } catch (Exception e) {
@@ -292,13 +291,8 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes
     AtomicInteger numDeletedKeysWithinTTLWindow = new AtomicInteger();
     AtomicInteger numDeletedTTLKeysInMultipleSegments = new AtomicInteger();
     double largestSeenComparisonValue = _largestSeenComparisonValue.get();
-    double deletedKeysThreshold;
-    if (_deletedKeysTTL > 0) {
-      deletedKeysThreshold = largestSeenComparisonValue - _deletedKeysTTL;
-    } else {
-      deletedKeysThreshold = Double.MIN_VALUE;
-    }
-
+    double deletedKeysThreshold =
+        _deletedKeysTTL > 0 ? largestSeenComparisonValue - _deletedKeysTTL : 
Double.NEGATIVE_INFINITY;
     _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> {
       double comparisonValue = ((Number) 
recordLocation.getComparisonValue()).doubleValue();
       // We need to verify that the record belongs to only one segment. If a 
record is part of multiple segments,
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 4270e9547d..278f0f5ef5 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -163,20 +163,88 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
   }
 
   @Test
-  public void testUpsertMetadataCleanupWithTTLConfig()
+  public void testRemoveExpiredPrimaryKeys()
       throws IOException {
     _contextBuilder.setEnableSnapshot(true).setMetadataTTL(30);
     verifyRemoveExpiredPrimaryKeys(new Integer(80), new Integer(120));
     verifyRemoveExpiredPrimaryKeys(new Float(80), new Float(120));
     verifyRemoveExpiredPrimaryKeys(new Double(80), new Double(120));
     verifyRemoveExpiredPrimaryKeys(new Long(80), new Long(120));
-    verifyPersistAndLoadWatermark();
-    verifyAddSegmentForTTL(new Integer(80));
-    verifyAddSegmentForTTL(new Float(80));
-    verifyAddSegmentForTTL(new Double(80));
-    verifyAddSegmentForTTL(new Long(80));
-    verifyAddOutOfTTLSegment();
-    verifyAddOutOfTTLSegmentWithRecordDelete();
+  }
+
+  @Test
+  public void testAddSegmentOutOfTTL()
+      throws IOException {
+    _contextBuilder.setEnableSnapshot(true).setMetadataTTL(30);
+    verifyAddSegmentOutOfTTL(new Integer(80));
+    verifyAddSegmentOutOfTTL(new Float(80));
+    verifyAddSegmentOutOfTTL(new Double(80));
+    verifyAddSegmentOutOfTTL(new Long(80));
+    verifyAddMultipleSegmentsWithOneOutOfTTL();
+    verifyAddSegmentOutOfTTLWithRecordDelete();
+  }
+
+  @Test
+  public void testTTLWithNegativeComparisonValues()
+      throws IOException {
+    _contextBuilder.setEnableSnapshot(true).setMetadataTTL(30);
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, _contextBuilder.build());
+    Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> 
recordLocationMap =
+        upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    // Add record to update largestSeenTimestamp, largest seen timestamp: 
comparisonValue
+    ThreadSafeMutableRoaringBitmap validDocIds0 = new 
ThreadSafeMutableRoaringBitmap();
+    MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null);
+    upsertMetadataManager.addRecord(segment0, new 
RecordInfo(makePrimaryKey(10), 1, -80, false));
+    checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, -80, 
HashFunction.NONE);
+    assertEquals(upsertMetadataManager.getWatermark(), -80);
+
+    // add a segment with segmentEndTime = -200 so it will be skipped since it 
out-of-TTL
+    int numRecords = 4;
+    int[] primaryKeys = new int[]{0, 1, 2, 3};
+    ThreadSafeMutableRoaringBitmap validDocIds1 = new 
ThreadSafeMutableRoaringBitmap();
+    List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, 
COMPARISON_COLUMNS, -200, null);
+
+    // load segment1.
+    upsertMetadataManager.addSegment(segment1);
+    assertEquals(recordLocationMap.size(), 1);
+    checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, -80, 
HashFunction.NONE);
+    assertEquals(upsertMetadataManager.getWatermark(), -80);
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
+  }
+
+  @Test
+  public void testManageWatermark()
+      throws IOException {
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, _contextBuilder.build());
+
+    double currentTimeMs = System.currentTimeMillis();
+    upsertMetadataManager.persistWatermark(currentTimeMs);
+    assertTrue(new File(INDEX_DIR, V1Constants.TTL_WATERMARK_TABLE_PARTITION + 
0).exists());
+
+    double watermark = upsertMetadataManager.loadWatermark();
+    assertEquals(watermark, currentTimeMs);
+
+    ImmutableSegmentImpl segment =
+        mockImmutableSegmentWithEndTime(1, new 
ThreadSafeMutableRoaringBitmap(), null, new ArrayList<>(),
+            COMPARISON_COLUMNS, new Double(currentTimeMs + 1024), new 
MutableRoaringBitmap());
+    upsertMetadataManager.updateWatermark(segment);
+    assertEquals(upsertMetadataManager.getWatermark(), currentTimeMs + 1024);
+
+    // Stop the metadata manager
+    upsertMetadataManager.stop();
+
+    // Close the metadata manager
+    upsertMetadataManager.close();
   }
 
   @Test
@@ -1049,6 +1117,52 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2});
   }
 
+  @Test
+  public void testPreloadSegmentOutOfTTL() {
+    _contextBuilder.setEnableSnapshot(true).setMetadataTTL(30);
+    verifyPreloadSegmentOutOfTTL(HashFunction.NONE);
+    verifyPreloadSegmentOutOfTTL(HashFunction.MD5);
+    verifyPreloadSegmentOutOfTTL(HashFunction.MURMUR3);
+  }
+
+  private void verifyPreloadSegmentOutOfTTL(HashFunction hashFunction) {
+    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
+        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0,
+            _contextBuilder.setHashFunction(hashFunction).build());
+    Map<Object, RecordLocation> recordLocationMap = 
upsertMetadataManager._primaryKeyToRecordLocationMap;
+
+    int numRecords = 3;
+    int[] primaryKeys = new int[]{0, 1, 2};
+    int[] docIds = new int[]{0, 1, 2};
+    ThreadSafeMutableRoaringBitmap validDocIds = new 
ThreadSafeMutableRoaringBitmap();
+    MutableRoaringBitmap snapshot = new MutableRoaringBitmap();
+    snapshot.add(docIds);
+    ImmutableSegmentImpl segment1 =
+        mockImmutableSegmentWithEndTime(1, validDocIds, null, 
getPrimaryKeyList(numRecords, primaryKeys),
+            COMPARISON_COLUMNS, 80, snapshot);
+
+    upsertMetadataManager.setWatermark(60);
+    upsertMetadataManager.doPreloadSegment(segment1);
+    assertEquals(recordLocationMap.keySet().size(), 3);
+    for (int key : primaryKeys) {
+      
assertTrue(recordLocationMap.containsKey(HashUtils.hashPrimaryKey(makePrimaryKey(key),
 hashFunction)),
+          String.valueOf(key));
+    }
+
+    // Bump up the watermark, so that segment2 gets out of TTL and is skipped.
+    upsertMetadataManager.setWatermark(120);
+    primaryKeys = new int[]{10, 11, 12};
+    ImmutableSegmentImpl segment2 =
+        mockImmutableSegmentWithEndTime(1, validDocIds, null, 
getPrimaryKeyList(numRecords, primaryKeys),
+            COMPARISON_COLUMNS, 80, snapshot);
+    upsertMetadataManager.doPreloadSegment(segment2);
+    assertEquals(recordLocationMap.keySet().size(), 3);
+    for (int key : primaryKeys) {
+      
assertFalse(recordLocationMap.containsKey(HashUtils.hashPrimaryKey(makePrimaryKey(key),
 hashFunction)),
+          String.valueOf(key));
+    }
+  }
+
   @Test
   public void testAddRecordWithDeleteColumn()
       throws IOException {
@@ -1260,6 +1374,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null);
     upsertMetadataManager.addRecord(segment0, new 
RecordInfo(makePrimaryKey(10), 1, earlierComparisonValue, false));
     checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, 
HashFunction.NONE);
+    assertEquals(upsertMetadataManager.getWatermark(), 80);
 
     // Add a segment with segmentEndTime = earlierComparisonValue, so it will 
not be skipped
     int numRecords = 4;
@@ -1271,10 +1386,6 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
         mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, 
COMPARISON_COLUMNS, earlierComparisonValue,
             null);
 
-    int[] docIds1 = new int[]{0, 1, 2, 3};
-    MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
-    validDocIdsSnapshot1.add(docIds1);
-
     // load segment1.
     upsertMetadataManager.addSegment(segment1, validDocIds1, null,
         getRecordInfoListForTTL(numRecords, primaryKeys, 
timestamps).iterator());
@@ -1284,6 +1395,9 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, 
HashFunction.NONE);
+    // The watermark is updated by segment's max comparison value, although 
segment1 has a few docs with larger
+    // comparison values. This segment is created to simplify the tests here 
and it shouldn't exist in real world env.
+    assertEquals(upsertMetadataManager.getWatermark(), 80);
 
     // Add record to update largestSeenTimestamp, largest seen timestamp: 
largerComparisonValue
     upsertMetadataManager.addRecord(segment0, new 
RecordInfo(makePrimaryKey(10), 0, largerComparisonValue, false));
@@ -1293,6 +1407,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 10, segment0, 0, 120, 
HashFunction.NONE);
+    assertEquals(upsertMetadataManager.getWatermark(), 120);
 
     // records before (largest seen timestamp - TTL) are expired and removed 
from upsertMetadata.
     upsertMetadataManager.removeExpiredPrimaryKeys();
@@ -1301,6 +1416,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 10, segment0, 0, 120, 
HashFunction.NONE);
+    assertEquals(upsertMetadataManager.getWatermark(), 120);
 
     // ValidDocIds for out-of-ttl records should not be removed.
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2, 3});
@@ -1312,7 +1428,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.close();
   }
 
-  private void verifyAddOutOfTTLSegment()
+  private void verifyAddMultipleSegmentsWithOneOutOfTTL()
       throws IOException {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, _contextBuilder.build());
@@ -1334,10 +1450,6 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     ImmutableSegmentImpl segment1 =
         mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, 
COMPARISON_COLUMNS, new Double(80), null);
 
-    int[] docIds1 = new int[]{0, 1, 2, 3};
-    MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
-    validDocIdsSnapshot1.add(docIds1);
-
     // load segment1 with segmentEndTime: 80, largest seen timestamp: 80. the 
segment will be loaded.
     upsertMetadataManager.addSegment(segment1, validDocIds1, null,
         getRecordInfoListForTTL(numRecords, primaryKeys, 
timestamps).iterator());
@@ -1348,6 +1460,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, 
HashFunction.NONE);
     assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new 
int[]{0, 1, 2, 3});
+    assertEquals(upsertMetadataManager.getWatermark(), 80);
 
     // Add record to update largestSeenTimestamp, largest seen timestamp: 120
     upsertMetadataManager.addRecord(segment0, new 
RecordInfo(makePrimaryKey(0), 0, new Double(120), false));
@@ -1358,6 +1471,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, 
HashFunction.NONE);
     checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, 
HashFunction.NONE);
+    assertEquals(upsertMetadataManager.getWatermark(), 120);
 
     // Add an out-of-ttl segment, verify all the invalid docs should not show 
up again.
     // Add a segment with segmentEndTime: 80, largest seen timestamp: 120. the 
segment will be skipped.
@@ -1372,6 +1486,12 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.addSegment(segment2);
     // out of ttl segment should not be added to recordLocationMap
     assertEquals(recordLocationMap.size(), 5);
+    checkRecordLocationForTTL(recordLocationMap, 0, segment0, 0, 120, 
HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, 
HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, 
HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, 
HashFunction.NONE);
+    checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, 
HashFunction.NONE);
+    assertEquals(upsertMetadataManager.getWatermark(), 120);
 
     // Stop the metadata manager
     upsertMetadataManager.stop();
@@ -1380,7 +1500,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     upsertMetadataManager.close();
   }
 
-  private void verifyAddOutOfTTLSegmentWithRecordDelete()
+  private void verifyAddSegmentOutOfTTLWithRecordDelete()
       throws IOException {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, _contextBuilder.build());
@@ -1489,7 +1609,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     }
   }
 
-  private void verifyAddSegmentForTTL(Comparable comparisonValue)
+  private void verifyAddSegmentOutOfTTL(Comparable comparisonValue)
       throws IOException {
     ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, _contextBuilder.build());
@@ -1501,6 +1621,7 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null);
     upsertMetadataManager.addRecord(segment0, new 
RecordInfo(makePrimaryKey(10), 1, comparisonValue, false));
     checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, 
HashFunction.NONE);
+    assertEquals(upsertMetadataManager.getWatermark(), 80);
 
     // add a segment with segmentEndTime = -1 so it will be skipped since it 
out-of-TTL
     int numRecords = 4;
@@ -1510,14 +1631,11 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     ImmutableSegmentImpl segment1 =
         mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, 
COMPARISON_COLUMNS, -1, null);
 
-    int[] docIds1 = new int[]{0, 1, 2, 3};
-    MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap();
-    validDocIdsSnapshot1.add(docIds1);
-
     // load segment1.
     upsertMetadataManager.addSegment(segment1);
     assertEquals(recordLocationMap.size(), 1);
     checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, 
HashFunction.NONE);
+    assertEquals(upsertMetadataManager.getWatermark(), 80);
 
     // Stop the metadata manager
     upsertMetadataManager.stop();
@@ -1547,25 +1665,6 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
     assertEquals(((Number) recordLocation.getComparisonValue()).doubleValue(), 
comparisonValue.doubleValue());
   }
 
-  private void verifyPersistAndLoadWatermark()
-      throws IOException {
-    ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager =
-        new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, _contextBuilder.build());
-
-    double currentTimeMs = System.currentTimeMillis();
-    upsertMetadataManager.persistWatermark(currentTimeMs);
-    assertTrue(new File(INDEX_DIR, V1Constants.TTL_WATERMARK_TABLE_PARTITION + 
0).exists());
-
-    double watermark = upsertMetadataManager.loadWatermark();
-    assertEquals(watermark, currentTimeMs);
-
-    // Stop the metadata manager
-    upsertMetadataManager.stop();
-
-    // Close the metadata manager
-    upsertMetadataManager.close();
-  }
-
   @Test
   public void testHashPrimaryKey() {
     PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"});


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to