This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new e4a9f15697 Add watermark for dedup TTL (#14137)
e4a9f15697 is described below

commit e4a9f15697b9e13eeab60a0c89c60acb753af72e
Author: Xiaobing <61892277+klsi...@users.noreply.github.com>
AuthorDate: Wed Oct 2 09:30:24 2024 -0700

    Add watermark for dedup TTL (#14137)
    
    * store/load watermark for dedup ttl
    
    * skip semgments out of TTL for all add/replace/remove methods
---
 .../dedup/BasePartitionDedupMetadataManager.java   | 69 ++++++++++++-----
 ...ConcurrentMapPartitionDedupMetadataManager.java | 11 ++-
 .../ConcurrentMapTableDedupMetadataManager.java    |  1 -
 .../upsert/BasePartitionUpsertMetadataManager.java | 66 ++--------------
 .../pinot/segment/local/utils/WatermarkUtils.java  | 88 ++++++++++++++++++++++
 ...apPartitionDedupMetadataManagerWithTTLTest.java | 31 +++++---
 ...artitionDedupMetadataManagerWithoutTTLTest.java | 16 +++-
 .../mutable/MutableSegmentDedupeTest.java          | 16 ++--
 ...rrentMapPartitionUpsertMetadataManagerTest.java |  5 +-
 9 files changed, 197 insertions(+), 106 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
index 3009875c3e..3892d36d92 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.dedup;
 
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AtomicDouble;
+import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
@@ -30,7 +31,9 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerTimer;
 import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.utils.WatermarkUtils;
 import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.slf4j.Logger;
@@ -38,6 +41,8 @@ import org.slf4j.LoggerFactory;
 
 
 public abstract class BasePartitionDedupMetadataManager implements 
PartitionDedupMetadataManager {
+  // The special value to indicate the largest seen time is not set yet, 
assuming times are positive.
+  protected static final double TTL_WATERMARK_NOT_SET = 0;
   protected final String _tableNameWithType;
   protected final List<String> _primaryKeyColumns;
   protected final int _partitionId;
@@ -45,7 +50,8 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
   protected final HashFunction _hashFunction;
   protected final double _metadataTTL;
   protected final String _dedupTimeColumn;
-  protected final AtomicDouble _largestSeenTime = new AtomicDouble(0);
+  protected final AtomicDouble _largestSeenTime;
+  protected final File _tableIndexDir;
   protected final Logger _logger;
   // The following variables are always accessed within synchronized block
   private boolean _stopped;
@@ -61,12 +67,17 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
     _serverMetrics = dedupContext.getServerMetrics();
     _metadataTTL = dedupContext.getMetadataTTL() >= 0 ? 
dedupContext.getMetadataTTL() : 0;
     _dedupTimeColumn = dedupContext.getDedupTimeColumn();
+    _tableIndexDir = dedupContext.getTableIndexDir();
+    _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + 
"-" + getClass().getSimpleName());
     if (_metadataTTL > 0) {
       Preconditions.checkArgument(_dedupTimeColumn != null,
           "When metadataTTL is configured, metadata time column must be 
configured for dedup enabled table: %s",
           tableNameWithType);
+      _largestSeenTime = new 
AtomicDouble(WatermarkUtils.loadWatermark(getWatermarkFile(), 
TTL_WATERMARK_NOT_SET));
+    } else {
+      _largestSeenTime = new AtomicDouble(TTL_WATERMARK_NOT_SET);
+      WatermarkUtils.deleteWatermark(getWatermarkFile());
     }
-    _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + 
"-" + getClass().getSimpleName());
   }
 
   @Override
@@ -85,17 +96,6 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
     Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
         "Got unsupported segment implementation: %s for segment: %s, table: 
%s", segment.getClass(), segmentName,
         _tableNameWithType);
