This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch table_data_manager_cache_config in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 078808af4c92d417004684b44a90566d2b645991 Author: Xiaotian (Jackie) Jiang <jackie....@gmail.com> AuthorDate: Wed Apr 2 18:48:49 2025 -0700 Cache TableConfig and Schema in TableDataManager --- .../core/data/manager/BaseTableDataManager.java | 44 +++++++++++++++------- .../manager/offline/DimensionTableDataManager.java | 22 ++++------- .../provider/DefaultTableDataManagerProvider.java | 10 +++-- .../manager/provider/TableDataManagerProvider.java | 12 +++--- .../manager/realtime/RealtimeTableDataManager.java | 15 ++++---- .../BaseTableDataManagerAcquireSegmentTest.java | 10 +++-- .../data/manager/BaseTableDataManagerTest.java | 3 +- .../offline/DimensionTableDataManagerTest.java | 12 +++--- .../realtime/RealtimeSegmentDataManagerTest.java | 3 +- .../executor/QueryExecutorExceptionsTest.java | 2 +- .../core/query/executor/QueryExecutorTest.java | 2 +- .../pinot/queries/ExplainPlanQueriesTest.java | 2 +- .../queries/SegmentWithNullValueVectorTest.java | 2 +- .../FailureInjectingTableDataManagerProvider.java | 11 +++--- .../perf/BenchmarkDimensionTableOverhead.java | 2 +- .../local/data/manager/TableDataManager.java | 26 ++++++------- .../pinot/server/api/resources/TablesResource.java | 5 +-- .../starter/helix/HelixInstanceDataManager.java | 7 +++- .../apache/pinot/server/api/BaseResourceTest.java | 3 +- 19 files changed, 105 insertions(+), 88 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index c506dc336c..254b2eb58f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -103,6 +103,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.TimestampIndexUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -119,7 +120,6 @@ public abstract class BaseTableDataManager implements TableDataManager { protected HelixManager _helixManager; protected ZkHelixPropertyStore<ZNRecord> _propertyStore; protected SegmentLocks _segmentLocks; - protected TableConfig _tableConfig; protected String _tableNameWithType; protected String _tableDataDir; protected File _indexDir; @@ -136,6 +136,12 @@ public abstract class BaseTableDataManager implements TableDataManager { protected boolean _isStreamSegmentDownloadUntar; @Nullable protected SegmentOperationsThrottler _segmentOperationsThrottler; + + // Caches the latest TableConfig and Schema. The cached TableConfig and Schema already have timestamp index applied, + // and should not be modified. + protected volatile TableConfig _tableConfig; + protected volatile Schema _schema; + // Semaphore to restrict the maximum number of parallel segment downloads from deep store for a table private Semaphore _segmentDownloadSemaphore; private AtomicInteger _numSegmentsAcquiredDownloadSemaphore; @@ -150,7 +156,7 @@ public abstract class BaseTableDataManager implements TableDataManager { @Override public void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, - SegmentLocks segmentLocks, TableConfig tableConfig, SegmentReloadSemaphore segmentReloadSemaphore, + SegmentLocks segmentLocks, TableConfig tableConfig, Schema schema, SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService segmentReloadExecutor, @Nullable ExecutorService segmentPreloadExecutor, @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache, @Nullable SegmentOperationsThrottler segmentOperationsThrottler) { @@ -161,14 +167,16 @@ public abstract class BaseTableDataManager implements TableDataManager { _helixManager = helixManager; _propertyStore = helixManager.getHelixPropertyStore(); _segmentLocks = segmentLocks; + TimestampIndexUtils.applyTimestampIndex(tableConfig, schema); _tableConfig = tableConfig; + _schema = schema; _segmentReloadSemaphore = segmentReloadSemaphore; _segmentReloadExecutor = segmentReloadExecutor; _segmentPreloadExecutor = segmentPreloadExecutor; - _authProvider = AuthProviderUtils.extractAuthProvider(_instanceDataManagerConfig.getAuthConfig(), null); + _authProvider = AuthProviderUtils.extractAuthProvider(instanceDataManagerConfig.getAuthConfig(), null); _tableNameWithType = tableConfig.getTableName(); - _tableDataDir = _instanceDataManagerConfig.getInstanceDataDir() + File.separator + _tableNameWithType; + _tableDataDir = instanceDataManagerConfig.getInstanceDataDir() + File.separator + _tableNameWithType; _indexDir = new File(_tableDataDir); if (!_indexDir.exists()) { Preconditions.checkState(_indexDir.mkdirs(), "Unable to create index directory at %s. " @@ -379,21 +387,29 @@ public abstract class BaseTableDataManager implements TableDataManager { } @Override - public Pair<TableConfig, Schema> fetchTableConfigAndSchema() { + public IndexLoadingConfig fetchIndexLoadingConfig() { TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType); Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType); Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType); - return Pair.of(tableConfig, schema); - } - - @Override - public IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, Schema schema) { IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema); indexLoadingConfig.setTableDataDir(_tableDataDir); + // NOTE: Timestamp index is already applied in the constructor of IndexLoadingConfig. + _tableConfig = tableConfig; + _schema = schema; return indexLoadingConfig; } + @Override + public TableConfig getTableConfig() { + return _tableConfig; + } + + @Override + public Schema getSchema() { + return _schema; + } + @Override public void addNewOnlineSegment(SegmentZKMetadata zkMetadata, IndexLoadingConfig indexLoadingConfig) throws Exception { @@ -1258,10 +1274,12 @@ public abstract class BaseTableDataManager implements TableDataManager { } @Override - public List<StaleSegment> getStaleSegments(TableConfig tableConfig, Schema schema) { + public List<StaleSegment> getStaleSegments() { List<StaleSegment> staleSegments = new ArrayList<>(); List<SegmentDataManager> segmentDataManagers = acquireAllSegments(); - final long startTime = System.currentTimeMillis(); + TableConfig tableConfig = _tableConfig; + Schema schema = _schema; + long startTimeMs = System.currentTimeMillis(); try { for (SegmentDataManager segmentDataManager : segmentDataManagers) { StaleSegment response = isSegmentStale(tableConfig, schema, segmentDataManager); @@ -1273,7 +1291,7 @@ public abstract class BaseTableDataManager implements TableDataManager { for (SegmentDataManager segmentDataManager : segmentDataManagers) { releaseSegment(segmentDataManager); } - LOGGER.info("Time Taken to get stale segments: {} ms", System.currentTimeMillis() - startTime); + LOGGER.info("Time Taken to get stale segments: {} ms", System.currentTimeMillis() - startTimeMs); } return staleSegments; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java index 5e36d847e4..0c0a7240ab 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java @@ -35,14 +35,12 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections4.CollectionUtils; -import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.spi.config.table.DimensionTableConfig; -import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.data.readers.GenericRow; @@ -108,20 +106,16 @@ public class DimensionTableDataManager extends OfflineTableDataManager { @Override protected void doInit() { super.doInit(); - Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); - Preconditions.checkState(schema != null, "Failed to find schema for dimension table: %s", _tableNameWithType); + Schema schema = _schema; List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns), "Primary key columns must be configured for dimension table: %s", _tableNameWithType); - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType); - if (tableConfig != null) { - DimensionTableConfig dimensionTableConfig = tableConfig.getDimensionTableConfig(); - if (dimensionTableConfig != null) { - _disablePreload = dimensionTableConfig.isDisablePreload(); - _errorOnDuplicatePrimaryKey = dimensionTableConfig.isErrorOnDuplicatePrimaryKey(); - } + DimensionTableConfig dimensionTableConfig = _tableConfig.getDimensionTableConfig(); + if (dimensionTableConfig != null) { + _disablePreload = dimensionTableConfig.isDisablePreload(); + _errorOnDuplicatePrimaryKey = dimensionTableConfig.isErrorOnDuplicatePrimaryKey(); } if (_disablePreload) { @@ -206,8 +200,7 @@ public class DimensionTableDataManager extends OfflineTableDataManager { // loading is in progress. int token = _loadToken.incrementAndGet(); - Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); - Preconditions.checkState(schema != null, "Failed to find schema for dimension table: %s", _tableNameWithType); + Schema schema = _schema; List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns), "Primary key columns must be configured for dimension table: %s", _tableNameWithType); @@ -282,8 +275,7 @@ public class DimensionTableDataManager extends OfflineTableDataManager { // loading is in progress. int token = _loadToken.incrementAndGet(); - Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); - Preconditions.checkState(schema != null, "Failed to find schema for dimension table: %s", _tableNameWithType); + Schema schema = _schema; List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns), "Primary key columns must be configured for dimension table: %s", _tableNameWithType); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java index 186736002d..b750016f74 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java @@ -37,6 +37,7 @@ import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler; import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.IngestionConfigUtils; @@ -64,8 +65,9 @@ public class DefaultTableDataManagerProvider implements TableDataManagerProvider } @Override - public TableDataManager getTableDataManager(TableConfig tableConfig, SegmentReloadSemaphore segmentReloadSemaphore, - ExecutorService segmentReloadExecutor, @Nullable ExecutorService segmentPreloadExecutor, + public TableDataManager getTableDataManager(TableConfig tableConfig, Schema schema, + SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService segmentReloadExecutor, + @Nullable ExecutorService segmentPreloadExecutor, @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache, Supplier<Boolean> isServerReadyToServeQueries) { TableDataManager tableDataManager; @@ -90,8 +92,8 @@ public class DefaultTableDataManagerProvider implements TableDataManagerProvider default: throw new IllegalStateException(); } - tableDataManager.init(_instanceDataManagerConfig, _helixManager, _segmentLocks, tableConfig, segmentReloadSemaphore, - segmentReloadExecutor, segmentPreloadExecutor, errorCache, _segmentOperationsThrottler); + tableDataManager.init(_instanceDataManagerConfig, _helixManager, _segmentLocks, tableConfig, schema, + segmentReloadSemaphore, segmentReloadExecutor, segmentPreloadExecutor, errorCache, _segmentOperationsThrottler); return tableDataManager; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java index 59e5a75f8c..63bfed0b0e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java @@ -34,6 +34,7 @@ import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore; import org.apache.pinot.spi.annotations.InterfaceAudience; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; /** @@ -45,14 +46,15 @@ public interface TableDataManagerProvider { void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, SegmentLocks segmentLocks, @Nullable SegmentOperationsThrottler segmentOperationsThrottler); - TableDataManager getTableDataManager(TableConfig tableConfig, SegmentReloadSemaphore segmentRefreshSemaphore, - ExecutorService segmentRefreshExecutor, @Nullable ExecutorService segmentPreloadExecutor, + TableDataManager getTableDataManager(TableConfig tableConfig, Schema schema, + SegmentReloadSemaphore segmentRefreshSemaphore, ExecutorService segmentRefreshExecutor, + @Nullable ExecutorService segmentPreloadExecutor, @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache, Supplier<Boolean> isServerReadyToServeQueries); @VisibleForTesting - default TableDataManager getTableDataManager(TableConfig tableConfig) { - return getTableDataManager(tableConfig, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, - null, () -> true); + default TableDataManager getTableDataManager(TableConfig tableConfig, Schema schema) { + return getTableDataManager(tableConfig, schema, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), + null, null, () -> true); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 5d212530b2..9c82d43fca 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -201,7 +201,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { // Set up dedup/upsert metadata manager // NOTE: Dedup/upsert has to be set up when starting the server. Changing the table config without restarting the // server won't enable/disable them on the fly. - DedupConfig dedupConfig = _tableConfig.getDedupConfig(); + TableConfig tableConfig = _tableConfig; + DedupConfig dedupConfig = tableConfig.getDedupConfig(); boolean dedupEnabled = dedupConfig != null && dedupConfig.isDedupEnabled(); if (dedupEnabled) { Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); @@ -210,19 +211,19 @@ public class RealtimeTableDataManager extends BaseTableDataManager { List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(!CollectionUtils.isEmpty(primaryKeyColumns), "Primary key columns must be configured for dedup"); - _tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(_tableConfig, schema, this, _serverMetrics, + _tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(tableConfig, schema, this, _serverMetrics, _instanceDataManagerConfig.getDedupConfig()); } - UpsertConfig upsertConfig = _tableConfig.getUpsertConfig(); + UpsertConfig upsertConfig = tableConfig.getUpsertConfig(); if (upsertConfig != null && upsertConfig.getMode() != UpsertConfig.Mode.NONE) { Preconditions.checkState(!dedupEnabled, "Dedup and upsert cannot be both enabled for table: %s", _tableUpsertMetadataManager); Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType); _tableUpsertMetadataManager = - TableUpsertMetadataManagerFactory.create(_tableConfig, _instanceDataManagerConfig.getUpsertConfig()); - _tableUpsertMetadataManager.init(_tableConfig, schema, this); + TableUpsertMetadataManagerFactory.create(tableConfig, _instanceDataManagerConfig.getUpsertConfig()); + _tableUpsertMetadataManager.init(tableConfig, schema, this); } _enforceConsumptionInOrder = isEnforceConsumptionInOrder(); @@ -594,9 +595,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { String segmentName = zkMetadata.getSegmentName(); Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s for segment: %s to be downloaded", status, segmentName); - TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, _tableNameWithType); - Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType); - long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig); + long downloadTimeoutMs = getDownloadTimeoutMs(_tableConfig); long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs; while (System.currentTimeMillis() < deadlineMs) { // ZK Metadata may change during segment download process; fetch it on every retry. diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java index ad43c7c299..499c3e14a8 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerAcquireSegmentTest.java @@ -47,6 +47,7 @@ import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.util.TestUtils; import org.testng.Assert; @@ -123,12 +124,13 @@ public class BaseTableDataManagerAcquireSegmentTest { when(instanceDataManagerConfig.getDeletedSegmentsCacheSize()).thenReturn(DELETED_SEGMENTS_CACHE_SIZE); when(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes()).thenReturn(DELETED_SEGMENTS_TTL_MINUTES); TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); - SegmentOperationsThrottler segmentOperationsThrottler = new SegmentOperationsThrottler( - new SegmentAllIndexPreprocessThrottler(8, 10, true), new SegmentStarTreePreprocessThrottler(4, 8, true), - new SegmentDownloadThrottler(10, 20, true)); + SegmentOperationsThrottler segmentOperationsThrottler = + new SegmentOperationsThrottler(new SegmentAllIndexPreprocessThrottler(8, 10, true), + new SegmentStarTreePreprocessThrottler(4, 8, true), new SegmentDownloadThrottler(10, 20, true)); TableDataManager tableDataManager = new OfflineTableDataManager(); tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), tableConfig, - new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, segmentOperationsThrottler); + mock(Schema.class), new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, + segmentOperationsThrottler); tableDataManager.start(); Field segsMapField = BaseTableDataManager.class.getDeclaredField("_segmentDataManagerMap"); segsMapField.setAccessible(true); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java index c992c800da..163cd8a0c5 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java @@ -664,7 +664,8 @@ public class BaseTableDataManagerTest { private static OfflineTableDataManager createTableManager(InstanceDataManagerConfig instanceDataManagerConfig) { OfflineTableDataManager tableDataManager = new OfflineTableDataManager(); tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), DEFAULT_TABLE_CONFIG, - new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, SEGMENT_OPERATIONS_THROTTLER); + SCHEMA, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, + SEGMENT_OPERATIONS_THROTTLER); return tableDataManager; } diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java index 372d5ce9c2..cd5c0c7c9b 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManagerTest.java @@ -89,7 +89,6 @@ public class DimensionTableDataManagerTest { new SegmentDownloadThrottler(1, 2, true)); private File _indexDir; - private SegmentMetadata _segmentMetadata; private SegmentZKMetadata _segmentZKMetadata; @BeforeClass @@ -120,9 +119,9 @@ public class DimensionTableDataManagerTest { String segmentName = driver.getSegmentName(); _indexDir = new File(tableDataDir, segmentName); - _segmentMetadata = new SegmentMetadataImpl(_indexDir); + SegmentMetadata segmentMetadata = new SegmentMetadataImpl(_indexDir); _segmentZKMetadata = new SegmentZKMetadata(segmentName); - _segmentZKMetadata.setCrc(Long.parseLong(_segmentMetadata.getCrc())); + _segmentZKMetadata.setCrc(Long.parseLong(segmentMetadata.getCrc())); } @AfterClass @@ -174,7 +173,7 @@ public class DimensionTableDataManagerTest { when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); DimensionTableDataManager tableDataManager = DimensionTableDataManager.createInstanceByTableName(OFFLINE_TABLE_NAME); - tableDataManager.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), tableConfig, + tableDataManager.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), tableConfig, schema, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, SEGMENT_OPERATIONS_THROTTLER); tableDataManager.start(); return tableDataManager; @@ -294,8 +293,9 @@ public class DimensionTableDataManagerTest { Schema schemaWithExtraColumn = getSchemaWithExtraColumn(); when(propertyStore.get("/SCHEMAS/dimBaseballTeams", null, AccessOption.PERSISTENT)).thenReturn( SchemaUtils.toZNRecord(schemaWithExtraColumn)); - tableDataManager.reloadSegment(_segmentZKMetadata.getSegmentName(), - new IndexLoadingConfig(tableConfig, schemaWithExtraColumn), _segmentZKMetadata, _segmentMetadata, false); + when(propertyStore.get("/SEGMENTS/dimBaseballTeams_OFFLINE/" + _segmentZKMetadata.getSegmentName(), null, + AccessOption.PERSISTENT)).thenReturn(_segmentZKMetadata.toZNRecord()); + tableDataManager.reloadSegment(_segmentZKMetadata.getSegmentName(), false); // Confirm the new column is available for lookup teamCitySpec = tableDataManager.getColumnFieldSpec("teamCity"); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java index 8bf32c576b..887f8885e2 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java @@ -752,6 +752,7 @@ public class RealtimeSegmentDataManagerTest { throws Exception { TableConfig tableConfig = createTableConfig(); tableConfig.setUpsertConfig(null); + Schema schema = mock(Schema.class); ZkHelixPropertyStore propertyStore = mock(ZkHelixPropertyStore.class); when(propertyStore.get(anyString(), any(), anyInt())).thenReturn(TableConfigUtils.toZNRecord(tableConfig)); HelixManager helixManager = mock(HelixManager.class); @@ -761,7 +762,7 @@ public class RealtimeSegmentDataManagerTest { when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); tableDataManagerProvider.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), null); - TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); + TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig, schema); tableDataManager.start(); tableDataManager.shutDown(); Assert.assertFalse(SegmentBuildTimeLeaseExtender.isExecutorShutdown()); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java index 9afed0b634..7c9bd2e4f4 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java @@ -136,7 +136,7 @@ public class QueryExecutorExceptionsTest { when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(INDEX_DIR.getAbsolutePath()); TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null); - TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); + TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig, schema); tableDataManager.start(); //we don't add index segments to the data manager to simulate numSegmentsAcquired < numSegmentsQueried InstanceDataManager instanceDataManager = mock(InstanceDataManager.class); diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java index 095a1c0a89..022d134d36 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorTest.java @@ -160,7 +160,7 @@ public class QueryExecutorTest { when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null); - TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); + TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig, schema); tableDataManager.start(); for (ImmutableSegment indexSegment : _indexSegments) { tableDataManager.addSegment(indexSegment); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java index aa2d9e2cc1..db1af9416c 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/ExplainPlanQueriesTest.java @@ -279,7 +279,7 @@ public class ExplainPlanQueriesTest extends BaseQueriesTest { when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null); - TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(TABLE_CONFIG); + TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(TABLE_CONFIG, SCHEMA); tableDataManager.start(); for (IndexSegment indexSegment : _indexSegments) { tableDataManager.addSegment((ImmutableSegment) indexSegment); diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java index 1c509ca5e8..ffbd50a3f5 100644 --- a/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/queries/SegmentWithNullValueVectorTest.java @@ -141,7 +141,7 @@ public class SegmentWithNullValueVectorTest { when(instanceDataManagerConfig.getInstanceDataDir()).thenReturn(TEMP_DIR.getAbsolutePath()); TableDataManagerProvider tableDataManagerProvider = new DefaultTableDataManagerProvider(); tableDataManagerProvider.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), null); - TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig); + TableDataManager tableDataManager = tableDataManagerProvider.getTableDataManager(tableConfig, _schema); tableDataManager.start(); tableDataManager.addSegment(_segment); _instanceDataManager = mock(InstanceDataManager.class); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java index 7fbc220806..67cca949e9 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java @@ -37,6 +37,7 @@ import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler; import org.apache.pinot.segment.local.utils.SegmentReloadSemaphore; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.IngestionConfigUtils; @@ -62,8 +63,9 @@ public class FailureInjectingTableDataManagerProvider implements TableDataManage } @Override - public TableDataManager getTableDataManager(TableConfig tableConfig, SegmentReloadSemaphore segmentReloadSemaphore, - ExecutorService segmentReloadExecutor, @Nullable ExecutorService segmentPreloadExecutor, + public TableDataManager getTableDataManager(TableConfig tableConfig, Schema schema, + SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService segmentReloadExecutor, + @Nullable ExecutorService segmentPreloadExecutor, @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache, Supplier<Boolean> isServerReadyToServeQueries) { TableDataManager tableDataManager; @@ -89,9 +91,8 @@ public class FailureInjectingTableDataManagerProvider implements TableDataManage default: throw new IllegalStateException(); } - tableDataManager.init(_instanceDataManagerConfig, _helixManager, _segmentLocks, tableConfig, segmentReloadSemaphore, - segmentReloadExecutor, segmentPreloadExecutor, - errorCache, null); + tableDataManager.init(_instanceDataManagerConfig, _helixManager, _segmentLocks, tableConfig, schema, + segmentReloadSemaphore, segmentReloadExecutor, segmentPreloadExecutor, errorCache, null); return tableDataManager; } } diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java index 3ed62ec35a..d2aa77321d 100644 --- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDimensionTableOverhead.java @@ -184,7 +184,7 @@ public class BenchmarkDimensionTableOverhead extends BaseQueriesTest { String tableName = TABLE_NAME + "_" + _iteration; _tableDataManager = DimensionTableDataManager.createInstanceByTableName(tableName); - _tableDataManager.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), tableConfig, + _tableDataManager.init(instanceDataManagerConfig, helixManager, new SegmentLocks(), tableConfig, SCHEMA, new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, SEGMENT_OPERATIONS_THROTTLER); _tableDataManager.start(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index ed1f96f4da..111c2ec4e5 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -53,8 +53,8 @@ public interface TableDataManager { * Initializes the table data manager. Should be called only once and before calling any other method. */ void init(InstanceDataManagerConfig instanceDataManagerConfig, HelixManager helixManager, SegmentLocks segmentLocks, - TableConfig tableConfig, SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService segmentReloadExecutor, - @Nullable ExecutorService segmentPreloadExecutor, + TableConfig tableConfig, Schema schema, SegmentReloadSemaphore segmentReloadSemaphore, + ExecutorService segmentReloadExecutor, @Nullable ExecutorService segmentPreloadExecutor, @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache, @Nullable SegmentOperationsThrottler segmentOperationsThrottler); @@ -295,22 +295,21 @@ public interface TableDataManager { SegmentZKMetadata fetchZKMetadata(String segmentName); /** - * Fetches the table config and schema for the table from ZK. + * Fetches the table config and schema for the table from ZK, then construct the index loading config with them. */ - Pair<TableConfig, Schema> fetchTableConfigAndSchema(); + IndexLoadingConfig fetchIndexLoadingConfig(); /** - * Fetches the table config and schema for the table from ZK, then construct the index loading config with them. + * Returns the cached latest table config (with timestamp index applied) for the table. The cache is refreshed when + * invoking {@link #fetchIndexLoadingConfig()}. */ - default IndexLoadingConfig fetchIndexLoadingConfig() { - Pair<TableConfig, Schema> tableConfigSchemaPair = fetchTableConfigAndSchema(); - return getIndexLoadingConfig(tableConfigSchemaPair.getLeft(), tableConfigSchemaPair.getRight()); - } + TableConfig getTableConfig(); /** - * Constructs the index loading config for the table with the given table config and schema. + * Returns the cached latest schema (with timestamp index applied) for the table. The cache is refreshed when invoking + * {@link #fetchIndexLoadingConfig()}. */ - IndexLoadingConfig getIndexLoadingConfig(TableConfig tableConfig, Schema schema); + Schema getSchema(); /** * Interface to handle segment state transitions from CONSUMING to DROPPED @@ -330,9 +329,8 @@ public interface TableDataManager { /** * Return list of segment names that are stale along with reason. - * @param tableConfig Table Config of the table - * @param schema Schema of the table + * * @return List of {@link StaleSegment} with segment names and reason why it is stale */ - List<StaleSegment> getStaleSegments(TableConfig tableConfig, Schema schema); + List<StaleSegment> getStaleSegments(); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index f7038ed11f..3df7a4290e 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -99,11 +99,9 @@ import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl; import org.apache.pinot.server.access.AccessControlFactory; import org.apache.pinot.server.api.AdminApiApplication; import org.apache.pinot.server.starter.ServerInstance; -import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; -import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.stream.ConsumerPartitionState; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.JsonUtils; @@ -1188,8 +1186,7 @@ public class TablesResource { tableName = DatabaseUtils.translateTableName(tableName, headers); TableDataManager tableDataManager = ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName); try { - Pair<TableConfig, Schema> tableConfigSchemaPair = tableDataManager.fetchTableConfigAndSchema(); - return tableDataManager.getStaleSegments(tableConfigSchemaPair.getLeft(), tableConfigSchemaPair.getRight()); + return tableDataManager.getStaleSegments(); } catch (Exception e) { throw new WebApplicationException(e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index b79a759e76..b3ed24fa81 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -63,6 +63,7 @@ import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderRegistry; import org.apache.pinot.server.realtime.ServerSegmentCompletionProtocolHandler; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.plugin.PluginManager; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -294,9 +295,11 @@ public class HelixInstanceDataManager implements InstanceDataManager { tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", tableNameWithType); } + Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableNameWithType); + Preconditions.checkState(schema != null, "Failed to find schema for table: %s", tableNameWithType); TableDataManager tableDataManager = - _tableDataManagerProvider.getTableDataManager(tableConfig, _segmentReloadSemaphore, _segmentReloadExecutor, - _segmentPreloadExecutor, _errorCache, _isServerReadyToServeQueries); + _tableDataManagerProvider.getTableDataManager(tableConfig, schema, _segmentReloadSemaphore, + _segmentReloadExecutor, _segmentPreloadExecutor, _errorCache, _isServerReadyToServeQueries); tableDataManager.start(); LOGGER.info("Created table data manager for table: {}", tableNameWithType); return tableDataManager; diff --git a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java index 1cc0b05b83..89cc6ef292 100644 --- a/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java +++ b/pinot-server/src/test/java/org/apache/pinot/server/api/BaseResourceTest.java @@ -53,6 +53,7 @@ import org.apache.pinot.server.starter.ServerInstance; import org.apache.pinot.spi.config.instance.InstanceDataManagerConfig; import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.NetUtils; @@ -204,7 +205,7 @@ public abstract class BaseResourceTest { // table config. TableDataManager tableDataManager = new OfflineTableDataManager(); tableDataManager.init(instanceDataManagerConfig, mock(HelixManager.class), new SegmentLocks(), tableConfig, - new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, null); + mock(Schema.class), new SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(), null, null, null); tableDataManager.start(); _tableDataManagerMap.put(tableNameWithType, tableDataManager); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org