This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ee84c9de1 Add a config to skip updating dedup metadata for 
non-default tier segments (#15576)
9ee84c9de1 is described below

commit 9ee84c9de13cd32ed6d4f0bfc938f292737c0016
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Wed Apr 16 19:05:40 2025 -0600

    Add a config to skip updating dedup metadata for non-default tier segments 
(#15576)
---
 .../core/data/manager/BaseTableDataManager.java     | 10 +++++-----
 .../manager/offline/DimensionTableDataManager.java  |  5 +++--
 .../manager/realtime/RealtimeTableDataManager.java  | 11 +++++++----
 .../local/data/manager/TableDataManager.java        | 18 +++++++++++++++++-
 .../local/dedup/BaseTableDedupMetadataManager.java  |  4 ++++
 .../pinot/segment/local/dedup/DedupContext.java     | 21 +++++++++++++++++----
 .../apache/pinot/spi/config/table/DedupConfig.java  | 13 +++++++++++++
 .../org/apache/pinot/spi/utils/CommonConstants.java |  1 +
 8 files changed, 67 insertions(+), 16 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index c7773c8b09..67088db6ab 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -324,7 +324,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
    * @param immutableSegment Immutable segment to add
    */
   @Override
-  public void addSegment(ImmutableSegment immutableSegment) {
+  public void addSegment(ImmutableSegment immutableSegment, @Nullable 
SegmentZKMetadata zkMetadata) {
     String segmentName = immutableSegment.getSegmentName();
     Preconditions.checkState(!_shutDown, "Table data manager is already shut 
down, cannot add segment: %s to table: %s",
         segmentName, _tableNameWithType);
@@ -437,7 +437,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     String segmentName = zkMetadata.getSegmentName();
     _logger.info("Downloading and loading segment: {}", segmentName);
     File indexDir = downloadSegment(zkMetadata);
-    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
_segmentOperationsThrottler));
+    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
_segmentOperationsThrottler), zkMetadata);
     _logger.info("Downloaded and loaded segment: {} with CRC: {} on tier: {}", 
segmentName, zkMetadata.getCrc(),
         TierConfigUtils.normalizeTierName(zkMetadata.getTier()));
   }
@@ -817,7 +817,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
           _logger.info("Reloading segment: {} using existing segment directory 
as no reprocessing needed", segmentName);
           // No reprocessing needed, reuse the same segment
           ImmutableSegment segment = 
ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig);
-          addSegment(segment);
+          addSegment(segment, zkMetadata);
           return;
         }
         // Create backup directory to handle failure of segment reloading.
@@ -838,7 +838,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       _logger.info("Loading segment: {} from indexDir: {} to tier: {}", 
segmentName, indexDir,
           TierConfigUtils.normalizeTierName(zkMetadata.getTier()));
       ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, 
indexLoadingConfig, _segmentOperationsThrottler);
-      addSegment(segment);
+      addSegment(segment, zkMetadata);
 
       // Remove backup directory to mark the completion of segment reloading.
       removeBackup(indexDir);
@@ -1217,7 +1217,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
         segmentDirectory = initSegmentDirectory(segmentName, 
String.valueOf(zkMetadata.getCrc()), indexLoadingConfig);
       }
       ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, 
indexLoadingConfig);
-      addSegment(segment);
+      addSegment(segment, zkMetadata);
       _logger.info("Loaded existing segment: {} with CRC: {} on tier: {}", 
segmentName, zkMetadata.getCrc(),
           TierConfigUtils.normalizeTierName(segmentTier));
       return true;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index 442c62567c..0ac31abc3b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
@@ -142,8 +143,8 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
   }
 
   @Override