-    // If metadataTTL is enabled, we can skip adding segment that's already 
getting out of the TTL.
-    if (_metadataTTL > 0) {
-      double maxDedupTime = ((Number) 
segment.getSegmentMetadata().getColumnMetadataMap().get(_dedupTimeColumn)
-          .getMaxValue()).doubleValue();
-      _largestSeenTime.getAndUpdate(time -> Math.max(time, maxDedupTime));
-      if (isOutOfMetadataTTL(maxDedupTime)) {
-        _logger.info("Skip adding segment: {} as max dedupTime: {} is out of 
metadataTTL: {}", segmentName,
-            _dedupTimeColumn, _metadataTTL);
-        return;
-      }
-    }
     if (!startOperation()) {
       _logger.info("Skip adding segment: {} because dedup metadata manager is 
already stopped",
           segment.getSegmentName());
@@ -133,6 +133,17 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
 
   private void addOrReplaceSegment(@Nullable IndexSegment oldSegment, 
IndexSegment newSegment)
       throws IOException {
+    // If metadataTTL is enabled, we can skip adding dedup metadata for 
segment that's already out of the TTL.
+    if (_metadataTTL > 0) {
+      double maxDedupTime = getMaxDedupTime(newSegment);
+      _largestSeenTime.getAndUpdate(time -> Math.max(time, maxDedupTime));
+      if (isOutOfMetadataTTL(maxDedupTime)) {
+        String action = oldSegment == null ? "adding" : "replacing";
+        _logger.info("Skip {} segment: {} as max dedupTime: {} is out of TTL: 
{}", action, newSegment.getSegmentName(),
+            maxDedupTime, _metadataTTL);
+        return;
+      }
+    }
     try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new 
DedupUtils.DedupRecordInfoReader(newSegment,
         _primaryKeyColumns, _dedupTimeColumn)) {
       Iterator<DedupRecordInfo> dedupRecordInfoIterator =
@@ -158,6 +169,15 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
       _logger.info("Skip removing segment: {} because metadata manager is 
already stopped", segment.getSegmentName());
       return;
     }
+    // Skip removing the dedup metadata of segment out of TTL. The expired 
metadata is removed in batches.
+    if (_metadataTTL > 0) {
+      double maxDedupTime = getMaxDedupTime(segment);
+      if (isOutOfMetadataTTL(maxDedupTime)) {
+        _logger.info("Skip removing segment: {} as max dedupTime: {} is out of 
TTL: {}", segment.getSegmentName(),
+            maxDedupTime, _metadataTTL);
+        return;
+      }
+    }
     try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new 
DedupUtils.DedupRecordInfoReader(segment,
         _primaryKeyColumns, _dedupTimeColumn)) {
       Iterator<DedupRecordInfo> dedupRecordInfoIterator =
@@ -175,8 +195,26 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
 
   protected abstract void doRemoveSegment(IndexSegment segment, 
Iterator<DedupRecordInfo> dedupRecordInfoIterator);
 
+  protected boolean isOutOfMetadataTTL(double dedupTime) {
+    return _metadataTTL > 0 && dedupTime < _largestSeenTime.get() - 
_metadataTTL;
+  }
+
+  protected double getMaxDedupTime(IndexSegment segment) {
+    return ((Number) 
segment.getSegmentMetadata().getColumnMetadataMap().get(_dedupTimeColumn)
+        .getMaxValue()).doubleValue();
+  }
+
+  protected File getWatermarkFile() {
+    // Use 'dedup' suffix to avoid conflicts with upsert watermark file, as 
it's possible that a table is changed
+    // from using dedup to upsert and the watermark should be re-calculated 
based on upsert comparison column.
+    return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION 
+ _partitionId + ".dedup");
+  }
+
   @Override
   public void removeExpiredPrimaryKeys() {
+    if (_metadataTTL <= 0) {
+      return;
+    }
     if (!startOperation()) {
       _logger.info("Skip removing expired primary keys because metadata 
manager is already stopped");
       return;
@@ -184,6 +222,7 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
     try {
       long startTime = System.currentTimeMillis();
       doRemoveExpiredPrimaryKeys();
+      WatermarkUtils.persistWatermark(_largestSeenTime.get(), 
getWatermarkFile());
       long duration = System.currentTimeMillis() - startTime;
       _serverMetrics.addTimedTableValue(_tableNameWithType, 
ServerTimer.DEDUP_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS,
           duration, TimeUnit.MILLISECONDS);
@@ -251,10 +290,6 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
     _logger.info("Closed the metadata manager");
   }
 
-  protected boolean isOutOfMetadataTTL(double dedupTime) {
-    return _metadataTTL > 0 && dedupTime < _largestSeenTime.get() - 
_metadataTTL;
-  }
-
   protected abstract long getNumPrimaryKeys();
 
   protected void updatePrimaryKeyGauge(long numPrimaryKeys) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
index 5cfb2cea42..4461266e35 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
@@ -45,7 +45,6 @@ class ConcurrentMapPartitionDedupMetadataManager extends 
BasePartitionDedupMetad
     while (dedupRecordInfoIteratorOfNewSegment.hasNext()) {
       DedupRecordInfo dedupRecordInfo = 
dedupRecordInfoIteratorOfNewSegment.next();
       double dedupTime = dedupRecordInfo.getDedupTime();
-      _largestSeenTime.getAndUpdate(time -> Math.max(time, dedupTime));
       
_primaryKeyToSegmentAndTimeMap.compute(HashUtils.hashPrimaryKey(dedupRecordInfo.getPrimaryKey(),
 _hashFunction),
           (primaryKey, segmentAndTime) -> {
             // Stale metadata is treated as not existing when checking for 
deduplicates.
@@ -94,10 +93,8 @@ class ConcurrentMapPartitionDedupMetadataManager extends 
BasePartitionDedupMetad
 
   @Override
   protected void doRemoveExpiredPrimaryKeys() {
-    if (_metadataTTL > 0) {
-      double smallestTimeToKeep = _largestSeenTime.get() - _metadataTTL;
-      _primaryKeyToSegmentAndTimeMap.entrySet().removeIf(entry -> 
entry.getValue().getRight() < smallestTimeToKeep);
-    }
+    double smallestTimeToKeep = _largestSeenTime.get() - _metadataTTL;
+    _primaryKeyToSegmentAndTimeMap.entrySet().removeIf(entry -> 
entry.getValue().getRight() < smallestTimeToKeep);
   }
 
   @Override
@@ -108,7 +105,9 @@ class ConcurrentMapPartitionDedupMetadataManager extends 
BasePartitionDedupMetad
       return true;
     }
     try {
-      _largestSeenTime.getAndUpdate(time -> Math.max(time, 
dedupRecordInfo.getDedupTime()));
+      if (_metadataTTL > 0) {
+        _largestSeenTime.getAndUpdate(time -> Math.max(time, 
dedupRecordInfo.getDedupTime()));
+      }
       AtomicBoolean present = new AtomicBoolean(false);
       
_primaryKeyToSegmentAndTimeMap.compute(HashUtils.hashPrimaryKey(dedupRecordInfo.getPrimaryKey(),
 _hashFunction),
           (primaryKey, segmentAndTime) -> {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
index 4bb7929ffe..ea9ac9117b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapTableDedupMetadataManager.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.segment.local.dedup;
 
-
 class ConcurrentMapTableDedupMetadataManager extends 
BaseTableDedupMetadataManager {
   protected PartitionDedupMetadataManager 
createPartitionDedupMetadataManager(Integer partitionId) {
     return new ConcurrentMapPartitionDedupMetadataManager(_tableNameWithType, 
partitionId, _dedupContext);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index 18142e2981..9a868b34c5 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -21,12 +21,8 @@ package org.apache.pinot.segment.local.upsert;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.AtomicDouble;
-import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -45,7 +41,6 @@ import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
-import org.apache.commons.io.FileUtils;
 import org.apache.helix.HelixManager;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.Utils;
@@ -66,6 +61,7 @@ import 
org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
 import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.local.utils.WatermarkUtils;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
@@ -184,10 +180,11 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
       Preconditions.checkState(_comparisonColumns.size() == 1,
           "Upsert TTL does not work with multiple comparison columns");
       Preconditions.checkState(_metadataTTL <= 0 || _enableSnapshot, "Upsert 
metadata TTL must have snapshot enabled");
-      _largestSeenComparisonValue = new AtomicDouble(loadWatermark());
+      _largestSeenComparisonValue =
+          new AtomicDouble(WatermarkUtils.loadWatermark(getWatermarkFile(), 
TTL_WATERMARK_NOT_SET));
     } else {
       _largestSeenComparisonValue = new AtomicDouble(TTL_WATERMARK_NOT_SET);
-      deleteWatermark();
+      WatermarkUtils.deleteWatermark(getWatermarkFile());
     }
   }
 
@@ -1013,7 +1010,7 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
     // updated validDocIds bitmaps. If the TTL watermark is persisted first, 
segments out of TTL may get loaded with
     // stale bitmaps or even no bitmap snapshots to use.
     if (isTTLEnabled()) {
-      persistWatermark(_largestSeenComparisonValue.get());
+      WatermarkUtils.persistWatermark(_largestSeenComparisonValue.get(), 
getWatermarkFile());
     }
     _serverMetrics.setValueOfPartitionGauge(_tableNameWithType, _partitionId,
         ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT, numImmutableSegments);
@@ -1030,59 +1027,6 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
         numConsumingSegments, System.currentTimeMillis() - startTimeMs);
   }
 
-  /**
-   * Loads watermark from the file if exists.
-   */
-  protected double loadWatermark() {
-    File watermarkFile = getWatermarkFile();
-    if (watermarkFile.exists()) {
-      try {
-        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
-        double watermark = ByteBuffer.wrap(bytes).getDouble();
-        _logger.info("Loaded watermark: {} from file for table: {} 
partition_id: {}", watermark, _tableNameWithType,
-            _partitionId);
-        return watermark;
-      } catch (Exception e) {
-        _logger.warn("Caught exception while loading watermark file: {}, 
skipping", watermarkFile);
-      }
-    }
-    return TTL_WATERMARK_NOT_SET;
-  }
-
-  /**
-   * Persists watermark to the file.
-   */
-  protected void persistWatermark(double watermark) {
-    File watermarkFile = getWatermarkFile();
-    try {
-      if (watermarkFile.exists()) {
-        if (!FileUtils.deleteQuietly(watermarkFile)) {
-          _logger.warn("Cannot delete watermark file: {}, skipping", 
watermarkFile);
-          return;
-        }
-      }
-      try (OutputStream outputStream = new FileOutputStream(watermarkFile, 
false);
-          DataOutputStream dataOutputStream = new 
DataOutputStream(outputStream)) {
-        dataOutputStream.writeDouble(watermark);
-      }
-      _logger.info("Persisted watermark: {} to file: {}", watermark, 
watermarkFile);
-    } catch (Exception e) {
-      _logger.warn("Caught exception while persisting watermark file: {}, 
skipping", watermarkFile);
-    }
-  }
-
-  /**
-   * Deletes the watermark file.
-   */
-  protected void deleteWatermark() {
-    File watermarkFile = getWatermarkFile();
-    if (watermarkFile.exists()) {
-      if (!FileUtils.deleteQuietly(watermarkFile)) {
-        _logger.warn("Cannot delete watermark file: {}, skipping", 
watermarkFile);
-      }
-    }
-  }
-
   protected File getWatermarkFile() {
     return new File(_tableIndexDir, V1Constants.TTL_WATERMARK_TABLE_PARTITION 
+ _partitionId);
   }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/WatermarkUtils.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/WatermarkUtils.java
new file mode 100644
index 0000000000..d19591db4e
--- /dev/null
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/WatermarkUtils.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.utils;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import org.apache.commons.io.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utils methods to manage the TTL watermark for dedup and upsert tables, as 
both share very similar logic.
+ */
+public class WatermarkUtils {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(WatermarkUtils.class);
+
+  private WatermarkUtils() {
+  }
+
+  /**
+   * Loads watermark from the file if exists.
+   */
+  public static double loadWatermark(File watermarkFile, double 
defaultWatermark) {
+    if (watermarkFile.exists()) {
+      try {
+        byte[] bytes = FileUtils.readFileToByteArray(watermarkFile);
+        double watermark = ByteBuffer.wrap(bytes).getDouble();
+        LOGGER.info("Loaded watermark: {} from file: {}", watermark, 
watermarkFile);
+        return watermark;
+      } catch (Exception e) {
+        LOGGER.warn("Failed to load watermark from file: {}, skipping", 
watermarkFile);
+      }
+    }
+    return defaultWatermark;
+  }
+
+  /**
+   * Persists watermark to the file.
+   */
+  public static void persistWatermark(double watermark, File watermarkFile) {
+    try {
+      if (watermarkFile.exists()) {
+        if (!FileUtils.deleteQuietly(watermarkFile)) {
+          LOGGER.warn("Cannot delete watermark file: {} to persist watermark: 
{}, skipping", watermarkFile, watermark);
+          return;
+        }
+      }
+      try (OutputStream outputStream = new FileOutputStream(watermarkFile, 
false);
+          DataOutputStream dataOutputStream = new 
DataOutputStream(outputStream)) {
+        dataOutputStream.writeDouble(watermark);
+      }
+      LOGGER.info("Persisted watermark: {} to file: {}", watermark, 
watermarkFile);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to persist watermark: {} to file: {}, skipping", 
watermark, watermarkFile);
+    }
+  }
+
+  /**
+   * Deletes the watermark file.
+   */
+  public static void deleteWatermark(File watermarkFile) {
+    if (watermarkFile.exists()) {
+      if (!FileUtils.deleteQuietly(watermarkFile)) {
+        LOGGER.warn("Cannot delete watermark file: {}, skipping", 
watermarkFile);
+      }
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
index c7855c1eb3..c0697eb4c3 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.TreeMap;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -30,6 +31,7 @@ import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import org.apache.pinot.segment.local.segment.readers.PrimaryKeyReader;
 import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.local.utils.WatermarkUtils;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
@@ -38,6 +40,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 import org.mockito.Mockito;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -50,17 +53,27 @@ import static org.testng.Assert.assertTrue;
 
 
 public class ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
+      
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.class.getSimpleName());
   private static final int METADATA_TTL = 10000;
   private static final String DEDUP_TIME_COLUMN_NAME = "dedupTimeColumn";
   private DedupContext.Builder _dedupContextBuilder;
 
   @BeforeMethod
-  public void setUpContextBuilder() {
+  public void setUpContextBuilder()
+      throws IOException {
+    FileUtils.forceMkdir(TEMP_DIR);
     _dedupContextBuilder = new DedupContext.Builder();
     
_dedupContextBuilder.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
         
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setMetadataTTL(METADATA_TTL)
         
.setDedupTimeColumn(DEDUP_TIME_COLUMN_NAME).setTableIndexDir(mock(File.class))
-        
.setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class));
+        
.setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class))
+        .setTableIndexDir(TEMP_DIR);
+  }
+
+  @AfterMethod
+  public void cleanup() {
+    FileUtils.deleteQuietly(TEMP_DIR);
   }
 
   @Test
@@ -134,7 +147,6 @@ public class 
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
     dedupRecordInfoIterator = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 10);
     metadataManager.doRemoveSegment(segment, dedupRecordInfoIterator);
     assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 0);
-    assertEquals(metadataManager._largestSeenTime.get(), 9000);
 
     metadataManager.stop();
     metadataManager.close();
@@ -204,7 +216,6 @@ public class 
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
 
   private void 
verifyInitialSegmentAddition(ConcurrentMapPartitionDedupMetadataManager 
metadataManager,
       IndexSegment segment, HashFunction hashFunction) {
-    assertEquals(metadataManager._largestSeenTime.get(), 9000);
     assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 10);
     verifyInMemoryState(metadataManager, 0, 10, segment, hashFunction);
   }
@@ -245,23 +256,23 @@ public class 
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
     Iterator<DedupRecordInfo> dedupRecordInfoIterator2 =
         DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
     metadataManager.doAddOrReplaceSegment(null, segment2, 
dedupRecordInfoIterator2);
+
+    metadataManager._largestSeenTime.set(19000);
     metadataManager.removeExpiredPrimaryKeys();
     assertEquals(metadataManager.getNumPrimaryKeys(), 11);
     assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 11);
     verifyInMemoryState(metadataManager, 9, 1, segment1, hashFunction);
     verifyInMemoryState(metadataManager, 10, 10, segment2, hashFunction);
-    assertEquals(metadataManager._largestSeenTime.get(), 19000);
+    
assertEquals(WatermarkUtils.loadWatermark(metadataManager.getWatermarkFile(), 
-1), 19000);
 
     dedupRecordInfoIterator1 = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader1, 10);
     metadataManager.doRemoveSegment(segment1, dedupRecordInfoIterator1);
     assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 10);
     verifyInMemoryState(metadataManager, 10, 10, segment2, hashFunction);
-    assertEquals(metadataManager._largestSeenTime.get(), 19000);
 
     dedupRecordInfoIterator2 = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
     metadataManager.doRemoveSegment(segment2, dedupRecordInfoIterator2);
     assertTrue(metadataManager._primaryKeyToSegmentAndTimeMap.isEmpty());
-    assertEquals(metadataManager._largestSeenTime.get(), 19000);
 
     metadataManager.stop();
     metadataManager.close();
@@ -294,22 +305,21 @@ public class 
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
     Iterator<DedupRecordInfo> dedupRecordInfoIterator2 =
         DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
     metadataManager.doAddOrReplaceSegment(null, segment2, 
dedupRecordInfoIterator2);
+    metadataManager._largestSeenTime.set(19000);
     metadataManager.removeExpiredPrimaryKeys();
     assertEquals(metadataManager.getNumPrimaryKeys(), 11);
     assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 11);
     verifyInMemoryState(metadataManager, 10, 10, segment2, hashFunction);
-    assertEquals(metadataManager._largestSeenTime.get(), 19000);
+    
assertEquals(WatermarkUtils.loadWatermark(metadataManager.getWatermarkFile(), 
-1), 19000);
 
     dedupRecordInfoIterator2 = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader2, 10);
     metadataManager.doRemoveSegment(segment2, dedupRecordInfoIterator2);
     assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1);
     verifyInMemoryState(metadataManager, 9, 1, segment1, hashFunction);
