This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 61c9b8e580 Allow segment upload to deepstore only when server segment store uri is configured (#10216) 61c9b8e580 is described below commit 61c9b8e5800677c9aa82baef7101604935d7ec32 Author: Navina Ramesh <nav...@apache.org> AuthorDate: Mon Feb 6 15:09:27 2023 -0800 Allow segment upload to deepstore only when server segment store uri is configured (#10216) --- .../core/data/manager/offline/TableDataManagerProvider.java | 13 +++++++++++++ .../data/manager/realtime/LLRealtimeSegmentDataManager.java | 2 ++ .../core/data/manager/realtime/PinotFSSegmentUploader.java | 2 ++ .../core/data/manager/realtime/SegmentCommitterFactory.java | 6 ++++-- .../manager/realtime/LLRealtimeSegmentDataManagerTest.java | 1 + .../data/manager/realtime/SegmentCommitterFactoryTest.java | 6 ++++-- .../pinot/tools/admin/command/StartControllerCommand.java | 1 + .../pinot/tools/admin/command/StartServerCommand.java | 1 + 8 files changed, 28 insertions(+), 4 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java index 2a2671047a..6b83d33dce 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java @@ -19,8 +19,10 @@ package org.apache.pinot.core.data.manager.offline; import com.google.common.cache.LoadingCache; +import java.util.Map; import java.util.concurrent.Semaphore; import java.util.function.Supplier; +import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -32,6 +34,9 @@ import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig; import org.apache.pinot.segment.local.data.manager.TableDataManagerParams; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.stream.StreamConfigProperties; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.IngestionConfigUtils; /** @@ -72,6 +77,14 @@ public class TableDataManagerProvider { } break; case REALTIME: + Map<String, String> streamConfigMap = IngestionConfigUtils.getStreamConfigMap( + tableDataManagerConfig.getTableConfig()); + if (Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE)) + && StringUtils.isEmpty(tableDataManagerConfig.getInstanceDataManagerConfig().getSegmentStoreUri())) { + throw new IllegalStateException(String.format("Table has enabled %s config. But the server has not " + + "configured the segmentstore uri. Configure the server config %s", + StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE, CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI)); + } tableDataManager = new RealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToServeQueries); break; default: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 7fe3239dcd..6e43a77202 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -1000,6 +1000,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { SegmentCompletionProtocol.Response commitResponse = commit(controllerVipUrl, isSplitCommit); if (!commitResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS)) { + _segmentLogger.warn("Controller response was {} and not {}", commitResponse.getStatus(), + SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS); return false; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java index e71cd468f1..2325a44e19 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java @@ -54,6 +54,8 @@ public class PinotFSSegmentUploader implements SegmentUploader { public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) { if (_segmentStoreUriStr == null || _segmentStoreUriStr.isEmpty()) { + LOGGER.error("Missing segment store uri. Failed to upload segment file {} for {}.", segmentFile.getName(), + segmentName.getSegmentName()); return null; } Callable<URI> uploadTask = () -> { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java index 5a52103efc..37ff2e5a82 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java @@ -69,9 +69,11 @@ public class SegmentCommitterFactory { boolean uploadToFs = _streamConfig.isServerUploadToDeepStore(); String peerSegmentDownloadScheme = _tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(); - // TODO: exists for backwards compatibility. remove peerDownloadScheme non-null check once users have migrated + String segmentStoreUri = _indexLoadingConfig.getSegmentStoreURI(); + if (uploadToFs || peerSegmentDownloadScheme != null) { - segmentUploader = new PinotFSSegmentUploader(_indexLoadingConfig.getSegmentStoreURI(), + // TODO: peer scheme non-null check exists for backwards compatibility. remove check once users have migrated + segmentUploader = new PinotFSSegmentUploader(segmentStoreUri, PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS); } else { segmentUploader = new Server2ControllerSegmentUploader(_logger, diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java index fd75a9a0f0..95f19b419e 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java @@ -841,6 +841,7 @@ public class LLRealtimeSegmentDataManagerTest { when(tableDataManagerConfig.getTableName()).thenReturn(REALTIME_TABLE_NAME); when(tableDataManagerConfig.getTableType()).thenReturn(TableType.REALTIME); when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath()); + when(tableDataManagerConfig.getTableConfig()).thenReturn(tableConfig); InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4); when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java index c815d48a94..8e60b1e414 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java @@ -99,9 +99,11 @@ public class SegmentCommitterFactoryTest { Map<String, String> streamConfigMap = new HashMap<>(getMinimumStreamConfigMap()); streamConfigMap.put(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE, "true"); TableConfig config = createRealtimeTableConfig("testDeepStoreConfig", streamConfigMap).build(); + IndexLoadingConfig indexLoadingConfig = Mockito.mock(IndexLoadingConfig.class); + Mockito.when(indexLoadingConfig.getSegmentStoreURI()).thenReturn("file:///path/to/segment/store.txt"); SegmentCommitterFactory factory = new SegmentCommitterFactory(Mockito.mock(Logger.class), protocolHandler, config, - Mockito.mock(IndexLoadingConfig.class), Mockito.mock(ServerMetrics.class)); + indexLoadingConfig, Mockito.mock(ServerMetrics.class)); SegmentCommitter committer = factory.createSegmentCommitter(true, requestParams, controllerVipUrl); Assert.assertNotNull(committer); Assert.assertTrue(committer instanceof SplitSegmentCommitter); @@ -115,7 +117,7 @@ public class SegmentCommitterFactoryTest { .build(); factory = new SegmentCommitterFactory(Mockito.mock(Logger.class), protocolHandler, config1, - Mockito.mock(IndexLoadingConfig.class), Mockito.mock(ServerMetrics.class)); + indexLoadingConfig, Mockito.mock(ServerMetrics.class)); committer = factory.createSegmentCommitter(true, requestParams, controllerVipUrl); Assert.assertNotNull(committer); Assert.assertTrue(committer instanceof SplitSegmentCommitter); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java index 660ab1bfc4..1de1ca80f1 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java @@ -73,6 +73,7 @@ public class StartControllerCommand extends AbstractBaseAdminCommand implements // This can be set via the set method, or via config file input. private boolean _tenantIsolation = true; + @CommandLine.Option(names = {"-configOverride"}, required = false, split = ",") private Map<String, Object> _configOverrides = new HashMap<>(); @Override diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java index 80b2cd70bd..e515d13497 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java @@ -86,6 +86,7 @@ public class StartServerCommand extends AbstractBaseAdminCommand implements Comm // TODO support forbids = {"-serverHost", "-serverPort", "-dataDir", "-segmentDir"} private String _configFileName; + @CommandLine.Option(names = {"-configOverride"}, required = false, split = ",") private Map<String, Object> _configOverrides = new HashMap<>(); @Override --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org