This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 830083827a Make TableDataManagerProvider pluggable (#14470)
830083827a is described below

commit 830083827aee3f0d9c4699698c57a46b775c9892
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Tue Nov 19 09:35:49 2024 -0800

    Make TableDataManagerProvider pluggable (#14470)
---
 .../core/data/manager/InstanceDataManager.java     |  5 +-
 .../manager/offline/DimensionTableDataManager.java |  1 +
 .../DefaultTableDataManagerProvider.java}          | 24 +++---
 .../manager/provider/TableDataManagerProvider.java | 50 +++++++++++++
 .../realtime/RealtimeSegmentDataManagerTest.java   |  9 ++-
 .../executor/QueryExecutorExceptionsTest.java      |  9 ++-
 .../core/query/executor/QueryExecutorTest.java     |  9 ++-
 .../pinot/queries/ExplainPlanQueriesTest.java      |  9 ++-
 .../queries/SegmentWithNullValueVectorTest.java    |  9 ++-
 .../segment/index/loader/IndexLoadingConfig.java   |  4 +-
 .../org/apache/pinot/server/conf/ServerConf.java   |  2 +-
 .../starter/helix/HelixInstanceDataManager.java    | 11 ++-
 .../helix/HelixInstanceDataManagerConfig.java      | 40 ++--------
 .../config/instance/InstanceDataManagerConfig.java |  2 +-
 .../apache/pinot/spi/utils/CommonConstants.java    | 85 ++++++++++++++--------
 15 files changed, 163 insertions(+), 106 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 95a135f1e4..ffacffc897 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -24,7 +24,6 @@ import java.util.Set;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
-import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -33,6 +32,7 @@ import 
org.apache.pinot.core.data.manager.realtime.SegmentUploader;
 import org.apache.pinot.core.util.SegmentRefreshSemaphore;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.spi.SegmentMetadata;
+import org.apache.pinot.spi.annotations.InterfaceAudience;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
 
@@ -40,6 +40,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
  * The <code>InstanceDataManager</code> class is the instance level data 
manager, which manages all tables and segments
  * served by the instance.
  */
+@InterfaceAudience.Private
 @ThreadSafe
 public interface InstanceDataManager {
 
@@ -49,7 +50,7 @@ public interface InstanceDataManager {
    * <p>NOTE: The config is the subset of server config with prefix 
'pinot.server.instance'
    */
   void init(PinotConfiguration config, HelixManager helixManager, 
ServerMetrics serverMetrics)
-      throws ConfigurationException;
+      throws Exception;
 
   /**
    * Returns the instance id.
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index 20759934ca..6ed4edfd48 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.ImmutableSegment;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
similarity index 84%
rename from 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
rename to 
pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
index 302df13509..fff6232943 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.data.manager.offline;
+package org.apache.pinot.core.data.manager.provider;
 
 import com.google.common.cache.Cache;
 import java.util.Map;
@@ -28,6 +28,8 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager;
+import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import org.apache.pinot.segment.local.utils.SegmentLocks;
@@ -39,15 +41,16 @@ import org.apache.pinot.spi.utils.IngestionConfigUtils;
 
 
 /**
- * Factory for {@link TableDataManager}.
+ * Default implementation of {@link TableDataManagerProvider}.
  */
-public class TableDataManagerProvider {
-  private final InstanceDataManagerConfig _instanceDataManagerConfig;
-  private final HelixManager _helixManager;
-  private final SegmentLocks _segmentLocks;
-  private final Semaphore _segmentBuildSemaphore;
+public class DefaultTableDataManagerProvider implements 
TableDataManagerProvider {
+  private InstanceDataManagerConfig _instanceDataManagerConfig;
+  private HelixManager _helixManager;
+  private SegmentLocks _segmentLocks;
+  private Semaphore _segmentBuildSemaphore;
 
-  public TableDataManagerProvider(InstanceDataManagerConfig 
instanceDataManagerConfig, HelixManager helixManager,
+  @Override
+  public void init(InstanceDataManagerConfig instanceDataManagerConfig, 
HelixManager helixManager,
       SegmentLocks segmentLocks) {
     _instanceDataManagerConfig = instanceDataManagerConfig;
     _helixManager = helixManager;
@@ -56,10 +59,7 @@ public class TableDataManagerProvider {
     _segmentBuildSemaphore = maxParallelSegmentBuilds > 0 ? new 
Semaphore(maxParallelSegmentBuilds, true) : null;
   }
 
-  public TableDataManager getTableDataManager(TableConfig tableConfig) {
-    return getTableDataManager(tableConfig, null, null, () -> true);
-  }
-
+  @Override
   public TableDataManager getTableDataManager(TableConfig tableConfig, 
@Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
       Supplier<Boolean> isServerReadyToServeQueries) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
new file mode 100644
index 0000000000..6fc27bc4ac
--- /dev/null
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.provider;
+
+import com.google.common.cache.Cache;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.HelixManager;
+import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentLocks;
+import org.apache.pinot.spi.annotations.InterfaceAudience;
+import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+
+
+/**
+ * Factory for {@link TableDataManager}.
+ */
+@InterfaceAudience.Private
+public interface TableDataManagerProvider {
+
+  void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager 
helixManager, SegmentLocks segmentLocks);
+
+  default TableDataManager getTableDataManager(TableConfig tableConfig) {
+    return getTableDataManager(tableConfig, null, null, () -> true);
+  }
+
+  TableDataManager getTableDataManager(TableConfig tableConfig, @Nullable 
ExecutorService segmentPreloadExecutor,
+      @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
+      Supplier<Boolean> isServerReadyToServeQueries);
+}
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 910edf7d9b..f7ca4530bd 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -43,7 +43,8 @@ import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
-import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import 
org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider;
+import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
 import 
org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder;
@@ -779,9 +780,9 @@ public class RealtimeSegmentDataManagerTest {
 
     InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
-    TableDataManager tableDataManager =
-        new TableDataManagerProvider(instanceDataManagerConfig, helixManager, 
new SegmentLocks()).getTableDataManager(
-            tableConfig);
+    TableDataManagerProvider tableDataManagerProvider = new 
DefaultTableDataManagerProvider();
+    tableDataManagerProvider.init(instanceDataManagerConfig, helixManager, new 
SegmentLocks());
+    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig);
     tableDataManager.start();
     tableDataManager.shutDown();
     Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown());
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
index c664c2f013..12fdda9442 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
@@ -34,7 +34,8 @@ import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import 
org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider;
+import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
@@ -133,9 +134,9 @@ public class QueryExecutorExceptionsTest {
     // Mock the instance data manager
     InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(INDEX_DIR.getAbsolutePath());
-    TableDataManager tableDataManager =
-        new TableDataManagerProvider(instanceDataManagerConfig, 
mock(HelixManager.class),
-            new SegmentLocks()).getTableDataManager(tableConfig);
+    TableDataManagerProvider tableDataManagerProvider = new 
DefaultTableDataManagerProvider();
+    tableDataManagerProvider.init(instanceDataManagerConfig, 
mock(HelixManager.class), new SegmentLocks());
+    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig);
     tableDataManager.start();
     //we don't add index segments to the data manager to simulate 
numSegmentsAcquired < numSegmentsQueried
     InstanceDataManager instanceDataManager = mock(InstanceDataManager.class);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
index 404bd3efc3..4a171128c8 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java
@@ -40,7 +40,8 @@ import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.TimeSeriesContext;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import 
org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider;
+import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
 import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock;
@@ -154,9 +155,9 @@ public class QueryExecutorTest {
     // Mock the instance data manager
     InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
-    TableDataManager tableDataManager =
-        new TableDataManagerProvider(instanceDataManagerConfig, 
mock(HelixManager.class),
-            new SegmentLocks()).getTableDataManager(tableConfig);
+    TableDataManagerProvider tableDataManagerProvider = new 
DefaultTableDataManagerProvider();
+    tableDataManagerProvider.init(instanceDataManagerConfig, 
mock(HelixManager.class), new SegmentLocks());
+    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig);
     tableDataManager.start();
     for (ImmutableSegment indexSegment : _indexSegments) {
       tableDataManager.addSegment(indexSegment);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
index dce00515d0..64d5d8e150 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java
@@ -41,7 +41,8 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.ExplainPlanRows;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import 
org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider;
+import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.query.executor.QueryExecutor;
 import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl;
@@ -276,9 +277,9 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest 
{
     // Mock the instance data manager
     InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
-    TableDataManager tableDataManager =
-        new TableDataManagerProvider(instanceDataManagerConfig, 
mock(HelixManager.class),
-            new SegmentLocks()).getTableDataManager(TABLE_CONFIG);
+    TableDataManagerProvider tableDataManagerProvider = new 
DefaultTableDataManagerProvider();
+    tableDataManagerProvider.init(instanceDataManagerConfig, 
mock(HelixManager.class), new SegmentLocks());
+    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(TABLE_CONFIG);
     tableDataManager.start();
     for (IndexSegment indexSegment : _indexSegments) {
       tableDataManager.addSegment((ImmutableSegment) indexSegment);
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
index 3ef16168a2..04db06a140 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java
@@ -36,7 +36,8 @@ import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.InstanceRequest;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import 
org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider;
+import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
 import org.apache.pinot.core.operator.blocks.InstanceResponseBlock;
 import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock;
 import org.apache.pinot.core.query.executor.QueryExecutor;
@@ -138,9 +139,9 @@ public class SegmentWithNullValueVectorTest {
     // Mock the instance data manager
     InstanceDataManagerConfig instanceDataManagerConfig = 
mock(InstanceDataManagerConfig.class);
     
when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath());
-    TableDataManager tableDataManager =
-        new TableDataManagerProvider(instanceDataManagerConfig, 
mock(HelixManager.class),
-            new SegmentLocks()).getTableDataManager(tableConfig);
+    TableDataManagerProvider tableDataManagerProvider = new 
DefaultTableDataManagerProvider();
+    tableDataManagerProvider.init(instanceDataManagerConfig, 
mock(HelixManager.class), new SegmentLocks());
+    TableDataManager tableDataManager = 
tableDataManagerProvider.getTableDataManager(tableConfig);
     tableDataManager.start();
     tableDataManager.addSegment(_segment);
     _instanceDataManager = mock(InstanceDataManager.class);
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
index 56558f8c3b..77512a2a38 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java
@@ -42,7 +42,6 @@ import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.TimestampIndexUtils;
 
@@ -160,8 +159,7 @@ public class IndexLoadingConfig {
     if (avgMultiValueCount != null) {
       _realtimeAvgMultiValueCount = Integer.parseInt(avgMultiValueCount);
     }
-    _segmentStoreURI =
-        
_instanceDataManagerConfig.getConfig().getProperty(CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI);
+    _segmentStoreURI = _instanceDataManagerConfig.getSegmentStoreUri();
     _segmentDirectoryLoader = 
_instanceDataManagerConfig.getSegmentDirectoryLoader();
 
     Map<String, Map<String, String>> tierConfigs = 
_instanceDataManagerConfig.getTierConfigs();
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java 
b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
index a02df2aba6..fb6120be4b 100644
--- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
+++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java
@@ -105,7 +105,7 @@ public class ServerConf {
   }
 
   public String getInstanceDataManagerClassName() {
-    return _serverConf.getProperty(CONFIG_OF_INSTANCE_DATA_MANAGER_CLASS, 
DEFAULT_DATA_MANAGER_CLASS);
+    return _serverConf.getProperty(CONFIG_OF_INSTANCE_DATA_MANAGER_CLASS, 
DEFAULT_INSTANCE_DATA_MANAGER_CLASS);
   }
 
   public double getQueryLogMaxRate() {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 9476f27858..88fdfa1590 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -38,7 +38,6 @@ import java.util.concurrent.locks.Lock;
 import java.util.function.Supplier;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
-import org.apache.commons.configuration2.ex.ConfigurationException;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.HelixManager;
@@ -49,7 +48,7 @@ import 
org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
-import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider;
+import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
 import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader;
 import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
 import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
@@ -68,6 +67,7 @@ import 
org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -110,14 +110,17 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
 
   @Override
   public synchronized void init(PinotConfiguration config, HelixManager 
helixManager, ServerMetrics serverMetrics)
-      throws ConfigurationException {
+      throws Exception {
     LOGGER.info("Initializing Helix instance data manager");
 
     _instanceDataManagerConfig = new HelixInstanceDataManagerConfig(config);
     LOGGER.info("HelixInstanceDataManagerConfig: {}", 
_instanceDataManagerConfig.getConfig());
     _instanceId = _instanceDataManagerConfig.getInstanceId();
     _helixManager = helixManager;
-    _tableDataManagerProvider = new 
TableDataManagerProvider(_instanceDataManagerConfig, helixManager, 
_segmentLocks);
+    String tableDataManagerProviderClass = 
_instanceDataManagerConfig.getTableDataManagerProviderClass();
+    LOGGER.info("Initializing table data manager provider of class: {}", 
tableDataManagerProviderClass);
+    _tableDataManagerProvider = 
PluginManager.get().createInstance(tableDataManagerProviderClass);
+    _tableDataManagerProvider.init(_instanceDataManagerConfig, helixManager, 
_segmentLocks);
     _segmentUploader = new 
PinotFSSegmentUploader(_instanceDataManagerConfig.getSegmentStoreUri(),
         
ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), 
serverMetrics);
 
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
index a959e0b509..aade26f339 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java
@@ -29,7 +29,6 @@ import org.apache.pinot.common.utils.TarCompressionUtils;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry;
 import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig;
 import org.apache.pinot.spi.env.PinotConfiguration;
-import org.apache.pinot.spi.utils.CommonConstants.Server;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -46,22 +45,6 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   // Average number of values in multi-valued columns in any table in this 
instance.
   // This value is used to allocate initial memory for multi-valued columns in 
realtime segments in consuming state.
   private static final String AVERAGE_MV_COUNT = 
"realtime.averageMultiValueEntriesPerRow";
-  // Key of instance id
-  public static final String INSTANCE_ID = "id";
-  // Key of instance data directory
-  public static final String INSTANCE_DATA_DIR = "dataDir";
-  // Key of consumer directory
-  public static final String CONSUMER_DIR = "consumerDir";
-  // Key of instance segment tar directory
-  public static final String INSTANCE_SEGMENT_TAR_DIR = "segmentTarDir";
-  // Key of segment directory
-  public static final String INSTANCE_BOOTSTRAP_SEGMENT_DIR = 
"bootstrap.segment.dir";
-  // Key of instance level segment read mode
-  public static final String READ_MODE = "readMode";
-  // Key of the segment format this server can read
-  public static final String SEGMENT_FORMAT_VERSION = "segment.format.version";
-  // Key of whether to enable reloading consuming segments
-  public static final String INSTANCE_RELOAD_CONSUMING_SEGMENT = 
"reload.consumingSegment";
   // Key of segment directory loader
   public static final String SEGMENT_DIRECTORY_LOADER = 
"segment.directory.loader";
   // Prefix for upsert config
@@ -99,13 +82,6 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   private static final String ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR = 
"segment.stream.download.untar";
   private static final boolean DEFAULT_ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR = 
false;
 
-  // Whether memory for realtime consuming segments should be allocated 
off-heap.
-  private static final String REALTIME_OFFHEAP_ALLOCATION = 
"realtime.alloc.offheap";
-  // And whether the allocation should be direct (default is to allocate via 
mmap)
-  // Direct memory allocation may mean setting heap size appropriately when 
starting JVM.
-  // The metric ServerGauge.REALTIME_OFFHEAP_MEMORY_USED should indicate how 
much memory is needed.
-  private static final String DIRECT_REALTIME_OFFHEAP_ALLOCATION = 
"realtime.alloc.offheap.direct";
-
   // Number of simultaneous segments that can be refreshed on one server.
   // Segment refresh works by loading the old as well as new versions of 
segments in memory, assigning
   // new incoming queries to use the new version. The old version is dropped 
when all the queries that
@@ -206,18 +182,18 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
   }
 
   @Override
-  public String getConsumerClientIdSuffix() {
-    return 
_serverConfig.getProperty(CONFIG_OF_REALTIME_SEGMENT_CONSUMER_CLIENT_ID_SUFFIX);
+  public String getTableDataManagerProviderClass() {
+    return _serverConfig.getProperty(TABLE_DATA_MANAGER_PROVIDER_CLASS, 
DEFAULT_TABLE_DATA_MANAGER_PROVIDER_CLASS);
   }
 
   @Override
-  public String getInstanceBootstrapSegmentDir() {
-    return _serverConfig.getProperty(INSTANCE_BOOTSTRAP_SEGMENT_DIR);
+  public String getConsumerClientIdSuffix() {
+    return _serverConfig.getProperty(CONSUMER_CLIENT_ID_SUFFIX);
   }
 
   @Override
   public String getSegmentStoreUri() {
-    return _serverConfig.getProperty(CONFIG_OF_SEGMENT_STORE_URI);
+    return _serverConfig.getProperty(SEGMENT_STORE_URI);
   }
 
   @Override
@@ -232,16 +208,16 @@ public class HelixInstanceDataManagerConfig implements 
InstanceDataManagerConfig
 
   @Override
   public boolean isRealtimeOffHeapAllocation() {
-    return _serverConfig.getProperty(REALTIME_OFFHEAP_ALLOCATION, true);
+    return _serverConfig.getProperty(REALTIME_OFFHEAP_ALLOCATION, 
DEFAULT_REALTIME_OFFHEAP_ALLOCATION);
   }
 
   @Override
   public boolean isDirectRealtimeOffHeapAllocation() {
-    return _serverConfig.getProperty(DIRECT_REALTIME_OFFHEAP_ALLOCATION, 
false);
+    return _serverConfig.getProperty(REALTIME_OFFHEAP_DIRECT_ALLOCATION, 
DEFAULT_REALTIME_OFFHEAP_DIRECT_ALLOCATION);
   }
 
   public boolean shouldReloadConsumingSegment() {
-    return _serverConfig.getProperty(INSTANCE_RELOAD_CONSUMING_SEGMENT, 
Server.DEFAULT_RELOAD_CONSUMING_SEGMENT);
+    return _serverConfig.getProperty(RELOAD_CONSUMING_SEGMENT, 
DEFAULT_RELOAD_CONSUMING_SEGMENT);
   }
 
   @Override
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
index c9d406d19e..52e9b6f9f2 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java
@@ -35,7 +35,7 @@ public interface InstanceDataManagerConfig {
 
   String getInstanceSegmentTarDir();
 
-  String getInstanceBootstrapSegmentDir();
+  String getTableDataManagerProviderClass();
 
   String getSegmentStoreUri();
 
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 21800684b9..89046dec81 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -24,6 +24,7 @@ import java.math.BigDecimal;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
 import org.apache.pinot.spi.config.instance.InstanceType;
 
 
@@ -379,8 +380,8 @@ public class CommonConstants {
     // to determine whether the query could have successfully been run on the 
v2 / multi-stage query engine. If not,
     // a counter metric will be incremented - if this counter remains 0 during 
regular query workload execution, it
     // signals that users can potentially migrate their query workload to the 
multistage query engine.
-    public static final String 
CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC
-        = "pinot.broker.enable.multistage.migration.metric";
+    public static final String 
CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC =
+        "pinot.broker.enable.multistage.migration.metric";
     public static final boolean DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC = 
false;
 
     public static class Request {
@@ -614,14 +615,55 @@ public class CommonConstants {
     public static final String QUERY_EXECUTOR_CONFIG_PREFIX = 
"pinot.server.query.executor";
     public static final String METRICS_CONFIG_PREFIX = "pinot.server.metrics";
 
-    public static final String CONFIG_OF_INSTANCE_ID = 
"pinot.server.instance.id";
-    public static final String CONFIG_OF_INSTANCE_DATA_DIR = 
"pinot.server.instance.dataDir";
-    public static final String CONFIG_OF_CONSUMER_DIR = 
"pinot.server.instance.consumerDir";
-    public static final String CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR = 
"pinot.server.instance.segmentTarDir";
-    public static final String CONFIG_OF_INSTANCE_READ_MODE = 
"pinot.server.instance.readMode";
-    public static final String CONFIG_OF_INSTANCE_RELOAD_CONSUMING_SEGMENT =
-        "pinot.server.instance.reload.consumingSegment";
     public static final String CONFIG_OF_INSTANCE_DATA_MANAGER_CLASS = 
"pinot.server.instance.data.manager.class";
+    public static final String DEFAULT_INSTANCE_DATA_MANAGER_CLASS =
+        "org.apache.pinot.server.starter.helix.HelixInstanceDataManager";
+    // Following configs are used in HelixInstanceDataManagerConfig, where the 
config prefix is trimmed. We keep the
+    // full config for reference purpose.
+    public static final String INSTANCE_ID = "id";
+    public static final String CONFIG_OF_INSTANCE_ID = 
INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + INSTANCE_ID;
+    public static final String INSTANCE_DATA_DIR = "dataDir";
+    public static final String CONFIG_OF_INSTANCE_DATA_DIR =
+        INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + INSTANCE_DATA_DIR;
+    public static final String DEFAULT_INSTANCE_BASE_DIR =
+        FileUtils.getTempDirectoryPath() + File.separator + "PinotServer";
+    public static final String DEFAULT_INSTANCE_DATA_DIR = 
DEFAULT_INSTANCE_BASE_DIR + File.separator + "index";
+    public static final String CONSUMER_DIR = "consumerDir";
+    public static final String CONFIG_OF_CONSUMER_DIR = 
INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + CONSUMER_DIR;
+    public static final String INSTANCE_SEGMENT_TAR_DIR = "segmentTarDir";
+    public static final String CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR =
+        INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + INSTANCE_SEGMENT_TAR_DIR;
+    public static final String DEFAULT_INSTANCE_SEGMENT_TAR_DIR =
+        DEFAULT_INSTANCE_BASE_DIR + File.separator + "segmentTar";
+    public static final String CONSUMER_CLIENT_ID_SUFFIX = 
"consumer.client.id.suffix";
+    public static final String CONFIG_OF_CONSUMER_CLIENT_ID_SUFFIX =
+        INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + CONSUMER_CLIENT_ID_SUFFIX;
+    public static final String SEGMENT_STORE_URI = "segment.store.uri";
+    public static final String CONFIG_OF_SEGMENT_STORE_URI =
+        INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + SEGMENT_STORE_URI;
+    public static final String TABLE_DATA_MANAGER_PROVIDER_CLASS = 
"table.data.manager.provider.class";
+    public static final String CONFIG_OF_TABLE_DATA_MANAGER_PROVIDER_CLASS =
+        INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + 
TABLE_DATA_MANAGER_PROVIDER_CLASS;
+    public static final String DEFAULT_TABLE_DATA_MANAGER_PROVIDER_CLASS =
+        
"org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider";
+    public static final String READ_MODE = "readMode";
+    public static final String CONFIG_OF_READ_MODE = 
INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + READ_MODE;
+    public static final String DEFAULT_READ_MODE = "mmap";
+    public static final String SEGMENT_FORMAT_VERSION = 
"segment.format.version";
+    public static final String CONFIG_OF_SEGMENT_FORMAT_VERSION =
+        INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + SEGMENT_FORMAT_VERSION;
+    public static final String REALTIME_OFFHEAP_ALLOCATION = 
"realtime.alloc.offheap";
+    public static final String CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION =
+        INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + 
REALTIME_OFFHEAP_ALLOCATION;
+    public static final boolean DEFAULT_REALTIME_OFFHEAP_ALLOCATION = true;
+    public static final String REALTIME_OFFHEAP_DIRECT_ALLOCATION = 
"realtime.alloc.offheap.direct";
+    public static final String CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION =
+        INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + 
REALTIME_OFFHEAP_DIRECT_ALLOCATION;
+    public static final boolean DEFAULT_REALTIME_OFFHEAP_DIRECT_ALLOCATION = 
false;
+    public static final String RELOAD_CONSUMING_SEGMENT = 
"reload.consumingSegment";
+    public static final String CONFIG_OF_RELOAD_CONSUMING_SEGMENT =
+        INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + RELOAD_CONSUMING_SEGMENT;
+    public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = true;
 
     // Query logger related configs
     public static final String CONFIG_OF_QUERY_LOG_MAX_RATE = 
"pinot.server.query.log.maxRatePerSecond";
@@ -671,10 +713,6 @@ public class CommonConstants {
     public static final String CONFIG_OF_SERVER_RESOURCE_PACKAGES = 
"server.restlet.api.resource.packages";
     public static final String DEFAULT_SERVER_RESOURCE_PACKAGES = 
"org.apache.pinot.server.api.resources";
 
-    public static final String CONFIG_OF_SEGMENT_FORMAT_VERSION = 
"pinot.server.instance.segment.format.version";
-    public static final String CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION = 
"pinot.server.instance.realtime.alloc.offheap";
-    public static final String CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION =
-        "pinot.server.instance.realtime.alloc.offheap.direct";
     public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY = 
"pinot.server.storage.factory";
     public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER = 
"pinot.server.crypter";
     public static final String CONFIG_OF_VALUE_PRUNER_IN_PREDICATE_THRESHOLD =
@@ -726,17 +764,7 @@ public class CommonConstants {
     // Default to 0.0 (no limit)
     public static final double DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT = 0.0;
 
-    public static final String DEFAULT_READ_MODE = "mmap";
     public static final String CONFIG_OF_MMAP_DEFAULT_ADVICE = 
"pinot.server.mmap.advice.default";
-    // Whether to reload consuming segment on scheme update
-    public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = true;
-    public static final String DEFAULT_INSTANCE_BASE_DIR =
-        System.getProperty("java.io.tmpdir") + File.separator + "PinotServer";
-    public static final String DEFAULT_INSTANCE_DATA_DIR = 
DEFAULT_INSTANCE_BASE_DIR + File.separator + "index";
-    public static final String DEFAULT_INSTANCE_SEGMENT_TAR_DIR =
-        DEFAULT_INSTANCE_BASE_DIR + File.separator + "segmentTar";
-    public static final String DEFAULT_DATA_MANAGER_CLASS =
-        "org.apache.pinot.server.starter.helix.HelixInstanceDataManager";
     public static final String DEFAULT_QUERY_EXECUTOR_CLASS =
         "org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl";
     // The order of the pruners matters. Pruning with segment metadata ahead 
of those using segment data like bloom
@@ -795,12 +823,8 @@ public class CommonConstants {
     public static final String SERVER_GRPCTLS_PREFIX = "pinot.server.grpctls";
     public static final String SERVER_NETTY_PREFIX = "pinot.server.netty";
 
-    // The complete config key is pinot.server.instance.segment.store.uri
-    public static final String CONFIG_OF_SEGMENT_STORE_URI = 
"segment.store.uri";
     public static final String CONFIG_OF_LOGGER_ROOT_DIR = 
"pinot.server.logger.root.dir";
 
-    public static final String 
CONFIG_OF_REALTIME_SEGMENT_CONSUMER_CLIENT_ID_SUFFIX = 
"consumer.client.id.suffix";
-
     public static final String LUCENE_MAX_REFRESH_THREADS = 
"pinot.server.lucene.max.refresh.threads";
     public static final int DEFAULT_LUCENE_MAX_REFRESH_THREADS = 1;
     public static final String LUCENE_MIN_REFRESH_INTERVAL_MS = 
"pinot.server.lucene.min.refresh.interval.ms";
@@ -1022,8 +1046,7 @@ public class CommonConstants {
     public static final String CONFIG_OF_QUERY_KILLED_METRIC_ENABLED = 
"accounting.query.killed.metric.enabled";
     public static final boolean DEFAULT_QUERY_KILLED_METRIC_ENABLED = false;
 
-    public static final String CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE =
-        "accounting.enable.thread.sampling.mse.debug";
+    public static final String CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE = 
"accounting.enable.thread.sampling.mse.debug";
     public static final Boolean DEFAULT_ENABLE_THREAD_SAMPLING_MSE = true;
   }
 
@@ -1256,8 +1279,8 @@ public class CommonConstants {
       public static final int V1 = 1;
     }
 
-    public static final String KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN
-        = "pinot.query.multistage.explain.include.segment.plan";
+    public static final String KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN =
+        "pinot.query.multistage.explain.include.segment.plan";
     public static final boolean 
DEFAULT_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN = false;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org


Reply via email to