This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a1b37cff7c Adds Disaster Recovery modes for Pauseless (#16071)
a1b37cff7c is described below

commit a1b37cff7c6a22e986415cae5406cc70a4bb545c
Author: NOOB <43700604+noob-se...@users.noreply.github.com>
AuthorDate: Tue Jul 1 03:37:32 2025 +0530

    Adds Disaster Recovery modes for Pauseless (#16071)
---
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  53 ++++++--
 .../realtime/RealtimeSegmentDataManager.java       |  16 ++-
 .../realtime/RealtimeSegmentDataManagerTest.java   |   6 +-
 ...pRealtimeIngestionSegmentCommitFailureTest.java | 150 +++++++++++++++++++++
 ...sRealtimeIngestionSegmentCommitFailureTest.java |  12 +-
 ...FailureInjectingRealtimeSegmentDataManager.java |   7 +-
 .../FailureInjectingRealtimeTableDataManager.java  |   3 +-
 .../dedupPauselessIngestionTestData.tar.gz         | Bin 0 -> 467 bytes
 .../spi/config/table/DisasterRecoveryMode.java     |  34 +++++
 .../table/ingestion/StreamIngestionConfig.java     |  13 ++
 10 files changed, 268 insertions(+), 26 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 3afc78eeb5..5f7c950178 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -104,6 +104,8 @@ import org.apache.pinot.core.util.PeerServerSegmentFinder;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata;
 import org.apache.pinot.spi.config.table.ColumnPartitionConfig;
+import org.apache.pinot.spi.config.table.DedupConfig;
+import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
 import org.apache.pinot.spi.config.table.PauseState;
 import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
@@ -2508,28 +2510,18 @@ public class PinotLLCRealtimeSegmentManager {
         segmentsInErrorStateInAtLeastOneReplica.size(), 
segmentsInErrorStateInAtLeastOneReplica,
         segmentsInErrorStateInAllReplicas.size(), 
segmentsInErrorStateInAllReplicas, realtimeTableName);
 
-    boolean isPartialUpsertEnabled =
-        tableConfig.getUpsertConfig() != null && 
tableConfig.getUpsertConfig().getMode() == UpsertConfig.Mode.PARTIAL;
-    boolean isDedupEnabled = tableConfig.getDedupConfig() != null && 
tableConfig.getDedupConfig().isDedupEnabled();
-    if ((isPartialUpsertEnabled || isDedupEnabled) && 
!repairErrorSegmentsForPartialUpsertOrDedup) {
+    if 
(!allowRepairOfErrorSegments(repairErrorSegmentsForPartialUpsertOrDedup, 
tableConfig)) {
       // We do not run reingestion for dedup and partial upsert tables in 
pauseless as it can
       // lead to data inconsistencies
       _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
           ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, 
segmentsInErrorStateInAllReplicas.size());
-      LOGGER.error("Skipping repair for errored segments in table: {} because 
dedup or partial upsert is enabled.",
-          realtimeTableName);
       return;
     } else {
-      if ((isPartialUpsertEnabled || isDedupEnabled)) {
-        LOGGER.info(
-            "Repairing error segments in table: {} as 
repairErrorSegmentForPartialUpsertOrDedup is set to true",
-            realtimeTableName);
-      }
+      LOGGER.info("Repairing error segments in table: {}.", realtimeTableName);
       _controllerMetrics.setOrUpdateTableGauge(realtimeTableName,
           ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, 
segmentsInErrorStateInAllReplicas.size());
     }
 
-
     for (String segmentName : segmentsInErrorStateInAtLeastOneReplica) {
       SegmentZKMetadata segmentZKMetadata = 
getSegmentZKMetadata(realtimeTableName, segmentName);
       if (segmentZKMetadata == null) {
@@ -2566,6 +2558,43 @@ public class PinotLLCRealtimeSegmentManager {
     }
   }
 
+  private boolean allowRepairOfErrorSegments(boolean 
repairErrorSegmentsForPartialUpsertOrDedup,
+      TableConfig tableConfig) {
+    if (repairErrorSegmentsForPartialUpsertOrDedup) {
+      // If API context has repairErrorSegmentsForPartialUpsertOrDedup=true, 
allow repair.
+      return true;
+    }
+
+    if ((tableConfig.getIngestionConfig() != null) && 
(tableConfig.getIngestionConfig().getStreamIngestionConfig()
+        != null)) {
+      DisasterRecoveryMode disasterRecoveryMode =
+          
tableConfig.getIngestionConfig().getStreamIngestionConfig().getDisasterRecoveryMode();
+      if (disasterRecoveryMode == DisasterRecoveryMode.ALWAYS) {
+        return true;
+      }
+    }
+
+    boolean isPartialUpsertEnabled = (tableConfig.getUpsertConfig() != null) 
&& (tableConfig.getUpsertConfig().getMode()
+        == UpsertConfig.Mode.PARTIAL);
+    if (isPartialUpsertEnabled) {
+      // If isPartialUpsert is enabled, do not allow repair.
+      LOGGER.warn("Skipping repair for errored segments in table: {} because 
partialUpsert is enabled",
+          tableConfig.getTableName());
+      return false;
+    }
+
+    DedupConfig dedupConfig = tableConfig.getDedupConfig();
+    boolean isDedupEnabled = (dedupConfig != null) && 
(dedupConfig.isDedupEnabled());
+    if (isDedupEnabled) {
+      // If dedup is enabled, do not allow repair of error segment.
+      LOGGER.warn("Skipping repair for errored segments in table: {} because 
dedup is enabled",
+          tableConfig.getTableName());
+      return false;
+    }
+
+    return true;
+  }
+
   /**
    * Invokes the server's reingestSegment API via a POST request with JSON 
payload,
    * using Simple HTTP APIs.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index beea808a66..5ce71e81f7 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1306,7 +1306,7 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     SegmentBuildDescriptor descriptor;
     try {
       descriptor = buildSegmentInternal(false);
-    } catch (SegmentBuildFailureException e) {
+    } catch (Exception e) {
       return false;
     }
     if (descriptor == null) {
@@ -1475,9 +1475,10 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
               } else if (_currentOffset.compareTo(endOffset) == 0) {
                 _segmentLogger
                     .info("Current offset {} matches offset in zk {}. 
Replacing segment", _currentOffset, endOffset);
-                boolean replaced = buildSegmentAndReplace();
-                if (!replaced) {
-                  throw new RuntimeException("Failed to build the segment: " + 
_segmentNameStr);
+                if (!buildSegmentAndReplace()) {
+                  _segmentLogger.warn("Failed to build the segment: {} and 
replace. Downloading to replace",
+                      _segmentNameStr);
+                  downloadSegmentAndReplace(segmentZKMetadata);
                 }
               } else {
                 boolean success = false;
@@ -1495,9 +1496,10 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
 
                 if (success) {
                   _segmentLogger.info("Caught up to offset {}", 
_currentOffset);
-                  boolean replaced = buildSegmentAndReplace();
-                  if (!replaced) {
-                    throw new RuntimeException("Failed to build the segment: " 
+ _segmentNameStr);
+                  if (!buildSegmentAndReplace()) {
+                    _segmentLogger.warn("Failed to build the segment: {} after 
catchup. Downloading to replace",
+                        _segmentNameStr);
+                    downloadSegmentAndReplace(segmentZKMetadata);
                   }
                 } else {
                   _segmentLogger
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index ba3f8579fc..073ba7c15e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -583,13 +583,15 @@ public class RealtimeSegmentDataManagerTest {
       Assert.assertFalse(segmentDataManager._buildAndReplaceCalled);
     }
 
-    // Test Runtime Exception is thrown when build segment fails.
+    // Test downloadAndReplace is called after buildAndReplace fails.
     try (FakeRealtimeSegmentDataManager segmentDataManager = 
createFakeSegmentManager()) {
       segmentDataManager._failSegmentBuildAndReplace = true;
       segmentDataManager._stopWaitTimeMs = 0;
       segmentDataManager._state.set(segmentDataManager, 
RealtimeSegmentDataManager.State.HOLDING);
       segmentDataManager.setCurrentOffset(finalOffsetValue);
-      Assert.expectThrows(RuntimeException.class, () -> 
segmentDataManager.goOnlineFromConsuming(metadata));
+      segmentDataManager.goOnlineFromConsuming(metadata);
+      Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled);
+      Assert.assertTrue(segmentDataManager._buildAndReplaceCalled);
     }
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionSegmentCommitFailureTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionSegmentCommitFailureTest.java
new file mode 100644
index 0000000000..925d7a7379
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionSegmentCommitFailureTest.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.List;
+import org.apache.pinot.common.utils.PauselessConsumptionUtils;
+import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import 
org.apache.pinot.spi.config.table.ingestion.ParallelSegmentConsumptionPolicy;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
+import org.testng.annotations.BeforeClass;
+
+import static org.testng.Assert.assertNotNull;
+
+
+public class PauselessDedupRealtimeIngestionSegmentCommitFailureTest
+    extends PauselessRealtimeIngestionSegmentCommitFailureTest {
+
+  private static final int NUM_PARTITIONS = 2;
+
+  @Override
+  protected String getAvroTarFileName() {
+    return "dedupPauselessIngestionTestData.tar.gz";
+  }
+
+  @Override
+  protected int getRealtimeSegmentFlushSize() {
+    return 2;
+  }
+
+  @Override
+  protected String getSchemaFileName() {
+    return "dedupIngestionTestSchema.schema";
+  }
+
+  @Override
+  protected long getCountStarResult() {
+    // Two distinct records are expected with pk values of 0, 1.
+    return 2;
+  }
+
+  @Override
+  protected String getPartitionColumn() {
+    return "id";
+  }
+
+  @Override
+  protected int getNumReplicas() {
+    return 2;
+  }
+
+  @Override
+  protected IngestionConfig getIngestionConfig() {
+    IngestionConfig ingestionConfig = new IngestionConfig();
+    ingestionConfig.setStreamIngestionConfig(new 
StreamIngestionConfig(List.of(getStreamConfigs())));
+    assert ingestionConfig.getStreamIngestionConfig() != null;
+    ingestionConfig.getStreamIngestionConfig()
+        
.setParallelSegmentConsumptionPolicy(ParallelSegmentConsumptionPolicy.ALLOW_DURING_BUILD_ONLY);
+    
ingestionConfig.getStreamIngestionConfig().setEnforceConsumptionInOrder(true);
+    return ingestionConfig;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
+
+    // Start the Pinot cluster
+    startZk();
+    // Start a customized controller with more frequent realtime segment 
validation
+    startController();
+    startBroker();
+    startServers(2);
+
+    // load data in kafka
+    List<File> avroFiles = unpackAvroData(_tempDir);
+    startKafka();
+    pushAvroIntoKafka(avroFiles);
+
+    setMaxSegmentCompletionTimeMillis();
+    // create schema for non-pauseless table
+    Schema schema = createSchema();
+    schema.setSchemaName(getNonPauselessTableName());
+    addSchema(schema);
+
+    // add non-pauseless table
+    TableConfig tableConfig2 = createDedupTableConfig(avroFiles.get(0));
+    
tableConfig2.setTableName(TableNameBuilder.REALTIME.tableNameWithType(getNonPauselessTableName()));
+    tableConfig2.getValidationConfig().setRetentionTimeUnit("DAYS");
+    tableConfig2.getValidationConfig().setRetentionTimeValue("100000");
+    addTableConfig(tableConfig2);
+    waitForDocsLoaded(600_000L, true, tableConfig2.getTableName());
+
+    // create schema for pauseless table
+    schema.setSchemaName(getPauselessTableName());
+    addSchema(schema);
+
+    // add pauseless table
+    TableConfig tableConfig = createDedupTableConfig(avroFiles.get(0));
+    
tableConfig.setTableName(TableNameBuilder.REALTIME.tableNameWithType(getPauselessTableName()));
+    tableConfig.getValidationConfig().setRetentionTimeUnit("DAYS");
+    tableConfig.getValidationConfig().setRetentionTimeValue("100000");
+    assertNotNull(tableConfig.getIngestionConfig());
+    StreamIngestionConfig streamIngestionConfig = 
tableConfig.getIngestionConfig().getStreamIngestionConfig();
+    assertNotNull(streamIngestionConfig);
+    streamIngestionConfig.getStreamConfigMaps()
+        .get(0)
+        
.put(StreamConfigProperties.PAUSELESS_SEGMENT_DOWNLOAD_TIMEOUT_SECONDS, "10");
+    streamIngestionConfig.setPauselessConsumptionEnabled(true);
+    streamIngestionConfig.setDisasterRecoveryMode(DisasterRecoveryMode.ALWAYS);
+    
tableConfig.getValidationConfig().setPeerSegmentDownloadScheme(CommonConstants.HTTP_PROTOCOL);
+
+    addTableConfig(tableConfig);
+    String realtimeTableName = tableConfig.getTableName();
+    TestUtils.waitForCondition(aVoid -> 
getNumErrorSegmentsInEV(realtimeTableName) > 0, 600_000L,
+        "Segments still not in error state");
+  }
+
+  protected TableConfig createDedupTableConfig(File sampleAvroFile) {
+    TableConfig tableConfig = super.createDedupTableConfig(sampleAvroFile, 
getPartitionColumn(), NUM_PARTITIONS);
+    assertNotNull(tableConfig.getDedupConfig());
+    if (PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) {
+      
tableConfig.getValidationConfig().setPeerSegmentDownloadScheme(CommonConstants.HTTP_PROTOCOL);
+    }
+    return tableConfig;
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
index 821031e8f9..7ecb5e5c34 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java
@@ -140,7 +140,7 @@ public class 
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
         "Segments still not in error state");
   }
 
-  private void setMaxSegmentCompletionTimeMillis() {
+  protected void setMaxSegmentCompletionTimeMillis() {
     PinotLLCRealtimeSegmentManager realtimeSegmentManager = 
_helixResourceManager.getRealtimeSegmentManager();
     if (realtimeSegmentManager instanceof 
FailureInjectingPinotLLCRealtimeSegmentManager) {
       ((FailureInjectingPinotLLCRealtimeSegmentManager) 
realtimeSegmentManager).setMaxSegmentCompletionTimeoutMs(
@@ -148,7 +148,7 @@ public class 
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
     }
   }
 
-  private int getNumErrorSegmentsInEV(String realtimeTableName) {
+  protected int getNumErrorSegmentsInEV(String realtimeTableName) {
     ExternalView externalView = _helixResourceManager.getHelixAdmin()
         .getResourceExternalView(_helixResourceManager.getHelixClusterName(), 
realtimeTableName);
     if (externalView == null) {
@@ -238,6 +238,14 @@ public class 
PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus
     return segmentZKMetadataMap;
   }
 
+  protected String getNonPauselessTableName() {
+    return DEFAULT_TABLE_NAME_2;
+  }
+
+  protected String getPauselessTableName() {
+    return DEFAULT_TABLE_NAME;
+  }
+
   @AfterClass
   public void tearDown()
       throws IOException {
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
index 2467429466..ee13b49e08 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.integration.tests.realtime.utils;
 
+import java.util.function.BooleanSupplier;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -25,6 +26,7 @@ import 
org.apache.pinot.core.data.manager.realtime.ConsumerCoordinator;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import 
org.apache.pinot.core.data.manager.realtime.SegmentBuildFailureException;
+import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
 import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
@@ -47,12 +49,13 @@ public class FailureInjectingRealtimeSegmentDataManager 
extends RealtimeSegmentD
   public FailureInjectingRealtimeSegmentDataManager(SegmentZKMetadata 
segmentZKMetadata, TableConfig tableConfig,
       RealtimeTableDataManager realtimeTableDataManager, String 
resourceDataDir, IndexLoadingConfig indexLoadingConfig,
       Schema schema, LLCSegmentName llcSegmentName, ConsumerCoordinator 
consumerCoordinator,
-      ServerMetrics serverMetrics, boolean failCommit)
+      ServerMetrics serverMetrics, boolean failCommit, 
PartitionDedupMetadataManager partitionDedupMetadataManager,
+      BooleanSupplier isTableReadyToConsumeData)
       throws AttemptsExceededException, RetriableOperationException {
     // Pass through to the real parent constructor
     super(segmentZKMetadata, tableConfig, realtimeTableDataManager, 
resourceDataDir, indexLoadingConfig, schema,
         llcSegmentName, consumerCoordinator, serverMetrics, null /* no 
PartitionUpsertMetadataManager */,
-        null /* no PartitionDedupMetadataManager */, () -> true /* 
isReadyToConsumeData always true for tests */);
+        partitionDedupMetadataManager, isTableReadyToConsumeData);
 
     _failCommit = failCommit;
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
index af65cd1a79..994800ea70 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
@@ -62,6 +62,7 @@ public class FailureInjectingRealtimeTableDataManager extends 
RealtimeTableDataM
       addFailureToCommits = false;
     }
     return new FailureInjectingRealtimeSegmentDataManager(zkMetadata, 
tableConfig, this, _indexDir.getAbsolutePath(),
-        indexLoadingConfig, schema, llcSegmentName, consumerCoordinator, 
_serverMetrics, addFailureToCommits);
+        indexLoadingConfig, schema, llcSegmentName, consumerCoordinator, 
_serverMetrics, addFailureToCommits,
+        partitionDedupMetadataManager, isTableReadyToConsumeData);
   }
 }
