This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new fd448f44266 Misc fixes for virtual column support (#16121) fd448f44266 is described below commit fd448f44266cfbef87e545c9933c120a74f21d16 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Tue Jun 17 14:16:56 2025 -0600 Misc fixes for virtual column support (#16121) --- .../helix/core/PinotHelixResourceManager.java | 56 ++++++++++++---------- .../core/data/manager/BaseTableDataManager.java | 5 +- .../core/operator/filter/MapFilterOperator.java | 14 ++++-- .../apache/pinot/core/startree/StarTreeUtils.java | 24 ++++++++-- .../tests/OfflineClusterIntegrationTest.java | 25 ++++++++++ .../segment/readers/PinotSegmentColumnReader.java | 1 - 6 files changed, 86 insertions(+), 39 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index d377ed190f8..1d01e23525b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1569,11 +1569,9 @@ public class PinotHelixResourceManager { reloadAllSegments(tableNameWithType, false, null); } } else { - // Send schema refresh message to all tables that use this schema + LOGGER.info("Refreshing schema for tables with name: {}", schemaName); for (String tableNameWithType : tableNamesWithType) { - LOGGER.info("Sending updated schema message for table: {}", tableNameWithType); - sendTableConfigSchemaRefreshMessage(tableNameWithType, getServerInstancesForTable(tableNameWithType, - TableNameBuilder.getTableTypeFromTableName(tableNameWithType))); + sendTableConfigSchemaRefreshMessage(tableNameWithType); } } } catch (TableNotFoundException e) { @@ -3155,6 +3153,7 @@ public class PinotHelixResourceManager { } } + /// Sends table config refresh message to brokers. private void sendTableConfigRefreshMessage(String tableNameWithType) { TableConfigRefreshMessage tableConfigRefreshMessage = new TableConfigRefreshMessage(tableNameWithType); @@ -3176,6 +3175,32 @@ public class PinotHelixResourceManager { } } + /// Sends table config and schema refresh message to servers. + private void sendTableConfigSchemaRefreshMessage(String tableNameWithType) { + TableConfigSchemaRefreshMessage refreshMessage = new TableConfigSchemaRefreshMessage(tableNameWithType); + + // Send table config and schema refresh message to servers + Criteria recipientCriteria = new Criteria(); + recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); + recipientCriteria.setInstanceName("%"); + recipientCriteria.setResource(tableNameWithType); + recipientCriteria.setSessionSpecific(true); + + // Send message with no callback and infinite timeout on the recipient + try { + int numMessagesSent = _helixZkManager.getMessagingService().send(recipientCriteria, refreshMessage, null, -1); + if (numMessagesSent > 0) { + LOGGER.info("Sent {} table config and schema refresh messages for table: {}", numMessagesSent, + tableNameWithType); + } else { + LOGGER.warn("No table config and schema refresh message sent for table: {}", tableNameWithType); + } + } catch (Exception e) { + LOGGER.warn("Caught exception while sending table config and schema refresh message for table: {}", + tableNameWithType, e); + } + } + private void sendLogicalTableConfigRefreshMessage(String logicalTableName) { LogicalTableConfigRefreshMessage refreshMessage = new LogicalTableConfigRefreshMessage(logicalTableName); @@ -3239,7 +3264,7 @@ public class PinotHelixResourceManager { private void sendRoutingTableRebuildMessage(String tableNameWithType) { RoutingTableRebuildMessage routingTableRebuildMessage = new RoutingTableRebuildMessage(tableNameWithType); - // Send table config refresh message to brokers + // Send routing table rebuild message to brokers Criteria recipientCriteria = new Criteria(); recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); recipientCriteria.setInstanceName("%"); @@ -3258,27 +3283,6 @@ public class PinotHelixResourceManager { } } - private void sendTableConfigSchemaRefreshMessage(String tableNameWithType, List<String> instances) { - TableConfigSchemaRefreshMessage refreshMessage = new TableConfigSchemaRefreshMessage(tableNameWithType); - for (String instance : instances) { - // Send refresh message to servers - Criteria recipientCriteria = new Criteria(); - recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); - recipientCriteria.setInstanceName(instance); - recipientCriteria.setSessionSpecific(true); - ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); - // Send message with no callback and infinite timeout on the recipient - int numMessagesSent = messagingService.send(recipientCriteria, refreshMessage, null, -1); - if (numMessagesSent > 0) { - LOGGER.info("Sent {} schema refresh messages to servers for table: {} for instance: {}", numMessagesSent, - tableNameWithType, instance); - } else { - LOGGER.warn("No schema refresh message sent to servers for table: {} for instance: {}", tableNameWithType, - instance); - } - } - } - /** * Update the instance config given the broker instance id */ diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 6796b9bd02d..dd35dcf7c10 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -1382,10 +1382,9 @@ public abstract class BaseTableDataManager implements TableDataManager { for (String columnName : segmentPhysicalColumns) { ColumnMetadata columnMetadata = segmentMetadata.getColumnMetadataFor(columnName); - FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName); DataSource source = segment.getDataSource(columnName); - Preconditions.checkNotNull(columnMetadata); - Preconditions.checkNotNull(source); + assert columnMetadata != null && source != null; + FieldSpec fieldSpecInSchema = schema.getFieldSpecFor(columnName); // Column is deleted if (fieldSpecInSchema == null) { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java index ee413306b3b..bc4b19e52b5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/MapFilterOperator.java @@ -65,10 +65,14 @@ public class MapFilterOperator extends BaseFilterOperator { _columnName = arguments.get(0).getIdentifier(); _keyName = arguments.get(1).getLiteral().getStringValue(); - // Get JSON index and create operator - DataSource dataSource = indexSegment.getDataSource(_columnName); - JsonIndexReader jsonIndex = dataSource.getJsonIndex(); - if (jsonIndex != null && useJsonIndex(_predicate.getType())) { + JsonIndexReader jsonIndex = null; + if (canUseJsonIndex(_predicate.getType())) { + DataSource dataSource = indexSegment.getDataSourceNullable(_columnName); + if (dataSource != null) { + jsonIndex = dataSource.getJsonIndex(); + } + } + if (jsonIndex != null) { FilterContext filterContext = createFilterContext(); _jsonMatchOperator = new JsonMatchFilterOperator(jsonIndex, filterContext, numDocs); _expressionFilterOperator = null; @@ -201,7 +205,7 @@ public class MapFilterOperator extends BaseFilterOperator { * @param predicateType The type of predicate * @return true if the predicate type is supported for JSON index, false otherwise */ - private boolean useJsonIndex(Predicate.Type predicateType) { + private static boolean canUseJsonIndex(Predicate.Type predicateType) { switch (predicateType) { case EQ: case NOT_EQ: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java index d5180285955..c422a64a7ee 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/startree/StarTreeUtils.java @@ -324,7 +324,11 @@ public class StarTreeUtils { return null; } String column = lhs.getIdentifier(); - DataSource dataSource = indexSegment.getDataSource(column); + DataSource dataSource = indexSegment.getDataSourceNullable(column); + if (dataSource == null) { + // Star-tree does not support non-existent column + return null; + } Dictionary dictionary = dataSource.getDictionary(); if (dictionary == null) { // Star-tree does not support non-dictionary encoded dimension @@ -388,7 +392,11 @@ public class StarTreeUtils { } String column = aggregationFunctionColumnPair.getColumn(); - DataSource dataSource = indexSegment.getDataSource(column); + DataSource dataSource = indexSegment.getDataSourceNullable(column); + if (dataSource == null) { + LOGGER.debug("Cannot use star-tree index because aggregation column: '{}' does not exist", column); + return null; + } if (dataSource.getNullValueVector() != null && !dataSource.getNullValueVector().getNullBitmap().isEmpty()) { LOGGER.debug("Cannot use star-tree index because aggregation column: '{}' has null values", column); return null; @@ -396,7 +404,11 @@ public class StarTreeUtils { } for (String column : predicateEvaluatorsMap.keySet()) { - DataSource dataSource = indexSegment.getDataSource(column); + DataSource dataSource = indexSegment.getDataSourceNullable(column); + if (dataSource == null) { + LOGGER.debug("Cannot use star-tree index because filter column: '{}' does not exist", column); + return null; + } if (dataSource.getNullValueVector() != null && !dataSource.getNullValueVector().getNullBitmap().isEmpty()) { LOGGER.debug("Cannot use star-tree index because filter column: '{}' has null values", column); return null; @@ -410,7 +422,11 @@ public class StarTreeUtils { } } for (String column : groupByColumns) { - DataSource dataSource = indexSegment.getDataSource(column); + DataSource dataSource = indexSegment.getDataSourceNullable(column); + if (dataSource == null) { + LOGGER.debug("Cannot use star-tree index because group-by column: '{}' does not exist", column); + return null; + } if (dataSource.getNullValueVector() != null && !dataSource.getNullValueVector().getNullBitmap().isEmpty()) { LOGGER.debug("Cannot use star-tree index because group-by column: '{}' has null values", column); return null; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 7ce615a3b15..f40bfc8a5aa 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -1701,6 +1701,19 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet ingestionConfig.setTransformConfigs(transformConfigs); tableConfig.setIngestionConfig(ingestionConfig); updateTableConfig(tableConfig); + + // Query the new added columns without reload + TestUtils.waitForCondition(aVoid -> { + try { + JsonNode response = postQuery(TEST_STAR_TREE_QUERY_3); + return response.get("resultTable").get("rows").get(0).get(0).asInt() == 0; + } catch (Exception e) { + return false; + } + }, 60_000L, "Failed to query new added columns without reload"); + // Table size shouldn't change without reload + assertEquals(getTableSize(getTableName()), _tableSize); + reloadAllSegments(TEST_STAR_TREE_QUERY_3, false, numTotalDocs); int thirdQueryResult = postQuery(TEST_STAR_TREE_QUERY_3_REFERENCE).get("resultTable").get("rows").get(0).get(0).asInt(); @@ -1869,6 +1882,18 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet CompressionCodec.MV_ENTRY_DICT, null)); updateTableConfig(tableConfig); + // Query the new added columns without reload + TestUtils.waitForCondition(aVoid -> { + try { + JsonNode response = postQuery(SELECT_STAR_QUERY); + return response.get("resultTable").get("dataSchema").get("columnNames").size() == 104; + } catch (Exception e) { + return false; + } + }, 60_000L, "Failed to query new added columns without reload"); + // Table size shouldn't change without reload + assertEquals(getTableSize(getTableName()), _tableSize); + // Trigger reload and verify column count reloadAllSegments(TEST_EXTRA_COLUMNS_QUERY, false, numTotalDocs); JsonNode segmentsMetadata = JsonUtils.stringToJsonNode( diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java index 01257777777..4a691e4fc42 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/PinotSegmentColumnReader.java @@ -42,7 +42,6 @@ public class PinotSegmentColumnReader implements Closeable { public PinotSegmentColumnReader(IndexSegment indexSegment, String column) { DataSource dataSource = indexSegment.getDataSource(column); - Preconditions.checkArgument(dataSource != null, "Failed to find data source for column: %s", column); _forwardIndexReader = dataSource.getForwardIndex(); Preconditions.checkArgument(_forwardIndexReader != null, "Forward index disabled for column: %s", column); _forwardIndexReaderContext = _forwardIndexReader.createContext(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org