This is an automated email from the ASF dual-hosted git repository.

manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new df089db2e30 Adds Disaster Recover Mode as Controller/Cluster config 
(#17243)
df089db2e30 is described below

commit df089db2e30a958ef27439fc065534b2fbcf0180
Author: NOOB <[email protected]>
AuthorDate: Thu Dec 11 15:41:54 2025 +0530

    Adds Disaster Recover Mode as Controller/Cluster config (#17243)
    
    * Adds Disaster Recover Mode as controller config
    
    * fix bug
    
    * updates test
    
    * Adds tests
    
    * Addresses PR comments and fixes tests
    
    * Adds log for parse failure
    
    * Fixes tests
---
 .../apache/pinot/controller/ControllerConf.java    | 14 ++++++
 .../realtime/PinotLLCRealtimeSegmentManager.java   |  6 ++-
 .../RealtimeSegmentValidationManager.java          | 50 ++++++++++++++++------
 .../pinot/controller/ControllerConfTest.java       | 13 ++++++
 .../PinotLLCRealtimeSegmentManagerTest.java        | 10 +++++
 .../RealtimeSegmentValidationManagerTest.java      | 18 +++++++-
 ...pRealtimeIngestionSegmentCommitFailureTest.java | 16 ++++++-
 7 files changed, 110 insertions(+), 17 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index b99023230e6..a0decdabdde 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -29,11 +29,13 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import javax.annotation.Nullable;
 import org.apache.commons.configuration2.Configuration;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.filesystem.LocalPinotFS;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -222,6 +224,7 @@ public class ControllerConf extends PinotConfiguration {
         "controller.segment.level.validation.intervalPeriod";
     public static final String AUTO_RESET_ERROR_SEGMENTS_VALIDATION =
         "controller.segment.error.autoReset";
+    public static final String DISASTER_RECOVERY_MODE_CONFIG_KEY = 
"controller.segment.disaster.recovery.mode";
 
     // Initial delays
     public static final String STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS =
@@ -1130,6 +1133,17 @@ public class ControllerConf extends PinotConfiguration {
     return 
getProperty(ControllerPeriodicTasksConf.AUTO_RESET_ERROR_SEGMENTS_VALIDATION, 
true);
   }
 
+  public DisasterRecoveryMode getDisasterRecoveryMode() {
+    return 
getDisasterRecoveryMode(getProperty(ControllerPeriodicTasksConf.DISASTER_RECOVERY_MODE_CONFIG_KEY));
+  }
+
+  public static DisasterRecoveryMode getDisasterRecoveryMode(@Nullable String 
disasterRecoveryModeString) {
+    if (disasterRecoveryModeString == null) {
+      return DisasterRecoveryMode.DEFAULT;
+    }
+    return DisasterRecoveryMode.valueOf(disasterRecoveryModeString);
+  }
+
   public long getStatusCheckerInitialDelayInSeconds() {
     return 
getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS,
         ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 061e769eb57..2f8b6d8b307 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -2808,6 +2808,10 @@ public class PinotLLCRealtimeSegmentManager {
     }
   }
 
+  public boolean 
shouldRepairErrorSegmentsForPartialUpsertOrDedup(DisasterRecoveryMode 
disasterRecoveryMode) {
+    return disasterRecoveryMode == DisasterRecoveryMode.ALWAYS;
+  }
+
   /**
    * Whether to allow repairing the ERROR segments with ZK status: COMMITTING
    * This method is only useful for pauseless ingestion with features like 
dedup enabled (Since repairing committing
@@ -2827,7 +2831,7 @@ public class PinotLLCRealtimeSegmentManager {
         != null)) {
       DisasterRecoveryMode disasterRecoveryMode =
           
tableConfig.getIngestionConfig().getStreamIngestionConfig().getDisasterRecoveryMode();
-      if (disasterRecoveryMode == DisasterRecoveryMode.ALWAYS) {
+      if 
(shouldRepairErrorSegmentsForPartialUpsertOrDedup(disasterRecoveryMode)) {
         return true;
       }
     }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 330b382565a..d10884bf6ad 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -22,8 +22,10 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerGauge;
@@ -37,6 +39,7 @@ import 
org.apache.pinot.controller.api.resources.PauseStatusDetails;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
 import org.apache.pinot.spi.config.table.PauseState;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.stream.OffsetCriteria;
@@ -54,20 +57,20 @@ import org.slf4j.LoggerFactory;
  */
 public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<RealtimeSegmentValidationManager.Context> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RealtimeSegmentValidationManager.class);
+  public static final String OFFSET_CRITERIA = "offsetCriteria";
+  public static final String REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP 
=
+      "repairErrorSegmentsForPartialUpsertOrDedup";
 
   private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
   private final ValidationMetrics _validationMetrics;
   private final ControllerMetrics _controllerMetrics;
   private final StorageQuotaChecker _storageQuotaChecker;
   private final ResourceUtilizationManager _resourceUtilizationManager;
-
   private final int _segmentLevelValidationIntervalInSeconds;
-  private long _lastSegmentLevelValidationRunTimeMs = 0L;
   private final boolean _segmentAutoResetOnErrorAtValidation;
 
-  public static final String OFFSET_CRITERIA = "offsetCriteria";
-  public static final String REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP 
=
-      "repairErrorSegmentsForPartialUpsertOrDedup";
+  private long _lastSegmentLevelValidationRunTimeMs = 0L;
+  private volatile DisasterRecoveryMode _disasterRecoveryMode;
 
   public RealtimeSegmentValidationManager(ControllerConf config, 
PinotHelixResourceManager pinotHelixResourceManager,
       LeadControllerManager leadControllerManager, 
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
@@ -84,6 +87,7 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
 
     _segmentLevelValidationIntervalInSeconds = 
config.getSegmentLevelValidationIntervalInSeconds();
     _segmentAutoResetOnErrorAtValidation = 
config.isAutoResetErrorSegmentsOnValidationEnabled();
+    _disasterRecoveryMode = config.getDisasterRecoveryMode();
     Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
   }
 
@@ -281,15 +285,19 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
   }
 
   private boolean shouldRepairErrorSegmentsForPartialUpsertOrDedup(Properties 
periodicTaskProperties) {
-    return 
Optional.ofNullable(periodicTaskProperties.getProperty(REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP))
-        .map(value -> {
-          try {
-            return Boolean.parseBoolean(value);
-          } catch (Exception e) {
-            return false;
-          }
-        })
-        .orElse(false);
+    String property = 
periodicTaskProperties.getProperty(REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP);
+
+    if (property == null) {
+      return 
_llcRealtimeSegmentManager.shouldRepairErrorSegmentsForPartialUpsertOrDedup(_disasterRecoveryMode);
+    }
+
+    try {
+      return Boolean.parseBoolean(property);
+    } catch (Exception e) {
+      LOGGER.warn("Failed to parse property '{}' for '{}'. Returning false.", 
property,
+          REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP, e);
+      return false;
+    }
   }
 
   @Override
@@ -317,6 +325,20 @@ public class RealtimeSegmentValidationManager extends 
ControllerPeriodicTask<Rea
     _validationMetrics.unregisterAllMetrics();
   }
 
