This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eae993760 Donot allow consumption for Dedup Tables enabled during 
segment download and replacement  (#15268)
7eae993760 is described below

commit 7eae993760f99582aec9eca6d4a6bf54f4f19116
Author: Chaitanya Deepthi <45308220+deepthi...@users.noreply.github.com>
AuthorDate: Sat Mar 15 12:22:52 2025 -0400

    Donot allow consumption for Dedup Tables enabled during segment download 
and replacement  (#15268)
    
    * Do not allow consumption for dedup tables and full upsert tables with out 
of order entries enabled.
    
    * Add server and table level settings as well for dedup to tune the 
consumption during commit
---
 .../realtime/RealtimeSegmentDataManager.java       | 22 +++++++++++++++++-----
 .../dedup/TableDedupMetadataManagerFactory.java    |  8 ++++++++
 .../apache/pinot/spi/config/table/DedupConfig.java | 11 +++++++++++
 3 files changed, 36 insertions(+), 5 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index a57a3cc278..17c425996f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1687,11 +1687,9 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
       setConsumeEndTime(segmentZKMetadata, _consumeStartTime);
       _segmentCommitterFactory =
           new SegmentCommitterFactory(_segmentLogger, _protocolHandler, 
tableConfig, indexLoadingConfig, serverMetrics);
-      _segmentLogger
-          .info("Starting consumption on realtime consuming segment {} 
maxRowCount {} maxEndTime {}", llcSegmentName,
-              _segmentMaxRowCount, new DateTime(_consumeEndTime, 
DateTimeZone.UTC));
-      _allowConsumptionDuringCommit = 
!_realtimeTableDataManager.isPartialUpsertEnabled() ? true
-          : 
_tableConfig.getUpsertConfig().isAllowPartialUpsertConsumptionDuringCommit();
+      _segmentLogger.info("Starting consumption on realtime consuming segment 
{} maxRowCount {} maxEndTime {}",
+          llcSegmentName, _segmentMaxRowCount, new DateTime(_consumeEndTime, 
DateTimeZone.UTC));
+      _allowConsumptionDuringCommit = isConsumptionAllowedDuringCommit();
     } catch (Throwable t) {
       // In case of exception thrown here, segment goes to ERROR state. Then 
any attempt to reset the segment from
       // ERROR -> OFFLINE -> CONSUMING via Helix Admin fails because the 
semaphore is acquired, but not released.
@@ -1725,6 +1723,20 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     }
   }
 
+  // Consumption while downloading and replacing the slow replicas is not 
allowed for the following tables:
+  // 1. Partial Upserts
+  // 2. Dedup Tables
+  // For the above table types, we would be looking into the metadata 
information when inserting a new record,
+  // so it is not right to allow consumption while downloading and replacing 
the consuming segment as we might see
+  // duplicates in dedup tables and inconsistent entries compared to lead 
replicas for partial
+  // upsert tables. If tables are dedup/partial upsert enabled check for table 
and server config properties to see if
+  // consumption is allowed
+  private boolean isConsumptionAllowedDuringCommit() {
+    return (!_realtimeTableDataManager.isDedupEnabled() || 
_tableConfig.getDedupConfig()
+        .isAllowDedupConsumptionDuringCommit()) && 
(!_realtimeTableDataManager.isPartialUpsertEnabled()
+        || 
_tableConfig.getUpsertConfig().isAllowPartialUpsertConsumptionDuringCommit());
+  }
+
   private void setConsumeEndTime(SegmentZKMetadata segmentZKMetadata, long 
now) {
     long maxConsumeTimeMillis = _streamConfig.getFlushThresholdTimeMillis();
     _consumeEndTime = segmentZKMetadata.getCreationTime() + 
maxConsumeTimeMillis;
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
index a94b4385a5..7d34286335 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
@@ -39,6 +39,9 @@ public class TableDedupMetadataManagerFactory {
   public static final String DEDUP_DEFAULT_METADATA_MANAGER_CLASS = 
"default.metadata.manager.class";
   public static final String DEDUP_DEFAULT_ENABLE_PRELOAD = 
"default.enable.preload";
 
+  public static final String 
DEDUP_DEFAULT_ALLOW_DEDUP_CONSUMPTION_DURING_COMMIT =
+      "default.allow.dedup.consumption.during.commit";
+
   public static TableDedupMetadataManager create(TableConfig tableConfig, 
Schema schema,
       TableDataManager tableDataManager, ServerMetrics serverMetrics,
       @Nullable PinotConfiguration instanceDedupConfig) {
@@ -59,6 +62,11 @@ public class TableDedupMetadataManagerFactory {
         dedupConfig.setEnablePreload(
             
Boolean.parseBoolean(instanceDedupConfig.getProperty(DEDUP_DEFAULT_ENABLE_PRELOAD,
 "false")));
       }
+      // server level config honoured only when table level config is not set 
to true
+      if (!dedupConfig.isAllowDedupConsumptionDuringCommit()) {
+        dedupConfig.setAllowDedupConsumptionDuringCommit(Boolean.parseBoolean(
+            
instanceDedupConfig.getProperty(DEDUP_DEFAULT_ALLOW_DEDUP_CONSUMPTION_DURING_COMMIT,
 "false")));
+      }
     }
     if (StringUtils.isNotEmpty(metadataManagerClass)) {
       LOGGER.info("Creating TableDedupMetadataManager with class: {} for 
table: {}", metadataManagerClass,
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
index b1e6caec30..b3dc13f420 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DedupConfig.java
@@ -47,6 +47,9 @@ public class DedupConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Whether to preload segments for fast dedup 
metadata recovery")
   private boolean _enablePreload;
 
+  @JsonPropertyDescription("Whether to pause dedup table's partition 
consumption during commit")
+  private boolean _allowDedupConsumptionDuringCommit;
+
   public DedupConfig(@JsonProperty(value = "dedupEnabled", required = true) 
boolean dedupEnabled,
       @JsonProperty(value = "hashFunction") HashFunction hashFunction) {
     this(dedupEnabled, hashFunction, null, null, 0, null, false);
@@ -100,4 +103,12 @@ public class DedupConfig extends BaseJsonConfig {
   public void setEnablePreload(boolean enablePreload) {
     _enablePreload = enablePreload;
   }
+
+  public boolean isAllowDedupConsumptionDuringCommit() {
+    return _allowDedupConsumptionDuringCommit;
+  }
+
+  public void setAllowDedupConsumptionDuringCommit(boolean 
allowDedupConsumptionDuringCommit) {
+    _allowDedupConsumptionDuringCommit = allowDedupConsumptionDuringCommit;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to