This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 767f32fc11 add preloading support for dedup tables (#14187)
767f32fc11 is described below

commit 767f32fc113dbf5c027bc10b1cb695611194614e
Author: Xiaobing <61892277+klsi...@users.noreply.github.com>
AuthorDate: Tue Oct 22 08:25:03 2024 -0700

    add preloading support for dedup tables (#14187)
    
    * add preloading suuport for dedup tables
---
 .../apache/pinot/common/metrics/ServerMeter.java   |   1 +
 .../apache/pinot/common/metrics/ServerTimer.java   |   1 +
 .../common/utils/config/TableConfigSerDeTest.java  |   7 +-
 .../manager/realtime/RealtimeTableDataManager.java |  59 +++++--
 .../tests/DedupPreloadIntegrationTest.java         | 162 +++++++++++++++++++
 .../dedup/BasePartitionDedupMetadataManager.java   | 171 +++++++++++++++++----
 .../local/dedup/BaseTableDedupMetadataManager.java |  33 ++--
 ...ConcurrentMapPartitionDedupMetadataManager.java |  11 ++
 .../pinot/segment/local/dedup/DedupContext.java    |  33 ++--
 .../local/dedup/PartitionDedupMetadataManager.java |  18 +++
 .../local/dedup/TableDedupMetadataManager.java     |   2 +
 .../BasePartitionDedupMetadataManagerTest.java     |  86 +++++++++++
 ...apPartitionDedupMetadataManagerWithTTLTest.java |   7 +-
 ...artitionDedupMetadataManagerWithoutTTLTest.java |   4 +-
 .../TableDedupMetadataManagerFactoryTest.java      |  68 ++++++++
 .../mutable/MutableSegmentDedupeTest.java          |   2 +-
 .../apache/pinot/spi/config/table/DedupConfig.java |  14 +-
 17 files changed, 592 insertions(+), 87 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 77595d6181..a68e77f144 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -52,6 +52,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   // number of times partition of a record did not match the partition of the 
stream
   REALTIME_PARTITION_MISMATCH("mismatch", false),
   REALTIME_DEDUP_DROPPED("rows", false),
+  DEDUP_PRELOAD_FAILURE("count", false),
   UPSERT_KEYS_IN_WRONG_SEGMENT("rows", false),
   PARTIAL_UPSERT_OUT_OF_ORDER("rows", false),
   PARTIAL_UPSERT_KEYS_NOT_REPLACED("rows", false),
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index 63b42440a6..6738dbe194 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -58,6 +58,7 @@ public enum ServerTimer implements AbstractMetrics.Timer {
 
   DEDUP_REMOVE_EXPIRED_PRIMARY_KEYS_TIME_MS("milliseconds", false,
       "Total time taken to delete expired dedup primary keys based on 
metadataTTL or deletedKeysTTL"),
+  DEDUP_PRELOAD_TIME_MS("milliseconds", false, "Total time taken to preload a 
table partition of a dedup table"),
 
   SECONDARY_Q_WAIT_TIME_MS("milliseconds", false,
       "Time spent waiting in the secondary queue when BinaryWorkloadScheduler 
is used."),
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
index 47de3d6225..5972994e2d 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeTest.java
@@ -65,6 +65,7 @@ import static org.testng.Assert.*;
 
 public class TableConfigSerDeTest {
   private static final double NO_DICTIONARY_THRESHOLD_RATIO = 0.72;
+
   @Test
   public void testSerDe()
       throws IOException {
@@ -192,8 +193,8 @@ public class TableConfigSerDeTest {
     }
     {
       // With query config
-      QueryConfig queryConfig = new QueryConfig(1000L, true, true, 
Collections.singletonMap("func(a)", "b"), null,
-          null);
+      QueryConfig queryConfig =
+          new QueryConfig(1000L, true, true, 
Collections.singletonMap("func(a)", "b"), null, null);
       TableConfig tableConfig = 
tableConfigBuilder.setQueryConfig(queryConfig).build();
 
       checkQueryConfig(tableConfig);
@@ -270,7 +271,7 @@ public class TableConfigSerDeTest {
     }
     {
       // with dedup config - with metadata ttl and metadata time column
-      DedupConfig dedupConfig = new DedupConfig(true, HashFunction.MD5, null, 
null, 10, "dedupTimeColumn");
+      DedupConfig dedupConfig = new DedupConfig(true, HashFunction.MD5, null, 
null, 10, "dedupTimeColumn", false);
       TableConfig tableConfig = 
tableConfigBuilder.setDedupConfig(dedupConfig).build();
       // Serialize then de-serialize
       
checkTableConfigWithDedupConfigWithTTL(JsonUtils.stringToObject(tableConfig.toJsonString(),
 TableConfig.class));
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 297254529d..b4b33baa02 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -402,6 +402,15 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
         && _tableUpsertMetadataManager.getUpsertMode() == 
UpsertConfig.Mode.PARTIAL;
   }
 
+  private void handleSegmentPreload(SegmentZKMetadata zkMetadata, 
IndexLoadingConfig indexLoadingConfig) {
+    // Today a table can use either upsert or dedup but not both at the same 
time, so preloading is done by either the
+    // upsert manager or the dedup manager.
+    // TODO: if a table can enable both dedup and upsert in the future, we 
need to revisit the preloading logic here,
+    //       as we can only preload segments once but have to restore metadata 
for both dedup and upsert managers.
+    handleUpsertPreload(zkMetadata, indexLoadingConfig);
+    handleDedupPreload(zkMetadata, indexLoadingConfig);
+  }
+
   /**
    * Handles upsert preload if the upsert preload is enabled.
    */
@@ -417,6 +426,21 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     
_tableUpsertMetadataManager.getOrCreatePartitionManager(partitionId).preloadSegments(indexLoadingConfig);
   }
 
+  /**
+   * Handles dedup preload if the dedup preload is enabled.
+   */
+  private void handleDedupPreload(SegmentZKMetadata zkMetadata, 
IndexLoadingConfig indexLoadingConfig) {
+    if (_tableDedupMetadataManager == null || 
!_tableDedupMetadataManager.isEnablePreload()) {
+      return;
+    }
+    String segmentName = zkMetadata.getSegmentName();
+    Integer partitionId = 
SegmentUtils.getRealtimeSegmentPartitionId(segmentName, zkMetadata, null);
+    Preconditions.checkState(partitionId != null,
+        String.format("Failed to get partition id for segment: %s in 
dedup-enabled table: %s", segmentName,
+            _tableNameWithType));
+    
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionId).preloadSegments(indexLoadingConfig);
+  }
+
   protected void doAddOnlineSegment(String segmentName)
       throws Exception {
     SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
@@ -424,7 +448,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
         "Segment: %s of table: %s is not committed, cannot make it ONLINE", 
segmentName, _tableNameWithType);
     IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
     indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
-    handleUpsertPreload(zkMetadata, indexLoadingConfig);
+    handleSegmentPreload(zkMetadata, indexLoadingConfig);
     SegmentDataManager segmentDataManager = 
_segmentDataManagerMap.get(segmentName);
     if (segmentDataManager == null) {
       addNewOnlineSegment(zkMetadata, indexLoadingConfig);
@@ -470,7 +494,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
       return;
     }
     IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
-    handleUpsertPreload(zkMetadata, indexLoadingConfig);
+    handleSegmentPreload(zkMetadata, indexLoadingConfig);
     SegmentDataManager segmentDataManager = 
_segmentDataManagerMap.get(segmentName);
     if (segmentDataManager != null) {
       _logger.warn("Segment: {} ({}) already exists, skipping adding it as 
CONSUMING segment", segmentName,
@@ -567,22 +591,29 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   private void handleDedup(ImmutableSegmentImpl immutableSegment) {
     // TODO(saurabh) refactor commons code with handleUpsert
     String segmentName = immutableSegment.getSegmentName();
-    Integer partitionGroupId =
+    _logger.info("Adding immutable segment: {} with dedup enabled", 
segmentName);
+    Integer partitionId =
         SegmentUtils.getRealtimeSegmentPartitionId(segmentName, 
_tableNameWithType, _helixManager, null);
-    Preconditions.checkNotNull(partitionGroupId,
-        String.format("PartitionGroupId is not available for segment: '%s' 
(dedup-enabled table: %s)", segmentName,
+    Preconditions.checkNotNull(partitionId,
+        String.format("PartitionId is not available for segment: '%s' 
(dedup-enabled table: %s)", segmentName,
             _tableNameWithType));
     PartitionDedupMetadataManager partitionDedupMetadataManager =
-        
_tableDedupMetadataManager.getOrCreatePartitionManager(partitionGroupId);
+        _tableDedupMetadataManager.getOrCreatePartitionManager(partitionId);
     immutableSegment.enableDedup(partitionDedupMetadataManager);
     SegmentDataManager oldSegmentManager = 
_segmentDataManagerMap.get(segmentName);
-    if (oldSegmentManager != null) {
-      LOGGER.info("Replacing mutable segment: {} with immutable segment: {} in 
partition dedup metadata manager",
-          oldSegmentManager.getSegment().getSegmentName(), segmentName);
-      
partitionDedupMetadataManager.replaceSegment(oldSegmentManager.getSegment(), 
immutableSegment);
-    } else {
-      LOGGER.info("Adding immutable segment: {} to partition dedup metadata 
manager", segmentName);
+    if (partitionDedupMetadataManager.isPreloading()) {
+      partitionDedupMetadataManager.preloadSegment(immutableSegment);
+      LOGGER.info("Preloaded immutable segment: {} with dedup enabled", 
segmentName);
+      return;
+    }
+    if (oldSegmentManager == null) {
       partitionDedupMetadataManager.addSegment(immutableSegment);
+      LOGGER.info("Added new immutable segment: {} with dedup enabled", 
segmentName);
+    } else {
+      IndexSegment oldSegment = oldSegmentManager.getSegment();
+      partitionDedupMetadataManager.replaceSegment(oldSegment, 
immutableSegment);
+      LOGGER.info("Replaced {} segment: {} with dedup enabled",
+          oldSegment instanceof ImmutableSegment ? "immutable" : "mutable", 
segmentName);
     }
   }
 
@@ -603,8 +634,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     _serverMetrics.addValueToTableGauge(_tableNameWithType, 
ServerGauge.SEGMENT_COUNT, 1L);
     ImmutableSegmentDataManager newSegmentManager = new 
ImmutableSegmentDataManager(immutableSegment);
     if (partitionUpsertMetadataManager.isPreloading()) {
-      // Preloading segment is ensured to be handled by a single thread, so no 
need to take the segment upsert lock.
-      // Besides, preloading happens before the table partition is made ready 
for any queries.
+      // Register segment after it is preloaded and has initialized its 
validDocIds. The order of preloading and
+      // registering segment doesn't matter much as preloading happens before 
table partition is ready for queries.
       partitionUpsertMetadataManager.preloadSegment(immutableSegment);
       registerSegment(segmentName, newSegmentManager, 
partitionUpsertMetadataManager);
       _logger.info("Preloaded immutable segment: {} with upsert enabled", 
segmentName);
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupPreloadIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupPreloadIntegrationTest.java
new file mode 100644
index 0000000000..c2589bb520
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupPreloadIntegrationTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.DedupConfig;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig;
+import org.apache.pinot.spi.config.table.RoutingConfig;
+import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class DedupPreloadIntegrationTest extends BaseClusterIntegrationTestSet 
{
+
+  private List<File> _avroFiles;
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    // Start a customized controller with more frequent realtime segment 
validation
+    startController();
+    startBroker();
+    startServer();
+
+    _avroFiles = unpackAvroData(_tempDir);
+    startKafka();
+    pushAvroIntoKafka(_avroFiles);
+
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createDedupTableConfig(_avroFiles.get(0), "id", 
getNumKafkaPartitions());
+    addTableConfig(tableConfig);
+
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @Override
+  protected void overrideServerConf(PinotConfiguration serverConf) {
+    
serverConf.setProperty(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX
 + ".max.segment.preload.threads",
+        "1");
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    dropRealtimeTable(getTableName());
+    stopServer();
+    stopBroker();
+    stopController();
+    stopKafka();
+    stopZk();
+    FileUtils.deleteDirectory(_tempDir);
+  }
+
+  @Override
+  protected int getRealtimeSegmentFlushSize() {
+    // Create > 1 segments
+    return 2;
+  }
+
+  @Override
+  protected String getSchemaFileName() {
+    return "dedupIngestionTestSchema.schema";
+  }
+
+  @Override
+  protected String getAvroTarFileName() {
+    return "dedupIngestionTestData.tar.gz";
+  }
+
+  @Override
+  protected String getPartitionColumn() {
+    return "id";
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    // Three distinct records are expected with pk values of 100000, 100001, 
100002
+    return 5;
+  }
+
+  @Test
+  public void testValues()
+      throws Exception {
+    assertEquals(getCurrentCountStarResult(), getCountStarResult());
+
+    // Validate the older value persist
+    for (int i = 0; i < getCountStarResult(); i++) {
+      assertEquals(
+          getPinotConnection().execute("SELECT name FROM " + getTableName() + 
" WHERE id = " + i).getResultSet(0)
+              .getString(0), "" + i);
+    }
+
+    // Restart the servers and check again
+    restartServers();
+    waitForAllDocsLoaded(600_000L);
+
+    // Validate the older value persist
+    for (int i = 0; i < getCountStarResult(); i++) {
+      assertEquals(
+          getPinotConnection().execute("SELECT name FROM " + getTableName() + 
" WHERE id = " + i).getResultSet(0)
+              .getString(0), "" + i);
+    }
+  }
+
+  @Override
+  protected TableConfig createDedupTableConfig(File sampleAvroFile, String 
primaryKeyColumn, int numPartitions) {
+    AvroFileSchemaKafkaAvroMessageDecoder._avroFile = sampleAvroFile;
+    Map<String, ColumnPartitionConfig> columnPartitionConfigMap = new 
HashMap<>();
+    columnPartitionConfigMap.put(primaryKeyColumn, new 
ColumnPartitionConfig("Murmur", numPartitions));
+
+    DedupConfig dedupConfig = new DedupConfig(true, HashFunction.NONE, null, 
null, 0, null, true);
+
+    return new 
TableConfigBuilder(TableType.REALTIME).setTableName(getTableName())
+        
.setTimeColumnName(getTimeColumnName()).setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas())
+        
.setSegmentVersion(getSegmentVersion()).setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig())
+        
.setBrokerTenant(getBrokerTenant()).setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig())
+        
.setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).setRoutingConfig(
+            new RoutingConfig(null, null, 
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+        .setSegmentPartitionConfig(new 
SegmentPartitionConfig(columnPartitionConfigMap))
+        .setReplicaGroupStrategyConfig(new 
ReplicaGroupStrategyConfig(primaryKeyColumn, 1)).setDedupConfig(dedupConfig)
+        .build();
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
index 3892d36d92..08ca8633a7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManager.java
@@ -24,14 +24,23 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 import javax.annotation.Nullable;
+import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerTimer;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.local.utils.SegmentPreloadUtils;
 import org.apache.pinot.segment.local.utils.WatermarkUtils;
+import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.segment.spi.V1Constants;
 import org.apache.pinot.spi.config.table.HashFunction;
@@ -46,6 +55,7 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
   protected final String _tableNameWithType;
   protected final List<String> _primaryKeyColumns;
   protected final int _partitionId;
+  protected final DedupContext _context;
   protected final ServerMetrics _serverMetrics;
   protected final HashFunction _hashFunction;
   protected final double _metadataTTL;
@@ -58,16 +68,21 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
   // Initialize with 1 pending operation to indicate the metadata manager can 
take more operations
   private int _numPendingOperations = 1;
   private boolean _closed;
+  // The lock and boolean flag ensure only one thread can start preloading and 
preloading happens only once.
+  private final Lock _preloadLock = new ReentrantLock();
+  private volatile boolean _isPreloading;
 
   protected BasePartitionDedupMetadataManager(String tableNameWithType, int 
partitionId, DedupContext dedupContext) {
     _tableNameWithType = tableNameWithType;
     _partitionId = partitionId;
+    _context = dedupContext;
     _primaryKeyColumns = dedupContext.getPrimaryKeyColumns();
     _hashFunction = dedupContext.getHashFunction();
-    _serverMetrics = dedupContext.getServerMetrics();
+    _isPreloading = dedupContext.isPreloadEnabled();
     _metadataTTL = dedupContext.getMetadataTTL() >= 0 ? 
dedupContext.getMetadataTTL() : 0;
     _dedupTimeColumn = dedupContext.getDedupTimeColumn();
     _tableIndexDir = dedupContext.getTableIndexDir();
+    _serverMetrics = ServerMetrics.get();
     _logger = LoggerFactory.getLogger(tableNameWithType + "-" + partitionId + 
"-" + getClass().getSimpleName());
     if (_metadataTTL > 0) {
       Preconditions.checkArgument(_dedupTimeColumn != null,
@@ -87,7 +102,58 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
   }
 
   @Override
-  public void addSegment(IndexSegment segment) {
+  public boolean isPreloading() {
+    return _isPreloading;
+  }
+
+  @Override
+  public void preloadSegments(IndexLoadingConfig indexLoadingConfig) {
+    if (!_isPreloading) {
+      return;
+    }
+    TableDataManager tableDataManager = _context.getTableDataManager();
+    Preconditions.checkNotNull(tableDataManager, "Preloading segments requires 
tableDataManager");
+    HelixManager helixManager = tableDataManager.getHelixManager();
+    ExecutorService segmentPreloadExecutor = 
tableDataManager.getSegmentPreloadExecutor();
+    // Preloading the segments for dedup table for fast metadata recovery, as 
done for upsert table.
+    _preloadLock.lock();
+    try {
+      // Check the flag again to ensure preloading happens only once.
+      if (!_isPreloading) {
+        return;
+      }
+      // From now on, the _isPreloading flag is true until the segments are 
preloaded.
+      long startTime = System.currentTimeMillis();
+      doPreloadSegments(tableDataManager, indexLoadingConfig, helixManager, 
segmentPreloadExecutor);
+      long duration = System.currentTimeMillis() - startTime;
+      _serverMetrics.addTimedTableValue(_tableNameWithType, 
ServerTimer.DEDUP_PRELOAD_TIME_MS, duration,
+          TimeUnit.MILLISECONDS);
+    } catch (Exception e) {
+      // We should continue even if preloading fails, so that segments not 
being preloaded successfully can get
+      // loaded via the normal segment loading logic as done on the Helix task 
threads.
+      _logger.warn("Failed to preload segments from partition: {} of table: 
{}, skipping", _partitionId,
+          _tableNameWithType, e);
+      _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.DEDUP_PRELOAD_FAILURE, 1);
+      if (e instanceof InterruptedException) {
+        // Restore the interrupted status in case the upper callers want to 
check.
+        Thread.currentThread().interrupt();
+      }
+    } finally {
+      _isPreloading = false;
+      _preloadLock.unlock();
+    }
+  }
+
+  // Keep this hook method for subclasses to modify the preloading logic.
+  protected void doPreloadSegments(TableDataManager tableDataManager, 
IndexLoadingConfig indexLoadingConfig,
+      HelixManager helixManager, ExecutorService segmentPreloadExecutor)
+      throws Exception {
+    SegmentPreloadUtils.preloadSegments(tableDataManager, _partitionId, 
indexLoadingConfig, helixManager,
+        segmentPreloadExecutor, null);
+  }
+
+  @Override
+  public void preloadSegment(ImmutableSegment segment) {
     String segmentName = segment.getSegmentName();
     if (segment instanceof EmptyIndexSegment) {
       _logger.info("Skip adding empty segment: {}", segmentName);
@@ -97,21 +163,58 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
         "Got unsupported segment implementation: %s for segment: %s, table: 
%s", segment.getClass(), segmentName,
         _tableNameWithType);
     if (!startOperation()) {
-      _logger.info("Skip adding segment: {} because dedup metadata manager is 
already stopped",
-          segment.getSegmentName());
+      _logger.info("Skip preloading segment: {} because dedup metadata manager 
is already stopped", segmentName);
       return;
     }
     try {
-      addOrReplaceSegment(null, segment);
+      if (skipSegmentOutOfTTL(segment, true)) {
+        return;
+      }
+      try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new 
DedupUtils.DedupRecordInfoReader(segment,
+          _primaryKeyColumns, _dedupTimeColumn)) {
+        Iterator<DedupRecordInfo> dedupRecordInfoIterator =
+            DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 
segment.getSegmentMetadata().getTotalDocs());
+        doPreloadSegment(segment, dedupRecordInfoIterator);
+        updatePrimaryKeyGauge();
+      }
     } catch (Exception e) {
       throw new RuntimeException(
-          String.format("Caught exception while adding segment: %s of table: 
%s to %s", segment.getSegmentName(),
+          String.format("Caught exception while preloading segment: %s of 
table: %s in %s", segmentName,
               _tableNameWithType, this.getClass().getSimpleName()), e);
     } finally {
       finishOperation();
     }
   }
 
+  protected abstract void doPreloadSegment(ImmutableSegment segment, 
Iterator<DedupRecordInfo> dedupRecordInfoIterator);
+
+  @Override
+  public void addSegment(IndexSegment segment) {
+    String segmentName = segment.getSegmentName();
+    if (segment instanceof EmptyIndexSegment) {
+      _logger.info("Skip adding empty segment: {}", segmentName);
+      return;
+    }
+    Preconditions.checkArgument(segment instanceof ImmutableSegmentImpl,
+        "Got unsupported segment implementation: %s for segment: %s, table: 
%s", segment.getClass(), segmentName,
+        _tableNameWithType);
+    if (!startOperation()) {
+      _logger.info("Skip adding segment: {} because dedup metadata manager is 
already stopped", segmentName);
+      return;
+    }
+    try {
+      if (!skipSegmentOutOfTTL(segment, true)) {
+        addOrReplaceSegment(null, segment);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(
+          String.format("Caught exception while adding segment: %s of table: 
%s to %s", segmentName, _tableNameWithType,
+              this.getClass().getSimpleName()), e);
+    } finally {
+      finishOperation();
+    }
+  }
+
   @Override
   public void replaceSegment(IndexSegment oldSegment, IndexSegment newSegment) 
{
     if (!startOperation()) {
@@ -120,7 +223,9 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
       return;
     }
     try {
-      addOrReplaceSegment(oldSegment, newSegment);
+      if (!skipSegmentOutOfTTL(newSegment, true)) {
+        addOrReplaceSegment(oldSegment, newSegment);
+      }
     } catch (Exception e) {
       throw new RuntimeException(
           String.format("Caught exception while replacing segment: %s with 
segment: %s of table: %s in %s",
@@ -131,19 +236,27 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
     }
   }
 
-  private void addOrReplaceSegment(@Nullable IndexSegment oldSegment, 
IndexSegment newSegment)
-      throws IOException {
-    // If metadataTTL is enabled, we can skip adding dedup metadata for 
segment that's already out of the TTL.
-    if (_metadataTTL > 0) {
-      double maxDedupTime = getMaxDedupTime(newSegment);
+  protected boolean skipSegmentOutOfTTL(IndexSegment segment, boolean 
updateWatermark) {
+    if (_metadataTTL <= 0) {
+      return false;
+    }
+    // If metadataTTL is enabled, we can skip adding dedup metadata for 
segment already out of the TTL. Different
+    // from upsert table, there is no need to initialize things like 
validDocIds bitmap for those skipped segments.
+    double maxDedupTime = getMaxDedupTime(segment);
+    if (updateWatermark) {
       _largestSeenTime.getAndUpdate(time -> Math.max(time, maxDedupTime));
-      if (isOutOfMetadataTTL(maxDedupTime)) {
-        String action = oldSegment == null ? "adding" : "replacing";
-        _logger.info("Skip {} segment: {} as max dedupTime: {} is out of TTL: 
{}", action, newSegment.getSegmentName(),
-            maxDedupTime, _metadataTTL);
-        return;
-      }
     }
+    if (!isOutOfMetadataTTL(maxDedupTime)) {
+      return false;
+    }
+    _logger.info("Skip segment: {} as max dedupTime: {} is out of TTL: {}", 
segment.getSegmentName(), maxDedupTime,
+        _metadataTTL);
+    // Return true if skipped. Boolean value allows subclasses to disable 
skipping.
+    return true;
+  }
+
+  private void addOrReplaceSegment(@Nullable IndexSegment oldSegment, 
IndexSegment newSegment)
+      throws IOException {
     try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new 
DedupUtils.DedupRecordInfoReader(newSegment,
         _primaryKeyColumns, _dedupTimeColumn)) {
       Iterator<DedupRecordInfo> dedupRecordInfoIterator =
@@ -169,21 +282,17 @@ public abstract class BasePartitionDedupMetadataManager 
implements PartitionDedu
       _logger.info("Skip removing segment: {} because metadata manager is 
already stopped", segment.getSegmentName());
       return;
     }
-    // Skip removing the dedup metadata of segment out of TTL. The expired 
metadata is removed in batches.
-    if (_metadataTTL > 0) {
-      double maxDedupTime = getMaxDedupTime(segment);
-      if (isOutOfMetadataTTL(maxDedupTime)) {
-        _logger.info("Skip removing segment: {} as max dedupTime: {} is out of 
TTL: {}", segment.getSegmentName(),
-            maxDedupTime, _metadataTTL);
+    try {
+      if (skipSegmentOutOfTTL(segment, false)) {
         return;
       }
-    }
-    try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new 
DedupUtils.DedupRecordInfoReader(segment,
-        _primaryKeyColumns, _dedupTimeColumn)) {
-      Iterator<DedupRecordInfo> dedupRecordInfoIterator =
-          DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 
segment.getSegmentMetadata().getTotalDocs());
-      doRemoveSegment(segment, dedupRecordInfoIterator);
-      updatePrimaryKeyGauge();
+      try (DedupUtils.DedupRecordInfoReader dedupRecordInfoReader = new 
DedupUtils.DedupRecordInfoReader(segment,
+          _primaryKeyColumns, _dedupTimeColumn)) {
+        Iterator<DedupRecordInfo> dedupRecordInfoIterator =
+            DedupUtils.getDedupRecordInfoIterator(dedupRecordInfoReader, 
segment.getSegmentMetadata().getTotalDocs());
+        doRemoveSegment(segment, dedupRecordInfoIterator);
+        updatePrimaryKeyGauge();
+      }
     } catch (Exception e) {
       throw new RuntimeException(
           String.format("Caught exception while removing segment: %s of table: 
%s from %s", segment.getSegmentName(),
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
index 80639ebd5e..8172bb86f3 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.segment.local.dedup;
 
 import com.google.common.base.Preconditions;
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
@@ -27,14 +28,20 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.spi.config.table.DedupConfig;
+import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 public abstract class BaseTableDedupMetadataManager implements 
TableDedupMetadataManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseTableDedupMetadataManager.class);
+
   protected final Map<Integer, PartitionDedupMetadataManager> 
_partitionMetadataManagerMap = new ConcurrentHashMap<>();
   protected String _tableNameWithType;
   protected DedupContext _dedupContext;
+  private boolean _enablePreload;
 
   @Override
   public void init(TableConfig tableConfig, Schema schema, TableDataManager 
tableDataManager,
@@ -57,19 +64,18 @@ public abstract class BaseTableDedupMetadataManager 
implements TableDedupMetadat
           "When metadataTTL is configured, metadata time column or time column 
must be configured for "
               + "dedup enabled table: %s", _tableNameWithType);
     }
-
+    _enablePreload = dedupConfig.isEnablePreload() && 
tableDataManager.getSegmentPreloadExecutor() != null;
+    HashFunction hashFunction = dedupConfig.getHashFunction();
+    File tableIndexDir = tableDataManager.getTableDataDir();
     DedupContext.Builder dedupContextBuider = new DedupContext.Builder();
-    dedupContextBuider
-        .setTableConfig(tableConfig)
-        .setSchema(schema)
-        .setPrimaryKeyColumns(primaryKeyColumns)
-        .setHashFunction(dedupConfig.getHashFunction())
-        .setMetadataTTL(metadataTTL)
-        .setDedupTimeColumn(dedupTimeColumn)
-        .setTableIndexDir(tableDataManager.getTableDataDir())
-        .setTableDataManager(tableDataManager)
-        .setServerMetrics(serverMetrics);
+    
dedupContextBuider.setTableConfig(tableConfig).setSchema(schema).setPrimaryKeyColumns(primaryKeyColumns)
+        
.setHashFunction(hashFunction).setEnablePreload(_enablePreload).setMetadataTTL(metadataTTL)
+        
.setDedupTimeColumn(dedupTimeColumn).setTableIndexDir(tableIndexDir).setTableDataManager(tableDataManager);
     _dedupContext = dedupContextBuider.build();
+    LOGGER.info(
+        "Initialized {} for table: {} with primary key columns: {}, hash 
function: {}, enable preload: {}, metadata "
+            + "TTL: {}, dedup time column: {}, table index dir: {}", 
getClass().getSimpleName(), _tableNameWithType,
+        primaryKeyColumns, hashFunction, _enablePreload, metadataTTL, 
dedupTimeColumn, tableIndexDir);
 
     initCustomVariables();
   }
@@ -89,6 +95,11 @@ public abstract class BaseTableDedupMetadataManager 
implements TableDedupMetadat
   protected void initCustomVariables() {
   }
 
+  @Override
+  public boolean isEnablePreload() {
+    return _enablePreload;
+  }
+
   @Override
   public void stop() {
     for (PartitionDedupMetadataManager metadataManager : 
_partitionMetadataManagerMap.values()) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
index 4461266e35..b4ef9ca63a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManager.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.segment.local.utils.HashUtils;
+import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 
 
@@ -38,6 +39,16 @@ class ConcurrentMapPartitionDedupMetadataManager extends 
BasePartitionDedupMetad
     super(tableNameWithType, partitionId, dedupContext);
   }
 
+  @Override
+  protected void doPreloadSegment(ImmutableSegment segment, 
Iterator<DedupRecordInfo> dedupRecordInfoIterator) {
+    while (dedupRecordInfoIterator.hasNext()) {
+      DedupRecordInfo dedupRecordInfo = dedupRecordInfoIterator.next();
+      double dedupTime = dedupRecordInfo.getDedupTime();
+      
_primaryKeyToSegmentAndTimeMap.put(HashUtils.hashPrimaryKey(dedupRecordInfo.getPrimaryKey(),
 _hashFunction),
+          Pair.of(segment, dedupTime));
+    }
+  }
+
   @Override
   protected void doAddOrReplaceSegment(IndexSegment oldSegment, IndexSegment 
newSegment,
       Iterator<DedupRecordInfo> dedupRecordInfoIteratorOfNewSegment) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
index a523f26957..4407676ad7 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
@@ -22,7 +22,6 @@ import com.google.common.base.Preconditions;
 import java.io.File;
 import java.util.List;
 import org.apache.commons.collections4.CollectionUtils;
-import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.spi.config.table.HashFunction;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -34,24 +33,24 @@ public class DedupContext {
   private final Schema _schema;
   private final List<String> _primaryKeyColumns;
   private final HashFunction _hashFunction;
+  private final boolean _enablePreload;
   private final double _metadataTTL;
   private final String _dedupTimeColumn;
   private final File _tableIndexDir;
   private final TableDataManager _tableDataManager;
-  private final ServerMetrics _serverMetrics;
 
   private DedupContext(TableConfig tableConfig, Schema schema, List<String> 
primaryKeyColumns,
-      HashFunction hashFunction, double metadataTTL, String dedupTimeColumn, 
File tableIndexDir,
-      TableDataManager tableDataManager, ServerMetrics serverMetrics) {
+      HashFunction hashFunction, boolean enablePreload, double metadataTTL, 
String dedupTimeColumn, File tableIndexDir,
+      TableDataManager tableDataManager) {
     _tableConfig = tableConfig;
     _schema = schema;
     _primaryKeyColumns = primaryKeyColumns;
     _hashFunction = hashFunction;
+    _enablePreload = enablePreload;
     _metadataTTL = metadataTTL;
     _dedupTimeColumn = dedupTimeColumn;
     _tableIndexDir = tableIndexDir;
     _tableDataManager = tableDataManager;
-    _serverMetrics = serverMetrics;
   }
 
   public TableConfig getTableConfig() {
@@ -70,6 +69,10 @@ public class DedupContext {
     return _hashFunction;
   }
 
+  public boolean isPreloadEnabled() {
+    return _enablePreload;
+  }
+
   public double getMetadataTTL() {
     return _metadataTTL;
   }
@@ -86,20 +89,16 @@ public class DedupContext {
     return _tableDataManager;
   }
 
-  public ServerMetrics getServerMetrics() {
-    return _serverMetrics;
-  }
-
   public static class Builder {
     private TableConfig _tableConfig;
     private Schema _schema;
     private List<String> _primaryKeyColumns;
     private HashFunction _hashFunction;
+    private boolean _enablePreload;
     private double _metadataTTL;
     private String _dedupTimeColumn;
     private File _tableIndexDir;
     private TableDataManager _tableDataManager;
-    private ServerMetrics _serverMetrics;
 
     public Builder setTableConfig(TableConfig tableConfig) {
       _tableConfig = tableConfig;
@@ -121,6 +120,11 @@ public class DedupContext {
       return this;
     }
 
+    public Builder setEnablePreload(boolean enablePreload) {
+      _enablePreload = enablePreload;
+      return this;
+    }
+
     public Builder setMetadataTTL(double metadataTTL) {
       _metadataTTL = metadataTTL;
       return this;
@@ -141,19 +145,14 @@ public class DedupContext {
       return this;
     }
 
-    public Builder setServerMetrics(ServerMetrics serverMetrics) {
-      _serverMetrics = serverMetrics;
-      return this;
-    }
-
     public DedupContext build() {
       Preconditions.checkState(_tableConfig != null, "Table config must be 
set");
       Preconditions.checkState(_schema != null, "Schema must be set");
       Preconditions.checkState(CollectionUtils.isNotEmpty(_primaryKeyColumns), 
"Primary key columns must be set");
       Preconditions.checkState(_hashFunction != null, "Hash function must be 
set");
       Preconditions.checkState(_tableIndexDir != null, "Table index directory 
must be set");
-      return new DedupContext(_tableConfig, _schema, _primaryKeyColumns, 
_hashFunction, _metadataTTL, _dedupTimeColumn,
-          _tableIndexDir, _tableDataManager, _serverMetrics);
+      return new DedupContext(_tableConfig, _schema, _primaryKeyColumns, 
_hashFunction, _enablePreload, _metadataTTL,
+          _dedupTimeColumn, _tableIndexDir, _tableDataManager);
     }
   }
 }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
index 835ce6dfa7..ff2667ec14 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/PartitionDedupMetadataManager.java
@@ -19,6 +19,8 @@
 package org.apache.pinot.segment.local.dedup;
 
 import java.io.Closeable;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
 import org.apache.pinot.spi.data.readers.PrimaryKey;
 
@@ -37,6 +39,22 @@ public interface PartitionDedupMetadataManager extends 
Closeable {
     addSegment(newSegment);
   }
 
+  /**
+   * Preload segments for the table partition. Segments can be added 
differently during preloading.
+   * TODO: As commented in PartitionUpsertMetadataManager, revisit this method 
and see if we can use the same
+   *       IndexLoadingConfig for all segments. Tier info might be different 
for different segments.
+   */
+  void preloadSegments(IndexLoadingConfig indexLoadingConfig);
+
+  boolean isPreloading();
+
+  /**
+   * Different from adding a segment, when preloading a segment, the dedup 
metadata may be updated more efficiently.
+   * Basically the dedup metadata can be directly updated for each primary 
key, without doing the more costly
+   * read-compare-update.
+   */
+  void preloadSegment(ImmutableSegment immutableSegment);
+
   /**
    * Removes the dedup metadata for the given segment.
    */
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
index 5c0bd1d830..949f7ab669 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
@@ -36,6 +36,8 @@ public interface TableDedupMetadataManager extends Closeable {
    */
   PartitionDedupMetadataManager getOrCreatePartitionManager(int partitionId);
 
+  boolean isEnablePreload();
+
   /**
    * Stops the metadata manager. After invoking this method, no access to the 
metadata will be accepted.
    */
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java
new file mode 100644
index 0000000000..475d4a3929
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/BasePartitionDedupMetadataManagerTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import java.io.IOException;
+import java.util.Iterator;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class BasePartitionDedupMetadataManagerTest {
+  @Test
+  public void testPreloadSegments()
+      throws IOException {
+    String realtimeTableName = "testTable_REALTIME";
+    DedupContext dedupContext = mock(DedupContext.class);
+    when(dedupContext.isPreloadEnabled()).thenReturn(true);
+    TableDataManager tableDataManager = mock(TableDataManager.class);
+    when(dedupContext.getTableDataManager()).thenReturn(tableDataManager);
+    IndexLoadingConfig indexLoadingConfig = mock(IndexLoadingConfig.class);
+    
when(indexLoadingConfig.getTableConfig()).thenReturn(mock(TableConfig.class));
+
+    try (DummyPartitionDedupMetadataManager dedupMetadataManager = new 
DummyPartitionDedupMetadataManager(
+        realtimeTableName, 0, dedupContext)) {
+      assertTrue(dedupMetadataManager.isPreloading());
+      dedupMetadataManager.preloadSegments(indexLoadingConfig);
+      assertFalse(dedupMetadataManager.isPreloading());
+      dedupMetadataManager.stop();
+    }
+  }
+
+  private static class DummyPartitionDedupMetadataManager extends 
BasePartitionDedupMetadataManager {
+
+    protected DummyPartitionDedupMetadataManager(String tableNameWithType, int 
partitionId, DedupContext context) {
+      super(tableNameWithType, partitionId, context);
+    }
+
+    @Override
+    protected void doPreloadSegment(ImmutableSegment segment, 
Iterator<DedupRecordInfo> dedupRecordInfoIterator) {
+    }
+
+    @Override
+    protected void doAddOrReplaceSegment(@Nullable IndexSegment oldSegment, 
IndexSegment newSegment,
+        Iterator<DedupRecordInfo> dedupRecordInfoIteratorOfNewSegment) {
+    }
+
+    @Override
+    protected void doRemoveSegment(IndexSegment segment, 
Iterator<DedupRecordInfo> dedupRecordInfoIterator) {
+    }
+
+    @Override
+    protected void doRemoveExpiredPrimaryKeys() {
+    }
+
+    @Override
+    protected long getNumPrimaryKeys() {
+      return 0;
+    }
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
index c0697eb4c3..2823d68d34 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithTTLTest.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.TreeMap;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
@@ -67,8 +66,7 @@ public class 
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
     
_dedupContextBuilder.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
         
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setMetadataTTL(METADATA_TTL)
         
.setDedupTimeColumn(DEDUP_TIME_COLUMN_NAME).setTableIndexDir(mock(File.class))
-        
.setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class))
-        .setTableIndexDir(TEMP_DIR);
+        
.setTableDataManager(mock(TableDataManager.class)).setTableIndexDir(TEMP_DIR);
   }
 
   @AfterMethod
@@ -81,8 +79,7 @@ public class 
ConcurrentMapPartitionDedupMetadataManagerWithTTLTest {
     DedupContext.Builder dedupContextBuider = new DedupContext.Builder();
     
dedupContextBuider.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
         
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setHashFunction(HashFunction.NONE).setMetadataTTL(1)
-        
.setDedupTimeColumn(null).setTableIndexDir(mock(File.class)).setTableDataManager(mock(TableDataManager.class))
-        .setServerMetrics(mock(ServerMetrics.class));
+        
.setDedupTimeColumn(null).setTableIndexDir(mock(File.class)).setTableDataManager(mock(TableDataManager.class));
     DedupContext dedupContext = dedupContextBuider.build();
     assertThrows(IllegalArgumentException.class,
         () -> new 
ConcurrentMapPartitionDedupMetadataManager(DedupTestUtils.REALTIME_TABLE_NAME, 
0, dedupContext));
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
index f7eadeaf09..90c68f1f42 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
-import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
@@ -58,8 +57,7 @@ public class 
ConcurrentMapPartitionDedupMetadataManagerWithoutTTLTest {
     _dedupContextBuilder = new DedupContext.Builder();
     
_dedupContextBuilder.setTableConfig(mock(TableConfig.class)).setSchema(mock(Schema.class))
         
.setPrimaryKeyColumns(List.of("primaryKeyColumn")).setTableIndexDir(mock(File.class))
-        
.setTableDataManager(mock(TableDataManager.class)).setServerMetrics(mock(ServerMetrics.class))
-        .setTableIndexDir(TEMP_DIR);
+        
.setTableDataManager(mock(TableDataManager.class)).setTableIndexDir(TEMP_DIR);
   }
 
   @AfterMethod
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java
new file mode 100644
index 0000000000..f3247c8227
--- /dev/null
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.dedup;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.spi.config.table.DedupConfig;
+import org.apache.pinot.spi.config.table.HashFunction;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+
+
+public class TableDedupMetadataManagerFactoryTest {
+  @Test
+  public void testEnablePreload() {
+    DedupConfig dedupConfig =
+        new DedupConfig(true, HashFunction.MD5, null, Collections.emptyMap(), 
10, "timeCol", true);
+    Schema schema =
+        new 
Schema.SchemaBuilder().setSchemaName("mytable").addSingleValueDimension("myCol",
 FieldSpec.DataType.STRING)
+            .setPrimaryKeyColumns(Lists.newArrayList("myCol")).build();
+    TableConfig tableConfig =
+        new 
TableConfigBuilder(TableType.REALTIME).setTableName("mytable").setDedupConfig(dedupConfig).build();
+
+    // Preloading is not enabled as there is no preloading thread.
+    TableDataManager tableDataManager = mock(TableDataManager.class);
+    when(tableDataManager.getTableDataDir()).thenReturn(new File("mytable"));
+    when(tableDataManager.getSegmentPreloadExecutor()).thenReturn(null);
+    TableDedupMetadataManager tableDedupMetadataManager =
+        TableDedupMetadataManagerFactory.create(tableConfig, schema, 
tableDataManager, null);
+    assertNotNull(tableDedupMetadataManager);
+    assertFalse(tableDedupMetadataManager.isEnablePreload());
+
+    // Enabled as enablePreload is true and there is preloading thread.
+    tableDataManager = mock(TableDataManager.class);
+    when(tableDataManager.getTableDataDir()).thenReturn(new File("mytable"));
+    
when(tableDataManager.getSegmentPreloadExecutor()).thenReturn(mock(ExecutorService.class));
+    tableDedupMetadataManager = 
TableDedupMetadataManagerFactory.create(tableConfig, schema, tableDataManager, 
null);
+    assertNotNull(tableDedupMetadataManager);
+  }
+}
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
index 8c4594dc91..b4544979e3 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupeTest.java
@@ -65,7 +65,7 @@ public class MutableSegmentDedupeTest {
         .setDedupConfig(new DedupConfig(dedupEnabled, 
HashFunction.NONE)).build();
     CompositeTransformer recordTransformer = 
CompositeTransformer.getDefaultTransformer(tableConfig, schema);
     File jsonFile = new File(dataResourceUrl.getFile());
-    DedupConfig dedupConfig = new DedupConfig(true, HashFunction.NONE, null, 
null, metadataTTL, dedupTimeColumn);
+    DedupConfig dedupConfig = new DedupConfig(true, HashFunction.NONE, null, 
null, metadataTTL, dedupTimeColumn, false);
     PartitionDedupMetadataManager partitionDedupMetadataManager =
         (dedupEnabled) ? getTableDedupMetadataManager(schema, 
dedupConfig).getOrCreatePartitionManager(0) : null;
     _mutableSegmentImpl =
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
index 78d7b3b9f0..dfc8151e35 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
@@ -24,6 +24,7 @@ import 
com.fasterxml.jackson.annotation.JsonPropertyDescription;
 import java.util.Map;
 import org.apache.pinot.spi.config.BaseJsonConfig;
 
+
 public class DedupConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Whether dedup is enabled or not.")
   private final boolean _dedupEnabled;
@@ -43,9 +44,12 @@ public class DedupConfig extends BaseJsonConfig {
       + " from the table config will be used.")
   private final String _dedupTimeColumn;
 
+  @JsonPropertyDescription("Whether to preload segments for fast dedup 
metadata recovery")
+  private final boolean _enablePreload;
+
   public DedupConfig(@JsonProperty(value = "dedupEnabled", required = true) 
boolean dedupEnabled,
       @JsonProperty(value = "hashFunction") HashFunction hashFunction) {
-    this(dedupEnabled, hashFunction, null, null, 0, null);
+    this(dedupEnabled, hashFunction, null, null, 0, null, false);
   }
 
   @JsonCreator
@@ -54,13 +58,15 @@ public class DedupConfig extends BaseJsonConfig {
       @JsonProperty(value = "metadataManagerClass") String 
metadataManagerClass,
       @JsonProperty(value = "metadataManagerConfigs") Map<String, String> 
metadataManagerConfigs,
       @JsonProperty(value = "metadataTTL") double metadataTTL,
-      @JsonProperty(value = "dedupTimeColumn") String dedupTimeColumn) {
+      @JsonProperty(value = "dedupTimeColumn") String dedupTimeColumn,
+      @JsonProperty(value = "enablePreload") boolean enablePreload) {
     _dedupEnabled = dedupEnabled;
     _hashFunction = hashFunction == null ? HashFunction.NONE : hashFunction;
     _metadataManagerClass = metadataManagerClass;
     _metadataManagerConfigs = metadataManagerConfigs;
     _metadataTTL = metadataTTL;
     _dedupTimeColumn = dedupTimeColumn;
+    _enablePreload = enablePreload;
   }
 
   public HashFunction getHashFunction() {
@@ -86,4 +92,8 @@ public class DedupConfig extends BaseJsonConfig {
   public String getDedupTimeColumn() {
     return _dedupTimeColumn;
   }
+
+  public boolean isEnablePreload() {
+    return _enablePreload;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to