diff --git 
a/pinot-integration-tests/src/test/resources/dedupPauselessIngestionTestData.tar.gz
 
b/pinot-integration-tests/src/test/resources/dedupPauselessIngestionTestData.tar.gz
new file mode 100644
index 0000000000..1a34d77ec3
Binary files /dev/null and 
b/pinot-integration-tests/src/test/resources/dedupPauselessIngestionTestData.tar.gz
 differ
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DisasterRecoveryMode.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DisasterRecoveryMode.java
new file mode 100644
index 0000000000..4876332c46
--- /dev/null
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DisasterRecoveryMode.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.config.table;
+
+/**
+ * Recovery mode which is used to decide how to recover a segment online in IS 
but having no completed (immutable)
+ * replica on any server in pause-less ingestion
+ */
+public enum DisasterRecoveryMode {
+  // ALWAYS means Pinot will always run the Disaster Recovery Job.
+  ALWAYS,
+  // DEFAULT means Pinot will skip the Disaster Recovery Job for tables like 
Dedup/Partial-Upsert where consistency
+  // of data is higher in priority than availability. Features like 
Dedup/Partial-Upsert requires ingestion to only
+  // happen in strict order (i.e. a segment ingested in past cannot be 
re-ingested if server has consumed the
+  // following segments to it). So in case of above Disaster scenario, there's 
only a fix requiring bulk delete of
+  // segments.
+  DEFAULT
+}
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
index da17a2c020..4dcea288c9 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.config.BaseJsonConfig;
+import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
 
 
 /**
@@ -54,6 +55,10 @@ public class StreamIngestionConfig extends BaseJsonConfig {
   @JsonPropertyDescription("Policy to determine the behaviour of parallel 
consumption.")
   private ParallelSegmentConsumptionPolicy _parallelSegmentConsumptionPolicy;
 
+  @JsonPropertyDescription("Recovery mode which is used to decide how to 
recover a segment online in IS but having no"
+      + " completed (immutable) replica on any server in pause-less ingestion")
+  private DisasterRecoveryMode _disasterRecoveryMode = 
DisasterRecoveryMode.DEFAULT;
+
   @JsonCreator
   public StreamIngestionConfig(@JsonProperty("streamConfigMaps") 
List<Map<String, String>> streamConfigMaps) {
     _streamConfigMaps = streamConfigMaps;
@@ -111,4 +116,12 @@ public class StreamIngestionConfig extends BaseJsonConfig {
   public void 
setParallelSegmentConsumptionPolicy(ParallelSegmentConsumptionPolicy 
parallelSegmentConsumptionPolicy) {
     _parallelSegmentConsumptionPolicy = parallelSegmentConsumptionPolicy;
   }
+
+  public DisasterRecoveryMode getDisasterRecoveryMode() {
+    return _disasterRecoveryMode;
+  }
+
+  public void setDisasterRecoveryMode(DisasterRecoveryMode 
disasterRecoveryMode) {
+    _disasterRecoveryMode = disasterRecoveryMode;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to