This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new ad1eda52ea fix checks on largest comparison value for upsert ttl and allow to add segments out of ttl (#14094) ad1eda52ea is described below commit ad1eda52ea3e9a040f87d0e0cabff1fba933f7bf Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Fri Sep 27 19:37:03 2024 -0700 fix checks on largest comparison value for upsert ttl and allow to add segments out of ttl (#14094) * fix checks on largest comparison value for ttl and allow subclass to add segments even out of ttl * fix the special value for TTL comparison and use Double.Negtive_Infinity * unify TTL enable check and persist watermark after taking snapshot * update ttl watermark and skip segment out of ttl during preloading * refine and add tests --- .../upsert/BasePartitionUpsertMetadataManager.java | 137 +++++++++------ ...oncurrentMapPartitionUpsertMetadataManager.java | 22 +-- ...nUpsertMetadataManagerForConsistentDeletes.java | 12 +- ...rrentMapPartitionUpsertMetadataManagerTest.java | 183 ++++++++++++++++----- 4 files changed, 239 insertions(+), 115 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index 048213e1b1..18142e2981 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -88,6 +88,8 @@ import org.slf4j.LoggerFactory; @ThreadSafe public abstract class BasePartitionUpsertMetadataManager implements PartitionUpsertMetadataManager { protected static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = TimeUnit.MINUTES.toNanos(1); + // The special value to indicate the largest comparison value is not set yet, and allow negative comparison values. + protected static final double TTL_WATERMARK_NOT_SET = Double.NEGATIVE_INFINITY; protected final String _tableNameWithType; protected final int _partitionId; @@ -178,10 +180,13 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps } _serverMetrics = ServerMetrics.get(); _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName()); - if (_metadataTTL > 0) { + if (isTTLEnabled()) { + Preconditions.checkState(_comparisonColumns.size() == 1, + "Upsert TTL does not work with multiple comparison columns"); + Preconditions.checkState(_metadataTTL <= 0 || _enableSnapshot, "Upsert metadata TTL must have snapshot enabled"); _largestSeenComparisonValue = new AtomicDouble(loadWatermark()); } else { - _largestSeenComparisonValue = new AtomicDouble(Double.MIN_VALUE); + _largestSeenComparisonValue = new AtomicDouble(TTL_WATERMARK_NOT_SET); deleteWatermark(); } } @@ -383,37 +388,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl, "Got unsupported segment implementation: %s for segment: %s, table: %s", segment.getClass(), segmentName, _tableNameWithType); - ImmutableSegmentImpl immutableSegment = (ImmutableSegmentImpl) segment; - - if (_deletedKeysTTL > 0) { - double maxComparisonValue = - ((Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)) - .getMaxValue()).doubleValue(); - _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue)); + if (isTTLEnabled()) { + updateWatermark(segment); } - - // Skip adding segment that has max comparison value smaller than (largestSeenComparisonValue - TTL) - if (_metadataTTL > 0 && _largestSeenComparisonValue.get() > 0) { - Preconditions.checkState(_enableSnapshot, "Upsert TTL must have snapshot enabled"); - Preconditions.checkState(_comparisonColumns.size() == 1, - "Upsert TTL does not work with multiple comparison columns"); - Number maxComparisonValue = - (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue(); - if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() - _metadataTTL) { - _logger.info("Skip adding segment: {} because it's out of TTL", segmentName); - MutableRoaringBitmap validDocIdsSnapshot = immutableSegment.loadValidDocIdsFromSnapshot(); - if (validDocIdsSnapshot != null) { - MutableRoaringBitmap queryableDocIds = getQueryableDocIds(segment, validDocIdsSnapshot); - immutableSegment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot), - queryableDocIds != null ? new ThreadSafeMutableRoaringBitmap(queryableDocIds) : null); - } else { - _logger.warn("Failed to find snapshot from segment: {} which is out of TTL, treating all documents as valid", - segmentName); - } - return; - } - } - if (!startOperation()) { _logger.info("Skip adding segment: {} because metadata manager is already stopped", segment.getSegmentName()); return; @@ -422,7 +399,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps _snapshotLock.readLock().lock(); } try { - doAddSegment(immutableSegment); + doAddSegment((ImmutableSegmentImpl) segment); _trackedSegments.add(segment); if (_enableSnapshot) { _updatedSegmentsSinceLastSnapshot.add(segment); @@ -435,9 +412,51 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps } } + protected boolean isTTLEnabled() { + return _metadataTTL > 0 || _deletedKeysTTL > 0; + } + + protected boolean isOutOfMetadataTTL(IndexSegment segment) { + if (_metadataTTL > 0 && _largestSeenComparisonValue.get() != TTL_WATERMARK_NOT_SET) { + Number maxComparisonValue = + (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue(); + return maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() - _metadataTTL; + } + return false; + } + + protected boolean skipAddSegmentOutOfTTL(ImmutableSegmentImpl segment) { + String segmentName = segment.getSegmentName(); + _logger.info("Skip adding segment: {} because it's out of TTL", segmentName); + MutableRoaringBitmap validDocIdsSnapshot = segment.loadValidDocIdsFromSnapshot(); + if (validDocIdsSnapshot != null) { + MutableRoaringBitmap queryableDocIds = getQueryableDocIds(segment, validDocIdsSnapshot); + segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot), + queryableDocIds != null ? new ThreadSafeMutableRoaringBitmap(queryableDocIds) : null); + } else { + _logger.warn("Failed to find validDocIds snapshot to add segment: {} out of TTL, treating all docs as valid", + segmentName); + } + // Return true if segment is skipped. This boolean value allows subclass to decide whether to skip. + return true; + } + + protected boolean skipPreloadSegmentOutOfTTL(ImmutableSegmentImpl segment, MutableRoaringBitmap validDocIdsSnapshot) { + String segmentName = segment.getSegmentName(); + _logger.info("Skip preloading segment: {} because it's out of TTL", segmentName); + MutableRoaringBitmap queryableDocIds = getQueryableDocIds(segment, validDocIdsSnapshot); + segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(validDocIdsSnapshot), + queryableDocIds != null ? new ThreadSafeMutableRoaringBitmap(queryableDocIds) : null); + // Return true if segment is skipped. This boolean value allows subclass to decide whether to skip. + return true; + } + protected void doAddSegment(ImmutableSegmentImpl segment) { String segmentName = segment.getSegmentName(); _logger.info("Adding segment: {}, current primary key count: {}", segmentName, getNumPrimaryKeys()); + if (isOutOfMetadataTTL(segment) && skipAddSegmentOutOfTTL(segment)) { + return; + } long startTimeMs = System.currentTimeMillis(); if (!_enableSnapshot) { segment.deleteValidDocIdsSnapshot(); @@ -479,6 +498,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl, "Got unsupported segment implementation: %s for segment: %s, table: %s", segment.getClass(), segmentName, _tableNameWithType); + if (isTTLEnabled()) { + updateWatermark(segment); + } if (!startOperation()) { _logger.info("Skip preloading segment: {} because metadata manager is already stopped", segmentName); return; @@ -508,7 +530,9 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps segment.enableUpsert(this, new ThreadSafeMutableRoaringBitmap(), null); return; } - + if (isOutOfMetadataTTL(segment) && skipPreloadSegmentOutOfTTL(segment, validDocIds)) { + return; + } try (UpsertUtils.RecordInfoReader recordInfoReader = new UpsertUtils.RecordInfoReader(segment, _primaryKeyColumns, _comparisonColumns, _deleteRecordColumn)) { doPreloadSegment(segment, null, null, UpsertUtils.getRecordInfoIterator(recordInfoReader, validDocIds)); @@ -780,21 +804,17 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps _logger.info("Skip removing untracked (replaced or empty) segment: {}", segmentName); return; } + if (!startOperation()) { + _logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName); + return; + } // Skip removing the upsert metadata of segment that has max comparison value smaller than // (largestSeenComparisonValue - TTL), i.e. out of metadata TTL. The expired metadata is removed while creating // new consuming segment in batches. boolean skipRemoveMetadata = false; - if (_metadataTTL > 0 && _largestSeenComparisonValue.get() > 0) { - Number maxComparisonValue = - (Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)).getMaxValue(); - if (maxComparisonValue.doubleValue() < _largestSeenComparisonValue.get() - _metadataTTL) { - _logger.info("Skip removing segment: {} because it's out of TTL", segmentName); - skipRemoveMetadata = true; - } - } - if (!startOperation()) { - _logger.info("Skip removing segment: {} because metadata manager is already stopped", segmentName); - return; + if (isOutOfMetadataTTL(segment)) { + _logger.info("Skip removing segment: {} because it's out of TTL", segmentName); + skipRemoveMetadata = true; } if (_enableSnapshot) { _snapshotLock.readLock().lock(); @@ -989,6 +1009,12 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps } } _updatedSegmentsSinceLastSnapshot.clear(); + // Persist TTL watermark after taking snapshots if TTL is enabled, so that segments out of TTL can be loaded with + // updated validDocIds bitmaps. If the TTL watermark is persisted first, segments out of TTL may get loaded with + // stale bitmaps or even no bitmap snapshots to use. + if (isTTLEnabled()) { + persistWatermark(_largestSeenComparisonValue.get()); + } _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, numImmutableSegments); _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, @@ -1020,7 +1046,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps _logger.warn("Caught exception while loading watermark file: {}, skipping", watermarkFile); } } - return Double.MIN_VALUE; + return TTL_WATERMARK_NOT_SET; } /** @@ -1061,9 +1087,26 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION + _partitionId); } + protected void updateWatermark(ImmutableSegment segment) { + double maxComparisonValue = + ((Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_comparisonColumns.get(0)) + .getMaxValue()).doubleValue(); + _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, maxComparisonValue)); + } + + @VisibleForTesting + double getWatermark() { + return _largestSeenComparisonValue.get(); + } + + @VisibleForTesting + void setWatermark(double watermark) { + _largestSeenComparisonValue.set(watermark); + } + @Override public void removeExpiredPrimaryKeys() { - if (_metadataTTL <= 0 && _deletedKeysTTL <= 0) { + if (!isTTLEnabled()) { return; } if (!startOperation()) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index b1fc92a2bf..5552a6c65c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -192,19 +192,10 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp AtomicInteger numTotalKeysMarkForDeletion = new AtomicInteger(); AtomicInteger numDeletedKeysWithinTTLWindow = new AtomicInteger(); double largestSeenComparisonValue = _largestSeenComparisonValue.get(); - double metadataTTLKeysThreshold; - if (_metadataTTL > 0) { - metadataTTLKeysThreshold = largestSeenComparisonValue - _metadataTTL; - } else { - metadataTTLKeysThreshold = Double.MIN_VALUE; - } - double deletedKeysThreshold; - if (_deletedKeysTTL > 0) { - deletedKeysThreshold = largestSeenComparisonValue - _deletedKeysTTL; - } else { - deletedKeysThreshold = Double.MIN_VALUE; - } - + double metadataTTLKeysThreshold = + _metadataTTL > 0 ? largestSeenComparisonValue - _metadataTTL : Double.NEGATIVE_INFINITY; + double deletedKeysThreshold = + _deletedKeysTTL > 0 ? largestSeenComparisonValue - _deletedKeysTTL : Double.NEGATIVE_INFINITY; _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> { double comparisonValue = ((Number) recordLocation.getComparisonValue()).doubleValue(); if (_metadataTTL > 0 && comparisonValue < metadataTTLKeysThreshold) { @@ -227,9 +218,6 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp } } }); - if (_metadataTTL > 0) { - persistWatermark(largestSeenComparisonValue); - } // Update metrics updatePrimaryKeyGauge(); @@ -266,7 +254,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp Comparable newComparisonValue = recordInfo.getComparisonValue(); // When TTL is enabled, update largestSeenComparisonValue when adding new record - if (_metadataTTL > 0 || _deletedKeysTTL > 0) { + if (isTTLEnabled()) { double comparisonValue = ((Number) newComparisonValue).doubleValue(); _largestSeenComparisonValue.getAndUpdate(v -> Math.max(v, comparisonValue)); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java index 8507d00cc9..a179a05fa7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes.java @@ -202,8 +202,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes segment instanceof ImmutableSegment ? "immutable" : "mutable", segmentName, getNumPrimaryKeys()); long startTimeMs = System.currentTimeMillis(); - try ( - PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment, _primaryKeyColumns)) { + try (PrimaryKeyReader primaryKeyReader = new PrimaryKeyReader(segment, _primaryKeyColumns)) { removeSegment(segment, UpsertUtils.getPrimaryKeyIterator(primaryKeyReader, segment.getSegmentMetadata().getTotalDocs())); } catch (Exception e) { @@ -292,13 +291,8 @@ public class ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes AtomicInteger numDeletedKeysWithinTTLWindow = new AtomicInteger(); AtomicInteger numDeletedTTLKeysInMultipleSegments = new AtomicInteger(); double largestSeenComparisonValue = _largestSeenComparisonValue.get(); - double deletedKeysThreshold; - if (_deletedKeysTTL > 0) { - deletedKeysThreshold = largestSeenComparisonValue - _deletedKeysTTL; - } else { - deletedKeysThreshold = Double.MIN_VALUE; - } - + double deletedKeysThreshold = + _deletedKeysTTL > 0 ? largestSeenComparisonValue - _deletedKeysTTL : Double.NEGATIVE_INFINITY; _primaryKeyToRecordLocationMap.forEach((primaryKey, recordLocation) -> { double comparisonValue = ((Number) recordLocation.getComparisonValue()).doubleValue(); // We need to verify that the record belongs to only one segment. If a record is part of multiple segments, diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index 4270e9547d..278f0f5ef5 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -163,20 +163,88 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { } @Test - public void testUpsertMetadataCleanupWithTTLConfig() + public void testRemoveExpiredPrimaryKeys() throws IOException { _contextBuilder.setEnableSnapshot(true).setMetadataTTL(30); verifyRemoveExpiredPrimaryKeys(new Integer(80), new Integer(120)); verifyRemoveExpiredPrimaryKeys(new Float(80), new Float(120)); verifyRemoveExpiredPrimaryKeys(new Double(80), new Double(120)); verifyRemoveExpiredPrimaryKeys(new Long(80), new Long(120)); - verifyPersistAndLoadWatermark(); - verifyAddSegmentForTTL(new Integer(80)); - verifyAddSegmentForTTL(new Float(80)); - verifyAddSegmentForTTL(new Double(80)); - verifyAddSegmentForTTL(new Long(80)); - verifyAddOutOfTTLSegment(); - verifyAddOutOfTTLSegmentWithRecordDelete(); + } + + @Test + public void testAddSegmentOutOfTTL() + throws IOException { + _contextBuilder.setEnableSnapshot(true).setMetadataTTL(30); + verifyAddSegmentOutOfTTL(new Integer(80)); + verifyAddSegmentOutOfTTL(new Float(80)); + verifyAddSegmentOutOfTTL(new Double(80)); + verifyAddSegmentOutOfTTL(new Long(80)); + verifyAddMultipleSegmentsWithOneOutOfTTL(); + verifyAddSegmentOutOfTTLWithRecordDelete(); + } + + @Test + public void testTTLWithNegativeComparisonValues() + throws IOException { + _contextBuilder.setEnableSnapshot(true).setMetadataTTL(30); + ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); + Map<Object, ConcurrentMapPartitionUpsertMetadataManager.RecordLocation> recordLocationMap = + upsertMetadataManager._primaryKeyToRecordLocationMap; + + // Add record to update largestSeenTimestamp, largest seen timestamp: comparisonValue + ThreadSafeMutableRoaringBitmap validDocIds0 = new ThreadSafeMutableRoaringBitmap(); + MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null); + upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(10), 1, -80, false)); + checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, -80, HashFunction.NONE); + assertEquals(upsertMetadataManager.getWatermark(), -80); + + // add a segment with segmentEndTime = -200 so it will be skipped since it out-of-TTL + int numRecords = 4; + int[] primaryKeys = new int[]{0, 1, 2, 3}; + ThreadSafeMutableRoaringBitmap validDocIds1 = new ThreadSafeMutableRoaringBitmap(); + List<PrimaryKey> primaryKeys1 = getPrimaryKeyList(numRecords, primaryKeys); + ImmutableSegmentImpl segment1 = + mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, COMPARISON_COLUMNS, -200, null); + + // load segment1. + upsertMetadataManager.addSegment(segment1); + assertEquals(recordLocationMap.size(), 1); + checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, -80, HashFunction.NONE); + assertEquals(upsertMetadataManager.getWatermark(), -80); + + // Stop the metadata manager + upsertMetadataManager.stop(); + + // Close the metadata manager + upsertMetadataManager.close(); + } + + @Test + public void testManageWatermark() + throws IOException { + ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); + + double currentTimeMs = System.currentTimeMillis(); + upsertMetadataManager.persistWatermark(currentTimeMs); + assertTrue(new File(INDEX_DIR, V1Constants.TTL_WATERMARK_TABLE_PARTITION + 0).exists()); + + double watermark = upsertMetadataManager.loadWatermark(); + assertEquals(watermark, currentTimeMs); + + ImmutableSegmentImpl segment = + mockImmutableSegmentWithEndTime(1, new ThreadSafeMutableRoaringBitmap(), null, new ArrayList<>(), + COMPARISON_COLUMNS, new Double(currentTimeMs + 1024), new MutableRoaringBitmap()); + upsertMetadataManager.updateWatermark(segment); + assertEquals(upsertMetadataManager.getWatermark(), currentTimeMs + 1024); + + // Stop the metadata manager + upsertMetadataManager.stop(); + + // Close the metadata manager + upsertMetadataManager.close(); } @Test @@ -1049,6 +1117,52 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2}); } + @Test + public void testPreloadSegmentOutOfTTL() { + _contextBuilder.setEnableSnapshot(true).setMetadataTTL(30); + verifyPreloadSegmentOutOfTTL(HashFunction.NONE); + verifyPreloadSegmentOutOfTTL(HashFunction.MD5); + verifyPreloadSegmentOutOfTTL(HashFunction.MURMUR3); + } + + private void verifyPreloadSegmentOutOfTTL(HashFunction hashFunction) { + ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = + new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, + _contextBuilder.setHashFunction(hashFunction).build()); + Map<Object, RecordLocation> recordLocationMap = upsertMetadataManager._primaryKeyToRecordLocationMap; + + int numRecords = 3; + int[] primaryKeys = new int[]{0, 1, 2}; + int[] docIds = new int[]{0, 1, 2}; + ThreadSafeMutableRoaringBitmap validDocIds = new ThreadSafeMutableRoaringBitmap(); + MutableRoaringBitmap snapshot = new MutableRoaringBitmap(); + snapshot.add(docIds); + ImmutableSegmentImpl segment1 = + mockImmutableSegmentWithEndTime(1, validDocIds, null, getPrimaryKeyList(numRecords, primaryKeys), + COMPARISON_COLUMNS, 80, snapshot); + + upsertMetadataManager.setWatermark(60); + upsertMetadataManager.doPreloadSegment(segment1); + assertEquals(recordLocationMap.keySet().size(), 3); + for (int key : primaryKeys) { + assertTrue(recordLocationMap.containsKey(HashUtils.hashPrimaryKey(makePrimaryKey(key), hashFunction)), + String.valueOf(key)); + } + + // Bump up the watermark, so that segment2 gets out of TTL and is skipped. + upsertMetadataManager.setWatermark(120); + primaryKeys = new int[]{10, 11, 12}; + ImmutableSegmentImpl segment2 = + mockImmutableSegmentWithEndTime(1, validDocIds, null, getPrimaryKeyList(numRecords, primaryKeys), + COMPARISON_COLUMNS, 80, snapshot); + upsertMetadataManager.doPreloadSegment(segment2); + assertEquals(recordLocationMap.keySet().size(), 3); + for (int key : primaryKeys) { + assertFalse(recordLocationMap.containsKey(HashUtils.hashPrimaryKey(makePrimaryKey(key), hashFunction)), + String.valueOf(key)); + } + } + @Test public void testAddRecordWithDeleteColumn() throws IOException { @@ -1260,6 +1374,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null); upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(10), 1, earlierComparisonValue, false)); checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE); + assertEquals(upsertMetadataManager.getWatermark(), 80); // Add a segment with segmentEndTime = earlierComparisonValue, so it will not be skipped int numRecords = 4; @@ -1271,10 +1386,6 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, COMPARISON_COLUMNS, earlierComparisonValue, null); - int[] docIds1 = new int[]{0, 1, 2, 3}; - MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap(); - validDocIdsSnapshot1.add(docIds1); - // load segment1. upsertMetadataManager.addSegment(segment1, validDocIds1, null, getRecordInfoListForTTL(numRecords, primaryKeys, timestamps).iterator()); @@ -1284,6 +1395,9 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE); + // The watermark is updated by segment's max comparison value, although segment1 has a few docs with larger + // comparison values. This segment is created to simplify the tests here and it shouldn't exist in real world env. + assertEquals(upsertMetadataManager.getWatermark(), 80); // Add record to update largestSeenTimestamp, largest seen timestamp: largerComparisonValue upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(10), 0, largerComparisonValue, false)); @@ -1293,6 +1407,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 10, segment0, 0, 120, HashFunction.NONE); + assertEquals(upsertMetadataManager.getWatermark(), 120); // records before (largest seen timestamp - TTL) are expired and removed from upsertMetadata. upsertMetadataManager.removeExpiredPrimaryKeys(); @@ -1301,6 +1416,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 10, segment0, 0, 120, HashFunction.NONE); + assertEquals(upsertMetadataManager.getWatermark(), 120); // ValidDocIds for out-of-ttl records should not be removed. assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2, 3}); @@ -1312,7 +1428,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { upsertMetadataManager.close(); } - private void verifyAddOutOfTTLSegment() + private void verifyAddMultipleSegmentsWithOneOutOfTTL() throws IOException { ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); @@ -1334,10 +1450,6 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { ImmutableSegmentImpl segment1 = mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, COMPARISON_COLUMNS, new Double(80), null); - int[] docIds1 = new int[]{0, 1, 2, 3}; - MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap(); - validDocIdsSnapshot1.add(docIds1); - // load segment1 with segmentEndTime: 80, largest seen timestamp: 80. the segment will be loaded. upsertMetadataManager.addSegment(segment1, validDocIds1, null, getRecordInfoListForTTL(numRecords, primaryKeys, timestamps).iterator()); @@ -1348,6 +1460,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE); assertEquals(validDocIds1.getMutableRoaringBitmap().toArray(), new int[]{0, 1, 2, 3}); + assertEquals(upsertMetadataManager.getWatermark(), 80); // Add record to update largestSeenTimestamp, largest seen timestamp: 120 upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(0), 0, new Double(120), false)); @@ -1358,6 +1471,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, HashFunction.NONE); checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE); + assertEquals(upsertMetadataManager.getWatermark(), 120); // Add an out-of-ttl segment, verify all the invalid docs should not show up again. // Add a segment with segmentEndTime: 80, largest seen timestamp: 120. the segment will be skipped. @@ -1372,6 +1486,12 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { upsertMetadataManager.addSegment(segment2); // out of ttl segment should not be added to recordLocationMap assertEquals(recordLocationMap.size(), 5); + checkRecordLocationForTTL(recordLocationMap, 0, segment0, 0, 120, HashFunction.NONE); + checkRecordLocationForTTL(recordLocationMap, 1, segment1, 1, 100, HashFunction.NONE); + checkRecordLocationForTTL(recordLocationMap, 2, segment1, 2, 120, HashFunction.NONE); + checkRecordLocationForTTL(recordLocationMap, 3, segment1, 3, 80, HashFunction.NONE); + checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE); + assertEquals(upsertMetadataManager.getWatermark(), 120); // Stop the metadata manager upsertMetadataManager.stop(); @@ -1380,7 +1500,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { upsertMetadataManager.close(); } - private void verifyAddOutOfTTLSegmentWithRecordDelete() + private void verifyAddSegmentOutOfTTLWithRecordDelete() throws IOException { ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); @@ -1489,7 +1609,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { } } - private void verifyAddSegmentForTTL(Comparable comparisonValue) + private void verifyAddSegmentOutOfTTL(Comparable comparisonValue) throws IOException { ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); @@ -1501,6 +1621,7 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { MutableSegment segment0 = mockMutableSegment(1, validDocIds0, null); upsertMetadataManager.addRecord(segment0, new RecordInfo(makePrimaryKey(10), 1, comparisonValue, false)); checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE); + assertEquals(upsertMetadataManager.getWatermark(), 80); // add a segment with segmentEndTime = -1 so it will be skipped since it out-of-TTL int numRecords = 4; @@ -1510,14 +1631,11 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { ImmutableSegmentImpl segment1 = mockImmutableSegmentWithEndTime(1, validDocIds1, null, primaryKeys1, COMPARISON_COLUMNS, -1, null); - int[] docIds1 = new int[]{0, 1, 2, 3}; - MutableRoaringBitmap validDocIdsSnapshot1 = new MutableRoaringBitmap(); - validDocIdsSnapshot1.add(docIds1); - // load segment1. upsertMetadataManager.addSegment(segment1); assertEquals(recordLocationMap.size(), 1); checkRecordLocationForTTL(recordLocationMap, 10, segment0, 1, 80, HashFunction.NONE); + assertEquals(upsertMetadataManager.getWatermark(), 80); // Stop the metadata manager upsertMetadataManager.stop(); @@ -1547,25 +1665,6 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { assertEquals(((Number) recordLocation.getComparisonValue()).doubleValue(), comparisonValue.doubleValue()); } - private void verifyPersistAndLoadWatermark() - throws IOException { - ConcurrentMapPartitionUpsertMetadataManager upsertMetadataManager = - new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); - - double currentTimeMs = System.currentTimeMillis(); - upsertMetadataManager.persistWatermark(currentTimeMs); - assertTrue(new File(INDEX_DIR, V1Constants.TTL_WATERMARK_TABLE_PARTITION + 0).exists()); - - double watermark = upsertMetadataManager.loadWatermark(); - assertEquals(watermark, currentTimeMs); - - // Stop the metadata manager - upsertMetadataManager.stop(); - - // Close the metadata manager - upsertMetadataManager.close(); - } - @Test public void testHashPrimaryKey() { PrimaryKey pk = new PrimaryKey(new Object[]{"uuid-1", "uuid-2", "uuid-3"}); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org