This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new a7c99db6d9 Do not modify cached schema in TableDataManager (#15690)
a7c99db6d9 is described below

commit a7c99db6d9f4773d0a0d409b6ce437c8a52540c8
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri May 2 12:11:11 2025 -0600

    Do not modify cached schema in TableDataManager (#15690)
---
 .../core/data/manager/BaseTableDataManager.java    | 16 +++++--------
 .../manager/offline/DimensionTableDataManager.java | 17 ++++++--------
 .../manager/realtime/RealtimeTableDataManager.java | 27 +++++++++-------------
 .../plan/server/ServerPlanRequestUtils.java        |  8 ++++---
 .../testutils/MockInstanceDataManagerFactory.java  |  5 ++--
 .../local/data/manager/TableDataManager.java       |  9 +++++---
 .../recordtransformer/CompositeTransformer.java    |  6 +++++
 .../starter/helix/HelixInstanceDataManager.java    |  2 ++
 8 files changed, 45 insertions(+), 45 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 67088db6ab..1e283ca29b 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -146,8 +146,8 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
   // Cache used for identifying segments which could not be acquired since 
they were recently deleted.
   protected Cache<String, String> _recentlyDeletedSegments;
 
-  // Caches the latest IndexLoadingConfig. The cached IndexLoadingConfig 
should not be modified.
-  protected volatile IndexLoadingConfig _indexLoadingConfig;
+  // Caches the latest TableConfig and Schema pair. The cache should not be 
modified.
+  protected volatile Pair<TableConfig, Schema> _cachedTableConfigAndSchema;
 
   protected volatile boolean _shutDown;
 
@@ -190,6 +190,7 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
         .maximumSize(instanceDataManagerConfig.getDeletedSegmentsCacheSize())
         
.expireAfterWrite(instanceDataManagerConfig.getDeletedSegmentsCacheTtlMinutes(),
 TimeUnit.MINUTES)
         .build();
+    _cachedTableConfigAndSchema = Pair.of(tableConfig, schema);
 
     _peerDownloadScheme = 
tableConfig.getValidationConfig().getPeerSegmentDownloadScheme();
     if (_peerDownloadScheme == null) {
@@ -226,7 +227,6 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
       _numSegmentsAcquiredDownloadSemaphore = null;
     }
     _logger = LoggerFactory.getLogger(_tableNameWithType + "-" + 
getClass().getSimpleName());
-    createAndCacheIndexLoadingConfig(tableConfig, schema);
 
     doInit();
 
@@ -387,19 +387,15 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
     Preconditions.checkState(tableConfig != null, "Failed to find table config 
for table: %s", _tableNameWithType);
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
_tableNameWithType);
     Preconditions.checkState(schema != null, "Failed to find schema for table: 
%s", _tableNameWithType);
-    return createAndCacheIndexLoadingConfig(tableConfig, schema);
-  }
-
-  private IndexLoadingConfig createAndCacheIndexLoadingConfig(TableConfig 
tableConfig, Schema schema) {
     IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(_instanceDataManagerConfig, tableConfig, schema);
     indexLoadingConfig.setTableDataDir(_tableDataDir);
-    _indexLoadingConfig = indexLoadingConfig;
+    _cachedTableConfigAndSchema = Pair.of(tableConfig, schema);
     return indexLoadingConfig;
   }
 
   @Override
-  public IndexLoadingConfig getIndexLoadingConfig() {
-    return _indexLoadingConfig;
+  public Pair<TableConfig, Schema> getCachedTableConfigAndSchema() {
+    return _cachedTableConfigAndSchema;
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
index 0ac31abc3b..884b2565b6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/DimensionTableDataManager.java
@@ -35,10 +35,10 @@ import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -110,15 +110,14 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
   protected void doInit() {
     super.doInit();
 
-    IndexLoadingConfig indexLoadingConfig = _indexLoadingConfig;
-    Schema schema = indexLoadingConfig.getSchema();
-    assert schema != null;
+    Pair<TableConfig, Schema> tableConfigAndSchema = 
getCachedTableConfigAndSchema();
+    TableConfig tableConfig = tableConfigAndSchema.getLeft();
+    Schema schema = tableConfigAndSchema.getRight();
+
     List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
 
-    TableConfig tableConfig = _indexLoadingConfig.getTableConfig();
-    assert tableConfig != null;
     DimensionTableConfig dimensionTableConfig = 
tableConfig.getDimensionTableConfig();
     if (dimensionTableConfig != null) {
       _disablePreload = dimensionTableConfig.isDisablePreload();
@@ -207,8 +206,7 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
     // loading is in progress.
     int token = _loadToken.incrementAndGet();
 
-    Schema schema = _indexLoadingConfig.getSchema();
-    assert schema != null;
+    Schema schema = getCachedTableConfigAndSchema().getRight();
     List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
@@ -283,8 +281,7 @@ public class DimensionTableDataManager extends 
OfflineTableDataManager {
     // loading is in progress.
     int token = _loadToken.incrementAndGet();
 
-    Schema schema = _indexLoadingConfig.getSchema();
-    assert schema != null;
+    Schema schema = getCachedTableConfigAndSchema().getRight();
     List<String> primaryKeyColumns = schema.getPrimaryKeyColumns();
     Preconditions.checkState(CollectionUtils.isNotEmpty(primaryKeyColumns),
         "Primary key columns must be configured for dimension table: %s", 
_tableNameWithType);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 375e518f1b..cf7f3e883a 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -42,6 +42,7 @@ import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.Utils;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ServerGauge;
@@ -86,6 +87,7 @@ import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
 import org.apache.pinot.spi.utils.retry.RetriableOperationException;
@@ -199,22 +201,17 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     // Set up dedup/upsert metadata manager
     // NOTE: Dedup/upsert has to be set up when starting the server. Changing 
the table config without restarting the
     //       server won't enable/disable them on the fly.
-    IndexLoadingConfig indexLoadingConfig = _indexLoadingConfig;
-    TableConfig tableConfig = indexLoadingConfig.getTableConfig();
-    assert tableConfig != null;
+    Pair<TableConfig, Schema> tableConfigAndSchema = 
getCachedTableConfigAndSchema();
+    TableConfig tableConfig = tableConfigAndSchema.getLeft();
+    Schema schema = tableConfigAndSchema.getRight();
     if (tableConfig.isDedupEnabled()) {
-      Schema schema = indexLoadingConfig.getSchema();
-      assert schema != null;
       _tableDedupMetadataManager =
           
TableDedupMetadataManagerFactory.create(_instanceDataManagerConfig.getDedupConfig(),
 tableConfig, schema,
               this);
     }
-
     if (tableConfig.isUpsertEnabled()) {
       Preconditions.checkState(_tableDedupMetadataManager == null,
           "Dedup and upsert cannot be both enabled for table: %s", 
_tableNameWithType);
-      Schema schema = indexLoadingConfig.getSchema();
-      assert schema != null;
       _tableUpsertMetadataManager =
           
TableUpsertMetadataManagerFactory.create(_instanceDataManagerConfig.getUpsertConfig(),
 tableConfig, schema,
               this);
@@ -492,7 +489,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
 
   @Override
   public void addConsumingSegment(String segmentName)
-      throws AttemptsExceededException, RetriableOperationException {
+      throws Exception {
     Preconditions.checkState(!_shutDown,
         "Table data manager is already shut down, cannot add CONSUMING 
segment: %s to table: %s", segmentName,
         _tableNameWithType);
@@ -511,7 +508,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   }
 
   private void doAddConsumingSegment(String segmentName)
-      throws AttemptsExceededException, RetriableOperationException {
+      throws Exception {
     SegmentZKMetadata zkMetadata = fetchZKMetadata(segmentName);
     if (zkMetadata.getStatus().isCompleted()) {
       // NOTE:
@@ -542,6 +539,8 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     TableConfig tableConfig = indexLoadingConfig.getTableConfig();
     Schema schema = indexLoadingConfig.getSchema();
     assert tableConfig != null && schema != null;
+    // Clone a schema to avoid modifying the cached one
+    schema = JsonUtils.jsonNodeToObject(schema.toJsonObject(), Schema.class);
     validate(tableConfig, schema);
     
VirtualColumnProviderFactory.addBuiltInVirtualColumnsToSegmentSchema(schema, 
segmentName);
     setDefaultTimeValueIfInvalid(tableConfig, schema, zkMetadata);
@@ -587,9 +586,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     String segmentName = zkMetadata.getSegmentName();
     Preconditions.checkState(status == Status.COMMITTING, "Invalid status: %s 
for segment: %s to be downloaded", status,
         segmentName);
-    TableConfig tableConfig = _indexLoadingConfig.getTableConfig();
-    assert tableConfig != null;
-    long downloadTimeoutMs = getDownloadTimeoutMs(tableConfig);
+    long downloadTimeoutMs = 
getDownloadTimeoutMs(getCachedTableConfigAndSchema().getLeft());
     long deadlineMs = System.currentTimeMillis() + downloadTimeoutMs;
     while (System.currentTimeMillis() < deadlineMs) {
       // ZK Metadata may change during segment download process; fetch it on 
every retry.
@@ -864,9 +861,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
 
   @Nullable
   public StreamIngestionConfig getStreamIngestionConfig() {
-    TableConfig tableConfig = _indexLoadingConfig.getTableConfig();
-    assert tableConfig != null;
-    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+    IngestionConfig ingestionConfig = 
getCachedTableConfigAndSchema().getLeft().getIngestionConfig();
     return ingestionConfig != null ? 
ingestionConfig.getStreamIngestionConfig() : null;
   }
 
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
index adcae1f30b..8bd40d51c0 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/server/ServerPlanRequestUtils.java
@@ -29,6 +29,7 @@ import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.function.BiConsumer;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.Expression;
@@ -51,9 +52,10 @@ import org.apache.pinot.query.runtime.operator.OpChain;
 import org.apache.pinot.query.runtime.plan.OpChainExecutionContext;
 import org.apache.pinot.query.runtime.plan.PlanNodeToOpChain;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.query.QueryThreadContext;
 import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.CommonConstants;
@@ -212,8 +214,8 @@ public class ServerPlanRequestUtils {
     for (QueryRewriter queryRewriter : QUERY_REWRITERS) {
       pinotQuery = queryRewriter.rewrite(pinotQuery);
     }
-    IndexLoadingConfig indexLoadingConfig = 
tableDataManager.getIndexLoadingConfig();
-    QUERY_OPTIMIZER.optimize(pinotQuery, indexLoadingConfig.getTableConfig(), 
indexLoadingConfig.getSchema());
+    Pair<TableConfig, Schema> tableConfigAndSchema = 
tableDataManager.getCachedTableConfigAndSchema();
+    QUERY_OPTIMIZER.optimize(pinotQuery, tableConfigAndSchema.getLeft(), 
tableConfigAndSchema.getRight());
 
     // 2. Update query options according to requestMetadataMap
     updateQueryOptions(pinotQuery, executionContext);
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
index 36cbdb53f6..3e75dfb100 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
@@ -26,13 +26,13 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.stream.Collectors;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.core.data.manager.InstanceDataManager;
 import org.apache.pinot.core.data.manager.offline.ImmutableSegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
 import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.IndexSegment;
@@ -155,8 +155,7 @@ public class MockInstanceDataManagerFactory {
     when(tableDataManager.getTableName()).thenReturn(tableNameWithType);
     TableConfig tableConfig = createTableConfig(tableNameWithType);
     Schema schema = 
_schemaMap.get(TableNameBuilder.extractRawTableName(tableNameWithType));
-    IndexLoadingConfig indexLoadingConfig = new 
IndexLoadingConfig(tableConfig, schema);
-    
when(tableDataManager.getIndexLoadingConfig()).thenReturn(indexLoadingConfig);
+    
when(tableDataManager.getCachedTableConfigAndSchema()).thenReturn(Pair.of(tableConfig,
 schema));
 
     Map<String, SegmentDataManager> segmentDataManagerMap =
         
segmentList.stream().collect(Collectors.toMap(IndexSegment::getSegmentName, 
ImmutableSegmentDataManager::new));
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 727dcf6c92..5e64fd953b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -312,14 +312,17 @@ public interface TableDataManager {
 
   /**
    * Fetches the table config and schema for the table from ZK, then construct 
the index loading config with them.
+   * The fetched table config and schema are then cached in the table data 
manager, and should not be modified. The
+   * cache is used mainly for query time access of table config and schema 
without accessing ZK.
    */
   IndexLoadingConfig fetchIndexLoadingConfig();
 
   /**
-   * Returns the cached latest {@link IndexLoadingConfig} for the table. The 
cache is refreshed when invoking
-   * {@link #fetchIndexLoadingConfig()}.
+   * Returns the cached latest {@link TableConfig} and {@link Schema} pair for 
the table. The cache is refreshed when
+   * invoking {@link #fetchIndexLoadingConfig()}, and should not be modified. 
We cache them as a pair to ensure they are
+   * updated at once to avoid race conditions.
    */
-  IndexLoadingConfig getIndexLoadingConfig();
+  Pair<TableConfig, Schema> getCachedTableConfigAndSchema();
 
   /**
    * Interface to handle segment state transitions from CONSUMING to DROPPED
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
index c639eac2d4..8643dbd43c 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/CompositeTransformer.java
@@ -24,6 +24,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.segment.local.utils.IngestionUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.ingestion.EnrichmentConfig;
@@ -145,6 +146,11 @@ public class CompositeTransformer implements 
RecordTransformer {
     }
   }
 
+  public static CompositeTransformer getDefaultTransformer(TableConfig 
tableConfig, Schema schema, @Nullable
+      SegmentZKMetadata segmentZKMetadata) {
+    return new CompositeTransformer(getDefaultTransformers(tableConfig, 
schema));
+  }
+
   public static CompositeTransformer getDefaultTransformer(TableConfig 
tableConfig, Schema schema) {
     return new CompositeTransformer(getDefaultTransformers(tableConfig, 
schema));
   }
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index b3ed24fa81..98da1ee05d 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -66,6 +66,7 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.plugin.PluginManager;
+import org.apache.pinot.spi.utils.TimestampIndexUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.zookeeper.data.Stat;
 import org.slf4j.Logger;
@@ -297,6 +298,7 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     }
     Schema schema = ZKMetadataProvider.getTableSchema(_propertyStore, 
tableNameWithType);
     Preconditions.checkState(schema != null, "Failed to find schema for table: 
%s", tableNameWithType);
+    TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
     TableDataManager tableDataManager =
         _tableDataManagerProvider.getTableDataManager(tableConfig, schema, 
_segmentReloadSemaphore,
             _segmentReloadExecutor, _segmentPreloadExecutor, _errorCache, 
_isServerReadyToServeQueries);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to