This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new e4a9f15697 Add watermark for dedup TTL (#14137) e4a9f15697 is described below commit e4a9f15697b9e13eeab60a0c89c60acb753af72e Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Wed Oct 2 09:30:24 2024 -0700 Add watermark for dedup TTL (#14137) * store/load watermark for dedup ttl * skip semgments out of TTL for all add/replace/remove methods --- .../dedup/BasePartitionDedupMetadataManager.java | 69 ++++++++++++----- ...ConcurrentMapPartitionDedupMetadataManager.java | 11 ++- .../ConcurrentMapTableDedupMetadataManager.java | 1 - .../upsert/BasePartitionUpsertMetadataManager.java | 66 ++-------------- .../pinot/segment/local/utils/WatermarkUtils.java | 88 ++++++++++++++++++++++ ...apPartitionDedupMetadataManagerWithTTLTest.java | 31 +++++--- ...artitionDedupMetadataManagerWithoutTTLTest.java | 16 +++- .../mutable/MutableSegmentDedupeTest.java | 16 ++-- ...rrentMapPartitionUpsertMetadataManagerTest.java | 5 +- 9 files changed, 197 insertions(+), 106 deletions(-) diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java index 3009875c3e..3892d36d92 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java @@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.dedup; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AtomicDouble; +import java.io.File; import java.io.IOException; import java.util.Iterator; import java.util.List; @@ -30,7 +31,9 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.metrics.ServerTimer; import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl; +import org.apache.pinot.segment.local.utils.WatermarkUtils; import org.apache.pinot.segment.spi.IndexSegment; +import org.apache.pinot.segment.spi.V1Constants; import org.apache.pinot.spi.config.table.HashFunction; import org.apache.pinot.spi.data.readers.PrimaryKey; import org.slf4j.Logger; @@ -38,6 +41,8 @@ import org.slf4j.LoggerFactory; public abstract class BasePartitionDedupMetadataManager implements PartitionDedupMetadataManager { + // The special value to indicate the largest seen time is not set yet, assuming times are positive. + protected static final double TTL_WATERMARK_NOT_SET = 0; protected final String _tableNameWithType; protected final List<String> _primaryKeyColumns; protected final int _partitionId; @@ -45,7 +50,8 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu protected final HashFunction _hashFunction; protected final double _metadataTTL; protected final String _dedupTimeColumn; - protected final AtomicDouble _largestSeenTime = new AtomicDouble(0); + protected final AtomicDouble _largestSeenTime; + protected final File _tableIndexDir; protected final Logger _logger; // The following variables are always accessed within synchronized block private boolean _stopped; @@ -61,12 +67,17 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu _serverMetrics = dedupContext.getServerMetrics(); _metadataTTL = dedupContext.getMetadataTTL() >= 0 ? dedupContext.getMetadataTTL() : 0; _dedupTimeColumn = dedupContext.getDedupTimeColumn(); + _tableIndexDir = dedupContext.getTableIndexDir(); + _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName()); if (_metadataTTL > 0) { Preconditions.checkArgument(_dedupTimeColumn != null, "When metadataTTL is configured, metadata time column must be configured for dedup enabled table: %s", tableNameWithType); + _largestSeenTime = new AtomicDouble(WatermarkUtils.loadWatermark(getWatermarkFile(), TTL_WATERMARK_NOT_SET)); + } else { + _largestSeenTime = new AtomicDouble(TTL_WATERMARK_NOT_SET); + WatermarkUtils.deleteWatermark(getWatermarkFile()); } - _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + "-" + getClass().getSimpleName()); } @Override @@ -85,17 +96,6 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl, "Got unsupported segment implementation: %s for segment: %s, table: %s", segment.getClass(), segmentName, _tableNameWithType); - // If metadataTTL is enabled, we can skip adding segment that's already getting out of the TTL. - if (_metadataTTL > 0) { - double maxDedupTime = ((Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_dedupTimeColumn) - .getMaxValue()).doubleValue(); - _largestSeenTime.getAndUpdate(time -> Math.max(time, maxDedupTime)); - if (isOutOfMetadataTTL(maxDedupTime)) { - _logger.info("Skip adding segment: {} as max dedupTime: {} is out of metadataTTL: {}", segmentName, - _dedupTimeColumn, _metadataTTL); - return; - } - } if (!startOperation()) { _logger.info("Skip adding segment: {} because dedup metadata manager is already stopped", segment.getSegmentName()); @@ -133,6 +133,17 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu private void addOrReplaceSegment(@Nullable IndexSegment oldSegment, IndexSegment newSegment) throws IOException { + // If metadataTTL is enabled, we can skip adding dedup metadata for segment that's already out of the TTL. + if (_metadataTTL > 0) { + double maxDedupTime = getMaxDedupTime(newSegment); + _largestSeenTime.getAndUpdate(time -> Math.max(time, maxDedupTime)); + if (isOutOfMetadataTTL(maxDedupTime)) { + String action = oldSegment == null ? "adding" : "replacing"; + _logger.info("Skip {} segment: {} as max dedupTime: {} is out of TTL: {}", action, newSegment.getSegmentName(), + maxDedupTime, _metadataTTL); + return; + } + } try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new DedupUtils.DedupRecordInfoReader(newSegment, _primaryKeyColumns, _dedupTimeColumn)) { Iterator<DedupRecordInfo> dedupRecordInfoIterator = @@ -158,6 +169,15 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu _logger.info("Skip removing segment: {} because metadata manager is already stopped", segment.getSegmentName()); return; } + // Skip removing the dedup metadata of segment out of TTL. The expired metadata is removed in batches. + if (_metadataTTL > 0) { + double maxDedupTime = getMaxDedupTime(segment); + if (isOutOfMetadataTTL(maxDedupTime)) { + _logger.info("Skip removing segment: {} as max dedupTime: {} is out of TTL: {}", segment.getSegmentName(), + maxDedupTime, _metadataTTL); + return; + } + } try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new DedupUtils.DedupRecordInfoReader(segment, _primaryKeyColumns, _dedupTimeColumn)) { Iterator<DedupRecordInfo> dedupRecordInfoIterator = @@ -175,8 +195,26 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu protected abstract void doRemoveSegment(IndexSegment segment, Iterator<DedupRecordInfo> dedupRecordInfoIterator); + protected boolean isOutOfMetadataTTL(double dedupTime) { + return _metadataTTL > 0 && dedupTime < _largestSeenTime.get() - _metadataTTL; + } + + protected double getMaxDedupTime(IndexSegment segment) { + return ((Number) segment.getSegmentMetadata().getColumnMetadataMap().get(_dedupTimeColumn) + .getMaxValue()).doubleValue(); + } + + protected File getWatermarkFile() { + // Use 'dedup' suffix to avoid conflicts with upsert watermark file, as it's possible that a table is changed + // from using dedup to upsert and the watermark should be re-calculated based on upsert comparison column. + return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION + _partitionId + ".dedup"); + } + @Override public void removeExpiredPrimaryKeys() { + if (_metadataTTL <= 0) { + return; + } if (!startOperation()) { _logger.info("Skip removing expired primary keys because metadata manager is already stopped"); return; @@ -184,6 +222,7 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu try { long startTime = System.currentTimeMillis(); doRemoveExpiredPrimaryKeys(); + WatermarkUtils.persistWatermark(_largestSeenTime.get(), getWatermarkFile()); long duration = System.currentTimeMillis() - startTime; _serverMetrics.addTimedTableValue(_tableNameWithType, ServerTimer.DEDUP_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS, duration, TimeUnit.MILLISECONDS); @@ -251,10 +290,6 @@ public abstract class BasePartitionDedupMetadataManager implements PartitionDedu _logger.info("Closed the metadata manager"); } - protected boolean isOutOfMetadataTTL(double dedupTime) { - return _metadataTTL > 0 && dedupTime < _largestSeenTime.get() - _metadataTTL; - } - protected abstract long getNumPrimaryKeys(); protected void updatePrimaryKeyGauge(long numPrimaryKeys) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java index 5cfb2cea42..4461266e35 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java @@ -45,7 +45,6 @@ class ConcurrentMapPartitionDedupMetadataManager extends BasePartitionDedupMetad while (dedupRecordInfoIteratorOfNewSegment.hasNext()) { DedupRecordInfo dedupRecordInfo = dedupRecordInfoIteratorOfNewSegment.next(); double dedupTime = dedupRecordInfo.getDedupTime(); - _largestSeenTime.getAndUpdate(time -> Math.max(time, dedupTime)); _primaryKeyToSegmentAndTimeMap.compute(HashUtils.hashPrimaryKey(dedupRecordInfo.getPrimaryKey(), _hashFunction), (primaryKey, segmentAndTime) -> { // Stale metadata is treated as not existing when checking for deduplicates. @@ -94,10 +93,8 @@ class ConcurrentMapPartitionDedupMetadataManager extends BasePartitionDedupMetad @Override protected void doRemoveExpiredPrimaryKeys() { - if (_metadataTTL > 0) { - double smallestTimeToKeep = _largestSeenTime.get() - _metadataTTL; - _primaryKeyToSegmentAndTimeMap.entrySet().removeIf(entry -> entry.getValue().getRight() < smallestTimeToKeep); - } + double smallestTimeToKeep = _largestSeenTime.get() - _metadataTTL; + _primaryKeyToSegmentAndTimeMap.entrySet().removeIf(entry -> entry.getValue().getRight() < smallestTimeToKeep); } @Override @@ -108,7 +105,9 @@ class ConcurrentMapPartitionDedupMetadataManager extends BasePartitionDedupMetad return true; } try { - _largestSeenTime.getAndUpdate(time -> Math.max(time, dedupRecordInfo.getDedupTime())); + if (_metadataTTL > 0) { + _largestSeenTime.getAndUpdate(time -> Math.max(time, dedupRecordInfo.getDedupTime())); + } AtomicBoolean present = new AtomicBoolean(false); _primaryKeyToSegmentAndTimeMap.compute(HashUtils.hashPrimaryKey(dedupRecordInfo.getPrimaryKey(), _hashFunction), (primaryKey, segmentAndTime) -> { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java index 4bb7929ffe..ea9ac9117b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.segment.local.dedup; - class ConcurrentMapTableDedupMetadataManager extends BaseTableDedupMetadataManager { protected PartitionDedupMetadataManager createPartitionDedupMetadataManager(Integer partitionId) { return new ConcurrentMapPartitionDedupMetadataManager(_tableNameWithType, partitionId, _dedupContext); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index 18142e2981..9a868b34c5 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -21,12 +21,8 @@ package org.apache.pinot.segment.local.upsert; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.AtomicDouble; -import java.io.DataOutputStream; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStream; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -45,7 +41,6 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; -import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; import org.apache.helix.model.IdealState; import org.apache.pinot.common.Utils; @@ -66,6 +61,7 @@ import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader; import org.apache.pinot.segment.local.utils.HashUtils; +import org.apache.pinot.segment.local.utils.WatermarkUtils; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.MutableSegment; @@ -184,10 +180,11 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps Preconditions.checkState(_comparisonColumns.size() == 1, "Upsert TTL does not work with multiple comparison columns"); Preconditions.checkState(_metadataTTL <= 0 || _enableSnapshot, "Upsert metadata TTL must have snapshot enabled"); - _largestSeenComparisonValue = new AtomicDouble(loadWatermark()); + _largestSeenComparisonValue = + new AtomicDouble(WatermarkUtils.loadWatermark(getWatermarkFile(), TTL_WATERMARK_NOT_SET)); } else { _largestSeenComparisonValue = new AtomicDouble(TTL_WATERMARK_NOT_SET); - deleteWatermark(); + WatermarkUtils.deleteWatermark(getWatermarkFile()); } } @@ -1013,7 +1010,7 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps // updated validDocIds bitmaps. If the TTL watermark is persisted first, segments out of TTL may get loaded with // stale bitmaps or even no bitmap snapshots to use. if (isTTLEnabled()) { - persistWatermark(_largestSeenComparisonValue.get()); + WatermarkUtils.persistWatermark(_largestSeenComparisonValue.get(), getWatermarkFile()); } _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId, ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, numImmutableSegments); @@ -1030,59 +1027,6 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps numConsumingSegments, System.currentTimeMillis() - startTimeMs); } - /** - * Loads watermark from the file if exists. - */ - protected double loadWatermark() { - File watermarkFile = getWatermarkFile(); - if (watermarkFile.exists()) { - try { - byte[] bytes = FileUtils.readFileToByteArray(watermarkFile); - double watermark = ByteBuffer.wrap(bytes).getDouble(); - _logger.info("Loaded watermark: {} from file for table: {} partition_id: {}", watermark, _tableNameWithType, - _partitionId); - return watermark; - } catch (Exception e) { - _logger.warn("Caught exception while loading watermark file: {}, skipping", watermarkFile); - } - } - return TTL_WATERMARK_NOT_SET; - } - - /** - * Persists watermark to the file. - */ - protected void persistWatermark(double watermark) { - File watermarkFile = getWatermarkFile(); - try { - if (watermarkFile.exists()) { - if (!FileUtils.deleteQuietly(watermarkFile)) { - _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile); - return; - } - } - try (OutputStream outputStream = new FileOutputStream(watermarkFile, false); - DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) { - dataOutputStream.writeDouble(watermark); - } - _logger.info("Persisted watermark: {} to file: {}", watermark, watermarkFile); - } catch (Exception e) { - _logger.warn("Caught exception while persisting watermark file: {}, skipping", watermarkFile); - } - } - - /** - * Deletes the watermark file. - */ - protected void deleteWatermark() { - File watermarkFile = getWatermarkFile(); - if (watermarkFile.exists()) { - if (!FileUtils.deleteQuietly(watermarkFile)) { - _logger.warn("Cannot delete watermark file: {}, skipping", watermarkFile); - } - } - } - protected File getWatermarkFile() { return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION + _partitionId); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/WatermarkUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/WatermarkUtils.java new file mode 100644 index 0000000000..d19591db4e --- /dev/null +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/WatermarkUtils.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.local.utils; + +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileOutputStream; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Utils methods to manage the TTL watermark for dedup and upsert tables, as both share very similar logic. + */ +public class WatermarkUtils { + private static final Logger LOGGER = LoggerFactory.getLogger(WatermarkUtils.class); + + private WatermarkUtils() { + } + + /** + * Loads watermark from the file if exists. + */ + public static double loadWatermark(File watermarkFile, double defaultWatermark) { + if (watermarkFile.exists()) { + try { + byte[] bytes = FileUtils.readFileToByteArray(watermarkFile); + double watermark = ByteBuffer.wrap(bytes).getDouble(); + LOGGER.info("Loaded watermark: {} from file: {}", watermark, watermarkFile); + return watermark; + } catch (Exception e) { + LOGGER.warn("Failed to load watermark from file: {}, skipping", watermarkFile); + } + } + return defaultWatermark; + } + + /** + * Persists watermark to the file. + */ + public static void persistWatermark(double watermark, File watermarkFile) { + try { + if (watermarkFile.exists()) { + if (!FileUtils.deleteQuietly(watermarkFile)) { + LOGGER.warn("Cannot delete watermark file: {} to persist watermark: {}, skipping", watermarkFile, watermark); + return; + } + } + try (OutputStream outputStream = new FileOutputStream(watermarkFile, false); + DataOutputStream dataOutputStream = new DataOutputStream(outputStream)) { + dataOutputStream.writeDouble(watermark); + } + LOGGER.info("Persisted watermark: {} to file: {}", watermark, watermarkFile); + } catch (Exception e) { + LOGGER.warn("Failed to persist watermark: {} to file: {}, skipping", watermark, watermarkFile); + } + } + + /** + * Deletes the watermark file. + */ + public static void deleteWatermark(File watermarkFile) { + if (watermarkFile.exists()) { + if (!FileUtils.deleteQuietly(watermarkFile)) { + LOGGER.warn("Cannot delete watermark file: {}, skipping", watermarkFile); + } + } + } +} diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java index c7855c1eb3..c0697eb4c3 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.TreeMap; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.data.manager.TableDataManager; @@ -30,6 +31,7 @@ import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader; import org.apache.pinot.segment.local.utils.HashUtils; +import org.apache.pinot.segment.local.utils.WatermarkUtils; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; @@ -38,6 +40,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.PrimaryKey; import org.mockito.Mockito; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -50,17 +53,27 @@ import static org.testng.Assert.assertTrue; public class ConcurrentMapPartitionDedupMetadataManagerWithTTLTest { + private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), + ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.class.getSimpleName()); private static final int METADATA_TTL = 10000; private static final String DEDUP_TIME_COLUMN_NAME = "dedupTimeColumn"; private DedupContext.Builder _dedupContextBuilder; @BeforeMethod - public void setUpContextBuilder() { + public void setUpContextBuilder() + throws IOException { + FileUtils.forceMkdir(TEMP_DIR); _dedupContextBuilder = new DedupContext.Builder(); _dedupContextBuilder.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class)) .setPrimaryKeyColumns(List.of("primaryKeyColumn")).setMetadataTTL(METADATA_TTL) .setDedupTimeColumn(DEDUP_TIME_COLUMN_NAME).setTableIndexDir(mock(File.class)) - .setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class)); + .setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class)) + .setTableIndexDir(TEMP_DIR); + } + + @AfterMethod + public void cleanup() { + FileUtils.deleteQuietly(TEMP_DIR); } @Test @@ -134,7 +147,6 @@ public class ConcurrentMapPartitionDedupMetadataManagerWithTTLTest { dedupRecordInfoIterator = DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 10); metadataManager.doRemoveSegment(segment, dedupRecordInfoIterator); assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 0); - assertEquals(metadataManager._largestSeenTime.get(), 9000); metadataManager.stop(); metadataManager.close(); @@ -204,7 +216,6 @@ public class ConcurrentMapPartitionDedupMetadataManagerWithTTLTest { private void verifyInitialSegmentAddition(ConcurrentMapPartitionDedupMetadataManager metadataManager, IndexSegment segment, HashFunction hashFunction) { - assertEquals(metadataManager._largestSeenTime.get(), 9000); assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 10); verifyInMemoryState(metadataManager, 0, 10, segment, hashFunction); } @@ -245,23 +256,23 @@ public class ConcurrentMapPartitionDedupMetadataManagerWithTTLTest { Iterator<DedupRecordInfo> dedupRecordInfoIterator2 = DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10); metadataManager.doAddOrReplaceSegment(null, segment2, dedupRecordInfoIterator2); + + metadataManager._largestSeenTime.set(19000); metadataManager.removeExpiredPrimaryKeys(); assertEquals(metadataManager.getNumPrimaryKeys(), 11); assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 11); verifyInMemoryState(metadataManager, 9, 1, segment1, hashFunction); verifyInMemoryState(metadataManager, 10, 10, segment2, hashFunction); - assertEquals(metadataManager._largestSeenTime.get(), 19000); + assertEquals(WatermarkUtils.loadWatermark(metadataManager.getWatermarkFile(), -1), 19000); dedupRecordInfoIterator1 = DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader1, 10); metadataManager.doRemoveSegment(segment1, dedupRecordInfoIterator1); assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 10); verifyInMemoryState(metadataManager, 10, 10, segment2, hashFunction); - assertEquals(metadataManager._largestSeenTime.get(), 19000); dedupRecordInfoIterator2 = DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10); metadataManager.doRemoveSegment(segment2, dedupRecordInfoIterator2); assertTrue(metadataManager._primaryKeyToSegmentAndTimeMap.isEmpty()); - assertEquals(metadataManager._largestSeenTime.get(), 19000); metadataManager.stop(); metadataManager.close(); @@ -294,22 +305,21 @@ public class ConcurrentMapPartitionDedupMetadataManagerWithTTLTest { Iterator<DedupRecordInfo> dedupRecordInfoIterator2 = DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10); metadataManager.doAddOrReplaceSegment(null, segment2, dedupRecordInfoIterator2); + metadataManager._largestSeenTime.set(19000); metadataManager.removeExpiredPrimaryKeys(); assertEquals(metadataManager.getNumPrimaryKeys(), 11); assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 11); verifyInMemoryState(metadataManager, 10, 10, segment2, hashFunction); - assertEquals(metadataManager._largestSeenTime.get(), 19000); + assertEquals(WatermarkUtils.loadWatermark(metadataManager.getWatermarkFile(), -1), 19000); dedupRecordInfoIterator2 = DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10); metadataManager.doRemoveSegment(segment2, dedupRecordInfoIterator2); assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1); verifyInMemoryState(metadataManager, 9, 1, segment1, hashFunction); - assertEquals(metadataManager._largestSeenTime.get(), 19000); dedupRecordInfoIterator1 = DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader1, 10); metadataManager.doRemoveSegment(segment1, dedupRecordInfoIterator1); assertTrue(metadataManager._primaryKeyToSegmentAndTimeMap.isEmpty()); - assertEquals(metadataManager._largestSeenTime.get(), 19000); metadataManager.stop(); metadataManager.close(); @@ -351,7 +361,6 @@ public class ConcurrentMapPartitionDedupMetadataManagerWithTTLTest { assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1); assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.get(primaryKeyHash), Pair.of(immutableSegment, 25000.0)); - assertEquals(metadataManager._largestSeenTime.get(), 25000); metadataManager.stop(); metadataManager.close(); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java index bab3682b63..f7eadeaf09 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java @@ -23,6 +23,7 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.Map; +import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.data.manager.TableDataManager; @@ -36,6 +37,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.mockito.Mockito; import org.testng.Assert; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -45,14 +47,24 @@ import static org.testng.Assert.assertSame; public class ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest { + private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), + ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.class.getSimpleName()); private DedupContext.Builder _dedupContextBuilder; @BeforeMethod - public void setUpContextBuilder() { + public void setUpContextBuilder() + throws IOException { + FileUtils.forceMkdir(TEMP_DIR); _dedupContextBuilder = new DedupContext.Builder(); _dedupContextBuilder.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class)) .setPrimaryKeyColumns(List.of("primaryKeyColumn")).setTableIndexDir(mock(File.class)) - .setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class)); + .setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class)) + .setTableIndexDir(TEMP_DIR); + } + + @AfterMethod + public void cleanup() { + FileUtils.deleteQuietly(TEMP_DIR); } @Test diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java index 36c5627902..8c4594dc91 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java @@ -26,6 +26,7 @@ import java.net.URL; import java.util.Collections; import java.util.List; import java.util.Map; +import org.apache.commons.io.FileUtils; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; @@ -49,6 +50,8 @@ import org.testng.annotations.Test; public class MutableSegmentDedupeTest { + private static final File TEMP_DIR = + new File(FileUtils.getTempDirectory(), MutableSegmentDedupeTest.class.getSimpleName()); private static final String SCHEMA_FILE_PATH = "data/test_dedup_schema.json"; private static final String DATA_FILE_PATH = "data/test_dedup_data.json"; private MutableSegmentImpl _mutableSegmentImpl; @@ -88,17 +91,18 @@ public class MutableSegmentDedupeTest { TableConfig tableConfig = Mockito.mock(TableConfig.class); Mockito.when(tableConfig.getTableName()).thenReturn("testTable_REALTIME"); Mockito.when(tableConfig.getDedupConfig()).thenReturn(dedupConfig); - SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig - = Mockito.mock(SegmentsValidationAndRetentionConfig.class); + SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig = + Mockito.mock(SegmentsValidationAndRetentionConfig.class); Mockito.when(tableConfig.getValidationConfig()).thenReturn(segmentsValidationAndRetentionConfig); Mockito.when(segmentsValidationAndRetentionConfig.getTimeColumnName()).thenReturn("secondsSinceEpoch"); TableDataManager tableDataManager = Mockito.mock(TableDataManager.class); - Mockito.when(tableDataManager.getTableDataDir()).thenReturn(Mockito.mock(File.class)); + Mockito.when(tableDataManager.getTableDataDir()).thenReturn(TEMP_DIR); return TableDedupMetadataManagerFactory.create(tableConfig, schema, tableDataManager, Mockito.mock(ServerMetrics.class)); } - public List<Map<String, String>> loadJsonFile(String filePath) throws IOException { + public List<Map<String, String>> loadJsonFile(String filePath) + throws IOException { URL resourceUrl = this.getClass().getClassLoader().getResource(filePath); if (resourceUrl == null) { throw new IllegalArgumentException("File not found: " + filePath); @@ -165,8 +169,8 @@ public class MutableSegmentDedupeTest { } } - private void verifyGeneratedSegmentDataAgainstRawData( - int docId, int rawDataIndex, List<Map<String, String>> rawData) { + private void verifyGeneratedSegmentDataAgainstRawData(int docId, int rawDataIndex, + List<Map<String, String>> rawData) { for (String columnName : rawData.get(0).keySet()) { Assert.assertEquals(String.valueOf(_mutableSegmentImpl.getValue(docId, columnName)), String.valueOf(rawData.get(rawDataIndex).get(columnName))); diff --git a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java index 278f0f5ef5..90c03bb5de 100644 --- a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java +++ b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java @@ -40,6 +40,7 @@ import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader; import org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager.RecordLocation; import org.apache.pinot.segment.local.utils.HashUtils; +import org.apache.pinot.segment.local.utils.WatermarkUtils; import org.apache.pinot.segment.spi.ColumnMetadata; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.MutableSegment; @@ -228,10 +229,10 @@ public class ConcurrentMapPartitionUpsertMetadataManagerTest { new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 0, _contextBuilder.build()); double currentTimeMs = System.currentTimeMillis(); - upsertMetadataManager.persistWatermark(currentTimeMs); + WatermarkUtils.persistWatermark(currentTimeMs, upsertMetadataManager.getWatermarkFile()); assertTrue(new File(INDEX_DIR, V1Constants.TTL_WATERMARK_TABLE_PARTITION + 0).exists()); - double watermark = upsertMetadataManager.loadWatermark(); + double watermark = WatermarkUtils.loadWatermark(upsertMetadataManager.getWatermarkFile(), -1); assertEquals(watermark, currentTimeMs); ImmutableSegmentImpl segment = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org