This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9ee84c9de1 Add a config to skip updating dedup metadata for non-default tier segments (#15576) 9ee84c9de1 is described below commit 9ee84c9de13cd32ed6d4f0bfc938f292737c0016 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Wed Apr 16 19:05:40 2025 -0600 Add a config to skip updating dedup metadata for non-default tier segments (#15576) --- .../core/data/manager/BaseTableDataManager.java | 10 +++++----- .../manager/offline/DimensionTableDataManager.java | 5 +++-- .../manager/realtime/RealtimeTableDataManager.java | 11 +++++++---- .../local/data/manager/TableDataManager.java | 18 +++++++++++++++++- .../local/dedup/BaseTableDedupMetadataManager.java | 4 ++++ .../pinot/segment/local/dedup/DedupContext.java | 21 +++++++++++++++++---- .../apache/pinot/spi/config/table/DedupConfig.java | 13 +++++++++++++ .../org/apache/pinot/spi/utils/CommonConstants.java | 1 + 8 files changed, 67 insertions(+), 16 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index c7773c8b09..67088db6ab 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -324,7 +324,7 @@ public abstract class BaseTableDataManager implements TableDataManager { * @param immutableSegment Immutable segment to add */ @Override - public void addSegment(ImmutableSegment immutableSegment) { + public void addSegment(ImmutableSegment immutableSegment, @Nullable SegmentZKMetadata zkMetadata) { String segmentName = immutableSegment.getSegmentName(); Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot add segment: %s to table: %s", segmentName, _tableNameWithType); @@ -437,7 +437,7 @@ public abstract class BaseTableDataManager implements TableDataManager { String segmentName = zkMetadata.getSegmentName(); _logger.info("Downloading and loading segment: {}", segmentName); File indexDir = downloadSegment(zkMetadata); - addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentOperationsThrottler)); + addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentOperationsThrottler), zkMetadata); _logger.info("Downloaded and loaded segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(), TierConfigUtils.normalizeTierName(zkMetadata.getTier())); } @@ -817,7 +817,7 @@ public abstract class BaseTableDataManager implements TableDataManager { _logger.info("Reloading segment: {} using existing segment directory as no reprocessing needed", segmentName); // No reprocessing needed, reuse the same segment ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig); - addSegment(segment); + addSegment(segment, zkMetadata); return; } // Create backup directory to handle failure of segment reloading. @@ -838,7 +838,7 @@ public abstract class BaseTableDataManager implements TableDataManager { _logger.info("Loading segment: {} from indexDir: {} to tier: {}", segmentName, indexDir, TierConfigUtils.normalizeTierName(zkMetadata.getTier())); ImmutableSegment segment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentOperationsThrottler); - addSegment(segment); + addSegment(segment, zkMetadata); // Remove backup directory to mark the completion of segment reloading. removeBackup(indexDir); @@ -1217,7 +1217,7 @@ public abstract class BaseTableDataManager implements TableDataManager { segmentDirectory = initSegmentDirectory(segmentName, String.valueOf(zkMetadata.getCrc()), indexLoadingConfig); } ImmutableSegment segment = ImmutableSegmentLoader.load(segmentDirectory, indexLoadingConfig); - addSegment(segment); + addSegment(segment, zkMetadata); _logger.info("Loaded existing segment: {} with CRC: {} on tier: {}", segmentName, zkMetadata.getCrc(), TierConfigUtils.normalizeTierName(segmentTier)); return true; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java index 442c62567c..0ac31abc3b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java @@ -35,6 +35,7 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections4.CollectionUtils; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; @@ -142,8 +143,8 @@ public class DimensionTableDataManager extends OfflineTableDataManager { } @Override - public void addSegment(ImmutableSegment immutableSegment) { - super.addSegment(immutableSegment); + public void addSegment(ImmutableSegment immutableSegment, @Nullable SegmentZKMetadata zkMetadata) { + super.addSegment(immutableSegment, zkMetadata); String segmentName = immutableSegment.getSegmentName(); if (loadLookupTable()) { _logger.info("Successfully loaded lookup table after adding segment: {}", segmentName); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 7471a5b69f..375e518f1b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -688,19 +688,22 @@ public class RealtimeTableDataManager extends BaseTableDataManager { } @Override - public void addSegment(ImmutableSegment immutableSegment) { + public void addSegment(ImmutableSegment immutableSegment, @Nullable SegmentZKMetadata zkMetadata) { String segmentName = immutableSegment.getSegmentName(); Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot add segment: %s to table: %s", segmentName, _tableNameWithType); + if (isUpsertEnabled()) { handleUpsert(immutableSegment); return; } - if (isDedupEnabled() && immutableSegment instanceof ImmutableSegmentImpl) { + if (_tableDedupMetadataManager != null && immutableSegment instanceof ImmutableSegmentImpl && (zkMetadata == null + || zkMetadata.getTier() == null || !_tableDedupMetadataManager.getContext().isIgnoreNonDefaultTiers())) { handleDedup((ImmutableSegmentImpl) immutableSegment); } - super.addSegment(immutableSegment); + + super.addSegment(immutableSegment, zkMetadata); } private void handleDedup(ImmutableSegmentImpl immutableSegment) { @@ -821,7 +824,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { // Get a new index loading config with latest table config and schema to load the segment IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig(); indexLoadingConfig.setSegmentTier(zkMetadata.getTier()); - addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentOperationsThrottler)); + addSegment(ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, _segmentOperationsThrottler), zkMetadata); _logger.info("Downloaded and replaced CONSUMING segment: {}", segmentName); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index e8351e5cbe..727dcf6c92 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -94,10 +94,26 @@ public interface TableDataManager { /** * Adds a loaded immutable segment into the table. + * See {@link #addSegment(ImmutableSegment, SegmentZKMetadata)} for details. + */ + @VisibleForTesting + default void addSegment(ImmutableSegment immutableSegment) { + addSegment(immutableSegment, null); + } + + /** + * Adds a loaded immutable segment into the table. + * <p>If one segment already exists with the same name, replaces it with the new one. + * <p>Ensures that reference count of the old segment (if replaced) is reduced by 1, so that the last user of the old + * segment (or the calling thread, if there are none) remove the segment. + * <p>The new segment is added with reference count of 1, so that is never removed until a drop command comes through. + * <p>Segment ZK metadata might not be available when replacing a CONSUMING segment with the locally sealed one or + * invoked from tests. + * * NOTE: This method is not designed to be directly used by the production code, but can be handy to set up tests. */ @VisibleForTesting - void addSegment(ImmutableSegment immutableSegment); + void addSegment(ImmutableSegment immutableSegment, @Nullable SegmentZKMetadata zkMetadata); /** * Adds an ONLINE segment into a table. diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java index f2487aa715..e2c8f986e2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java @@ -75,6 +75,9 @@ public abstract class BaseTableDedupMetadataManager implements TableDedupMetadat } } + boolean ignoreNonDefaultTiers = dedupConfig.getIgnoreNonDefaultTiers() + .isEnabled(() -> instanceDedupConfig.getProperty(Dedup.DEFAULT_IGNORE_NON_DEFAULT_TIERS, false)); + // NOTE: This field doesn't follow enablement override, and always enabled if enabled at instance level. boolean allowDedupConsumptionDuringCommit = dedupConfig.isAllowDedupConsumptionDuringCommit(); if (!allowDedupConsumptionDuringCommit) { @@ -91,6 +94,7 @@ public abstract class BaseTableDedupMetadataManager implements TableDedupMetadat .setMetadataTTL(metadataTTL) .setDedupTimeColumn(dedupTimeColumn) .setEnablePreload(enablePreload) + .setIgnoreNonDefaultTiers(ignoreNonDefaultTiers) .setMetadataManagerConfigs(dedupConfig.getMetadataManagerConfigs()) .setAllowDedupConsumptionDuringCommit(allowDedupConsumptionDuringCommit) .build(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java index fb0ba48c6a..687fe9559d 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/DedupContext.java @@ -40,6 +40,7 @@ public class DedupContext { @Nullable private final String _dedupTimeColumn; private final boolean _enablePreload; + private final boolean _ignoreNonDefaultTiers; @Nullable private final Map<String, String> _metadataManagerConfigs; @@ -54,8 +55,8 @@ public class DedupContext { private DedupContext(TableConfig tableConfig, Schema schema, List<String> primaryKeyColumns, HashFunction hashFunction, double metadataTTL, @Nullable String dedupTimeColumn, boolean enablePreload, - @Nullable Map<String, String> metadataManagerConfigs, boolean allowDedupConsumptionDuringCommit, - @Nullable TableDataManager tableDataManager, File tableIndexDir) { + boolean ignoreNonDefaultTiers, @Nullable Map<String, String> metadataManagerConfigs, + boolean allowDedupConsumptionDuringCommit, @Nullable TableDataManager tableDataManager, File tableIndexDir) { _tableConfig = tableConfig; _schema = schema; _primaryKeyColumns = primaryKeyColumns; @@ -63,6 +64,7 @@ public class DedupContext { _metadataTTL = metadataTTL; _dedupTimeColumn = dedupTimeColumn; _enablePreload = enablePreload; + _ignoreNonDefaultTiers = ignoreNonDefaultTiers; _metadataManagerConfigs = metadataManagerConfigs; _allowDedupConsumptionDuringCommit = allowDedupConsumptionDuringCommit; _tableDataManager = tableDataManager; @@ -98,6 +100,10 @@ public class DedupContext { return _enablePreload; } + public boolean isIgnoreNonDefaultTiers() { + return _ignoreNonDefaultTiers; + } + @Nullable public Map<String, String> getMetadataManagerConfigs() { return _metadataManagerConfigs; @@ -125,6 +131,7 @@ public class DedupContext { .append("metadataTTL", _metadataTTL) .append("dedupTimeColumn", _dedupTimeColumn) .append("enablePreload", _enablePreload) + .append("ignoreNonDefaultTiers", _ignoreNonDefaultTiers) .append("metadataManagerConfigs", _metadataManagerConfigs) .append("allowDedupConsumptionDuringCommit", _allowDedupConsumptionDuringCommit) .append("tableIndexDir", _tableIndexDir) @@ -139,6 +146,7 @@ public class DedupContext { private double _metadataTTL; private String _dedupTimeColumn; private boolean _enablePreload; + private boolean _ignoreNonDefaultTiers; private Map<String, String> _metadataManagerConfigs; @Deprecated private boolean _allowDedupConsumptionDuringCommit; @@ -180,6 +188,11 @@ public class DedupContext { return this; } + public Builder setIgnoreNonDefaultTiers(boolean ignoreNonDefaultTiers) { + _ignoreNonDefaultTiers = ignoreNonDefaultTiers; + return this; + } + public Builder setMetadataManagerConfigs(Map<String, String> metadataManagerConfigs) { _metadataManagerConfigs = metadataManagerConfigs; return this; @@ -211,8 +224,8 @@ public class DedupContext { _tableIndexDir = _tableDataManager.getTableDataDir(); } return new DedupContext(_tableConfig, _schema, _primaryKeyColumns, _hashFunction, _metadataTTL, _dedupTimeColumn, - _enablePreload, _metadataManagerConfigs, _allowDedupConsumptionDuringCommit, _tableDataManager, - _tableIndexDir); + _enablePreload, _ignoreNonDefaultTiers, _metadataManagerConfigs, _allowDedupConsumptionDuringCommit, + _tableDataManager, _tableIndexDir); } } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java index d46cc998bf..3eb03fece3 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java @@ -49,6 +49,9 @@ public class DedupConfig extends BaseJsonConfig { + "ENABLE, DISABLE and DEFAULT (use instance level default behavior).") private Enablement _preload = Enablement.DEFAULT; + @JsonPropertyDescription("Whether to ignore segments from non-default tiers when constructing dedup metadata.") + private Enablement _ignoreNonDefaultTiers = Enablement.DEFAULT; + @JsonPropertyDescription("Custom class for dedup metadata manager. If not specified, the default implementation " + "ConcurrentMapTableDedupMetadataManager will be used.") @Nullable @@ -133,6 +136,16 @@ public class DedupConfig extends BaseJsonConfig { _preload = preload; } + public Enablement getIgnoreNonDefaultTiers() { + return _ignoreNonDefaultTiers; + } + + public void setIgnoreNonDefaultTiers(Enablement ignoreNonDefaultTiers) { + Preconditions.checkArgument(ignoreNonDefaultTiers != null, + "Ignore non-default tiers cannot be null, must be one of ENABLE, DISABLE or DEFAULT"); + _ignoreNonDefaultTiers = ignoreNonDefaultTiers; + } + @Nullable public String getMetadataManagerClass() { return _metadataManagerClass; diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 7f975d93c7..1bcbc1126e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -1115,6 +1115,7 @@ public class CommonConstants { public static final String CONFIG_PREFIX = "dedup"; public static final String DEFAULT_METADATA_MANAGER_CLASS = "default.metadata.manager.class"; public static final String DEFAULT_ENABLE_PRELOAD = "default.enable.preload"; + public static final String DEFAULT_IGNORE_NON_DEFAULT_TIERS = "default.ignore.non.default.tiers"; /// @deprecated use {@link org.apache.pinot.spi.config.table.ingestion.ParallelSegmentConsumptionPolicy)} instead. @Deprecated --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org