This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 61c9b8e580 Allow segment upload to deepstore only when server segment 
store uri is configured (#10216)
61c9b8e580 is described below

commit 61c9b8e5800677c9aa82baef7101604935d7ec32
Author: Navina Ramesh <nav...@apache.org>
AuthorDate: Mon Feb 6 15:09:27 2023 -0800

    Allow segment upload to deepstore only when server segment store uri is 
configured (#10216)
---
 .../core/data/manager/offline/TableDataManagerProvider.java | 13 +++++++++++++
 .../data/manager/realtime/LLRealtimeSegmentDataManager.java |  2 ++
 .../core/data/manager/realtime/PinotFSSegmentUploader.java  |  2 ++
 .../core/data/manager/realtime/SegmentCommitterFactory.java |  6 ++++--
 .../manager/realtime/LLRealtimeSegmentDataManagerTest.java  |  1 +
 .../data/manager/realtime/SegmentCommitterFactoryTest.java  |  6 ++++--
 .../pinot/tools/admin/command/StartControllerCommand.java   |  1 +
 .../pinot/tools/admin/command/StartServerCommand.java       |  1 +
 8 files changed, 28 insertions(+), 4 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
index 2a2671047a..6b83d33dce 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
@@ -19,8 +19,10 @@
 package org.apache.pinot.core.data.manager.offline;
 
 import com.google.common.cache.LoadingCache;
+import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.function.Supplier;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -32,6 +34,9 @@ import 
org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig;
 import org.apache.pinot.segment.local.data.manager.TableDataManagerParams;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
 
 
 /**
@@ -72,6 +77,14 @@ public class TableDataManagerProvider {
         }
         break;
       case REALTIME:
+        Map<String, String> streamConfigMap = 
IngestionConfigUtils.getStreamConfigMap(
+            tableDataManagerConfig.getTableConfig());
+        if 
(Boolean.parseBoolean(streamConfigMap.get(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE))
+            && 
StringUtils.isEmpty(tableDataManagerConfig.getInstanceDataManagerConfig().getSegmentStoreUri()))
 {
+          throw new IllegalStateException(String.format("Table has enabled %s 
config. But the server has not "
+              + "configured the segmentstore uri. Configure the server config 
%s",
+              StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE, 
CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI));
+        }
         tableDataManager = new 
RealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToServeQueries);
         break;
       default:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 7fe3239dcd..6e43a77202 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -1000,6 +1000,8 @@ public class LLRealtimeSegmentDataManager extends 
RealtimeSegmentDataManager {
     SegmentCompletionProtocol.Response commitResponse = 
commit(controllerVipUrl, isSplitCommit);
 
     if 
(!commitResponse.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS))
 {
+      _segmentLogger.warn("Controller response was {} and not {}", 
commitResponse.getStatus(),
+          SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_SUCCESS);
       return false;
     }
 
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
index e71cd468f1..2325a44e19 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/PinotFSSegmentUploader.java
@@ -54,6 +54,8 @@ public class PinotFSSegmentUploader implements 
SegmentUploader {
 
   public URI uploadSegment(File segmentFile, LLCSegmentName segmentName) {
     if (_segmentStoreUriStr == null || _segmentStoreUriStr.isEmpty()) {
+      LOGGER.error("Missing segment store uri. Failed to upload segment file 
{} for {}.", segmentFile.getName(),
+          segmentName.getSegmentName());
       return null;
     }
     Callable<URI> uploadTask = () -> {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
index 5a52103efc..37ff2e5a82 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactory.java
@@ -69,9 +69,11 @@ public class SegmentCommitterFactory {
 
     boolean uploadToFs = _streamConfig.isServerUploadToDeepStore();
     String peerSegmentDownloadScheme = 
_tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
-    // TODO: exists for backwards compatibility. remove peerDownloadScheme 
non-null check once users have migrated
+    String segmentStoreUri = _indexLoadingConfig.getSegmentStoreURI();
+
     if (uploadToFs || peerSegmentDownloadScheme != null) {
-      segmentUploader = new 
PinotFSSegmentUploader(_indexLoadingConfig.getSegmentStoreURI(),
+      // TODO: peer scheme non-null check exists for backwards compatibility. 
remove check once users have migrated
+      segmentUploader = new PinotFSSegmentUploader(segmentStoreUri,
           PinotFSSegmentUploader.DEFAULT_SEGMENT_UPLOAD_TIMEOUT_MILLIS);
     } else {
       segmentUploader = new Server2ControllerSegmentUploader(_logger,
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
index fd75a9a0f0..95f19b419e 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManagerTest.java
@@ -841,6 +841,7 @@ public class LLRealtimeSegmentDataManagerTest {
     
when(tableDataManagerConfig.getTableName()).thenReturn(REALTIME_TABLE_NAME);
     when(tableDataManagerConfig.getTableType()).thenReturn(TableType.REALTIME);
     
when(tableDataManagerConfig.getDataDir()).thenReturn(FileUtils.getTempDirectoryPath());
+    when(tableDataManagerConfig.getTableConfig()).thenReturn(tableConfig);
     InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
     
when(instanceDataManagerConfig.getMaxParallelSegmentBuilds()).thenReturn(4);
     
when(instanceDataManagerConfig.getStreamSegmentDownloadUntarRateLimit()).thenReturn(-1L);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
index c815d48a94..8e60b1e414 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/SegmentCommitterFactoryTest.java
@@ -99,9 +99,11 @@ public class SegmentCommitterFactoryTest {
     Map<String, String> streamConfigMap = new 
HashMap<>(getMinimumStreamConfigMap());
     streamConfigMap.put(StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE, 
"true");
     TableConfig config = createRealtimeTableConfig("testDeepStoreConfig", 
streamConfigMap).build();
+    IndexLoadingConfig indexLoadingConfig = 
Mockito.mock(IndexLoadingConfig.class);
+    
Mockito.when(indexLoadingConfig.getSegmentStoreURI()).thenReturn("file:///path/to/segment/store.txt");
 
     SegmentCommitterFactory factory = new 
SegmentCommitterFactory(Mockito.mock(Logger.class), protocolHandler, config,
-        Mockito.mock(IndexLoadingConfig.class), 
Mockito.mock(ServerMetrics.class));
+        indexLoadingConfig, Mockito.mock(ServerMetrics.class));
     SegmentCommitter committer = factory.createSegmentCommitter(true, 
requestParams, controllerVipUrl);
     Assert.assertNotNull(committer);
     Assert.assertTrue(committer instanceof SplitSegmentCommitter);
@@ -115,7 +117,7 @@ public class SegmentCommitterFactoryTest {
         .build();
 
     factory = new SegmentCommitterFactory(Mockito.mock(Logger.class), 
protocolHandler, config1,
-        Mockito.mock(IndexLoadingConfig.class), 
Mockito.mock(ServerMetrics.class));
+        indexLoadingConfig, Mockito.mock(ServerMetrics.class));
     committer = factory.createSegmentCommitter(true, requestParams, 
controllerVipUrl);
     Assert.assertNotNull(committer);
     Assert.assertTrue(committer instanceof SplitSegmentCommitter);
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java
index 660ab1bfc4..1de1ca80f1 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartControllerCommand.java
@@ -73,6 +73,7 @@ public class StartControllerCommand extends 
AbstractBaseAdminCommand implements
   // This can be set via the set method, or via config file input.
   private boolean _tenantIsolation = true;
 
+  @CommandLine.Option(names = {"-configOverride"}, required = false, split = 
",")
   private Map<String, Object> _configOverrides = new HashMap<>();
 
   @Override
diff --git 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java
 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java
index 80b2cd70bd..e515d13497 100644
--- 
a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java
+++ 
b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/StartServerCommand.java
@@ -86,6 +86,7 @@ public class StartServerCommand extends 
AbstractBaseAdminCommand implements Comm
   // TODO support forbids = {"-serverHost", "-serverPort", "-dataDir", 
"-segmentDir"}
   private String _configFileName;
 
+  @CommandLine.Option(names = {"-configOverride"}, required = false, split = 
",")
   private Map<String, Object> _configOverrides = new HashMap<>();
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to