This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new fd448f44266 Misc fixes for virtual column support (#16121)
fd448f44266 is described below

commit fd448f44266cfbef87e545c9933c120a74f21d16
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Tue Jun 17 14:16:56 2025 -0600

    Misc fixes for virtual column support (#16121)
---
 .../helix/core/PinotHelixResourceManager.java      | 56 ++++++++++++----------
 .../core/data/manager/BaseTableDataManager.java    |  5 +-
 .../core/operator/filter/MapFilterOperator.java    | 14 ++++--
 .../apache/pinot/core/startree/StarTreeUtils.java  | 24 ++++++++--
 .../tests/OfflineClusterIntegrationTest.java       | 25 ++++++++++
 .../segment/readers/PinotSegmentColumnReader.java  |  1 -
 6 files changed, 86 insertions(+), 39 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index d377ed190f8..1d01e23525b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -1569,11 +1569,9 @@ public class PinotHelixResourceManager {
           reloadAllSegments(tableNameWithType, false, null);
         }
       } else {
-        // Send schema refresh message to all tables that use this schema
+        LOGGER.info("Refreshing schema for tables with name: {}", schemaName);
         for (String tableNameWithType : tableNamesWithType) {
-          LOGGER.info("Sending updated schema message for table: {}", 
tableNameWithType);
-          sendTableConfigSchemaRefreshMessage(tableNameWithType, 
getServerInstancesForTable(tableNameWithType,
-              TableNameBuilder.getTableTypeFromTableName(tableNameWithType)));
+          sendTableConfigSchemaRefreshMessage(tableNameWithType);
         }
       }
     } catch (TableNotFoundException e) {
@@ -3155,6 +3153,7 @@ public class PinotHelixResourceManager {
     }
   }
 
+  /// Sends table config refresh message to brokers.
   private void sendTableConfigRefreshMessage(String tableNameWithType) {
     TableConfigRefreshMessage tableConfigRefreshMessage = new 
TableConfigRefreshMessage(tableNameWithType);
 
@@ -3176,6 +3175,32 @@ public class PinotHelixResourceManager {
     }
   }
 