-    assertEquals(metadataManager._largestSeenTime.get(), 19000);
 
     dedupRecordInfoIterator1 = 
DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader1, 10);
     metadataManager.doRemoveSegment(segment1, dedupRecordInfoIterator1);
     assertTrue(metadataManager._primaryKeyToSegmentAndTimeMap.isEmpty());
-    assertEquals(metadataManager._largestSeenTime.get(), 19000);
 
     metadataManager.stop();
     metadataManager.close();
@@ -351,7 +361,6 @@ public class 
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
     assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.size(), 1);
     
assertEquals(metadataManager._primaryKeyToSegmentAndTimeMap.get(primaryKeyHash),
         Pair.of(immutableSegment, 25000.0));
-    assertEquals(metadataManager._largestSeenTime.get(), 25000);
 
     metadataManager.stop();
     metadataManager.close();
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
index bab3682b63..f7eadeaf09 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -36,6 +37,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.mockito.Mockito;
 import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -45,14 +47,24 @@ import static org.testng.Assert.assertSame;
 
 
 public class ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(),
+      
ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.class.getSimpleName());
   private DedupContext.Builder _dedupContextBuilder;
 
   @BeforeMethod
-  public void setUpContextBuilder() {
+  public void setUpContextBuilder()
+      throws IOException {
+    FileUtils.forceMkdir(TEMP_DIR);
     _dedupContextBuilder = new DedupContext.Builder();
     
_dedupContextBuilder.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
         
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setTableIndexDir(mock(File.class))
-        
.setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class));
+        
.setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class))
+        .setTableIndexDir(TEMP_DIR);
+  }
+
+  @AfterMethod
+  public void cleanup() {
+    FileUtils.deleteQuietly(TEMP_DIR);
   }
 
   @Test
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
index 36c5627902..8c4594dc91 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
@@ -26,6 +26,7 @@ import java.net.URL;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
@@ -49,6 +50,8 @@ import org.testng.annotations.Test;
 
 
 public class MutableSegmentDedupeTest {
+  private static final File TEMP_DIR =
+      new File(FileUtils.getTempDirectory(), 
MutableSegmentDedupeTest.class.getSimpleName());
   private static final String SCHEMA_FILE_PATH = "data/test_dedup_schema.json";
   private static final String DATA_FILE_PATH = "data/test_dedup_data.json";
   private MutableSegmentImpl _mutableSegmentImpl;
@@ -88,17 +91,18 @@ public class MutableSegmentDedupeTest {
     TableConfig tableConfig = Mockito.mock(TableConfig.class);
     Mockito.when(tableConfig.getTableName()).thenReturn("testTable_REALTIME");
     Mockito.when(tableConfig.getDedupConfig()).thenReturn(dedupConfig);
-    SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig
-        = Mockito.mock(SegmentsValidationAndRetentionConfig.class);
+    SegmentsValidationAndRetentionConfig segmentsValidationAndRetentionConfig =
+        Mockito.mock(SegmentsValidationAndRetentionConfig.class);
     
Mockito.when(tableConfig.getValidationConfig()).thenReturn(segmentsValidationAndRetentionConfig);
     
Mockito.when(segmentsValidationAndRetentionConfig.getTimeColumnName()).thenReturn("secondsSinceEpoch");
     TableDataManager tableDataManager = Mockito.mock(TableDataManager.class);
-    
Mockito.when(tableDataManager.getTableDataDir()).thenReturn(Mockito.mock(File.class));
+    Mockito.when(tableDataManager.getTableDataDir()).thenReturn(TEMP_DIR);
     return TableDedupMetadataManagerFactory.create(tableConfig, schema, 
tableDataManager,
         Mockito.mock(ServerMetrics.class));
   }
 
-  public List<Map<String, String>> loadJsonFile(String filePath) throws 
IOException {
+  public List<Map<String, String>> loadJsonFile(String filePath)
+      throws IOException {
     URL resourceUrl = this.getClass().getClassLoader().getResource(filePath);
     if (resourceUrl == null) {
       throw new IllegalArgumentException("File not found: " + filePath);
@@ -165,8 +169,8 @@ public class MutableSegmentDedupeTest {
     }
   }
 
-  private void verifyGeneratedSegmentDataAgainstRawData(
-      int docId, int rawDataIndex, List<Map<String, String>> rawData) {
+  private void verifyGeneratedSegmentDataAgainstRawData(int docId, int 
rawDataIndex,
+      List<Map<String, String>> rawData) {
     for (String columnName : rawData.get(0).keySet()) {
       Assert.assertEquals(String.valueOf(_mutableSegmentImpl.getValue(docId, 
columnName)),
           String.valueOf(rawData.get(rawDataIndex).get(columnName)));
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
index 278f0f5ef5..90c03bb5de 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManagerTest.java
@@ -40,6 +40,7 @@ import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImp
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
 import 
org.apache.pinot.segment.local.upsert.ConcurrentMapPartitionUpsertMetadataManager.RecordLocation;
 import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.local.utils.WatermarkUtils;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.MutableSegment;
@@ -228,10 +229,10 @@ public class 
ConcurrentMapPartitionUpsertMetadataManagerTest {
         new ConcurrentMapPartitionUpsertMetadataManager(REALTIME_TABLE_NAME, 
0, _contextBuilder.build());
 
     double currentTimeMs = System.currentTimeMillis();
-    upsertMetadataManager.persistWatermark(currentTimeMs);
+    WatermarkUtils.persistWatermark(currentTimeMs, 
upsertMetadataManager.getWatermarkFile());
     assertTrue(new File(INDEX_DIR, V1Constants.TTL_WATERMARK_TABLE_PARTITION + 
0).exists());
 
-    double watermark = upsertMetadataManager.loadWatermark();
+    double watermark = 
WatermarkUtils.loadWatermark(upsertMetadataManager.getWatermarkFile(), -1);
     assertEquals(watermark, currentTimeMs);
 
     ImmutableSegmentImpl segment =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to