+  @Override
+  public void onChange(Set<String> changedConfigs, Map<String, String> 
clusterConfigs) {
+    if 
(changedConfigs.contains(ControllerConf.ControllerPeriodicTasksConf.DISASTER_RECOVERY_MODE_CONFIG_KEY))
 {
+      String disasterRecoveryModeString =
+          
clusterConfigs.get(ControllerConf.ControllerPeriodicTasksConf.DISASTER_RECOVERY_MODE_CONFIG_KEY);
+      _disasterRecoveryMode = 
ControllerConf.getDisasterRecoveryMode(disasterRecoveryModeString);
+    }
+  }
+
+  @VisibleForTesting
+  DisasterRecoveryMode getDisasterRecoveryMode() {
+    return _disasterRecoveryMode;
+  }
+
   public static final class Context {
     private boolean _runSegmentLevelValidation;
     private boolean _repairErrorSegmentsForPartialUpsertOrDedup;
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerConfTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerConfTest.java
index 4b93525682b..22be2d820de 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerConfTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerConfTest.java
@@ -25,6 +25,7 @@ import java.util.Random;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
 import org.apache.pinot.spi.utils.Enablement;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.testng.Assert;
@@ -200,6 +201,18 @@ public class ControllerConfTest {
     Assert.assertEquals(conf.getSegmentRelocatorBatchSizePerServer(), 42);
   }
 
+  @Test
+  public void testGetDisasterRecoveryMode() {
+    Map<String, Object> controllerConfig = new HashMap<>();
+    ControllerConf conf = new ControllerConf(controllerConfig);
+    Assert.assertEquals(conf.getDisasterRecoveryMode(), 
DisasterRecoveryMode.DEFAULT);
+
+    controllerConfig = new HashMap<>();
+    controllerConfig.put(DISASTER_RECOVERY_MODE_CONFIG_KEY, "ALWAYS");
+    conf = new ControllerConf(controllerConfig);
+    Assert.assertEquals(conf.getDisasterRecoveryMode(), 
DisasterRecoveryMode.ALWAYS);
+  }
+
   @Test
   public void shouldBeAbleToDisableUsingNewConfig() {
     Map<String, Object> controllerConfig = new HashMap<>();
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 842eea188cb..0506b08b558 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -72,6 +72,7 @@ import 
org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import org.apache.pinot.segment.spi.creator.SegmentVersion;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -1956,6 +1957,15 @@ public class PinotLLCRealtimeSegmentManagerTest {
     assertFalse(segmentManager.syncCommittingSegments(realtimeTableName, 
newSegments));
   }
 
+  @Test
+  public void testShouldRepairErrorSegmentsForPartialUpsertOrDedup() {
+    PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager = new 
FakePinotLLCRealtimeSegmentManager();
+    Assert.assertFalse(
+        
pinotLLCRealtimeSegmentManager.shouldRepairErrorSegmentsForPartialUpsertOrDedup(DisasterRecoveryMode.DEFAULT));
+    Assert.assertTrue(
+        
pinotLLCRealtimeSegmentManager.shouldRepairErrorSegmentsForPartialUpsertOrDedup(DisasterRecoveryMode.ALWAYS));
+  }
+
   
//////////////////////////////////////////////////////////////////////////////////
   // Fake classes
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
index fb511b1feed..115d4b751f5 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
@@ -18,12 +18,18 @@
  */
 package org.apache.pinot.controller.validation;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.api.resources.PauseStatusDetails;
 import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
+import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
 import org.apache.pinot.spi.config.table.PauseState;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
@@ -38,7 +44,6 @@ import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.*;
 
-
 public class RealtimeSegmentValidationManagerTest {
   @Mock
   private PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
@@ -185,4 +190,15 @@ public class RealtimeSegmentValidationManagerTest {
 
     Assert.assertEquals(result, expectedResult);
   }
+
+  @Test
+  public void testConfigChange() {
+    
Assert.assertEquals(_realtimeSegmentValidationManager.getDisasterRecoveryMode(),
 DisasterRecoveryMode.DEFAULT);
+    Map<String, String> newConfig = new HashMap<>();
+    
newConfig.put(ControllerConf.ControllerPeriodicTasksConf.DISASTER_RECOVERY_MODE_CONFIG_KEY,
 "ALWAYS");
+    Set<String> changedConfigSet =
+        new 
HashSet<>(List.of(ControllerConf.ControllerPeriodicTasksConf.DISASTER_RECOVERY_MODE_CONFIG_KEY));
+    _realtimeSegmentValidationManager.onChange(changedConfigSet, newConfig);
+    
Assert.assertEquals(_realtimeSegmentValidationManager.getDisasterRecoveryMode(),
 DisasterRecoveryMode.ALWAYS);
+  }
 }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionSegmentCommitFailureTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionSegmentCommitFailureTest.java
index 925d7a7379b..bba8191954a 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionSegmentCommitFailureTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionSegmentCommitFailureTest.java
@@ -20,6 +20,8 @@ package org.apache.pinot.integration.tests;
 
 import java.io.File;
 import java.util.List;
+import java.util.Map;
+import java.util.Random;
 import org.apache.pinot.common.utils.PauselessConsumptionUtils;
 import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
 import org.apache.pinot.spi.config.table.TableConfig;
@@ -33,6 +35,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.BeforeClass;
 
+import static 
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.DISASTER_RECOVERY_MODE_CONFIG_KEY;
 import static org.testng.Assert.assertNotNull;
 
 
@@ -40,6 +43,7 @@ public class 
PauselessDedupRealtimeIngestionSegmentCommitFailureTest
     extends PauselessRealtimeIngestionSegmentCommitFailureTest {
 
   private static final int NUM_PARTITIONS = 2;
+  private final double _randomDouble = new Random().nextDouble();
 
   @Override
   protected String getAvroTarFileName() {
@@ -83,6 +87,14 @@ public class 
PauselessDedupRealtimeIngestionSegmentCommitFailureTest
     return ingestionConfig;
   }
 
+  @Override
+  protected void overrideControllerConf(Map<String, Object> properties) {
+    super.overrideControllerConf(properties);
+    if (_randomDouble > 0.5) {
+      properties.put(DISASTER_RECOVERY_MODE_CONFIG_KEY, "ALWAYS");
+    }
+  }
+
   @BeforeClass
   public void setUp()
       throws Exception {
@@ -130,7 +142,9 @@ public class 
PauselessDedupRealtimeIngestionSegmentCommitFailureTest
         .get(0)
         
.put(StreamConfigProperties.PAUSELESS_SEGMENT_DOWNLOAD_TIMEOUT_SECONDS, "10");
     streamIngestionConfig.setPauselessConsumptionEnabled(true);
-    streamIngestionConfig.setDisasterRecoveryMode(DisasterRecoveryMode.ALWAYS);
+    if (_randomDouble <= 0.5) {
+      
streamIngestionConfig.setDisasterRecoveryMode(DisasterRecoveryMode.ALWAYS);
+    }
     
tableConfig.getValidationConfig().setPeerSegmentDownloadScheme(CommonConstants.HTTP_PROTOCOL);
 
     addTableConfig(tableConfig);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to