This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 830083827a Make TableDataManagerProvider pluggable (#14470) 830083827a is described below commit 830083827aee3f0d9c4699698c57a46b775c9892 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Tue Nov 19 09:35:49 2024 -0800 Make TableDataManagerProvider pluggable (#14470) --- .../core/data/manager/InstanceDataManager.java | 5 +- .../manager/offline/DimensionTableDataManager.java | 1 + .../DefaultTableDataManagerProvider.java} | 24 +++--- .../manager/provider/TableDataManagerProvider.java | 50 +++++++++++++ .../realtime/RealtimeSegmentDataManagerTest.java | 9 ++- .../executor/QueryExecutorExceptionsTest.java | 9 ++- .../core/query/executor/QueryExecutorTest.java | 9 ++- .../pinot/queries/ExplainPlanQueriesTest.java | 9 ++- .../queries/SegmentWithNullValueVectorTest.java | 9 ++- .../segment/index/loader/IndexLoadingConfig.java | 4 +- .../org/apache/pinot/server/conf/ServerConf.java | 2 +- .../starter/helix/HelixInstanceDataManager.java | 11 ++- .../helix/HelixInstanceDataManagerConfig.java | 40 ++-------- .../config/instance/InstanceDataManagerConfig.java | 2 +- .../apache/pinot/spi/utils/CommonConstants.java | 85 ++++++++++++++-------- 15 files changed, 163 insertions(+), 106 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java index 95a135f1e4..ffacffc897 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java @@ -24,7 +24,6 @@ import java.util.Set; import java.util.function.Supplier; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; -import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -33,6 +32,7 @@ import org.apache.pinot.core.data.manager.realtime.SegmentUploader; import org.apache.pinot.core.util.SegmentRefreshSemaphore; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.spi.SegmentMetadata; +import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.env.PinotConfiguration; @@ -40,6 +40,7 @@ import org.apache.pinot.spi.env.PinotConfiguration; * The <code>InstanceDataManager</code> class is the instance level data manager, which manages all tables and segments * served by the instance. */ +@InterfaceAudience.Private @ThreadSafe public interface InstanceDataManager { @@ -49,7 +50,7 @@ public interface InstanceDataManager { * <p>NOTE: The config is the subset of server config with prefix 'pinot.server.instance' */ void init(PinotConfiguration config, HelixManager helixManager, ServerMetrics serverMetrics) - throws ConfigurationException; + throws Exception; /** * Returns the instance id. diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java index 20759934ca..6ed4edfd48 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java @@ -32,6 +32,7 @@ import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections4.CollectionUtils; import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; import org.apache.pinot.segment.spi.ImmutableSegment; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java similarity index 84% rename from pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java rename to pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java index 302df13509..fff6232943 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.core.data.manager.offline; +package org.apache.pinot.core.data.manager.provider; import com.google.common.cache.Cache; import java.util.Map; @@ -28,6 +28,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.HelixManager; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; +import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager; +import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.utils.SegmentLocks; @@ -39,15 +41,16 @@ import org.apache.pinot.spi.utils.IngestionConfigUtils; /** - * Factory for {@link TableDataManager}. + * Default implementation of {@link TableDataManagerProvider}. */ -public class TableDataManagerProvider { - private final InstanceDataManagerConfig _instanceDataManagerConfig; - private final HelixManager _helixManager; - private final SegmentLocks _segmentLocks; - private final Semaphore _segmentBuildSemaphore; +public class DefaultTableDataManagerProvider implements TableDataManagerProvider { + private InstanceDataManagerConfig _instanceDataManagerConfig; + private HelixManager _helixManager; + private SegmentLocks _segmentLocks; + private Semaphore _segmentBuildSemaphore; - public TableDataManagerProvider(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, + @Override + public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, SegmentLocks segmentLocks) { _instanceDataManagerConfig = instanceDataManagerConfig; _helixManager = helixManager; @@ -56,10 +59,7 @@ public class TableDataManagerProvider { _segmentBuildSemaphore = maxParallelSegmentBuilds > 0 ? new Semaphore(maxParallelSegmentBuilds, true) : null; } - public TableDataManager getTableDataManager(TableConfig tableConfig) { - return getTableDataManager(tableConfig, null, null, () -> true); - } - + @Override public TableDataManager getTableDataManager(TableConfig tableConfig, @Nullable ExecutorService segmentPreloadExecutor, @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache, Supplier<Boolean> isServerReadyToServeQueries) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java new file mode 100644 index 0000000000..6fc27bc4ac --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.data.manager.provider; + +import com.google.common.cache.Cache; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; +import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.helix.HelixManager; +import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.local.utils.SegmentLocks; +import org.apache.pinot.spi.annotations.InterfaceAudience; +import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; +import org.apache.pinot.spi.config.table.TableConfig; + + +/** + * Factory for {@link TableDataManager}. + */ +@InterfaceAudience.Private +public interface TableDataManagerProvider { + + void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, SegmentLocks segmentLocks); + + default TableDataManager getTableDataManager(TableConfig tableConfig) { + return getTableDataManager(tableConfig, null, null, () -> true); + } + + TableDataManager getTableDataManager(TableConfig tableConfig, @Nullable ExecutorService segmentPreloadExecutor, + @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache, + Supplier<Boolean> isServerReadyToServeQueries); +} diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java index 910edf7d9b..f7ca4530bd 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java @@ -43,7 +43,8 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.config.TableConfigUtils; -import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; +import org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider; +import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamMessageDecoder; @@ -779,9 +780,9 @@ public class RealtimeSegmentDataManagerTest { InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); - TableDataManager tableDataManager = - new TableDataManagerProvider(instanceDataManagerConfig, helixManager, new SegmentLocks()).getTableDataManager( - tableConfig); + TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); + tableDataManagerProvider.init(instanceDataManagerConfig, helixManager, new SegmentLocks()); + TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); tableDataManager.start(); tableDataManager.shutDown(); Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown()); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java index c664c2f013..12fdda9442 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java @@ -34,7 +34,8 @@ import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.core.data.manager.InstanceDataManager; -import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; +import org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider; +import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.query.request.ServerQueryRequest; import org.apache.pinot.segment.local.data.manager.TableDataManager; @@ -133,9 +134,9 @@ public class QueryExecutorExceptionsTest { // Mock the instance data manager InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(INDEX_DIR.getAbsolutePath()); - TableDataManager tableDataManager = - new TableDataManagerProvider(instanceDataManagerConfig, mock(HelixManager.class), - new SegmentLocks()).getTableDataManager(tableConfig); + TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); + tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks()); + TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); tableDataManager.start(); //we don't add index segments to the data manager to simulate numSegmentsAcquired < numSegmentsQueried InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index 404bd3efc3..4a171128c8 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -40,7 +40,8 @@ import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.common.request.context.ExpressionContext; import org.apache.pinot.common.request.context.TimeSeriesContext; import org.apache.pinot.core.data.manager.InstanceDataManager; -import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; +import org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider; +import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; import org.apache.pinot.core.operator.blocks.results.TimeSeriesResultsBlock; @@ -154,9 +155,9 @@ public class QueryExecutorTest { // Mock the instance data manager InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); - TableDataManager tableDataManager = - new TableDataManagerProvider(instanceDataManagerConfig, mock(HelixManager.class), - new SegmentLocks()).getTableDataManager(tableConfig); + TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); + tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks()); + TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); tableDataManager.start(); for (ImmutableSegment indexSegment : _indexSegments) { tableDataManager.addSegment(indexSegment); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java index dce00515d0..64d5d8e150 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java @@ -41,7 +41,8 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.DataSchema.ColumnDataType; import org.apache.pinot.core.common.ExplainPlanRows; import org.apache.pinot.core.data.manager.InstanceDataManager; -import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; +import org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider; +import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl; @@ -276,9 +277,9 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest { // Mock the instance data manager InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); - TableDataManager tableDataManager = - new TableDataManagerProvider(instanceDataManagerConfig, mock(HelixManager.class), - new SegmentLocks()).getTableDataManager(TABLE_CONFIG); + TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); + tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks()); + TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(TABLE_CONFIG); tableDataManager.start(); for (IndexSegment indexSegment : _indexSegments) { tableDataManager.addSegment((ImmutableSegment) indexSegment); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java index 3ef16168a2..04db06a140 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java @@ -36,7 +36,8 @@ import org.apache.helix.HelixManager; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.InstanceRequest; import org.apache.pinot.core.data.manager.InstanceDataManager; -import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; +import org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider; +import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; import org.apache.pinot.core.operator.blocks.results.AggregationResultsBlock; import org.apache.pinot.core.query.executor.QueryExecutor; @@ -138,9 +139,9 @@ public class SegmentWithNullValueVectorTest { // Mock the instance data manager InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); - TableDataManager tableDataManager = - new TableDataManagerProvider(instanceDataManagerConfig, mock(HelixManager.class), - new SegmentLocks()).getTableDataManager(tableConfig); + TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); + tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks()); + TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); tableDataManager.start(); tableDataManager.addSegment(_segment); _instanceDataManager = mock(InstanceDataManager.class); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java index 56558f8c3b..77512a2a38 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/IndexLoadingConfig.java @@ -42,7 +42,6 @@ import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.ReadMode; import org.apache.pinot.spi.utils.TimestampIndexUtils; @@ -160,8 +159,7 @@ public class IndexLoadingConfig { if (avgMultiValueCount != null) { _realtimeAvgMultiValueCount = Integer.parseInt(avgMultiValueCount); } - _segmentStoreURI = - _instanceDataManagerConfig.getConfig().getProperty(CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI); + _segmentStoreURI = _instanceDataManagerConfig.getSegmentStoreUri(); _segmentDirectoryLoader = _instanceDataManagerConfig.getSegmentDirectoryLoader(); Map<String, Map<String, String>> tierConfigs = _instanceDataManagerConfig.getTierConfigs(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java index a02df2aba6..fb6120be4b 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/conf/ServerConf.java @@ -105,7 +105,7 @@ public class ServerConf { } public String getInstanceDataManagerClassName() { - return _serverConf.getProperty(CONFIG_OF_INSTANCE_DATA_MANAGER_CLASS, DEFAULT_DATA_MANAGER_CLASS); + return _serverConf.getProperty(CONFIG_OF_INSTANCE_DATA_MANAGER_CLASS, DEFAULT_INSTANCE_DATA_MANAGER_CLASS); } public double getQueryLogMaxRate() { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 9476f27858..88fdfa1590 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -38,7 +38,6 @@ import java.util.concurrent.locks.Lock; import java.util.function.Supplier; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; -import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.HelixManager; @@ -49,7 +48,7 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; import org.apache.pinot.core.data.manager.InstanceDataManager; -import org.apache.pinot.core.data.manager.offline.TableDataManagerProvider; +import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; import org.apache.pinot.core.data.manager.realtime.PinotFSSegmentUploader; import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; @@ -68,6 +67,7 @@ import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -110,14 +110,17 @@ public class HelixInstanceDataManager implements InstanceDataManager { @Override public synchronized void init(PinotConfiguration config, HelixManager helixManager, ServerMetrics serverMetrics) - throws ConfigurationException { + throws Exception { LOGGER.info("Initializing Helix instance data manager"); _instanceDataManagerConfig = new HelixInstanceDataManagerConfig(config); LOGGER.info("HelixInstanceDataManagerConfig: {}", _instanceDataManagerConfig.getConfig()); _instanceId = _instanceDataManagerConfig.getInstanceId(); _helixManager = helixManager; - _tableDataManagerProvider = new TableDataManagerProvider(_instanceDataManagerConfig, helixManager, _segmentLocks); + String tableDataManagerProviderClass = _instanceDataManagerConfig.getTableDataManagerProviderClass(); + LOGGER.info("Initializing table data manager provider of class: {}", tableDataManagerProviderClass); + _tableDataManagerProvider = PluginManager.get().createInstance(tableDataManagerProviderClass); + _tableDataManagerProvider.init(_instanceDataManagerConfig, helixManager, _segmentLocks); _segmentUploader = new PinotFSSegmentUploader(_instanceDataManagerConfig.getSegmentStoreUri(), ServerSegmentCompletionProtocolHandler.getSegmentUploadRequestTimeoutMs(), serverMetrics); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java index a959e0b509..aade26f339 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerConfig.java @@ -29,7 +29,6 @@ import org.apache.pinot.common.utils.TarCompressionUtils; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.env.PinotConfiguration; -import org.apache.pinot.spi.utils.CommonConstants.Server; import org.apache.pinot.spi.utils.ReadMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,22 +45,6 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig // Average number of values in multi-valued columns in any table in this instance. // This value is used to allocate initial memory for multi-valued columns in realtime segments in consuming state. private static final String AVERAGE_MV_COUNT = "realtime.averageMultiValueEntriesPerRow"; - // Key of instance id - public static final String INSTANCE_ID = "id"; - // Key of instance data directory - public static final String INSTANCE_DATA_DIR = "dataDir"; - // Key of consumer directory - public static final String CONSUMER_DIR = "consumerDir"; - // Key of instance segment tar directory - public static final String INSTANCE_SEGMENT_TAR_DIR = "segmentTarDir"; - // Key of segment directory - public static final String INSTANCE_BOOTSTRAP_SEGMENT_DIR = "bootstrap.segment.dir"; - // Key of instance level segment read mode - public static final String READ_MODE = "readMode"; - // Key of the segment format this server can read - public static final String SEGMENT_FORMAT_VERSION = "segment.format.version"; - // Key of whether to enable reloading consuming segments - public static final String INSTANCE_RELOAD_CONSUMING_SEGMENT = "reload.consumingSegment"; // Key of segment directory loader public static final String SEGMENT_DIRECTORY_LOADER = "segment.directory.loader"; // Prefix for upsert config @@ -99,13 +82,6 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig private static final String ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR = "segment.stream.download.untar"; private static final boolean DEFAULT_ENABLE_STREAM_SEGMENT_DOWNLOAD_UNTAR = false; - // Whether memory for realtime consuming segments should be allocated off-heap. - private static final String REALTIME_OFFHEAP_ALLOCATION = "realtime.alloc.offheap"; - // And whether the allocation should be direct (default is to allocate via mmap) - // Direct memory allocation may mean setting heap size appropriately when starting JVM. - // The metric ServerGauge.REALTIME_OFFHEAP_MEMORY_USED should indicate how much memory is needed. - private static final String DIRECT_REALTIME_OFFHEAP_ALLOCATION = "realtime.alloc.offheap.direct"; - // Number of simultaneous segments that can be refreshed on one server. // Segment refresh works by loading the old as well as new versions of segments in memory, assigning // new incoming queries to use the new version. The old version is dropped when all the queries that @@ -206,18 +182,18 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig } @Override - public String getConsumerClientIdSuffix() { - return _serverConfig.getProperty(CONFIG_OF_REALTIME_SEGMENT_CONSUMER_CLIENT_ID_SUFFIX); + public String getTableDataManagerProviderClass() { + return _serverConfig.getProperty(TABLE_DATA_MANAGER_PROVIDER_CLASS, DEFAULT_TABLE_DATA_MANAGER_PROVIDER_CLASS); } @Override - public String getInstanceBootstrapSegmentDir() { - return _serverConfig.getProperty(INSTANCE_BOOTSTRAP_SEGMENT_DIR); + public String getConsumerClientIdSuffix() { + return _serverConfig.getProperty(CONSUMER_CLIENT_ID_SUFFIX); } @Override public String getSegmentStoreUri() { - return _serverConfig.getProperty(CONFIG_OF_SEGMENT_STORE_URI); + return _serverConfig.getProperty(SEGMENT_STORE_URI); } @Override @@ -232,16 +208,16 @@ public class HelixInstanceDataManagerConfig implements InstanceDataManagerConfig @Override public boolean isRealtimeOffHeapAllocation() { - return _serverConfig.getProperty(REALTIME_OFFHEAP_ALLOCATION, true); + return _serverConfig.getProperty(REALTIME_OFFHEAP_ALLOCATION, DEFAULT_REALTIME_OFFHEAP_ALLOCATION); } @Override public boolean isDirectRealtimeOffHeapAllocation() { - return _serverConfig.getProperty(DIRECT_REALTIME_OFFHEAP_ALLOCATION, false); + return _serverConfig.getProperty(REALTIME_OFFHEAP_DIRECT_ALLOCATION, DEFAULT_REALTIME_OFFHEAP_DIRECT_ALLOCATION); } public boolean shouldReloadConsumingSegment() { - return _serverConfig.getProperty(INSTANCE_RELOAD_CONSUMING_SEGMENT, Server.DEFAULT_RELOAD_CONSUMING_SEGMENT); + return _serverConfig.getProperty(RELOAD_CONSUMING_SEGMENT, DEFAULT_RELOAD_CONSUMING_SEGMENT); } @Override diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java index c9d406d19e..52e9b6f9f2 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/instance/InstanceDataManagerConfig.java @@ -35,7 +35,7 @@ public interface InstanceDataManagerConfig { String getInstanceSegmentTarDir(); - String getInstanceBootstrapSegmentDir(); + String getTableDataManagerProviderClass(); String getSegmentStoreUri(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 21800684b9..89046dec81 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -24,6 +24,7 @@ import java.math.BigDecimal; import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; import org.apache.pinot.spi.config.instance.InstanceType; @@ -379,8 +380,8 @@ public class CommonConstants { // to determine whether the query could have successfully been run on the v2 / multi-stage query engine. If not, // a counter metric will be incremented - if this counter remains 0 during regular query workload execution, it // signals that users can potentially migrate their query workload to the multistage query engine. - public static final String CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC - = "pinot.broker.enable.multistage.migration.metric"; + public static final String CONFIG_OF_BROKER_ENABLE_MULTISTAGE_MIGRATION_METRIC = + "pinot.broker.enable.multistage.migration.metric"; public static final boolean DEFAULT_ENABLE_MULTISTAGE_MIGRATION_METRIC = false; public static class Request { @@ -614,14 +615,55 @@ public class CommonConstants { public static final String QUERY_EXECUTOR_CONFIG_PREFIX = "pinot.server.query.executor"; public static final String METRICS_CONFIG_PREFIX = "pinot.server.metrics"; - public static final String CONFIG_OF_INSTANCE_ID = "pinot.server.instance.id"; - public static final String CONFIG_OF_INSTANCE_DATA_DIR = "pinot.server.instance.dataDir"; - public static final String CONFIG_OF_CONSUMER_DIR = "pinot.server.instance.consumerDir"; - public static final String CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR = "pinot.server.instance.segmentTarDir"; - public static final String CONFIG_OF_INSTANCE_READ_MODE = "pinot.server.instance.readMode"; - public static final String CONFIG_OF_INSTANCE_RELOAD_CONSUMING_SEGMENT = - "pinot.server.instance.reload.consumingSegment"; public static final String CONFIG_OF_INSTANCE_DATA_MANAGER_CLASS = "pinot.server.instance.data.manager.class"; + public static final String DEFAULT_INSTANCE_DATA_MANAGER_CLASS = + "org.apache.pinot.server.starter.helix.HelixInstanceDataManager"; + // Following configs are used in HelixInstanceDataManagerConfig, where the config prefix is trimmed. We keep the + // full config for reference purpose. + public static final String INSTANCE_ID = "id"; + public static final String CONFIG_OF_INSTANCE_ID = INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + INSTANCE_ID; + public static final String INSTANCE_DATA_DIR = "dataDir"; + public static final String CONFIG_OF_INSTANCE_DATA_DIR = + INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + INSTANCE_DATA_DIR; + public static final String DEFAULT_INSTANCE_BASE_DIR = + FileUtils.getTempDirectoryPath() + File.separator + "PinotServer"; + public static final String DEFAULT_INSTANCE_DATA_DIR = DEFAULT_INSTANCE_BASE_DIR + File.separator + "index"; + public static final String CONSUMER_DIR = "consumerDir"; + public static final String CONFIG_OF_CONSUMER_DIR = INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + CONSUMER_DIR; + public static final String INSTANCE_SEGMENT_TAR_DIR = "segmentTarDir"; + public static final String CONFIG_OF_INSTANCE_SEGMENT_TAR_DIR = + INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + INSTANCE_SEGMENT_TAR_DIR; + public static final String DEFAULT_INSTANCE_SEGMENT_TAR_DIR = + DEFAULT_INSTANCE_BASE_DIR + File.separator + "segmentTar"; + public static final String CONSUMER_CLIENT_ID_SUFFIX = "consumer.client.id.suffix"; + public static final String CONFIG_OF_CONSUMER_CLIENT_ID_SUFFIX = + INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + CONSUMER_CLIENT_ID_SUFFIX; + public static final String SEGMENT_STORE_URI = "segment.store.uri"; + public static final String CONFIG_OF_SEGMENT_STORE_URI = + INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + SEGMENT_STORE_URI; + public static final String TABLE_DATA_MANAGER_PROVIDER_CLASS = "table.data.manager.provider.class"; + public static final String CONFIG_OF_TABLE_DATA_MANAGER_PROVIDER_CLASS = + INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + TABLE_DATA_MANAGER_PROVIDER_CLASS; + public static final String DEFAULT_TABLE_DATA_MANAGER_PROVIDER_CLASS = + "org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider"; + public static final String READ_MODE = "readMode"; + public static final String CONFIG_OF_READ_MODE = INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + READ_MODE; + public static final String DEFAULT_READ_MODE = "mmap"; + public static final String SEGMENT_FORMAT_VERSION = "segment.format.version"; + public static final String CONFIG_OF_SEGMENT_FORMAT_VERSION = + INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + SEGMENT_FORMAT_VERSION; + public static final String REALTIME_OFFHEAP_ALLOCATION = "realtime.alloc.offheap"; + public static final String CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION = + INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + REALTIME_OFFHEAP_ALLOCATION; + public static final boolean DEFAULT_REALTIME_OFFHEAP_ALLOCATION = true; + public static final String REALTIME_OFFHEAP_DIRECT_ALLOCATION = "realtime.alloc.offheap.direct"; + public static final String CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION = + INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + REALTIME_OFFHEAP_DIRECT_ALLOCATION; + public static final boolean DEFAULT_REALTIME_OFFHEAP_DIRECT_ALLOCATION = false; + public static final String RELOAD_CONSUMING_SEGMENT = "reload.consumingSegment"; + public static final String CONFIG_OF_RELOAD_CONSUMING_SEGMENT = + INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." + RELOAD_CONSUMING_SEGMENT; + public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = true; // Query logger related configs public static final String CONFIG_OF_QUERY_LOG_MAX_RATE = "pinot.server.query.log.maxRatePerSecond"; @@ -671,10 +713,6 @@ public class CommonConstants { public static final String CONFIG_OF_SERVER_RESOURCE_PACKAGES = "server.restlet.api.resource.packages"; public static final String DEFAULT_SERVER_RESOURCE_PACKAGES = "org.apache.pinot.server.api.resources"; - public static final String CONFIG_OF_SEGMENT_FORMAT_VERSION = "pinot.server.instance.segment.format.version"; - public static final String CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION = "pinot.server.instance.realtime.alloc.offheap"; - public static final String CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION = - "pinot.server.instance.realtime.alloc.offheap.direct"; public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY = "pinot.server.storage.factory"; public static final String PREFIX_OF_CONFIG_OF_PINOT_CRYPTER = "pinot.server.crypter"; public static final String CONFIG_OF_VALUE_PRUNER_IN_PREDICATE_THRESHOLD = @@ -726,17 +764,7 @@ public class CommonConstants { // Default to 0.0 (no limit) public static final double DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT = 0.0; - public static final String DEFAULT_READ_MODE = "mmap"; public static final String CONFIG_OF_MMAP_DEFAULT_ADVICE = "pinot.server.mmap.advice.default"; - // Whether to reload consuming segment on scheme update - public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = true; - public static final String DEFAULT_INSTANCE_BASE_DIR = - System.getProperty("java.io.tmpdir") + File.separator + "PinotServer"; - public static final String DEFAULT_INSTANCE_DATA_DIR = DEFAULT_INSTANCE_BASE_DIR + File.separator + "index"; - public static final String DEFAULT_INSTANCE_SEGMENT_TAR_DIR = - DEFAULT_INSTANCE_BASE_DIR + File.separator + "segmentTar"; - public static final String DEFAULT_DATA_MANAGER_CLASS = - "org.apache.pinot.server.starter.helix.HelixInstanceDataManager"; public static final String DEFAULT_QUERY_EXECUTOR_CLASS = "org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl"; // The order of the pruners matters. Pruning with segment metadata ahead of those using segment data like bloom @@ -795,12 +823,8 @@ public class CommonConstants { public static final String SERVER_GRPCTLS_PREFIX = "pinot.server.grpctls"; public static final String SERVER_NETTY_PREFIX = "pinot.server.netty"; - // The complete config key is pinot.server.instance.segment.store.uri - public static final String CONFIG_OF_SEGMENT_STORE_URI = "segment.store.uri"; public static final String CONFIG_OF_LOGGER_ROOT_DIR = "pinot.server.logger.root.dir"; - public static final String CONFIG_OF_REALTIME_SEGMENT_CONSUMER_CLIENT_ID_SUFFIX = "consumer.client.id.suffix"; - public static final String LUCENE_MAX_REFRESH_THREADS = "pinot.server.lucene.max.refresh.threads"; public static final int DEFAULT_LUCENE_MAX_REFRESH_THREADS = 1; public static final String LUCENE_MIN_REFRESH_INTERVAL_MS = "pinot.server.lucene.min.refresh.interval.ms"; @@ -1022,8 +1046,7 @@ public class CommonConstants { public static final String CONFIG_OF_QUERY_KILLED_METRIC_ENABLED = "accounting.query.killed.metric.enabled"; public static final boolean DEFAULT_QUERY_KILLED_METRIC_ENABLED = false; - public static final String CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE = - "accounting.enable.thread.sampling.mse.debug"; + public static final String CONFIG_OF_ENABLE_THREAD_SAMPLING_MSE = "accounting.enable.thread.sampling.mse.debug"; public static final Boolean DEFAULT_ENABLE_THREAD_SAMPLING_MSE = true; } @@ -1256,8 +1279,8 @@ public class CommonConstants { public static final int V1 = 1; } - public static final String KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN - = "pinot.query.multistage.explain.include.segment.plan"; + public static final String KEY_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN = + "pinot.query.multistage.explain.include.segment.plan"; public static final boolean DEFAULT_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN = false; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org