This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a1b37cff7c Adds Disaster Recovery modes for Pauseless (#16071) a1b37cff7c is described below commit a1b37cff7c6a22e986415cae5406cc70a4bb545c Author: NOOB <43700604+noob-se...@users.noreply.github.com> AuthorDate: Tue Jul 1 03:37:32 2025 +0530 Adds Disaster Recovery modes for Pauseless (#16071) --- .../realtime/PinotLLCRealtimeSegmentManager.java | 53 ++++++-- .../realtime/RealtimeSegmentDataManager.java | 16 ++- .../realtime/RealtimeSegmentDataManagerTest.java | 6 +- ...pRealtimeIngestionSegmentCommitFailureTest.java | 150 +++++++++++++++++++++ ...sRealtimeIngestionSegmentCommitFailureTest.java | 12 +- ...FailureInjectingRealtimeSegmentDataManager.java | 7 +- .../FailureInjectingRealtimeTableDataManager.java | 3 +- .../dedupPauselessIngestionTestData.tar.gz | Bin 0 -> 467 bytes .../spi/config/table/DisasterRecoveryMode.java | 34 +++++ .../table/ingestion/StreamIngestionConfig.java | 13 ++ 10 files changed, 268 insertions(+), 26 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 3afc78eeb5..5f7c950178 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -104,6 +104,8 @@ import org.apache.pinot.core.util.PeerServerSegmentFinder; import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata; import org.apache.pinot.spi.config.table.ColumnPartitionConfig; +import org.apache.pinot.spi.config.table.DedupConfig; +import org.apache.pinot.spi.config.table.DisasterRecoveryMode; import org.apache.pinot.spi.config.table.PauseState; import org.apache.pinot.spi.config.table.SegmentPartitionConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; @@ -2508,28 +2510,18 @@ public class PinotLLCRealtimeSegmentManager { segmentsInErrorStateInAtLeastOneReplica.size(), segmentsInErrorStateInAtLeastOneReplica, segmentsInErrorStateInAllReplicas.size(), segmentsInErrorStateInAllReplicas, realtimeTableName); - boolean isPartialUpsertEnabled = - tableConfig.getUpsertConfig() != null && tableConfig.getUpsertConfig().getMode() == UpsertConfig.Mode.PARTIAL; - boolean isDedupEnabled = tableConfig.getDedupConfig() != null && tableConfig.getDedupConfig().isDedupEnabled(); - if ((isPartialUpsertEnabled || isDedupEnabled) && !repairErrorSegmentsForPartialUpsertOrDedup) { + if (!allowRepairOfErrorSegments(repairErrorSegmentsForPartialUpsertOrDedup, tableConfig)) { // We do not run reingestion for dedup and partial upsert tables in pauseless as it can // lead to data inconsistencies _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, ControllerGauge.PAUSELESS_SEGMENTS_IN_UNRECOVERABLE_ERROR_COUNT, segmentsInErrorStateInAllReplicas.size()); - LOGGER.error("Skipping repair for errored segments in table: {} because dedup or partial upsert is enabled.", - realtimeTableName); return; } else { - if ((isPartialUpsertEnabled || isDedupEnabled)) { - LOGGER.info( - "Repairing error segments in table: {} as repairErrorSegmentForPartialUpsertOrDedup is set to true", - realtimeTableName); - } + LOGGER.info("Repairing error segments in table: {}.", realtimeTableName); _controllerMetrics.setOrUpdateTableGauge(realtimeTableName, ControllerGauge.PAUSELESS_SEGMENTS_IN_ERROR_COUNT, segmentsInErrorStateInAllReplicas.size()); } - for (String segmentName : segmentsInErrorStateInAtLeastOneReplica) { SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(realtimeTableName, segmentName); if (segmentZKMetadata == null) { @@ -2566,6 +2558,43 @@ public class PinotLLCRealtimeSegmentManager { } } + private boolean allowRepairOfErrorSegments(boolean repairErrorSegmentsForPartialUpsertOrDedup, + TableConfig tableConfig) { + if (repairErrorSegmentsForPartialUpsertOrDedup) { + // If API context has repairErrorSegmentsForPartialUpsertOrDedup=true, allow repair. + return true; + } + + if ((tableConfig.getIngestionConfig() != null) && (tableConfig.getIngestionConfig().getStreamIngestionConfig() + != null)) { + DisasterRecoveryMode disasterRecoveryMode = + tableConfig.getIngestionConfig().getStreamIngestionConfig().getDisasterRecoveryMode(); + if (disasterRecoveryMode == DisasterRecoveryMode.ALWAYS) { + return true; + } + } + + boolean isPartialUpsertEnabled = (tableConfig.getUpsertConfig() != null) && (tableConfig.getUpsertConfig().getMode() + == UpsertConfig.Mode.PARTIAL); + if (isPartialUpsertEnabled) { + // If isPartialUpsert is enabled, do not allow repair. + LOGGER.warn("Skipping repair for errored segments in table: {} because partialUpsert is enabled", + tableConfig.getTableName()); + return false; + } + + DedupConfig dedupConfig = tableConfig.getDedupConfig(); + boolean isDedupEnabled = (dedupConfig != null) && (dedupConfig.isDedupEnabled()); + if (isDedupEnabled) { + // If dedup is enabled, do not allow repair of error segment. + LOGGER.warn("Skipping repair for errored segments in table: {} because dedup is enabled", + tableConfig.getTableName()); + return false; + } + + return true; + } + /** * Invokes the server's reingestSegment API via a POST request with JSON payload, * using Simple HTTP APIs. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index beea808a66..5ce71e81f7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -1306,7 +1306,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { SegmentBuildDescriptor descriptor; try { descriptor = buildSegmentInternal(false); - } catch (SegmentBuildFailureException e) { + } catch (Exception e) { return false; } if (descriptor == null) { @@ -1475,9 +1475,10 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { } else if (_currentOffset.compareTo(endOffset) == 0) { _segmentLogger .info("Current offset {} matches offset in zk {}. Replacing segment", _currentOffset, endOffset); - boolean replaced = buildSegmentAndReplace(); - if (!replaced) { - throw new RuntimeException("Failed to build the segment: " + _segmentNameStr); + if (!buildSegmentAndReplace()) { + _segmentLogger.warn("Failed to build the segment: {} and replace. Downloading to replace", + _segmentNameStr); + downloadSegmentAndReplace(segmentZKMetadata); } } else { boolean success = false; @@ -1495,9 +1496,10 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { if (success) { _segmentLogger.info("Caught up to offset {}", _currentOffset); - boolean replaced = buildSegmentAndReplace(); - if (!replaced) { - throw new RuntimeException("Failed to build the segment: " + _segmentNameStr); + if (!buildSegmentAndReplace()) { + _segmentLogger.warn("Failed to build the segment: {} after catchup. Downloading to replace", + _segmentNameStr); + downloadSegmentAndReplace(segmentZKMetadata); } } else { _segmentLogger diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java index ba3f8579fc..073ba7c15e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java @@ -583,13 +583,15 @@ public class RealtimeSegmentDataManagerTest { Assert.assertFalse(segmentDataManager._buildAndReplaceCalled); } - // Test Runtime Exception is thrown when build segment fails. + // Test downloadAndReplace is called after buildAndReplace fails. try (FakeRealtimeSegmentDataManager segmentDataManager = createFakeSegmentManager()) { segmentDataManager._failSegmentBuildAndReplace = true; segmentDataManager._stopWaitTimeMs = 0; segmentDataManager._state.set(segmentDataManager, RealtimeSegmentDataManager.State.HOLDING); segmentDataManager.setCurrentOffset(finalOffsetValue); - Assert.expectThrows(RuntimeException.class, () -> segmentDataManager.goOnlineFromConsuming(metadata)); + segmentDataManager.goOnlineFromConsuming(metadata); + Assert.assertTrue(segmentDataManager._downloadAndReplaceCalled); + Assert.assertTrue(segmentDataManager._buildAndReplaceCalled); } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionSegmentCommitFailureTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionSegmentCommitFailureTest.java new file mode 100644 index 0000000000..925d7a7379 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessDedupRealtimeIngestionSegmentCommitFailureTest.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.io.File; +import java.util.List; +import org.apache.pinot.common.utils.PauselessConsumptionUtils; +import org.apache.pinot.spi.config.table.DisasterRecoveryMode; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.ParallelSegmentConsumptionPolicy; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.stream.StreamConfigProperties; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.annotations.BeforeClass; + +import static org.testng.Assert.assertNotNull; + + +public class PauselessDedupRealtimeIngestionSegmentCommitFailureTest + extends PauselessRealtimeIngestionSegmentCommitFailureTest { + + private static final int NUM_PARTITIONS = 2; + + @Override + protected String getAvroTarFileName() { + return "dedupPauselessIngestionTestData.tar.gz"; + } + + @Override + protected int getRealtimeSegmentFlushSize() { + return 2; + } + + @Override + protected String getSchemaFileName() { + return "dedupIngestionTestSchema.schema"; + } + + @Override + protected long getCountStarResult() { + // Two distinct records are expected with pk values of 0, 1. + return 2; + } + + @Override + protected String getPartitionColumn() { + return "id"; + } + + @Override + protected int getNumReplicas() { + return 2; + } + + @Override + protected IngestionConfig getIngestionConfig() { + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig(new StreamIngestionConfig(List.of(getStreamConfigs()))); + assert ingestionConfig.getStreamIngestionConfig() != null; + ingestionConfig.getStreamIngestionConfig() + .setParallelSegmentConsumptionPolicy(ParallelSegmentConsumptionPolicy.ALLOW_DURING_BUILD_ONLY); + ingestionConfig.getStreamIngestionConfig().setEnforceConsumptionInOrder(true); + return ingestionConfig; + } + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + // Start the Pinot cluster + startZk(); + // Start a customized controller with more frequent realtime segment validation + startController(); + startBroker(); + startServers(2); + + // load data in kafka + List<File> avroFiles = unpackAvroData(_tempDir); + startKafka(); + pushAvroIntoKafka(avroFiles); + + setMaxSegmentCompletionTimeMillis(); + // create schema for non-pauseless table + Schema schema = createSchema(); + schema.setSchemaName(getNonPauselessTableName()); + addSchema(schema); + + // add non-pauseless table + TableConfig tableConfig2 = createDedupTableConfig(avroFiles.get(0)); + tableConfig2.setTableName(TableNameBuilder.REALTIME.tableNameWithType(getNonPauselessTableName())); + tableConfig2.getValidationConfig().setRetentionTimeUnit("DAYS"); + tableConfig2.getValidationConfig().setRetentionTimeValue("100000"); + addTableConfig(tableConfig2); + waitForDocsLoaded(600_000L, true, tableConfig2.getTableName()); + + // create schema for pauseless table + schema.setSchemaName(getPauselessTableName()); + addSchema(schema); + + // add pauseless table + TableConfig tableConfig = createDedupTableConfig(avroFiles.get(0)); + tableConfig.setTableName(TableNameBuilder.REALTIME.tableNameWithType(getPauselessTableName())); + tableConfig.getValidationConfig().setRetentionTimeUnit("DAYS"); + tableConfig.getValidationConfig().setRetentionTimeValue("100000"); + assertNotNull(tableConfig.getIngestionConfig()); + StreamIngestionConfig streamIngestionConfig = tableConfig.getIngestionConfig().getStreamIngestionConfig(); + assertNotNull(streamIngestionConfig); + streamIngestionConfig.getStreamConfigMaps() + .get(0) + .put(StreamConfigProperties.PAUSELESS_SEGMENT_DOWNLOAD_TIMEOUT_SECONDS, "10"); + streamIngestionConfig.setPauselessConsumptionEnabled(true); + streamIngestionConfig.setDisasterRecoveryMode(DisasterRecoveryMode.ALWAYS); + tableConfig.getValidationConfig().setPeerSegmentDownloadScheme(CommonConstants.HTTP_PROTOCOL); + + addTableConfig(tableConfig); + String realtimeTableName = tableConfig.getTableName(); + TestUtils.waitForCondition(aVoid -> getNumErrorSegmentsInEV(realtimeTableName) > 0, 600_000L, + "Segments still not in error state"); + } + + protected TableConfig createDedupTableConfig(File sampleAvroFile) { + TableConfig tableConfig = super.createDedupTableConfig(sampleAvroFile, getPartitionColumn(), NUM_PARTITIONS); + assertNotNull(tableConfig.getDedupConfig()); + if (PauselessConsumptionUtils.isPauselessEnabled(tableConfig)) { + tableConfig.getValidationConfig().setPeerSegmentDownloadScheme(CommonConstants.HTTP_PROTOCOL); + } + return tableConfig; + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java index 821031e8f9..7ecb5e5c34 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PauselessRealtimeIngestionSegmentCommitFailureTest.java @@ -140,7 +140,7 @@ public class PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus "Segments still not in error state"); } - private void setMaxSegmentCompletionTimeMillis() { + protected void setMaxSegmentCompletionTimeMillis() { PinotLLCRealtimeSegmentManager realtimeSegmentManager = _helixResourceManager.getRealtimeSegmentManager(); if (realtimeSegmentManager instanceof FailureInjectingPinotLLCRealtimeSegmentManager) { ((FailureInjectingPinotLLCRealtimeSegmentManager) realtimeSegmentManager).setMaxSegmentCompletionTimeoutMs( @@ -148,7 +148,7 @@ public class PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus } } - private int getNumErrorSegmentsInEV(String realtimeTableName) { + protected int getNumErrorSegmentsInEV(String realtimeTableName) { ExternalView externalView = _helixResourceManager.getHelixAdmin() .getResourceExternalView(_helixResourceManager.getHelixClusterName(), realtimeTableName); if (externalView == null) { @@ -238,6 +238,14 @@ public class PauselessRealtimeIngestionSegmentCommitFailureTest extends BaseClus return segmentZKMetadataMap; } + protected String getNonPauselessTableName() { + return DEFAULT_TABLE_NAME_2; + } + + protected String getPauselessTableName() { + return DEFAULT_TABLE_NAME; + } + @AfterClass public void tearDown() throws IOException { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java index 2467429466..ee13b49e08 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeSegmentDataManager.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.integration.tests.realtime.utils; +import java.util.function.BooleanSupplier; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; @@ -25,6 +26,7 @@ import org.apache.pinot.core.data.manager.realtime.ConsumerCoordinator; import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; import org.apache.pinot.core.data.manager.realtime.SegmentBuildFailureException; +import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; @@ -47,12 +49,13 @@ public class FailureInjectingRealtimeSegmentDataManager extends RealtimeSegmentD public FailureInjectingRealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConfig tableConfig, RealtimeTableDataManager realtimeTableDataManager, String resourceDataDir, IndexLoadingConfig indexLoadingConfig, Schema schema, LLCSegmentName llcSegmentName, ConsumerCoordinator consumerCoordinator, - ServerMetrics serverMetrics, boolean failCommit) + ServerMetrics serverMetrics, boolean failCommit, PartitionDedupMetadataManager partitionDedupMetadataManager, + BooleanSupplier isTableReadyToConsumeData) throws AttemptsExceededException, RetriableOperationException { // Pass through to the real parent constructor super(segmentZKMetadata, tableConfig, realtimeTableDataManager, resourceDataDir, indexLoadingConfig, schema, llcSegmentName, consumerCoordinator, serverMetrics, null /* no PartitionUpsertMetadataManager */, - null /* no PartitionDedupMetadataManager */, () -> true /* isReadyToConsumeData always true for tests */); + partitionDedupMetadataManager, isTableReadyToConsumeData); _failCommit = failCommit; } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java index af65cd1a79..994800ea70 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java @@ -62,6 +62,7 @@ public class FailureInjectingRealtimeTableDataManager extends RealtimeTableDataM addFailureToCommits = false; } return new FailureInjectingRealtimeSegmentDataManager(zkMetadata, tableConfig, this, _indexDir.getAbsolutePath(), - indexLoadingConfig, schema, llcSegmentName, consumerCoordinator, _serverMetrics, addFailureToCommits); + indexLoadingConfig, schema, llcSegmentName, consumerCoordinator, _serverMetrics, addFailureToCommits, + partitionDedupMetadataManager, isTableReadyToConsumeData); } } diff --git a/pinot-integration-tests/src/test/resources/dedupPauselessIngestionTestData.tar.gz b/pinot-integration-tests/src/test/resources/dedupPauselessIngestionTestData.tar.gz new file mode 100644 index 0000000000..1a34d77ec3 Binary files /dev/null and b/pinot-integration-tests/src/test/resources/dedupPauselessIngestionTestData.tar.gz differ diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DisasterRecoveryMode.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DisasterRecoveryMode.java new file mode 100644 index 0000000000..4876332c46 --- /dev/null +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/DisasterRecoveryMode.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.config.table; + +/** + * Recovery mode which is used to decide how to recover a segment online in IS but having no completed (immutable) + * replica on any server in pause-less ingestion + */ +public enum DisasterRecoveryMode { + // ALWAYS means Pinot will always run the Disaster Recovery Job. + ALWAYS, + // DEFAULT means Pinot will skip the Disaster Recovery Job for tables like Dedup/Partial-Upsert where consistency + // of data is higher in priority than availability. Features like Dedup/Partial-Upsert requires ingestion to only + // happen in strict order (i.e. a segment ingested in past cannot be re-ingested if server has consumed the + // following segments to it). So in case of above Disaster scenario, there's only a fix requiring bulk delete of + // segments. + DEFAULT +} diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java index da17a2c020..4dcea288c9 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import javax.annotation.Nullable; import org.apache.pinot.spi.config.BaseJsonConfig; +import org.apache.pinot.spi.config.table.DisasterRecoveryMode; /** @@ -54,6 +55,10 @@ public class StreamIngestionConfig extends BaseJsonConfig { @JsonPropertyDescription("Policy to determine the behaviour of parallel consumption.") private ParallelSegmentConsumptionPolicy _parallelSegmentConsumptionPolicy; + @JsonPropertyDescription("Recovery mode which is used to decide how to recover a segment online in IS but having no" + + " completed (immutable) replica on any server in pause-less ingestion") + private DisasterRecoveryMode _disasterRecoveryMode = DisasterRecoveryMode.DEFAULT; + @JsonCreator public StreamIngestionConfig(@JsonProperty("streamConfigMaps") List<Map<String, String>> streamConfigMaps) { _streamConfigMaps = streamConfigMaps; @@ -111,4 +116,12 @@ public class StreamIngestionConfig extends BaseJsonConfig { public void setParallelSegmentConsumptionPolicy(ParallelSegmentConsumptionPolicy parallelSegmentConsumptionPolicy) { _parallelSegmentConsumptionPolicy = parallelSegmentConsumptionPolicy; } + + public DisasterRecoveryMode getDisasterRecoveryMode() { + return _disasterRecoveryMode; + } + + public void setDisasterRecoveryMode(DisasterRecoveryMode disasterRecoveryMode) { + _disasterRecoveryMode = disasterRecoveryMode; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org