-  public void addSegment(ImmutableSegment immutableSegment) {
-    super.addSegment(immutableSegment);
+  public void addSegment(ImmutableSegment immutableSegment, @Nullable 
SegmentZKMetadata zkMetadata) {
+    super.addSegment(immutableSegment, zkMetadata);
     String segmentName = immutableSegment.getSegmentName();
     if (loadLookupTable()) {
       _logger.info("Successfully loaded lookup table after adding segment: 
{}", segmentName);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 7471a5b69f..375e518f1b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -688,19 +688,22 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   }
 
   @Override
-  public void addSegment(ImmutableSegment immutableSegment) {
+  public void addSegment(ImmutableSegment immutableSegment, @Nullable 
SegmentZKMetadata zkMetadata) {
     String segmentName = immutableSegment.getSegmentName();
     Preconditions.checkState(!_shutDown, "Table data manager is already shut 
down, cannot add segment: %s to table: %s",
         segmentName, _tableNameWithType);
+
     if (isUpsertEnabled()) {
       handleUpsert(immutableSegment);
       return;
     }
 
-    if (isDedupEnabled() && immutableSegment instanceof ImmutableSegmentImpl) {
+    if (_tableDedupMetadataManager != null && immutableSegment instanceof 
ImmutableSegmentImpl && (zkMetadata == null
+        || zkMetadata.getTier() == null || 
!_tableDedupMetadataManager.getContext().isIgnoreNonDefaultTiers())) {
       handleDedup((ImmutableSegmentImpl) immutableSegment);
     }
-    super.addSegment(immutableSegment);
+
+    super.addSegment(immutableSegment, zkMetadata);
   }
 
   private void handleDedup(ImmutableSegmentImpl immutableSegment) {
@@ -821,7 +824,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     // Get a new index loading config with latest table config and schema to 
load the segment
     IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig();
     indexLoadingConfig.setSegmentTier(zkMetadata.getTier());
-    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
_segmentOperationsThrottler));
+    addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, 
_segmentOperationsThrottler), zkMetadata);
     _logger.info("Downloaded and replaced CONSUMING segment: {}", segmentName);
   }
 
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index e8351e5cbe..727dcf6c92 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -94,10 +94,26 @@ public interface TableDataManager {
 
   /**
    * Adds a loaded immutable segment into the table.
+   * See {@link #addSegment(ImmutableSegment, SegmentZKMetadata)} for details.
+   */
+  @VisibleForTesting
+  default void addSegment(ImmutableSegment immutableSegment) {
+    addSegment(immutableSegment, null);
+  }
+
+  /**
+   * Adds a loaded immutable segment into the table.
+   * <p>If one segment already exists with the same name, replaces it with the 
new one.
+   * <p>Ensures that reference count of the old segment (if replaced) is 
reduced by 1, so that the last user of the old
+   * segment (or the calling thread, if there are none) remove the segment.
+   * <p>The new segment is added with reference count of 1, so that is never 
removed until a drop command comes through.
+   * <p>Segment ZK metadata might not be available when replacing a CONSUMING 
segment with the locally sealed one or
+   * invoked from tests.
+   *
    * NOTE: This method is not designed to be directly used by the production 
code, but can be handy to set up tests.
    */
   @VisibleForTesting
-  void addSegment(ImmutableSegment immutableSegment);
+  void addSegment(ImmutableSegment immutableSegment, @Nullable 
SegmentZKMetadata zkMetadata);
 
   /**
    * Adds an ONLINE segment into a table.
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
index f2487aa715..e2c8f986e2 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
@@ -75,6 +75,9 @@ public abstract class BaseTableDedupMetadataManager 
implements TableDedupMetadat
       }
     }
 
+    boolean ignoreNonDefaultTiers = dedupConfig.getIgnoreNonDefaultTiers()
+        .isEnabled(() -> 
instanceDedupConfig.getProperty(Dedup.DEFAULT_IGNORE_NON_DEFAULT_TIERS, false));
+
     // NOTE: This field doesn't follow enablement override, and always enabled 
if enabled at instance level.
     boolean allowDedupConsumptionDuringCommit = 
dedupConfig.isAllowDedupConsumptionDuringCommit();
     if (!allowDedupConsumptionDuringCommit) {
@@ -91,6 +94,7 @@ public abstract class BaseTableDedupMetadataManager 
implements TableDedupMetadat
         .setMetadataTTL(metadataTTL)
         .setDedupTimeColumn(dedupTimeColumn)
         .setEnablePreload(enablePreload)
+        .setIgnoreNonDefaultTiers(ignoreNonDefaultTiers)
         .setMetadataManagerConfigs(dedupConfig.getMetadataManagerConfigs())
         
.setAllowDedupConsumptionDuringCommit(allowDedupConsumptionDuringCommit)
         .build();
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
index fb0ba48c6a..687fe9559d 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java
@@ -40,6 +40,7 @@ public class DedupContext {
   @Nullable
   private final String _dedupTimeColumn;
   private final boolean _enablePreload;
+  private final boolean _ignoreNonDefaultTiers;
   @Nullable
   private final Map<String, String> _metadataManagerConfigs;
 
@@ -54,8 +55,8 @@ public class DedupContext {
 
   private DedupContext(TableConfig tableConfig, Schema schema, List<String> 
primaryKeyColumns,
       HashFunction hashFunction, double metadataTTL, @Nullable String 
dedupTimeColumn, boolean enablePreload,
-      @Nullable Map<String, String> metadataManagerConfigs, boolean 
allowDedupConsumptionDuringCommit,
-      @Nullable TableDataManager tableDataManager, File tableIndexDir) {
+      boolean ignoreNonDefaultTiers, @Nullable Map<String, String> 
metadataManagerConfigs,
+      boolean allowDedupConsumptionDuringCommit, @Nullable TableDataManager 
tableDataManager, File tableIndexDir) {
     _tableConfig = tableConfig;
     _schema = schema;
     _primaryKeyColumns = primaryKeyColumns;
@@ -63,6 +64,7 @@ public class DedupContext {
     _metadataTTL = metadataTTL;
     _dedupTimeColumn = dedupTimeColumn;
     _enablePreload = enablePreload;
+    _ignoreNonDefaultTiers = ignoreNonDefaultTiers;
     _metadataManagerConfigs = metadataManagerConfigs;
     _allowDedupConsumptionDuringCommit = allowDedupConsumptionDuringCommit;
     _tableDataManager = tableDataManager;
@@ -98,6 +100,10 @@ public class DedupContext {
     return _enablePreload;
   }
 
+  public boolean isIgnoreNonDefaultTiers() {
+    return _ignoreNonDefaultTiers;
+  }
+
   @Nullable
   public Map<String, String> getMetadataManagerConfigs() {
     return _metadataManagerConfigs;
@@ -125,6 +131,7 @@ public class DedupContext {
         .append("metadataTTL", _metadataTTL)
         .append("dedupTimeColumn", _dedupTimeColumn)
         .append("enablePreload", _enablePreload)
+        .append("ignoreNonDefaultTiers", _ignoreNonDefaultTiers)
         .append("metadataManagerConfigs", _metadataManagerConfigs)
         .append("allowDedupConsumptionDuringCommit", 
_allowDedupConsumptionDuringCommit)
         .append("tableIndexDir", _tableIndexDir)
@@ -139,6 +146,7 @@ public class DedupContext {
     private double _metadataTTL;
     private String _dedupTimeColumn;
     private boolean _enablePreload;
+    private boolean _ignoreNonDefaultTiers;
     private Map<String, String> _metadataManagerConfigs;
     @Deprecated
     private boolean _allowDedupConsumptionDuringCommit;
@@ -180,6 +188,11 @@ public class DedupContext {
       return this;
     }
 
+    public Builder setIgnoreNonDefaultTiers(boolean ignoreNonDefaultTiers) {
+      _ignoreNonDefaultTiers = ignoreNonDefaultTiers;
+      return this;
+    }
+
     public Builder setMetadataManagerConfigs(Map<String, String> 
metadataManagerConfigs) {
       _metadataManagerConfigs = metadataManagerConfigs;
       return this;
@@ -211,8 +224,8 @@ public class DedupContext {
         _tableIndexDir = _tableDataManager.getTableDataDir();
       }
       return new DedupContext(_tableConfig, _schema, _primaryKeyColumns, 
_hashFunction, _metadataTTL, _dedupTimeColumn,
-          _enablePreload, _metadataManagerConfigs, 
_allowDedupConsumptionDuringCommit, _tableDataManager,
-          _tableIndexDir);
+          _enablePreload, _ignoreNonDefaultTiers, _metadataManagerConfigs, 
_allowDedupConsumptionDuringCommit,
+          _tableDataManager, _tableIndexDir);
     }
   }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
index d46cc998bf..3eb03fece3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
@@ -49,6 +49,9 @@ public class DedupConfig extends BaseJsonConfig {
       + "ENABLE, DISABLE and DEFAULT (use instance level default behavior).")
   private Enablement _preload = Enablement.DEFAULT;
 
+  @JsonPropertyDescription("Whether to ignore segments from non-default tiers 
when constructing dedup metadata.")
+  private Enablement _ignoreNonDefaultTiers = Enablement.DEFAULT;
+
   @JsonPropertyDescription("Custom class for dedup metadata manager. If not 
specified, the default implementation "
       + "ConcurrentMapTableDedupMetadataManager will be used.")
   @Nullable
@@ -133,6 +136,16 @@ public class DedupConfig extends BaseJsonConfig {
     _preload = preload;
   }
 
+  public Enablement getIgnoreNonDefaultTiers() {
+    return _ignoreNonDefaultTiers;
+  }
+
+  public void setIgnoreNonDefaultTiers(Enablement ignoreNonDefaultTiers) {
+    Preconditions.checkArgument(ignoreNonDefaultTiers != null,
+        "Ignore non-default tiers cannot be null, must be one of ENABLE, 
DISABLE or DEFAULT");
+    _ignoreNonDefaultTiers = ignoreNonDefaultTiers;
+  }
+
   @Nullable
   public String getMetadataManagerClass() {
     return _metadataManagerClass;
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 7f975d93c7..1bcbc1126e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1115,6 +1115,7 @@ public class CommonConstants {
       public static final String CONFIG_PREFIX = "dedup";
       public static final String DEFAULT_METADATA_MANAGER_CLASS = 
"default.metadata.manager.class";
       public static final String DEFAULT_ENABLE_PRELOAD = 
"default.enable.preload";
+      public static final String DEFAULT_IGNORE_NON_DEFAULT_TIERS = 
"default.ignore.non.default.tiers";
 
       /// @deprecated use {@link 
org.apache.pinot.spi.config.table.ingestion.ParallelSegmentConsumptionPolicy)} 
instead.
       @Deprecated


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to