+  /// Sends table config and schema refresh message to servers.
+  private void sendTableConfigSchemaRefreshMessage(String tableNameWithType) {
+    TableConfigSchemaRefreshMessage refreshMessage = new 
TableConfigSchemaRefreshMessage(tableNameWithType);
+
+    // Send table config and schema refresh message to servers
+    Criteria recipientCriteria = new Criteria();
+    recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+    recipientCriteria.setInstanceName("%");
+    recipientCriteria.setResource(tableNameWithType);
+    recipientCriteria.setSessionSpecific(true);
+
+    // Send message with no callback and infinite timeout on the recipient
+    try {
+      int numMessagesSent = 
_helixZkManager.getMessagingService().send(recipientCriteria, refreshMessage, 
null, -1);
+      if (numMessagesSent > 0) {
+        LOGGER.info("Sent {} table config and schema refresh messages for 
table: {}", numMessagesSent,
+            tableNameWithType);
+      } else {
+        LOGGER.warn("No table config and schema refresh message sent for 
table: {}", tableNameWithType);
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while sending table config and schema 
refresh message for table: {}",
+          tableNameWithType, e);
+    }
+  }
+
   private void sendLogicalTableConfigRefreshMessage(String logicalTableName) {
     LogicalTableConfigRefreshMessage refreshMessage = new 
LogicalTableConfigRefreshMessage(logicalTableName);
 
@@ -3239,7 +3264,7 @@ public class PinotHelixResourceManager {
   private void sendRoutingTableRebuildMessage(String tableNameWithType) {
     RoutingTableRebuildMessage routingTableRebuildMessage = new 
RoutingTableRebuildMessage(tableNameWithType);
 
-    // Send table config refresh message to brokers
+    // Send routing table rebuild message to brokers
     Criteria recipientCriteria = new Criteria();
     recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
     recipientCriteria.setInstanceName("%");
@@ -3258,27 +3283,6 @@ public class PinotHelixResourceManager {
     }
   }
 
-  private void sendTableConfigSchemaRefreshMessage(String tableNameWithType, 
List<String> instances) {
-    TableConfigSchemaRefreshMessage refreshMessage = new 
TableConfigSchemaRefreshMessage(tableNameWithType);
-    for (String instance : instances) {
-      // Send refresh message to servers
-      Criteria recipientCriteria = new Criteria();
-      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
-      recipientCriteria.setInstanceName(instance);
-      recipientCriteria.setSessionSpecific(true);
-      ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
-      // Send message with no callback and infinite timeout on the recipient
-      int numMessagesSent = messagingService.send(recipientCriteria, 
refreshMessage, null, -1);
-      if (numMessagesSent > 0) {
-        LOGGER.info("Sent {} schema refresh messages to servers for table: {} 
for instance: {}", numMessagesSent,
-            tableNameWithType, instance);
-      } else {
-        LOGGER.warn("No schema refresh message sent to servers for table: {} 
for instance: {}", tableNameWithType,
-            instance);
-      }
-    }
-  }
-
   /**
    * Update the instance config given the broker instance id
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 6796b9bd02d..dd35dcf7c10 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -1382,10 +1382,9 @@ public abstract class BaseTableDataManager implements 
TableDataManager {
 
     for (String columnName : segmentPhysicalColumns) {
       ColumnMetadata columnMetadata = 
segmentMetadata.getColumnMetadataFor(columnName);
-      FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName);
       DataSource source = segment.getDataSource(columnName);
-      Preconditions.checkNotNull(columnMetadata);
-      Preconditions.checkNotNull(source);
+      assert columnMetadata != null && source != null;
+      FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName);
 
       // Column is deleted
       if (fieldSpecInSchema == null) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
index ee413306b3b..bc4b19e52b5 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java
@@ -65,10 +65,14 @@ public class MapFilterOperator extends BaseFilterOperator {
     _columnName = arguments.get(0).getIdentifier();
     _keyName = arguments.get(1).getLiteral().getStringValue();
 
-    // Get JSON index and create operator
-    DataSource dataSource = indexSegment.getDataSource(_columnName);
-    JsonIndexReader jsonIndex = dataSource.getJsonIndex();
-    if (jsonIndex != null && useJsonIndex(_predicate.getType())) {
+    JsonIndexReader jsonIndex = null;
+    if (canUseJsonIndex(_predicate.getType())) {
+      DataSource dataSource = indexSegment.getDataSourceNullable(_columnName);
+      if (dataSource != null) {
+        jsonIndex = dataSource.getJsonIndex();
+      }
+    }
+    if (jsonIndex != null) {
       FilterContext filterContext = createFilterContext();
       _jsonMatchOperator = new JsonMatchFilterOperator(jsonIndex, 
filterContext, numDocs);
       _expressionFilterOperator = null;
@@ -201,7 +205,7 @@ public class MapFilterOperator extends BaseFilterOperator {
    * @param predicateType The type of predicate
    * @return true if the predicate type is supported for JSON index, false 
otherwise
    */
-  private boolean useJsonIndex(Predicate.Type predicateType) {
+  private static boolean canUseJsonIndex(Predicate.Type predicateType) {
     switch (predicateType) {
       case EQ:
       case NOT_EQ:
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java 
b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
index d5180285955..c422a64a7ee 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java
@@ -324,7 +324,11 @@ public class StarTreeUtils {
       return null;
     }
     String column = lhs.getIdentifier();
-    DataSource dataSource = indexSegment.getDataSource(column);
+    DataSource dataSource = indexSegment.getDataSourceNullable(column);
+    if (dataSource == null) {
+      // Star-tree does not support non-existent column
+      return null;
+    }
     Dictionary dictionary = dataSource.getDictionary();
     if (dictionary == null) {
       // Star-tree does not support non-dictionary encoded dimension
@@ -388,7 +392,11 @@ public class StarTreeUtils {
         }
 
         String column = aggregationFunctionColumnPair.getColumn();
-        DataSource dataSource = indexSegment.getDataSource(column);
+        DataSource dataSource = indexSegment.getDataSourceNullable(column);
+        if (dataSource == null) {
+          LOGGER.debug("Cannot use star-tree index because aggregation column: 
'{}' does not exist", column);
+          return null;
+        }
         if (dataSource.getNullValueVector() != null && 
!dataSource.getNullValueVector().getNullBitmap().isEmpty()) {
           LOGGER.debug("Cannot use star-tree index because aggregation column: 
'{}' has null values", column);
           return null;
@@ -396,7 +404,11 @@ public class StarTreeUtils {
       }
 
       for (String column : predicateEvaluatorsMap.keySet()) {
-        DataSource dataSource = indexSegment.getDataSource(column);
+        DataSource dataSource = indexSegment.getDataSourceNullable(column);
+        if (dataSource == null) {
+          LOGGER.debug("Cannot use star-tree index because filter column: '{}' 
does not exist", column);
+          return null;
+        }
         if (dataSource.getNullValueVector() != null && 
!dataSource.getNullValueVector().getNullBitmap().isEmpty()) {
           LOGGER.debug("Cannot use star-tree index because filter column: '{}' 
has null values", column);
           return null;
@@ -410,7 +422,11 @@ public class StarTreeUtils {
         }
       }
       for (String column : groupByColumns) {
-        DataSource dataSource = indexSegment.getDataSource(column);
+        DataSource dataSource = indexSegment.getDataSourceNullable(column);
+        if (dataSource == null) {
+          LOGGER.debug("Cannot use star-tree index because group-by column: 
'{}' does not exist", column);
+          return null;
+        }
         if (dataSource.getNullValueVector() != null && 
!dataSource.getNullValueVector().getNullBitmap().isEmpty()) {
           LOGGER.debug("Cannot use star-tree index because group-by column: 
'{}' has null values", column);
           return null;
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 7ce615a3b15..f40bfc8a5aa 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1701,6 +1701,19 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     ingestionConfig.setTransformConfigs(transformConfigs);
     tableConfig.setIngestionConfig(ingestionConfig);
     updateTableConfig(tableConfig);
+
+    // Query the new added columns without reload
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode response = postQuery(TEST_STAR_TREE_QUERY_3);
+        return response.get("resultTable").get("rows").get(0).get(0).asInt() 
== 0;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 60_000L, "Failed to query new added columns without reload");
+    // Table size shouldn't change without reload
+    assertEquals(getTableSize(getTableName()), _tableSize);
+
     reloadAllSegments(TEST_STAR_TREE_QUERY_3, false, numTotalDocs);
     int thirdQueryResult =
         
postQuery(TEST_STAR_TREE_QUERY_3_REFERENCE).get("resultTable").get("rows").get(0).get(0).asInt();
@@ -1869,6 +1882,18 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
             CompressionCodec.MV_ENTRY_DICT, null));
     updateTableConfig(tableConfig);
 
+    // Query the new added columns without reload
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode response = postQuery(SELECT_STAR_QUERY);
+        return 
response.get("resultTable").get("dataSchema").get("columnNames").size() == 104;
+      } catch (Exception e) {
+        return false;
+      }
+    }, 60_000L, "Failed to query new added columns without reload");
+    // Table size shouldn't change without reload
+    assertEquals(getTableSize(getTableName()), _tableSize);
+
     // Trigger reload and verify column count
     reloadAllSegments(TEST_EXTRA_COLUMNS_QUERY, false, numTotalDocs);
     JsonNode segmentsMetadata = JsonUtils.stringToJsonNode(
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
index 01257777777..4a691e4fc42 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java
@@ -42,7 +42,6 @@ public class PinotSegmentColumnReader implements Closeable {
 
   public PinotSegmentColumnReader(IndexSegment indexSegment, String column) {
     DataSource dataSource = indexSegment.getDataSource(column);
-    Preconditions.checkArgument(dataSource != null, "Failed to find data 
source for column: %s", column);
     _forwardIndexReader = dataSource.getForwardIndex();
     Preconditions.checkArgument(_forwardIndexReader != null, "Forward index 
disabled for column: %s", column);
     _forwardIndexReaderContext = _forwardIndexReader.createContext();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to