This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new df089db2e30 Adds Disaster Recover Mode as Controller/Cluster config
(#17243)
df089db2e30 is described below
commit df089db2e30a958ef27439fc065534b2fbcf0180
Author: NOOB <[email protected]>
AuthorDate: Thu Dec 11 15:41:54 2025 +0530
Adds Disaster Recover Mode as Controller/Cluster config (#17243)
* Adds Disaster Recover Mode as controller config
* fix bug
* updates test
* Adds tests
* Addresses PR comments and fixes tests
* Adds log for parse failure
* Fixes tests
---
.../apache/pinot/controller/ControllerConf.java | 14 ++++++
.../realtime/PinotLLCRealtimeSegmentManager.java | 6 ++-
.../RealtimeSegmentValidationManager.java | 50 ++++++++++++++++------
.../pinot/controller/ControllerConfTest.java | 13 ++++++
.../PinotLLCRealtimeSegmentManagerTest.java | 10 +++++
.../RealtimeSegmentValidationManagerTest.java | 18 +++++++-
...pRealtimeIngestionSegmentCommitFailureTest.java | 16 ++++++-
7 files changed, 110 insertions(+), 17 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
index b99023230e6..a0decdabdde 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerConf.java
@@ -29,11 +29,13 @@ import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import org.apache.commons.configuration2.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.utils.CommonConstants;
@@ -222,6 +224,7 @@ public class ControllerConf extends PinotConfiguration {
"controller.segment.level.validation.intervalPeriod";
public static final String AUTO_RESET_ERROR_SEGMENTS_VALIDATION =
"controller.segment.error.autoReset";
+ public static final String DISASTER_RECOVERY_MODE_CONFIG_KEY =
"controller.segment.disaster.recovery.mode";
// Initial delays
public static final String STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS =
@@ -1130,6 +1133,17 @@ public class ControllerConf extends PinotConfiguration {
return
getProperty(ControllerPeriodicTasksConf.AUTO_RESET_ERROR_SEGMENTS_VALIDATION,
true);
}
+ public DisasterRecoveryMode getDisasterRecoveryMode() {
+ return
getDisasterRecoveryMode(getProperty(ControllerPeriodicTasksConf.DISASTER_RECOVERY_MODE_CONFIG_KEY));
+ }
+
+ public static DisasterRecoveryMode getDisasterRecoveryMode(@Nullable String
disasterRecoveryModeString) {
+ if (disasterRecoveryModeString == null) {
+ return DisasterRecoveryMode.DEFAULT;
+ }
+ return DisasterRecoveryMode.valueOf(disasterRecoveryModeString);
+ }
+
public long getStatusCheckerInitialDelayInSeconds() {
return
getProperty(ControllerPeriodicTasksConf.STATUS_CHECKER_INITIAL_DELAY_IN_SECONDS,
ControllerPeriodicTasksConf.getRandomInitialDelayInSeconds());
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 061e769eb57..2f8b6d8b307 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -2808,6 +2808,10 @@ public class PinotLLCRealtimeSegmentManager {
}
}
+ public boolean
shouldRepairErrorSegmentsForPartialUpsertOrDedup(DisasterRecoveryMode
disasterRecoveryMode) {
+ return disasterRecoveryMode == DisasterRecoveryMode.ALWAYS;
+ }
+
/**
* Whether to allow repairing the ERROR segments with ZK status: COMMITTING
* This method is only useful for pauseless ingestion with features like
dedup enabled (Since repairing committing
@@ -2827,7 +2831,7 @@ public class PinotLLCRealtimeSegmentManager {
!= null)) {
DisasterRecoveryMode disasterRecoveryMode =
tableConfig.getIngestionConfig().getStreamIngestionConfig().getDisasterRecoveryMode();
- if (disasterRecoveryMode == DisasterRecoveryMode.ALWAYS) {
+ if
(shouldRepairErrorSegmentsForPartialUpsertOrDedup(disasterRecoveryMode)) {
return true;
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
index 330b382565a..d10884bf6ad 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManager.java
@@ -22,8 +22,10 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerGauge;
@@ -37,6 +39,7 @@ import
org.apache.pinot.controller.api.resources.PauseStatusDetails;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
import org.apache.pinot.spi.config.table.PauseState;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.stream.OffsetCriteria;
@@ -54,20 +57,20 @@ import org.slf4j.LoggerFactory;
*/
public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<RealtimeSegmentValidationManager.Context> {
private static final Logger LOGGER =
LoggerFactory.getLogger(RealtimeSegmentValidationManager.class);
+ public static final String OFFSET_CRITERIA = "offsetCriteria";
+ public static final String REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP
=
+ "repairErrorSegmentsForPartialUpsertOrDedup";
private final PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
private final ValidationMetrics _validationMetrics;
private final ControllerMetrics _controllerMetrics;
private final StorageQuotaChecker _storageQuotaChecker;
private final ResourceUtilizationManager _resourceUtilizationManager;
-
private final int _segmentLevelValidationIntervalInSeconds;
- private long _lastSegmentLevelValidationRunTimeMs = 0L;
private final boolean _segmentAutoResetOnErrorAtValidation;
- public static final String OFFSET_CRITERIA = "offsetCriteria";
- public static final String REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP
=
- "repairErrorSegmentsForPartialUpsertOrDedup";
+ private long _lastSegmentLevelValidationRunTimeMs = 0L;
+ private volatile DisasterRecoveryMode _disasterRecoveryMode;
public RealtimeSegmentValidationManager(ControllerConf config,
PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager,
PinotLLCRealtimeSegmentManager llcRealtimeSegmentManager,
@@ -84,6 +87,7 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
_segmentLevelValidationIntervalInSeconds =
config.getSegmentLevelValidationIntervalInSeconds();
_segmentAutoResetOnErrorAtValidation =
config.isAutoResetErrorSegmentsOnValidationEnabled();
+ _disasterRecoveryMode = config.getDisasterRecoveryMode();
Preconditions.checkState(_segmentLevelValidationIntervalInSeconds > 0);
}
@@ -281,15 +285,19 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
}
private boolean shouldRepairErrorSegmentsForPartialUpsertOrDedup(Properties
periodicTaskProperties) {
- return
Optional.ofNullable(periodicTaskProperties.getProperty(REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP))
- .map(value -> {
- try {
- return Boolean.parseBoolean(value);
- } catch (Exception e) {
- return false;
- }
- })
- .orElse(false);
+ String property =
periodicTaskProperties.getProperty(REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP);
+
+ if (property == null) {
+ return
_llcRealtimeSegmentManager.shouldRepairErrorSegmentsForPartialUpsertOrDedup(_disasterRecoveryMode);
+ }
+
+ try {
+ return Boolean.parseBoolean(property);
+ } catch (Exception e) {
+ LOGGER.warn("Failed to parse property '{}' for '{}'. Returning false.",
property,
+ REPAIR_ERROR_SEGMENTS_FOR_PARTIAL_UPSERT_OR_DEDUP, e);
+ return false;
+ }
}
@Override
@@ -317,6 +325,20 @@ public class RealtimeSegmentValidationManager extends
ControllerPeriodicTask<Rea
_validationMetrics.unregisterAllMetrics();
}
+ @Override
+ public void onChange(Set<String> changedConfigs, Map<String, String>
clusterConfigs) {
+ if
(changedConfigs.contains(ControllerConf.ControllerPeriodicTasksConf.DISASTER_RECOVERY_MODE_CONFIG_KEY))
{
+ String disasterRecoveryModeString =
+
clusterConfigs.get(ControllerConf.ControllerPeriodicTasksConf.DISASTER_RECOVERY_MODE_CONFIG_KEY);
+ _disasterRecoveryMode =
ControllerConf.getDisasterRecoveryMode(disasterRecoveryModeString);
+ }
+ }
+
+ @VisibleForTesting
+ DisasterRecoveryMode getDisasterRecoveryMode() {
+ return _disasterRecoveryMode;
+ }
+
public static final class Context {
private boolean _runSegmentLevelValidation;
private boolean _repairErrorSegmentsForPartialUpsertOrDedup;
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerConfTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerConfTest.java
index 4b93525682b..22be2d820de 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerConfTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/ControllerConfTest.java
@@ -25,6 +25,7 @@ import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
import org.apache.pinot.spi.utils.Enablement;
import org.apache.pinot.spi.utils.TimeUtils;
import org.testng.Assert;
@@ -200,6 +201,18 @@ public class ControllerConfTest {
Assert.assertEquals(conf.getSegmentRelocatorBatchSizePerServer(), 42);
}
+ @Test
+ public void testGetDisasterRecoveryMode() {
+ Map<String, Object> controllerConfig = new HashMap<>();
+ ControllerConf conf = new ControllerConf(controllerConfig);
+ Assert.assertEquals(conf.getDisasterRecoveryMode(),
DisasterRecoveryMode.DEFAULT);
+
+ controllerConfig = new HashMap<>();
+ controllerConfig.put(DISASTER_RECOVERY_MODE_CONFIG_KEY, "ALWAYS");
+ conf = new ControllerConf(controllerConfig);
+ Assert.assertEquals(conf.getDisasterRecoveryMode(),
DisasterRecoveryMode.ALWAYS);
+ }
+
@Test
public void shouldBeAbleToDisableUsingNewConfig() {
Map<String, Object> controllerConfig = new HashMap<>();
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 842eea188cb..0506b08b558 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -72,6 +72,7 @@ import
org.apache.pinot.core.data.manager.realtime.SegmentCompletionUtils;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.segment.spi.creator.SegmentVersion;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
+import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -1956,6 +1957,15 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertFalse(segmentManager.syncCommittingSegments(realtimeTableName,
newSegments));
}
+ @Test
+ public void testShouldRepairErrorSegmentsForPartialUpsertOrDedup() {
+ PinotLLCRealtimeSegmentManager pinotLLCRealtimeSegmentManager = new
FakePinotLLCRealtimeSegmentManager();
+ Assert.assertFalse(
+
pinotLLCRealtimeSegmentManager.shouldRepairErrorSegmentsForPartialUpsertOrDedup(DisasterRecoveryMode.DEFAULT));
+ Assert.assertTrue(
+
pinotLLCRealtimeSegmentManager.shouldRepairErrorSegmentsForPartialUpsertOrDedup(DisasterRecoveryMode.ALWAYS));
+ }
+
//////////////////////////////////////////////////////////////////////////////////
// Fake classes
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
index fb511b1feed..115d4b751f5 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/validation/RealtimeSegmentValidationManagerTest.java
@@ -18,12 +18,18 @@
*/
package org.apache.pinot.controller.validation;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
+import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
import org.apache.pinot.spi.config.table.PauseState;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -38,7 +44,6 @@ import org.testng.annotations.Test;
import static org.mockito.Mockito.*;
-
public class RealtimeSegmentValidationManagerTest {
@Mock
private PinotLLCRealtimeSegmentManager _llcRealtimeSegmentManager;
@@ -185,4 +190,15 @@ public class RealtimeSegmentValidationManagerTest {
Assert.assertEquals(result, expectedResult);
}
+
+ @Test
+ public void testConfigChange() {
+
Assert.assertEquals(_realtimeSegmentValidationManager.getDisasterRecoveryMode(),
DisasterRecoveryMode.DEFAULT);
+ Map<String, String> newConfig = new HashMap<>();
+
newConfig.put(ControllerConf.ControllerPeriodicTasksConf.DISASTER_RECOVERY_MODE_CONFIG_KEY,
"ALWAYS");
+ Set<String> changedConfigSet =
+ new
HashSet<>(List.of(ControllerConf.ControllerPeriodicTasksConf.DISASTER_RECOVERY_MODE_CONFIG_KEY));
+ _realtimeSegmentValidationManager.onChange(changedConfigSet, newConfig);
+
Assert.assertEquals(_realtimeSegmentValidationManager.getDisasterRecoveryMode(),
DisasterRecoveryMode.ALWAYS);
+ }
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionSegmentCommitFailureTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionSegmentCommitFailureTest.java
index 925d7a7379b..bba8191954a 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionSegmentCommitFailureTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionSegmentCommitFailureTest.java
@@ -20,6 +20,8 @@ package org.apache.pinot.integration.tests;
import java.io.File;
import java.util.List;
+import java.util.Map;
+import java.util.Random;
import org.apache.pinot.common.utils.PauselessConsumptionUtils;
import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -33,6 +35,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.BeforeClass;
+import static
org.apache.pinot.controller.ControllerConf.ControllerPeriodicTasksConf.DISASTER_RECOVERY_MODE_CONFIG_KEY;
import static org.testng.Assert.assertNotNull;
@@ -40,6 +43,7 @@ public class
PauselessDedupRealtimeIngestionSegmentCommitFailureTest
extends PauselessRealtimeIngestionSegmentCommitFailureTest {
private static final int NUM_PARTITIONS = 2;
+ private final double _randomDouble = new Random().nextDouble();
@Override
protected String getAvroTarFileName() {
@@ -83,6 +87,14 @@ public class
PauselessDedupRealtimeIngestionSegmentCommitFailureTest
return ingestionConfig;
}
+ @Override
+ protected void overrideControllerConf(Map<String, Object> properties) {
+ super.overrideControllerConf(properties);
+ if (_randomDouble > 0.5) {
+ properties.put(DISASTER_RECOVERY_MODE_CONFIG_KEY, "ALWAYS");
+ }
+ }
+
@BeforeClass
public void setUp()
throws Exception {
@@ -130,7 +142,9 @@ public class
PauselessDedupRealtimeIngestionSegmentCommitFailureTest
.get(0)
.put(StreamConfigProperties.PAUSELESS_SEGMENT_DOWNLOAD_TIMEOUT_SECONDS, "10");
streamIngestionConfig.setPauselessConsumptionEnabled(true);
- streamIngestionConfig.setDisasterRecoveryMode(DisasterRecoveryMode.ALWAYS);
+ if (_randomDouble <= 0.5) {
+
streamIngestionConfig.setDisasterRecoveryMode(DisasterRecoveryMode.ALWAYS);
+ }
tableConfig.getValidationConfig().setPeerSegmentDownloadScheme(CommonConstants.HTTP_PROTOCOL);
addTableConfig(tableConfig);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]