This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new a7c99db6d9 Do not modify cached schema in TableDataManager (#15690) a7c99db6d9 is described below commit a7c99db6d9f4773d0a0d409b6ce437c8a52540c8 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri May 2 12:11:11 2025 -0600 Do not modify cached schema in TableDataManager (#15690) --- .../core/data/manager/BaseTableDataManager.java | 16 +++++-------- .../manager/offline/DimensionTableDataManager.java | 17 ++++++-------- .../manager/realtime/RealtimeTableDataManager.java | 27 +++++++++------------- .../plan/server/ServerPlanRequestUtils.java | 8 ++++--- .../testutils/MockInstanceDataManagerFactory.java | 5 ++-- .../local/data/manager/TableDataManager.java | 9 +++++--- .../recordtransformer/CompositeTransformer.java | 6 +++++ .../starter/helix/HelixInstanceDataManager.java | 2 ++ 8 files changed, 45 insertions(+), 45 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 67088db6ab..1e283ca29b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -146,8 +146,8 @@ public abstract class BaseTableDataManager implements TableDataManager { // Cache used for identifying segments which could not be acquired since they were recently deleted. protected Cache<String, String> _recentlyDeletedSegments; - // Caches the latest IndexLoadingConfig. The cached IndexLoadingConfig should not be modified. - protected volatile IndexLoadingConfig _indexLoadingConfig; + // Caches the latest TableConfig and Schema pair. The cache should not be modified. + protected volatile Pair<TableConfig, Schema> _cachedTableConfigAndSchema; protected volatile boolean _shutDown; @@ -190,6 +190,7 @@ public abstract class BaseTableDataManager implements TableDataManager { .maximumSize(instanceDataManagerConfig.getDeletedSegmentsCacheSize()) .expireAfterWrite(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes(), TimeUnit.MINUTES) .build(); + _cachedTableConfigAndSchema = Pair.of(tableConfig, schema); _peerDownloadScheme = tableConfig.getValidationConfig().getPeerSegmentDownloadScheme(); if (_peerDownloadScheme == null) { @@ -226,7 +227,6 @@ public abstract class BaseTableDataManager implements TableDataManager { _numSegmentsAcquiredDownloadSemaphore = null; } _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + getClass().getSimpleName()); - createAndCacheIndexLoadingConfig(tableConfig, schema); doInit(); @@ -387,19 +387,15 @@ public abstract class BaseTableDataManager implements TableDataManager { Preconditions.checkState(tableConfig != null, "Failed to find table config for table: %s", _tableNameWithType); Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, _tableNameWithType); Preconditions.checkState(schema != null, "Failed to find schema for table: %s", _tableNameWithType); - return createAndCacheIndexLoadingConfig(tableConfig, schema); - } - - private IndexLoadingConfig createAndCacheIndexLoadingConfig(TableConfig tableConfig, Schema schema) { IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema); indexLoadingConfig.setTableDataDir(_tableDataDir); - _indexLoadingConfig = indexLoadingConfig; + _cachedTableConfigAndSchema = Pair.of(tableConfig, schema); return indexLoadingConfig; } @Override - public IndexLoadingConfig getIndexLoadingConfig() { - return _indexLoadingConfig; + public Pair<TableConfig, Schema> getCachedTableConfigAndSchema() { + return _cachedTableConfigAndSchema; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java index 0ac31abc3b..884b2565b6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java @@ -35,10 +35,10 @@ import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; -import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; @@ -110,15 +110,14 @@ public class DimensionTableDataManager extends OfflineTableDataManager { protected void doInit() { super.doInit(); - IndexLoadingConfig indexLoadingConfig = _indexLoadingConfig; - Schema schema = indexLoadingConfig.getSchema(); - assert schema != null; + Pair<TableConfig, Schema> tableConfigAndSchema = getCachedTableConfigAndSchema(); + TableConfig tableConfig = tableConfigAndSchema.getLeft(); + Schema schema = tableConfigAndSchema.getRight(); + List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns), "Primary key columns must be configured for dimension table: %s", _tableNameWithType); - TableConfig tableConfig = _indexLoadingConfig.getTableConfig(); - assert tableConfig != null; DimensionTableConfig dimensionTableConfig = tableConfig.getDimensionTableConfig(); if (dimensionTableConfig != null) { _disablePreload = dimensionTableConfig.isDisablePreload(); @@ -207,8 +206,7 @@ public class DimensionTableDataManager extends OfflineTableDataManager { // loading is in progress. int token = _loadToken.incrementAndGet(); - Schema schema = _indexLoadingConfig.getSchema(); - assert schema != null; + Schema schema = getCachedTableConfigAndSchema().getRight(); List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns), "Primary key columns must be configured for dimension table: %s", _tableNameWithType); @@ -283,8 +281,7 @@ public class DimensionTableDataManager extends OfflineTableDataManager { // loading is in progress. int token = _loadToken.incrementAndGet(); - Schema schema = _indexLoadingConfig.getSchema(); - assert schema != null; + Schema schema = getCachedTableConfigAndSchema().getRight(); List<String> primaryKeyColumns = schema.getPrimaryKeyColumns(); Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns), "Primary key columns must be configured for dimension table: %s", _tableNameWithType); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 375e518f1b..cf7f3e883a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -42,6 +42,7 @@ import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.Utils; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerGauge; @@ -86,6 +87,7 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; import org.apache.pinot.spi.utils.IngestionConfigUtils; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.TimeUtils; import org.apache.pinot.spi.utils.retry.AttemptsExceededException; import org.apache.pinot.spi.utils.retry.RetriableOperationException; @@ -199,22 +201,17 @@ public class RealtimeTableDataManager extends BaseTableDataManager { // Set up dedup/upsert metadata manager // NOTE: Dedup/upsert has to be set up when starting the server. Changing the table config without restarting the // server won't enable/disable them on the fly. - IndexLoadingConfig indexLoadingConfig = _indexLoadingConfig; - TableConfig tableConfig = indexLoadingConfig.getTableConfig(); - assert tableConfig != null; + Pair<TableConfig, Schema> tableConfigAndSchema = getCachedTableConfigAndSchema(); + TableConfig tableConfig = tableConfigAndSchema.getLeft(); + Schema schema = tableConfigAndSchema.getRight(); if (tableConfig.isDedupEnabled()) { - Schema schema = indexLoadingConfig.getSchema(); - assert schema != null; _tableDedupMetadataManager = TableDedupMetadataManagerFactory.create(_instanceDataManagerConfig.getDedupConfig(), tableConfig, schema, this); } - if (tableConfig.isUpsertEnabled()) { Preconditions.checkState(_tableDedupMetadataManager == null, "Dedup and upsert cannot be both enabled for table: %s", _tableNameWithType); - Schema schema = indexLoadingConfig.getSchema(); - assert schema != null; _tableUpsertMetadataManager = TableUpsertMetadataManagerFactory.create(_instanceDataManagerConfig.getUpsertConfig(), tableConfig, schema, this); @@ -492,7 +489,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { @Override public void addConsumingSegment(String segmentName) - throws AttemptsExceededException, RetriableOperationException { + throws Exception { Preconditions.checkState(!_shutDown, "Table data manager is already shut down, cannot add CONSUMING segment: %s to table: %s", segmentName, _tableNameWithType); @@ -511,7 +508,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { } private void doAddConsumingSegment(String segmentName) - throws AttemptsExceededException, RetriableOperationException { + throws Exception { SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName); if (zkMetadata.getStatus().isCompleted()) { // NOTE: @@ -542,6 +539,8 @@ public class RealtimeTableDataManager extends BaseTableDataManager { TableConfig tableConfig = indexLoadingConfig.getTableConfig(); Schema schema = indexLoadingConfig.getSchema(); assert tableConfig != null && schema != null; + // Clone a schema to avoid modifying the cached one + schema = JsonUtils.jsonNodeToObject(schema.toJsonObject(), Schema.class); validate(tableConfig, schema); VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema, segmentName); setDefaultTimeValueIfInvalid(tableConfig, schema, zkMetadata); @@ -587,9 +586,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { String segmentName = zkMetadata.getSegmentName(); Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s for segment: %s to be downloaded", status, segmentName); - TableConfig tableConfig = _indexLoadingConfig.getTableConfig(); - assert tableConfig != null; - long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig); + long downloadTimeoutMs = getDownloadTimeoutMs(getCachedTableConfigAndSchema().getLeft()); long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs; while (System.currentTimeMillis() < deadlineMs) { // ZK Metadata may change during segment download process; fetch it on every retry. @@ -864,9 +861,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { @Nullable public StreamIngestionConfig getStreamIngestionConfig() { - TableConfig tableConfig = _indexLoadingConfig.getTableConfig(); - assert tableConfig != null; - IngestionConfig ingestionConfig = tableConfig.getIngestionConfig(); + IngestionConfig ingestionConfig = getCachedTableConfigAndSchema().getLeft().getIngestionConfig(); return ingestionConfig != null ? ingestionConfig.getStreamIngestionConfig() : null; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java index adcae1f30b..8bd40d51c0 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java @@ -29,6 +29,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.function.BiConsumer; import javax.annotation.Nullable; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.request.BrokerRequest; import org.apache.pinot.common.request.Expression; @@ -51,9 +52,10 @@ import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.query.runtime.plan.PlanNodeToOpChain; import org.apache.pinot.segment.local.data.manager.TableDataManager; -import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; +import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.query.QueryThreadContext; import org.apache.pinot.spi.utils.ByteArray; import org.apache.pinot.spi.utils.CommonConstants; @@ -212,8 +214,8 @@ public class ServerPlanRequestUtils { for (QueryRewriter queryRewriter : QUERY_REWRITERS) { pinotQuery = queryRewriter.rewrite(pinotQuery); } - IndexLoadingConfig indexLoadingConfig = tableDataManager.getIndexLoadingConfig(); - QUERY_OPTIMIZER.optimize(pinotQuery, indexLoadingConfig.getTableConfig(), indexLoadingConfig.getSchema()); + Pair<TableConfig, Schema> tableConfigAndSchema = tableDataManager.getCachedTableConfigAndSchema(); + QUERY_OPTIMIZER.optimize(pinotQuery, tableConfigAndSchema.getLeft(), tableConfigAndSchema.getRight()); // 2. Update query options according to requestMetadataMap updateQueryOptions(pinotQuery, executionContext); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java index 36cbdb53f6..3e75dfb100 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java @@ -26,13 +26,13 @@ import java.util.Map; import java.util.UUID; import java.util.stream.Collectors; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.core.data.manager.InstanceDataManager; import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader; import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl; -import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader; import org.apache.pinot.segment.spi.ImmutableSegment; import org.apache.pinot.segment.spi.IndexSegment; @@ -155,8 +155,7 @@ public class MockInstanceDataManagerFactory { when(tableDataManager.getTableName()).thenReturn(tableNameWithType); TableConfig tableConfig = createTableConfig(tableNameWithType); Schema schema = _schemaMap.get(TableNameBuilder.extractRawTableName(tableNameWithType)); - IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig(tableConfig, schema); - when(tableDataManager.getIndexLoadingConfig()).thenReturn(indexLoadingConfig); + when(tableDataManager.getCachedTableConfigAndSchema()).thenReturn(Pair.of(tableConfig, schema)); Map<String, SegmentDataManager> segmentDataManagerMap = segmentList.stream().collect(Collectors.toMap(IndexSegment::getSegmentName, ImmutableSegmentDataManager::new)); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index 727dcf6c92..5e64fd953b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -312,14 +312,17 @@ public interface TableDataManager { /** * Fetches the table config and schema for the table from ZK, then construct the index loading config with them. + * The fetched table config and schema are then cached in the table data manager, and should not be modified. The + * cache is used mainly for query time access of table config and schema without accessing ZK. */ IndexLoadingConfig fetchIndexLoadingConfig(); /** - * Returns the cached latest {@link IndexLoadingConfig} for the table. The cache is refreshed when invoking - * {@link #fetchIndexLoadingConfig()}. + * Returns the cached latest {@link TableConfig} and {@link Schema} pair for the table. The cache is refreshed when + * invoking {@link #fetchIndexLoadingConfig()}, and should not be modified. We cache them as a pair to ensure they are + * updated at once to avoid race conditions. */ - IndexLoadingConfig getIndexLoadingConfig(); + Pair<TableConfig, Schema> getCachedTableConfigAndSchema(); /** * Interface to handle segment state transitions from CONSUMING to DROPPED diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java index c639eac2d4..8643dbd43c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java @@ -24,6 +24,7 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import javax.annotation.Nullable; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.segment.local.utils.IngestionUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig; @@ -145,6 +146,11 @@ public class CompositeTransformer implements RecordTransformer { } } + public static CompositeTransformer getDefaultTransformer(TableConfig tableConfig, Schema schema, @Nullable + SegmentZKMetadata segmentZKMetadata) { + return new CompositeTransformer(getDefaultTransformers(tableConfig, schema)); + } + public static CompositeTransformer getDefaultTransformer(TableConfig tableConfig, Schema schema) { return new CompositeTransformer(getDefaultTransformers(tableConfig, schema)); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index b3ed24fa81..98da1ee05d 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -66,6 +66,7 @@ import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.plugin.PluginManager; +import org.apache.pinot.spi.utils.TimestampIndexUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; @@ -297,6 +298,7 @@ public class HelixInstanceDataManager implements InstanceDataManager { } Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, tableNameWithType); Preconditions.checkState(schema != null, "Failed to find schema for table: %s", tableNameWithType); + TimestampIndexUtils.applyTimestampIndex(tableConfig, schema); TableDataManager tableDataManager = _tableDataManagerProvider.getTableDataManager(tableConfig, schema, _segmentReloadSemaphore, _segmentReloadExecutor, _segmentPreloadExecutor, _errorCache, _isServerReadyToServeQueries); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org