This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7eae993760 Donot allow consumption for Dedup Tables enabled during segment download and replacement (#15268) 7eae993760 is described below commit 7eae993760f99582aec9eca6d4a6bf54f4f19116 Author: Chaitanya Deepthi <45308220+deepthi...@users.noreply.github.com> AuthorDate: Sat Mar 15 12:22:52 2025 -0400 Donot allow consumption for Dedup Tables enabled during segment download and replacement (#15268) * Do not allow consumption for dedup tables and full upsert tables with out of order entries enabled. * Add server and table level settings as well for dedup to tune the consumption during commit --- .../realtime/RealtimeSegmentDataManager.java | 22 +++++++++++++++++----- .../dedup/TableDedupMetadataManagerFactory.java | 8 ++++++++ .../apache/pinot/spi/config/table/DedupConfig.java | 11 +++++++++++ 3 files changed, 36 insertions(+), 5 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index a57a3cc278..17c425996f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -1687,11 +1687,9 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { setConsumeEndTime(segmentZKMetadata, _consumeStartTime); _segmentCommitterFactory = new SegmentCommitterFactory(_segmentLogger, _protocolHandler, tableConfig, indexLoadingConfig, serverMetrics); - _segmentLogger - .info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", llcSegmentName, - _segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC)); - _allowConsumptionDuringCommit = !_realtimeTableDataManager.isPartialUpsertEnabled() ? true - : _tableConfig.getUpsertConfig().isAllowPartialUpsertConsumptionDuringCommit(); + _segmentLogger.info("Starting consumption on realtime consuming segment {} maxRowCount {} maxEndTime {}", + llcSegmentName, _segmentMaxRowCount, new DateTime(_consumeEndTime, DateTimeZone.UTC)); + _allowConsumptionDuringCommit = isConsumptionAllowedDuringCommit(); } catch (Throwable t) { // In case of exception thrown here, segment goes to ERROR state. Then any attempt to reset the segment from // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the semaphore is acquired, but not released. @@ -1725,6 +1723,20 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { } } + // Consumption while downloading and replacing the slow replicas is not allowed for the following tables: + // 1. Partial Upserts + // 2. Dedup Tables + // For the above table types, we would be looking into the metadata information when inserting a new record, + // so it is not right to allow consumption while downloading and replacing the consuming segment as we might see + // duplicates in dedup tables and inconsistent entries compared to lead replicas for partial + // upsert tables. If tables are dedup/partial upsert enabled check for table and server config properties to see if + // consumption is allowed + private boolean isConsumptionAllowedDuringCommit() { + return (!_realtimeTableDataManager.isDedupEnabled() || _tableConfig.getDedupConfig() + .isAllowDedupConsumptionDuringCommit()) && (!_realtimeTableDataManager.isPartialUpsertEnabled() + || _tableConfig.getUpsertConfig().isAllowPartialUpsertConsumptionDuringCommit()); + } + private void setConsumeEndTime(SegmentZKMetadata segmentZKMetadata, long now) { long maxConsumeTimeMillis = _streamConfig.getFlushThresholdTimeMillis(); _consumeEndTime = segmentZKMetadata.getCreationTime() + maxConsumeTimeMillis; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java index a94b4385a5..7d34286335 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java @@ -39,6 +39,9 @@ public class TableDedupMetadataManagerFactory { public static final String DEDUP_DEFAULT_METADATA_MANAGER_CLASS = "default.metadata.manager.class"; public static final String DEDUP_DEFAULT_ENABLE_PRELOAD = "default.enable.preload"; + public static final String DEDUP_DEFAULT_ALLOW_DEDUP_CONSUMPTION_DURING_COMMIT = + "default.allow.dedup.consumption.during.commit"; + public static TableDedupMetadataManager create(TableConfig tableConfig, Schema schema, TableDataManager tableDataManager, ServerMetrics serverMetrics, @Nullable PinotConfiguration instanceDedupConfig) { @@ -59,6 +62,11 @@ public class TableDedupMetadataManagerFactory { dedupConfig.setEnablePreload( Boolean.parseBoolean(instanceDedupConfig.getProperty(DEDUP_DEFAULT_ENABLE_PRELOAD, "false"))); } + // server level config honoured only when table level config is not set to true + if (!dedupConfig.isAllowDedupConsumptionDuringCommit()) { + dedupConfig.setAllowDedupConsumptionDuringCommit(Boolean.parseBoolean( + instanceDedupConfig.getProperty(DEDUP_DEFAULT_ALLOW_DEDUP_CONSUMPTION_DURING_COMMIT, "false"))); + } } if (StringUtils.isNotEmpty(metadataManagerClass)) { LOGGER.info("Creating TableDedupMetadataManager with class: {} for table: {}", metadataManagerClass, diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java index b1e6caec30..b3dc13f420 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java @@ -47,6 +47,9 @@ public class DedupConfig extends BaseJsonConfig { @JsonPropertyDescription("Whether to preload segments for fast dedup metadata recovery") private boolean _enablePreload; + @JsonPropertyDescription("Whether to pause dedup table's partition consumption during commit") + private boolean _allowDedupConsumptionDuringCommit; + public DedupConfig(@JsonProperty(value = "dedupEnabled", required = true) boolean dedupEnabled, @JsonProperty(value = "hashFunction") HashFunction hashFunction) { this(dedupEnabled, hashFunction, null, null, 0, null, false); @@ -100,4 +103,12 @@ public class DedupConfig extends BaseJsonConfig { public void setEnablePreload(boolean enablePreload) { _enablePreload = enablePreload; } + + public boolean isAllowDedupConsumptionDuringCommit() { + return _allowDedupConsumptionDuringCommit; + } + + public void setAllowDedupConsumptionDuringCommit(boolean allowDedupConsumptionDuringCommit) { + _allowDedupConsumptionDuringCommit = allowDedupConsumptionDuringCommit; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org