This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 667fba096f Cache IndexLoadingConfig in TableDataManager (#15443) 667fba096f is described below commit 667fba096f0e0c721df1c1a9943ad57a3bacef3f Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Apr 4 12:20:56 2025 -0600 Cache IndexLoadingConfig in TableDataManager (#15443) --- .../core/data/manager/BaseTableDataManager.java | 46 ++-- .../manager/offline/DimensionTableDataManager.java | 28 +-- .../provider/DefaultTableDataManagerProvider.java | 10 +- .../manager/provider/TableDataManagerProvider.java | 12 +- .../manager/realtime/RealtimeTableDataManager.java | 31 +-- .../BaseTableDataManagerAcquireSegmentTest.java | 10 +- .../BaseTableDataManagerNeedRefreshTest.java | 275 ++++++++++++--------- .../data/manager/BaseTableDataManagerTest.java | 3 +- .../offline/DimensionTableDataManagerTest.java | 12 +- .../realtime/RealtimeSegmentDataManagerTest.java | 16 +- .../executor/QueryExecutorExceptionsTest.java | 2 +- .../core/query/executor/QueryExecutorTest.java | 2 +- .../pinot/queries/ExplainPlanQueriesTest.java | 2 +- .../queries/SegmentWithNullValueVectorTest.java | 2 +- .../FailureInjectingTableDataManagerProvider.java | 11 +- .../perf/BenchmarkDimensionTableOverhead.java | 2 +- .../local/data/manager/TableDataManager.java | 24 +- .../pinot/server/api/resources/TablesResource.java | 5 +- .../starter/helix/HelixInstanceDataManager.java | 7 +- .../apache/pinot/server/api/BaseResourceTest.java | 25 +- 20 files changed, 286 insertions(+), 239 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index c506dc336c..c7773c8b09 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -119,7 +119,6 @@ public abstract class BaseTableDataManager implements TableDataManager { protected HelixManager _helixManager; protected ZkHelixPropertyStore<ZNRecord> _propertyStore; protected SegmentLocks _segmentLocks; - protected TableConfig _tableConfig; protected String _tableNameWithType; protected String _tableDataDir; protected File _indexDir; @@ -136,9 +135,10 @@ public abstract class BaseTableDataManager implements TableDataManager { protected boolean _isStreamSegmentDownloadUntar; @Nullable protected SegmentOperationsThrottler _segmentOperationsThrottler; + // Semaphore to restrict the maximum number of parallel segment downloads from deep store for a table - private Semaphore _segmentDownloadSemaphore; - private AtomicInteger _numSegmentsAcquiredDownloadSemaphore; + protected Semaphore _segmentDownloadSemaphore; + protected AtomicInteger _numSegmentsAcquiredDownloadSemaphore; // Fixed size LRU cache with TableName - SegmentName pair as key, and segment related errors as the value. @Nullable @@ -146,11 +146,14 @@ public abstract class BaseTableDataManager implements TableDataManager { // Cache used for identifying segments which could not be acquired since they were recently deleted. protected Cache<String, String> _recentlyDeletedSegments; + // Caches the latest IndexLoadingConfig. The cached IndexLoadingConfig should not be modified. + protected volatile IndexLoadingConfig _indexLoadingConfig; + protected volatile boolean _shutDown; @Override public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, - SegmentLocks segmentLocks, TableConfig tableConfig, SegmentReloadSemaphore segmentReloadSemaphore, + SegmentLocks segmentLocks, TableConfig tableConfig, Schema schema, SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService segmentReloadExecutor, @Nullable ExecutorService segmentPreloadExecutor, @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache, @Nullable SegmentOperationsThrottler segmentOperationsThrottler) { @@ -161,14 +164,13 @@ public abstract class BaseTableDataManager implements TableDataManager { _helixManager = helixManager; _propertyStore = helixManager.getHelixPropertyStore(); _segmentLocks = segmentLocks; - _tableConfig = tableConfig; _segmentReloadSemaphore = segmentReloadSemaphore; _segmentReloadExecutor = segmentReloadExecutor; _segmentPreloadExecutor = segmentPreloadExecutor; - _authProvider = AuthProviderUtils.extractAuthProvider(_instanceDataManagerConfig.getAuthConfig(), null); + _authProvider = AuthProviderUtils.extractAuthProvider(instanceDataManagerConfig.getAuthConfig(), null); _tableNameWithType = tableConfig.getTableName(); - _tableDataDir = _instanceDataManagerConfig.getInstanceDataDir() + File.separator + _tableNameWithType; + _tableDataDir = instanceDataManagerConfig.getInstanceDataDir() + File.separator + _tableNameWithType; _indexDir = new File(_tableDataDir); if (!_indexDir.exists()) { Preconditions.checkState(_indexDir.mkdirs(), "Unable to create index directory at %s. " @@ -224,6 +226,7 @@ public abstract class BaseTableDataManager implements TableDataManager { _numSegmentsAcquiredDownloadSemaphore = null; } _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + getClass().getSimpleName()); + createAndCacheIndexLoadingConfig(tableConfig, schema); doInit(); @@ -379,21 +382,26 @@ public abstract class BaseTableDataManager implements TableDataManager { } @Override - public Pair<TableConfig, Schema> fetchTableConfigAndSchema() { + public IndexLoadingConfig fetchIndexLoadingConfig() { TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType); Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType); Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType); - return Pair.of(tableConfig, schema); + return createAndCacheIndexLoadingConfig(tableConfig, schema); } - @Override - public IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, Schema schema) { + private IndexLoadingConfig createAndCacheIndexLoadingConfig(TableConfig tableConfig, Schema schema) { IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema); indexLoadingConfig.setTableDataDir(_tableDataDir); + _indexLoadingConfig = indexLoadingConfig; return indexLoadingConfig; } + @Override + public IndexLoadingConfig getIndexLoadingConfig() { + return _indexLoadingConfig; + } + @Override public void addNewOnlineSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) throws Exception { @@ -1258,13 +1266,14 @@ public abstract class BaseTableDataManager implements TableDataManager { } @Override - public List<StaleSegment> getStaleSegments(TableConfig tableConfig, Schema schema) { + public List<StaleSegment> getStaleSegments() { + IndexLoadingConfig indexLoadingConfig = fetchIndexLoadingConfig(); List<StaleSegment> staleSegments = new ArrayList<>(); List<SegmentDataManager> segmentDataManagers = acquireAllSegments(); - final long startTime = System.currentTimeMillis(); + long startTimeMs = System.currentTimeMillis(); try { for (SegmentDataManager segmentDataManager : segmentDataManagers) { - StaleSegment response = isSegmentStale(tableConfig, schema, segmentDataManager); + StaleSegment response = isSegmentStale(indexLoadingConfig, segmentDataManager); if (response.isStale()) { staleSegments.add(response); } @@ -1273,13 +1282,18 @@ public abstract class BaseTableDataManager implements TableDataManager { for (SegmentDataManager segmentDataManager : segmentDataManagers) { releaseSegment(segmentDataManager); } - LOGGER.info("Time Taken to get stale segments: {} ms", System.currentTimeMillis() - startTime); + LOGGER.info("Time Taken to get stale segments: {} ms", System.currentTimeMillis() - startTimeMs); } return staleSegments; } - protected StaleSegment isSegmentStale(TableConfig tableConfig, Schema schema, SegmentDataManager segmentDataManager) { + @VisibleForTesting + StaleSegment isSegmentStale(IndexLoadingConfig indexLoadingConfig, SegmentDataManager segmentDataManager) { + TableConfig tableConfig = indexLoadingConfig.getTableConfig(); + Schema schema = indexLoadingConfig.getSchema(); + assert tableConfig != null && schema != null; + String tableNameWithType = tableConfig.getTableName(); Map<String, FieldIndexConfigs> indexConfigsMap = FieldIndexConfigsUtil.createIndexConfigsByColName(tableConfig, schema); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java index 5e36d847e4..442c62567c 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java @@ -35,9 +35,9 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections4.CollectionUtils; -import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; @@ -108,20 +108,20 @@ public class DimensionTableDataManager extends OfflineTableDataManager { @Override protected void doInit() { super.doInit(); - Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); - Preconditions.checkState(schema != null, "Failed to find schema for dimension table: %s", _tableNameWithType); + IndexLoadingConfig indexLoadingConfig = _indexLoadingConfig; + Schema schema = indexLoadingConfig.getSchema(); + assert schema != null; List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns), "Primary key columns must be configured for dimension table: %s", _tableNameWithType); - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType); - if (tableConfig != null) { - DimensionTableConfig dimensionTableConfig = tableConfig.getDimensionTableConfig(); - if (dimensionTableConfig != null) { - _disablePreload = dimensionTableConfig.isDisablePreload(); - _errorOnDuplicatePrimaryKey = dimensionTableConfig.isErrorOnDuplicatePrimaryKey(); - } + TableConfig tableConfig = _indexLoadingConfig.getTableConfig(); + assert tableConfig != null; + DimensionTableConfig dimensionTableConfig = tableConfig.getDimensionTableConfig(); + if (dimensionTableConfig != null) { + _disablePreload = dimensionTableConfig.isDisablePreload(); + _errorOnDuplicatePrimaryKey = dimensionTableConfig.isErrorOnDuplicatePrimaryKey(); } if (_disablePreload) { @@ -206,8 +206,8 @@ public class DimensionTableDataManager extends OfflineTableDataManager { // loading is in progress. int token = _loadToken.incrementAndGet(); - Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); - Preconditions.checkState(schema != null, "Failed to find schema for dimension table: %s", _tableNameWithType); + Schema schema = _indexLoadingConfig.getSchema(); + assert schema != null; List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns), "Primary key columns must be configured for dimension table: %s", _tableNameWithType); @@ -282,8 +282,8 @@ public class DimensionTableDataManager extends OfflineTableDataManager { // loading is in progress. int token = _loadToken.incrementAndGet(); - Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); - Preconditions.checkState(schema != null, "Failed to find schema for dimension table: %s", _tableNameWithType); + Schema schema = _indexLoadingConfig.getSchema(); + assert schema != null; List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns), "Primary key columns must be configured for dimension table: %s", _tableNameWithType); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java index 186736002d..b750016f74 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java @@ -37,6 +37,7 @@ import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler; import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.IngestionConfigUtils; @@ -64,8 +65,9 @@ public class DefaultTableDataManagerProvider implements TableDataManagerProvider } @Override - public TableDataManager getTableDataManager(TableConfig tableConfig, SegmentReloadSemaphore segmentReloadSemaphore, - ExecutorService segmentReloadExecutor, @Nullable ExecutorService segmentPreloadExecutor, + public TableDataManager getTableDataManager(TableConfig tableConfig, Schema schema, + SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService segmentReloadExecutor, + @Nullable ExecutorService segmentPreloadExecutor, @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache, Supplier<Boolean> isServerReadyToServeQueries) { TableDataManager tableDataManager; @@ -90,8 +92,8 @@ public class DefaultTableDataManagerProvider implements TableDataManagerProvider default: throw new IllegalStateException(); } - tableDataManager.init(_instanceDataManagerConfig, _helixManager, _segmentLocks, tableConfig, segmentReloadSemaphore, - segmentReloadExecutor, segmentPreloadExecutor, errorCache, _segmentOperationsThrottler); + tableDataManager.init(_instanceDataManagerConfig, _helixManager, _segmentLocks, tableConfig, schema, + segmentReloadSemaphore, segmentReloadExecutor, segmentPreloadExecutor, errorCache, _segmentOperationsThrottler); return tableDataManager; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java index 59e5a75f8c..63bfed0b0e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java @@ -34,6 +34,7 @@ import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; /** @@ -45,14 +46,15 @@ public interface TableDataManagerProvider { void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, SegmentLocks segmentLocks, @Nullable SegmentOperationsThrottler segmentOperationsThrottler); - TableDataManager getTableDataManager(TableConfig tableConfig, SegmentReloadSemaphore segmentRefreshSemaphore, - ExecutorService segmentRefreshExecutor, @Nullable ExecutorService segmentPreloadExecutor, + TableDataManager getTableDataManager(TableConfig tableConfig, Schema schema, + SegmentReloadSemaphore segmentRefreshSemaphore, ExecutorService segmentRefreshExecutor, + @Nullable ExecutorService segmentPreloadExecutor, @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache, Supplier<Boolean> isServerReadyToServeQueries); @VisibleForTesting - default TableDataManager getTableDataManager(TableConfig tableConfig) { - return getTableDataManager(tableConfig, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, - null, () -> true); + default TableDataManager getTableDataManager(TableConfig tableConfig, Schema schema) { + return getTableDataManager(tableConfig, schema, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), + null, null, () -> true); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 5d212530b2..0db35b57a4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -43,7 +43,6 @@ import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.Utils; -import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; @@ -201,28 +200,30 @@ public class RealtimeTableDataManager extends BaseTableDataManager { // Set up dedup/upsert metadata manager // NOTE: Dedup/upsert has to be set up when starting the server. Changing the table config without restarting the // server won't enable/disable them on the fly. - DedupConfig dedupConfig = _tableConfig.getDedupConfig(); + IndexLoadingConfig indexLoadingConfig = _indexLoadingConfig; + TableConfig tableConfig = indexLoadingConfig.getTableConfig(); + assert tableConfig != null; + DedupConfig dedupConfig = tableConfig.getDedupConfig(); boolean dedupEnabled = dedupConfig != null && dedupConfig.isDedupEnabled(); if (dedupEnabled) { - Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); - Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType); - + Schema schema = indexLoadingConfig.getSchema(); + assert schema != null; List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(!CollectionUtils.isEmpty(primaryKeyColumns), "Primary key columns must be configured for dedup"); - _tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(_tableConfig, schema, this, _serverMetrics, + _tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics, _instanceDataManagerConfig.getDedupConfig()); } - UpsertConfig upsertConfig = _tableConfig.getUpsertConfig(); + UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); if (upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE) { Preconditions.checkState(!dedupEnabled, "Dedup and upsert cannot be both enabled for table: %s", _tableUpsertMetadataManager); - Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); - Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType); + Schema schema = indexLoadingConfig.getSchema(); + assert schema != null; _tableUpsertMetadataManager = - TableUpsertMetadataManagerFactory.create(_tableConfig, _instanceDataManagerConfig.getUpsertConfig()); - _tableUpsertMetadataManager.init(_tableConfig, schema, this); + TableUpsertMetadataManagerFactory.create(tableConfig, _instanceDataManagerConfig.getUpsertConfig()); + _tableUpsertMetadataManager.init(tableConfig, schema, this); } _enforceConsumptionInOrder = isEnforceConsumptionInOrder(); @@ -594,8 +595,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { String segmentName = zkMetadata.getSegmentName(); Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s for segment: %s to be downloaded", status, segmentName); - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType); - Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType); + TableConfig tableConfig = _indexLoadingConfig.getTableConfig(); + assert tableConfig != null; long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig); long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs; while (System.currentTimeMillis() < deadlineMs) { @@ -868,7 +869,9 @@ public class RealtimeTableDataManager extends BaseTableDataManager { @Nullable public StreamIngestionConfig getStreamIngestionConfig() { - IngestionConfig ingestionConfig = _tableConfig.getIngestionConfig(); + TableConfig tableConfig = _indexLoadingConfig.getTableConfig(); + assert tableConfig != null; + IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); return ingestionConfig != null ? ingestionConfig.getStreamIngestionConfig() : null; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java index ad43c7c299..c861c13fc4 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java @@ -47,6 +47,7 @@ import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.util.TestUtils; import org.testng.Assert; @@ -123,11 +124,12 @@ public class BaseTableDataManagerAcquireSegmentTest { when(instanceDataManagerConfig.getDeletedSegmentsCacheSize()).thenReturn(DELETED_SEGMENTS_CACHE_SIZE); when(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes()).thenReturn(DELETED_SEGMENTS_TTL_MINUTES); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); - SegmentOperationsThrottler segmentOperationsThrottler = new SegmentOperationsThrottler( - new SegmentAllIndexPreprocessThrottler(8, 10, true), new SegmentStarTreePreprocessThrottler(4, 8, true), - new SegmentDownloadThrottler(10, 20, true)); + Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME).build(); + SegmentOperationsThrottler segmentOperationsThrottler = + new SegmentOperationsThrottler(new SegmentAllIndexPreprocessThrottler(8, 10, true), + new SegmentStarTreePreprocessThrottler(4, 8, true), new SegmentDownloadThrottler(10, 20, true)); TableDataManager tableDataManager = new OfflineTableDataManager(); - tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), tableConfig, + tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), tableConfig, schema, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, segmentOperationsThrottler); tableDataManager.start(); Field segsMapField = BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap"); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java index 5c4d4d347b..451d5ed9cf 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerNeedRefreshTest.java @@ -84,6 +84,7 @@ public class BaseTableDataManagerNeedRefreshTest { private static final TableConfig TABLE_CONFIG; private static final Schema SCHEMA; + private static final IndexLoadingConfig INDEX_LOADING_CONFIG; private static final ImmutableSegmentDataManager IMMUTABLE_SEGMENT_DATA_MANAGER; private static final BaseTableDataManager BASE_TABLE_DATA_MANAGER; @@ -93,8 +94,9 @@ public class BaseTableDataManagerNeedRefreshTest { try { TABLE_CONFIG = getTableConfigBuilder().build(); SCHEMA = getSchema(); + INDEX_LOADING_CONFIG = new IndexLoadingConfig(TABLE_CONFIG, SCHEMA); IMMUTABLE_SEGMENT_DATA_MANAGER = - createImmutableSegmentDataManager(TABLE_CONFIG, SCHEMA, "basicSegment", generateRows()); + createImmutableSegmentDataManager(INDEX_LOADING_CONFIG, "basicSegment", generateRows()); BASE_TABLE_DATA_MANAGER = BaseTableDataManagerTest.createTableManager(); } catch (Exception e) { throw new RuntimeException(e); @@ -103,7 +105,8 @@ public class BaseTableDataManagerNeedRefreshTest { protected static TableConfigBuilder getTableConfigBuilder() { return new TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME) - .setTimeColumnName(DEFAULT_TIME_COLUMN_NAME).setNullHandlingEnabled(true) + .setTimeColumnName(DEFAULT_TIME_COLUMN_NAME) + .setNullHandlingEnabled(true) .setNoDictionaryColumns(List.of(TEXT_INDEX_COLUMN)); } @@ -119,7 +122,8 @@ public class BaseTableDataManagerNeedRefreshTest { .addSingleValueDimension(FST_TEST_COLUMN, FieldSpec.DataType.STRING) .addSingleValueDimension(NULL_VALUE_COLUMN, FieldSpec.DataType.STRING) .addSingleValueDimension(DISTANCE_COLUMN_NAME, FieldSpec.DataType.INT) - .addSingleValueDimension(CARRIER_COLUMN_NAME, FieldSpec.DataType.STRING).build(); + .addSingleValueDimension(CARRIER_COLUMN_NAME, FieldSpec.DataType.STRING) + .build(); } protected static List<GenericRow> generateRows() { @@ -159,10 +163,10 @@ public class BaseTableDataManagerNeedRefreshTest { return List.of(row0, row2, row1); } - private static File createSegment(TableConfig tableConfig, Schema schema, - String segmentName, List<GenericRow> rows) + private static File createSegment(IndexLoadingConfig indexLoadingConfig, String segmentName, List<GenericRow> rows) throws Exception { - SegmentGeneratorConfig config = new SegmentGeneratorConfig(tableConfig, schema); + SegmentGeneratorConfig config = + new SegmentGeneratorConfig(indexLoadingConfig.getTableConfig(), indexLoadingConfig.getSchema()); config.setOutDir(TABLE_DATA_DIR.getAbsolutePath()); config.setSegmentName(segmentName); config.setSegmentVersion(SegmentVersion.v3); @@ -174,14 +178,12 @@ public class BaseTableDataManagerNeedRefreshTest { return new File(TABLE_DATA_DIR, segmentName); } - private static ImmutableSegmentDataManager createImmutableSegmentDataManager(TableConfig tableConfig, Schema schema, + private static ImmutableSegmentDataManager createImmutableSegmentDataManager(IndexLoadingConfig indexLoadingConfig, String segmentName, List<GenericRow> rows) throws Exception { ImmutableSegmentDataManager segmentDataManager = mock(ImmutableSegmentDataManager.class); when(segmentDataManager.getSegmentName()).thenReturn(segmentName); - File indexDir = createSegment(tableConfig, schema, segmentName, rows); - - IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(tableConfig, schema); + File indexDir = createSegment(indexLoadingConfig, segmentName, rows); ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(indexDir, indexLoadingConfig, BaseTableDataManagerTest.SEGMENT_OPERATIONS_THROTTLER); when(segmentDataManager.getSegment()).thenReturn(immutableSegment); @@ -196,13 +198,15 @@ public class BaseTableDataManagerNeedRefreshTest { @Test void testAddTimeColumn() throws Exception { - TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME).setNullHandlingEnabled(true) - .setNoDictionaryColumns(Collections.singletonList(TEXT_INDEX_COLUMN)).build(); - + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(DEFAULT_TABLE_NAME) + .setNullHandlingEnabled(true) + .setNoDictionaryColumns(Collections.singletonList(TEXT_INDEX_COLUMN)) + .build(); Schema schema = new Schema.SchemaBuilder().addSingleValueDimension(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING) .addSingleValueDimension(JSON_INDEX_COLUMN, FieldSpec.DataType.JSON) - .addSingleValueDimension(FST_TEST_COLUMN, FieldSpec.DataType.STRING).build(); + .addSingleValueDimension(FST_TEST_COLUMN, FieldSpec.DataType.STRING) + .build(); + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(tableConfig, schema); GenericRow row = new GenericRow(); row.putValue(TEXT_INDEX_COLUMN, "text_index_column"); @@ -210,23 +214,22 @@ public class BaseTableDataManagerNeedRefreshTest { row.putValue(FST_TEST_COLUMN, "fst_test_column"); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, schema, "noChanges", List.of(row)); + createImmutableSegmentDataManager(indexLoadingConfig, "noChanges", List.of(row)); BaseTableDataManager tableDataManager = BaseTableDataManagerTest.createTableManager(); - StaleSegment response = - tableDataManager.isSegmentStale(tableConfig, schema, segmentDataManager); + StaleSegment response = tableDataManager.isSegmentStale(indexLoadingConfig, segmentDataManager); assertFalse(response.isStale()); // Test new time column - response = tableDataManager.isSegmentStale(getTableConfigBuilder().build(), getSchema(), segmentDataManager); + response = tableDataManager.isSegmentStale(INDEX_LOADING_CONFIG, segmentDataManager); assertTrue(response.isStale()); assertEquals(response.getReason(), "time column"); } @Test void testChangeTimeColumn() { - StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale( - getTableConfigBuilder().setTimeColumnName(MS_SINCE_EPOCH_COLUMN_NAME).build(), SCHEMA, + TableConfig tableConfig = getTableConfigBuilder().setTimeColumnName(MS_SINCE_EPOCH_COLUMN_NAME).build(); + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(tableConfig, SCHEMA), IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "time column"); @@ -237,8 +240,8 @@ public class BaseTableDataManagerNeedRefreshTest { throws Exception { Schema schema = getSchema(); schema.removeField(TEXT_INDEX_COLUMN); - StaleSegment response = - BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, IMMUTABLE_SEGMENT_DATA_MANAGER); + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema), + IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "column deleted: textColumn"); } @@ -249,9 +252,8 @@ public class BaseTableDataManagerNeedRefreshTest { Schema schema = getSchema(); schema.removeField(TEXT_INDEX_COLUMN); schema.addField(new MetricFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING, true)); - - StaleSegment response = - BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, IMMUTABLE_SEGMENT_DATA_MANAGER); + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema), + IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "field type changed: textColumn"); } @@ -262,9 +264,8 @@ public class BaseTableDataManagerNeedRefreshTest { Schema schema = getSchema(); schema.removeField(TEXT_INDEX_COLUMN); schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.INT, true)); - - StaleSegment response = - BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, IMMUTABLE_SEGMENT_DATA_MANAGER); + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema), + IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "data type changed: textColumn"); } @@ -275,9 +276,8 @@ public class BaseTableDataManagerNeedRefreshTest { Schema schema = getSchema(); schema.removeField(TEXT_INDEX_COLUMN); schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN, FieldSpec.DataType.STRING, false)); - - StaleSegment response = - BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, IMMUTABLE_SEGMENT_DATA_MANAGER); + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema), + IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "single / multi value changed: textColumn"); } @@ -288,9 +288,8 @@ public class BaseTableDataManagerNeedRefreshTest { Schema schema = getSchema(); schema.removeField(TEXT_INDEX_COLUMN_MV); schema.addField(new DimensionFieldSpec(TEXT_INDEX_COLUMN_MV, FieldSpec.DataType.STRING, true)); - - StaleSegment response = - BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, schema, IMMUTABLE_SEGMENT_DATA_MANAGER); + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(TABLE_CONFIG, schema), + IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "single / multi value changed: textColumnMV"); } @@ -298,16 +297,14 @@ public class BaseTableDataManagerNeedRefreshTest { @Test void testSortColumnMismatch() { // Check with a column that is not sorted - StaleSegment response = - BASE_TABLE_DATA_MANAGER.isSegmentStale( - getTableConfigBuilder().setSortedColumn(MS_SINCE_EPOCH_COLUMN_NAME).build(), - SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER); + TableConfig tableConfig = getTableConfigBuilder().setSortedColumn(MS_SINCE_EPOCH_COLUMN_NAME).build(); + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(tableConfig, SCHEMA); + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "sort column changed: MilliSecondsSinceEpoch"); // Check with a column that is sorted - assertFalse( - BASE_TABLE_DATA_MANAGER.isSegmentStale(getTableConfigBuilder().setSortedColumn(TEXT_INDEX_COLUMN).build(), - SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER).isStale()); + tableConfig.getIndexingConfig().setSortedColumn(List.of(TEXT_INDEX_COLUMN)); + assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER).isStale()); } @DataProvider(name = "testFilterArgs") @@ -325,9 +322,9 @@ public class BaseTableDataManagerNeedRefreshTest { null, null))).build(), "text index changed: textColumn" }, { "withFstIndex", getTableConfigBuilder().setFieldConfigList(List.of( - new FieldConfig(FST_TEST_COLUMN, FieldConfig.EncodingType.DICTIONARY, List.of(FieldConfig.IndexType.FST), - null, Map.of(FieldConfig.TEXT_FST_TYPE, FieldConfig.TEXT_NATIVE_FST_LITERAL)))).build(), - "fst index changed: DestCityName" + new FieldConfig(FST_TEST_COLUMN, FieldConfig.EncodingType.DICTIONARY, List.of(FieldConfig.IndexType.FST), null, + Map.of(FieldConfig.TEXT_FST_TYPE, + FieldConfig.TEXT_NATIVE_FST_LITERAL)))).build(), "fst index changed: DestCityName" }, { "withRangeFilter", getTableConfigBuilder().setRangeIndexColumns( List.of(MS_SINCE_EPOCH_COLUMN_NAME)).build(), "range index changed: MilliSecondsSinceEpoch" @@ -338,22 +335,22 @@ public class BaseTableDataManagerNeedRefreshTest { @Test(dataProvider = "testFilterArgs") void testFilter(String segmentName, TableConfig tableConfigWithFilter, String expectedReason) throws Exception { + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(tableConfigWithFilter, SCHEMA); ImmutableSegmentDataManager segmentWithFilter = - createImmutableSegmentDataManager(tableConfigWithFilter, SCHEMA, segmentName, generateRows()); + createImmutableSegmentDataManager(indexLoadingConfig, segmentName, generateRows()); // When TableConfig has a filter but segment does not have, needRefresh is true. - StaleSegment response = - BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfigWithFilter, SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER); + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), expectedReason); // When TableConfig does not have a filter but segment has, needRefresh is true - response = BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA, segmentWithFilter); + response = BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, segmentWithFilter); assertTrue(response.isStale()); assertEquals(response.getReason(), expectedReason); // When TableConfig has a filter AND segment also has a filter, needRefresh is false - assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfigWithFilter, SCHEMA, segmentWithFilter).isStale()); + assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, segmentWithFilter).isStale()); } @Test @@ -361,23 +358,23 @@ public class BaseTableDataManagerNeedRefreshTest { throws Exception { TableConfig partitionedTableConfig = getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig( Map.of(PARTITIONED_COLUMN_NAME, new ColumnPartitionConfig(PARTITION_FUNCTION_NAME, NUM_PARTITIONS)))).build(); + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(partitionedTableConfig, SCHEMA); ImmutableSegmentDataManager segmentWithPartition = - createImmutableSegmentDataManager(partitionedTableConfig, SCHEMA, "partitionWithModulo", generateRows()); + createImmutableSegmentDataManager(indexLoadingConfig, "partitionWithModulo", generateRows()); // when segment has no partition AND tableConfig has partitions then needRefresh = true - StaleSegment response = - BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfig, SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER); + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "partition function added: partitionedColumn"); // when segment has partitions AND tableConfig has no partitions, then needRefresh = false - assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA, segmentWithPartition).isStale()); + assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, segmentWithPartition).isStale()); // when # of partitions is different, then needRefresh = true TableConfig partitionedTableConfig40 = getTableConfigBuilder().setSegmentPartitionConfig(new SegmentPartitionConfig( Map.of(PARTITIONED_COLUMN_NAME, new ColumnPartitionConfig(PARTITION_FUNCTION_NAME, 40)))).build(); - - response = BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfig40, SCHEMA, segmentWithPartition); + response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(partitionedTableConfig40, SCHEMA), + segmentWithPartition); assertTrue(response.isStale()); assertEquals(response.getReason(), "num partitions changed: partitionedColumn"); @@ -385,8 +382,8 @@ public class BaseTableDataManagerNeedRefreshTest { TableConfig partitionedTableConfigMurmur = getTableConfigBuilder().setSegmentPartitionConfig( new SegmentPartitionConfig( Map.of(PARTITIONED_COLUMN_NAME, new ColumnPartitionConfig("murmur", NUM_PARTITIONS)))).build(); - - response = BASE_TABLE_DATA_MANAGER.isSegmentStale(partitionedTableConfigMurmur, SCHEMA, segmentWithPartition); + response = BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(partitionedTableConfigMurmur, SCHEMA), + segmentWithPartition); assertTrue(response.isStale()); assertEquals(response.getReason(), "partition function name changed: partitionedColumn"); } @@ -395,36 +392,33 @@ public class BaseTableDataManagerNeedRefreshTest { void testNullValueVector() throws Exception { TableConfig withoutNullHandling = getTableConfigBuilder().setNullHandlingEnabled(false).build(); + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(withoutNullHandling, SCHEMA); ImmutableSegmentDataManager segmentWithoutNullHandling = - createImmutableSegmentDataManager(withoutNullHandling, SCHEMA, "withoutNullHandling", generateRows()); + createImmutableSegmentDataManager(indexLoadingConfig, "withoutNullHandling", generateRows()); // If null handling is removed from table config AND segment has NVV, then NVV can be removed. needRefresh = true - StaleSegment response = - BASE_TABLE_DATA_MANAGER.isSegmentStale(withoutNullHandling, SCHEMA, IMMUTABLE_SEGMENT_DATA_MANAGER); + StaleSegment response = BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, IMMUTABLE_SEGMENT_DATA_MANAGER); assertTrue(response.isStale()); assertEquals(response.getReason(), "null value vector index removed from column: NullValueColumn"); // if NVV is added to table config AND segment does not have NVV, then it cannot be added. needRefresh = false - assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(TABLE_CONFIG, SCHEMA, segmentWithoutNullHandling).isStale()); + assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, segmentWithoutNullHandling).isStale()); } @Test - // Test 1 : Adding a StarTree index should trigger segment refresh. - public void addStartreeIndex() - throws Exception { + public void addStartreeIndex() { + // Test 1 : Adding a StarTree index should trigger segment refresh. + StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Collections.singletonList("Carrier"), null, Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); - TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); - ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(getTableConfigBuilder().build(), SCHEMA, _testName, generateRows()); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfig, SCHEMA, segmentDataManager).isStale()); + assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(tableConfig, SCHEMA), + IMMUTABLE_SEGMENT_DATA_MANAGER).isStale()); } @Test public void testStarTreeIndexWithDifferentColumn() throws Exception { - // Test 2: Adding a new StarTree index with split dimension column of same size but with different element should // trigger segment refresh. @@ -433,20 +427,21 @@ public class BaseTableDataManagerNeedRefreshTest { Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); // Create a StarTree index on Distance. StarTreeIndexConfig newStarTreeIndexConfig = new StarTreeIndexConfig(Collections.singletonList("Distance"), null, Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA, segmentDataManager).isStale()); + assertTrue( + BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + .isStale()); } @Test public void testStarTreeIndexWithManyColumns() throws Exception { - // Test 3: Adding a new StarTree index with split dimension columns of different size should trigger segment // refresh. @@ -455,19 +450,20 @@ public class BaseTableDataManagerNeedRefreshTest { Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); StarTreeIndexConfig newStarTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA, segmentDataManager).isStale()); + assertTrue( + BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + .isStale()); } @Test public void testStartIndexWithDifferentOrder() throws Exception { - // Test 4: Adding a new StarTree index with the differently ordered split dimension columns should trigger // segment refresh. @@ -476,33 +472,39 @@ public class BaseTableDataManagerNeedRefreshTest { Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); // Create a StarTree index. StarTreeIndexConfig newStarTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("Distance", "Carrier"), null, Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); - TableConfig newConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, segmentDataManager).isStale()); + TableConfig newTableConfig = + getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); + assertTrue( + BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + .isStale()); } @Test void testStarTreeIndexWithSkipDimCols() throws Exception { - // Test 5: Adding a new StarTree index with skipped dimension columns should trigger segment refresh. // Create a segment with StarTree index on Carrier, Distance. + StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); // Create a StarTree index. StarTreeIndexConfig newStarTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), Arrays.asList("Carrier", "Distance"), Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); - TableConfig newConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, segmentDataManager).isStale()); + TableConfig newTableConfig = + getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); + assertTrue( + BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + .isStale()); } @Test @@ -510,35 +512,43 @@ public class BaseTableDataManagerNeedRefreshTest { throws Exception { // Test 6: Adding a new StarTree index with skipped dimension columns in different order should not trigger // segment refresh. + StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), Arrays.asList("Carrier", "Distance"), Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); StarTreeIndexConfig newStarTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), Arrays.asList("Distance", "Carrier"), Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); - TableConfig newConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); - assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, segmentDataManager).isStale()); + TableConfig newTableConfig = + getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); + assertFalse( + BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + .isStale()); } @Test void testStarTreeIndexRemoveSkipDimCols() throws Exception { // Test 7: Adding a new StarTree index with removed skipped-dimension column should trigger segment refresh. + StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), Arrays.asList("Carrier", "Distance"), Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); StarTreeIndexConfig newStarTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); - TableConfig newConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, segmentDataManager).isStale()); + TableConfig newTableConfig = + getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); + assertTrue( + BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + .isStale()); } @Test @@ -548,15 +558,17 @@ public class BaseTableDataManagerNeedRefreshTest { StarTreeIndexConfig starTreeIndex = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); - TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndex)).build(); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); - StarTreeIndexConfig starTreeIndexAddAggFn = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, + StarTreeIndexConfig newStarTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, Arrays.asList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName(), "MAX__Distance"), null, 100); - TableConfig newConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexAddAggFn)).build(); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, segmentDataManager).isStale()); + TableConfig newTableConfig = + getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); + assertTrue( + BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + .isStale()); } @Test @@ -569,12 +581,15 @@ public class BaseTableDataManagerNeedRefreshTest { Arrays.asList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName(), "MAX__Distance"), null, 100); TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); StarTreeIndexConfig newStarTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, Arrays.asList("MAX__Distance", AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); - TableConfig newConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); - assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, segmentDataManager).isStale()); + TableConfig newTableConfig = + getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); + assertFalse( + BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + .isStale()); } @Test @@ -587,32 +602,39 @@ public class BaseTableDataManagerNeedRefreshTest { TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); StarTreeIndexConfig newStarTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, null, List.of(new StarTreeAggregationConfig("Distance", "MAX")), 100); - TableConfig newConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, segmentDataManager).isStale()); + TableConfig newTableConfig = + getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); + assertTrue( + BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + .isStale()); } @Test void testStarTreeIndexNewMetricAgg() throws Exception { // Test 11 : Adding a new metric aggregation function through functionColumnPairs should trigger segment refresh. + StarTreeAggregationConfig aggregationConfig = new StarTreeAggregationConfig("Distance", "MAX"); StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, null, List.of(aggregationConfig), 100); TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); // Create a StarTree index. StarTreeIndexConfig newStarTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), Collections.singletonList(aggregationConfig), 100); - TableConfig newConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, segmentDataManager).isStale()); + TableConfig newTableConfig = + getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); + assertTrue( + BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + .isStale()); } @Test @@ -627,14 +649,17 @@ public class BaseTableDataManagerNeedRefreshTest { Collections.singletonList(aggregationConfig), 100); TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); StarTreeAggregationConfig starTreeAggregationConfig2 = new StarTreeAggregationConfig("*", "count"); StarTreeIndexConfig newStarTreeIndexConfig = new StarTreeIndexConfig(Arrays.asList("Carrier", "Distance"), null, null, Arrays.asList(starTreeAggregationConfig2, aggregationConfig), 100); - TableConfig newConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); - assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, segmentDataManager).isStale()); + TableConfig newTableConfig = + getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); + assertFalse( + BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + .isStale()); } @Test @@ -644,12 +669,15 @@ public class BaseTableDataManagerNeedRefreshTest { Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); StarTreeIndexConfig newStarTreeIndexConfig = new StarTreeIndexConfig(Collections.singletonList("Carrier"), null, Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 10); - TableConfig newConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newConfig, SCHEMA, segmentDataManager).isStale()); + TableConfig newTableConfig = + getTableConfigBuilder().setStarTreeIndexConfigs(List.of(newStarTreeIndexConfig)).build(); + assertTrue( + BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + .isStale()); } @Test @@ -659,9 +687,8 @@ public class BaseTableDataManagerNeedRefreshTest { Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); - assertTrue( - BASE_TABLE_DATA_MANAGER.isSegmentStale(getTableConfigBuilder().build(), SCHEMA, segmentDataManager).isStale()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); + assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(INDEX_LOADING_CONFIG, segmentDataManager).isStale()); } @Test @@ -673,13 +700,15 @@ public class BaseTableDataManagerNeedRefreshTest { Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); StarTreeIndexConfig newStarTreeIndexConfig = new StarTreeIndexConfig(Collections.singletonList("Distance"), null, Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig, newStarTreeIndexConfig)).build(); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA, segmentDataManager).isStale()); + assertTrue( + BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + .isStale()); } @Test @@ -691,11 +720,13 @@ public class BaseTableDataManagerNeedRefreshTest { Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); newTableConfig.getIndexingConfig().setEnableDefaultStarTree(true); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA, segmentDataManager).isStale()); + assertTrue( + BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + .isStale()); } @Test @@ -706,10 +737,10 @@ public class BaseTableDataManagerNeedRefreshTest { StarTreeIndexConfig starTreeIndexConfig = new StarTreeIndexConfig(Collections.singletonList("Carrier"), null, Collections.singletonList(AggregationFunctionColumnPair.COUNT_STAR.toColumnName()), null, 100); TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); + IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(tableConfig, SCHEMA); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); - - assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(tableConfig, SCHEMA, segmentDataManager).isStale()); + createImmutableSegmentDataManager(indexLoadingConfig, _testName, generateRows()); + assertFalse(BASE_TABLE_DATA_MANAGER.isSegmentStale(indexLoadingConfig, segmentDataManager).isStale()); } @Test @@ -722,10 +753,12 @@ public class BaseTableDataManagerNeedRefreshTest { TableConfig tableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); tableConfig.getIndexingConfig().setEnableDefaultStarTree(true); ImmutableSegmentDataManager segmentDataManager = - createImmutableSegmentDataManager(tableConfig, SCHEMA, _testName, generateRows()); + createImmutableSegmentDataManager(new IndexLoadingConfig(tableConfig, SCHEMA), _testName, generateRows()); TableConfig newTableConfig = getTableConfigBuilder().setStarTreeIndexConfigs(List.of(starTreeIndexConfig)).build(); newTableConfig.getIndexingConfig().setEnableDefaultStarTree(false); - assertTrue(BASE_TABLE_DATA_MANAGER.isSegmentStale(newTableConfig, SCHEMA, segmentDataManager).isStale()); + assertTrue( + BASE_TABLE_DATA_MANAGER.isSegmentStale(new IndexLoadingConfig(newTableConfig, SCHEMA), segmentDataManager) + .isStale()); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java index c992c800da..163cd8a0c5 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java @@ -664,7 +664,8 @@ public class BaseTableDataManagerTest { private static OfflineTableDataManager createTableManager(InstanceDataManagerConfig instanceDataManagerConfig) { OfflineTableDataManager tableDataManager = new OfflineTableDataManager(); tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), DEFAULT_TABLE_CONFIG, - new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, SEGMENT_OPERATIONS_THROTTLER); + SCHEMA, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, + SEGMENT_OPERATIONS_THROTTLER); return tableDataManager; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java index 372d5ce9c2..cd5c0c7c9b 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java @@ -89,7 +89,6 @@ public class DimensionTableDataManagerTest { new SegmentDownloadThrottler(1, 2, true)); private File _indexDir; - private SegmentMetadata _segmentMetadata; private SegmentZKMetadata _segmentZKMetadata; @BeforeClass @@ -120,9 +119,9 @@ public class DimensionTableDataManagerTest { String segmentName = driver.getSegmentName(); _indexDir = new File(tableDataDir, segmentName); - _segmentMetadata = new SegmentMetadataImpl(_indexDir); + SegmentMetadata segmentMetadata = new SegmentMetadataImpl(_indexDir); _segmentZKMetadata = new SegmentZKMetadata(segmentName); - _segmentZKMetadata.setCrc(Long.parseLong(_segmentMetadata.getCrc())); + _segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc())); } @AfterClass @@ -174,7 +173,7 @@ public class DimensionTableDataManagerTest { when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); DimensionTableDataManager tableDataManager = DimensionTableDataManager.createInstanceByTableName(OFFLINE_TABLE_NAME); - tableDataManager.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), tableConfig, + tableDataManager.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), tableConfig, schema, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, SEGMENT_OPERATIONS_THROTTLER); tableDataManager.start(); return tableDataManager; @@ -294,8 +293,9 @@ public class DimensionTableDataManagerTest { Schema schemaWithExtraColumn = getSchemaWithExtraColumn(); when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null, AccessOption.PERSISTENT)).thenReturn( SchemaUtils.toZNRecord(schemaWithExtraColumn)); - tableDataManager.reloadSegment(_segmentZKMetadata.getSegmentName(), - new IndexLoadingConfig(tableConfig, schemaWithExtraColumn), _segmentZKMetadata, _segmentMetadata, false); + when(propertyStore.get("/SEGMENTS/dimBaseballTeams_OFFLINE/" + _segmentZKMetadata.getSegmentName(), null, + AccessOption.PERSISTENT)).thenReturn(_segmentZKMetadata.toZNRecord()); + tableDataManager.reloadSegment(_segmentZKMetadata.getSegmentName(), false); // Confirm the new column is available for lookup teamCitySpec = tableDataManager.getColumnFieldSpec("teamCity"); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java index 8bf32c576b..1aa0367421 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java @@ -35,12 +35,10 @@ import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; import org.apache.helix.HelixManager; -import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.protocols.SegmentCompletionProtocol; import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.core.data.manager.provider.DefaultTableDataManagerProvider; import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; @@ -74,7 +72,6 @@ import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -750,18 +747,13 @@ public class RealtimeSegmentDataManagerTest { @Test public void testShutdownTableDataManagerWillNotShutdownLeaseExtenderExecutor() throws Exception { - TableConfig tableConfig = createTableConfig(); - tableConfig.setUpsertConfig(null); - ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); - when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(TableConfigUtils.toZNRecord(tableConfig)); - HelixManager helixManager = mock(HelixManager.class); - when(helixManager.getHelixPropertyStore()).thenReturn(propertyStore); - InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); - tableDataManagerProvider.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), null); - TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); + tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null); + TableConfig tableConfig = createTableConfig(); + Schema schema = Fixtures.createSchema(); + TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig, schema); tableDataManager.start(); tableDataManager.shutDown(); Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown()); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java index 9afed0b634..7c9bd2e4f4 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java @@ -136,7 +136,7 @@ public class QueryExecutorExceptionsTest { when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(INDEX_DIR.getAbsolutePath()); TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null); - TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); + TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig, schema); tableDataManager.start(); //we don't add index segments to the data manager to simulate numSegmentsAcquired < numSegmentsQueried InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index 095a1c0a89..022d134d36 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -160,7 +160,7 @@ public class QueryExecutorTest { when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null); - TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); + TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig, schema); tableDataManager.start(); for (ImmutableSegment indexSegment : _indexSegments) { tableDataManager.addSegment(indexSegment); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java index aa2d9e2cc1..db1af9416c 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java @@ -279,7 +279,7 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest { when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null); - TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(TABLE_CONFIG); + TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(TABLE_CONFIG, SCHEMA); tableDataManager.start(); for (IndexSegment indexSegment : _indexSegments) { tableDataManager.addSegment((ImmutableSegment) indexSegment); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java index 1c509ca5e8..ffbd50a3f5 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java @@ -141,7 +141,7 @@ public class SegmentWithNullValueVectorTest { when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null); - TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); + TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig, _schema); tableDataManager.start(); tableDataManager.addSegment(_segment); _instanceDataManager = mock(InstanceDataManager.class); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java index 7fbc220806..67cca949e9 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java @@ -37,6 +37,7 @@ import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler; import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.IngestionConfigUtils; @@ -62,8 +63,9 @@ public class FailureInjectingTableDataManagerProvider implements TableDataManage } @Override - public TableDataManager getTableDataManager(TableConfig tableConfig, SegmentReloadSemaphore segmentReloadSemaphore, - ExecutorService segmentReloadExecutor, @Nullable ExecutorService segmentPreloadExecutor, + public TableDataManager getTableDataManager(TableConfig tableConfig, Schema schema, + SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService segmentReloadExecutor, + @Nullable ExecutorService segmentPreloadExecutor, @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache, Supplier<Boolean> isServerReadyToServeQueries) { TableDataManager tableDataManager; @@ -89,9 +91,8 @@ public class FailureInjectingTableDataManagerProvider implements TableDataManage default: throw new IllegalStateException(); } - tableDataManager.init(_instanceDataManagerConfig, _helixManager, _segmentLocks, tableConfig, segmentReloadSemaphore, - segmentReloadExecutor, segmentPreloadExecutor, - errorCache, null); + tableDataManager.init(_instanceDataManagerConfig, _helixManager, _segmentLocks, tableConfig, schema, + segmentReloadSemaphore, segmentReloadExecutor, segmentPreloadExecutor, errorCache, null); return tableDataManager; } } diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java index 3ed62ec35a..d2aa77321d 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java @@ -184,7 +184,7 @@ public class BenchmarkDimensionTableOverhead extends BaseQueriesTest { String tableName = TABLE_NAME + "_" + _iteration; _tableDataManager = DimensionTableDataManager.createInstanceByTableName(tableName); - _tableDataManager.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), tableConfig, + _tableDataManager.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), tableConfig, SCHEMA, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, SEGMENT_OPERATIONS_THROTTLER); _tableDataManager.start(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index ed1f96f4da..e8351e5cbe 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -53,8 +53,8 @@ public interface TableDataManager { * Initializes the table data manager. Should be called only once and before calling any other method. */ void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, SegmentLocks segmentLocks, - TableConfig tableConfig, SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService segmentReloadExecutor, - @Nullable ExecutorService segmentPreloadExecutor, + TableConfig tableConfig, Schema schema, SegmentReloadSemaphore segmentReloadSemaphore, + ExecutorService segmentReloadExecutor, @Nullable ExecutorService segmentPreloadExecutor, @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache, @Nullable SegmentOperationsThrottler segmentOperationsThrottler); @@ -294,23 +294,16 @@ public interface TableDataManager { */ SegmentZKMetadata fetchZKMetadata(String segmentName); - /** - * Fetches the table config and schema for the table from ZK. - */ - Pair<TableConfig, Schema> fetchTableConfigAndSchema(); - /** * Fetches the table config and schema for the table from ZK, then construct the index loading config with them. */ - default IndexLoadingConfig fetchIndexLoadingConfig() { - Pair<TableConfig, Schema> tableConfigSchemaPair = fetchTableConfigAndSchema(); - return getIndexLoadingConfig(tableConfigSchemaPair.getLeft(), tableConfigSchemaPair.getRight()); - } + IndexLoadingConfig fetchIndexLoadingConfig(); /** - * Constructs the index loading config for the table with the given table config and schema. + * Returns the cached latest {@link IndexLoadingConfig} for the table. The cache is refreshed when invoking + * {@link #fetchIndexLoadingConfig()}. */ - IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, Schema schema); + IndexLoadingConfig getIndexLoadingConfig(); /** * Interface to handle segment state transitions from CONSUMING to DROPPED @@ -330,9 +323,8 @@ public interface TableDataManager { /** * Return list of segment names that are stale along with reason. - * @param tableConfig Table Config of the table - * @param schema Schema of the table + * * @return List of {@link StaleSegment} with segment names and reason why it is stale */ - List<StaleSegment> getStaleSegments(TableConfig tableConfig, Schema schema); + List<StaleSegment> getStaleSegments(); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index f7038ed11f..3df7a4290e 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -99,11 +99,9 @@ import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.server.access.AccessControlFactory; import org.apache.pinot.server.api.AdminApiApplication; import org.apache.pinot.server.starter.ServerInstance; -import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; -import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.ConsumerPartitionState; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.JsonUtils; @@ -1188,8 +1186,7 @@ public class TablesResource { tableName = DatabaseUtils.translateTableName(tableName, headers); TableDataManager tableDataManager = ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName); try { - Pair<TableConfig, Schema> tableConfigSchemaPair = tableDataManager.fetchTableConfigAndSchema(); - return tableDataManager.getStaleSegments(tableConfigSchemaPair.getLeft(), tableConfigSchemaPair.getRight()); + return tableDataManager.getStaleSegments(); } catch (Exception e) { throw new WebApplicationException(e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index b79a759e76..b3ed24fa81 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -63,6 +63,7 @@ import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -294,9 +295,11 @@ public class HelixInstanceDataManager implements InstanceDataManager { tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType); } + Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableNameWithType); + Preconditions.checkState(schema != null, "Failed to find schema for table: %s", tableNameWithType); TableDataManager tableDataManager = - _tableDataManagerProvider.getTableDataManager(tableConfig, _segmentReloadSemaphore, _segmentReloadExecutor, - _segmentPreloadExecutor, _errorCache, _isServerReadyToServeQueries); + _tableDataManagerProvider.getTableDataManager(tableConfig, schema, _segmentReloadSemaphore, + _segmentReloadExecutor, _segmentPreloadExecutor, _errorCache, _isServerReadyToServeQueries); tableDataManager.start(); LOGGER.info("Created table data manager for table: {}", tableNameWithType); return tableDataManager; diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java index 1cc0b05b83..9967cade27 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java @@ -51,15 +51,16 @@ import org.apache.pinot.segment.spi.creator.SegmentIndexCreationDriver; import org.apache.pinot.server.access.AllowAllAccessFactory; import org.apache.pinot.server.starter.ServerInstance; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; -import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.NetUtils; import org.apache.pinot.spi.utils.ReadMode; import org.apache.pinot.spi.utils.StringUtil; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -68,6 +69,8 @@ import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; public abstract class BaseResourceTest { @@ -98,9 +101,9 @@ public abstract class BaseResourceTest { ServerMetrics.register(mock(ServerMetrics.class)); FileUtils.deleteQuietly(TEMP_DIR); - Assert.assertTrue(TEMP_DIR.mkdirs()); + assertTrue(TEMP_DIR.mkdirs()); URL resourceUrl = getClass().getClassLoader().getResource(AVRO_DATA_PATH); - Assert.assertNotNull(resourceUrl); + assertNotNull(resourceUrl); _avroFile = new File(resourceUrl.getFile()); // Mock the instance data manager @@ -197,13 +200,15 @@ public abstract class BaseResourceTest { protected void addTable(String tableNameWithType) { InstanceDataManagerConfig instanceDataManagerConfig = mock(InstanceDataManagerConfig.class); when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); - TableConfig tableConfig = mock(TableConfig.class); - when(tableConfig.getTableName()).thenReturn(tableNameWithType); - when(tableConfig.getValidationConfig()).thenReturn(mock(SegmentsValidationAndRetentionConfig.class)); - // NOTE: Use OfflineTableDataManager for both OFFLINE and REALTIME table because RealtimeTableDataManager requires - // table config. + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + assertNotNull(tableType); + TableConfig tableConfig = new TableConfigBuilder(tableType).setTableName(tableNameWithType).build(); + Schema schema = + new Schema.SchemaBuilder().setSchemaName(TableNameBuilder.extractRawTableName(tableNameWithType)).build(); + // NOTE: Use OfflineTableDataManager for both OFFLINE and REALTIME table because RealtimeTableDataManager performs + // more checks TableDataManager tableDataManager = new OfflineTableDataManager(); - tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), tableConfig, + tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), tableConfig, schema, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, null); tableDataManager.start(); _tableDataManagerMap.put(tableNameWithType, tableDataManager); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org