praveenc7 commented on code in PR #15350:
URL: https://github.com/apache/pinot/pull/15350#discussion_r2093419556


##########
pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableConfigAndSchemaCache.java:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.config.provider;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+public class TableConfigAndSchemaCache {

Review Comment:
   Done



##########
pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableConfigAndSchemaCache.java:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.config.provider;
+
+import com.google.common.base.Preconditions;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+
+public class TableConfigAndSchemaCache {
+
+    private static TableConfigAndSchemaCache _instance;
+
+    private final ConcurrentHashMap<String, TableConfig> _tableConfigCache;
+    private final ConcurrentHashMap<String, Schema> _tableSchemaCache;
+
+    private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
+
+    private TableConfigAndSchemaCache(ZkHelixPropertyStore<ZNRecord> 
propertyStore) {
+        _propertyStore = propertyStore;
+        _tableConfigCache = new ConcurrentHashMap<>();
+        _tableSchemaCache = new ConcurrentHashMap<>();
+    }
+
+    public static void init(ZkHelixPropertyStore<ZNRecord> propertyStore) {
+        _instance = new TableConfigAndSchemaCache(propertyStore);
+    }
+
+    public static TableConfigAndSchemaCache getInstance() {
+        if (_instance == null) {
+            // This is a fallback for cases where init() is not called used in 
tests
+            _instance = new TableConfigAndSchemaCache(null);
+        }
+        return _instance;
+    }
+
+    /**
+     * Get the cached table config if available, otherwise fetch the latest 
table config from ZK and cache it.
+     *
+     * @param tableNameWithType Table name with type suffix
+     * @return TableConfig
+     */
+    public TableConfig getTableConfig(String tableNameWithType) {
+        return _tableConfigCache.computeIfAbsent(tableNameWithType, 
this::getLatestTableConfig);
+    }
+
+    /**
+     * Get the cached schema if available, otherwise fetch the latest schema 
from ZK and cache it.
+     *
+     * @param tableName - Table name without or without type suffix
+     * @return Schema - Schema for the table
+     */
+    public Schema getSchema(String tableName) {
+        String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+        return _tableSchemaCache.computeIfAbsent(rawTableName, 
this::getLatestSchema);
+    }
+
+    /**
+     * Get the latest table config for the given table name.
+     *
+     * @param tableNameWithType Table name with type suffix
+     * @return TableConfig - Latest table config
+     */
+    public TableConfig getLatestTableConfig(String tableNameWithType) {
+        TableConfig tableConfig = 
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
+        Preconditions.checkState(tableConfig != null, "Failed to find table 
config for table: %s", tableNameWithType);
+        _tableConfigCache.put(tableNameWithType, tableConfig);
+        return tableConfig;
+    }
+
+    /**
+     * Get the latest schema for the given table name.
+     *
+     * @param tableName - Table name without or without type suffix
+     * @return Schema - Latest schema
+     */
+    public Schema getLatestSchema(String tableName) {
+        String rawTableName = TableNameBuilder.extractRawTableName(tableName);
+        Schema schema = ZKMetadataProvider.getSchema(_propertyStore, 
rawTableName);
+        Preconditions.checkState(schema != null, "Failed to find schema for 
table: %s", rawTableName);

Review Comment:
   This check is introduced as part of 
https://github.com/apache/pinot/pull/15443 and with 
https://github.com/apache/pinot/pull/15333 we don't consider this as valid 
behaviour 



##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreloadUtils.java:
##########
@@ -191,4 +202,20 @@ private static Map<String, SegmentZKMetadata> 
getSegmentsZKMetadata(String table
         .forEach(m -> segmentMetadataMap.put(m.getSegmentName(), m));
     return segmentMetadataMap;
   }
+
+  public static DataSource getVirtualDataSource(String schemaName, String 
column, int totalDocCount) {
+    Schema schema = TABLE_CONFIG_AND_SCHEMA_CACHE.getSchema(schemaName);

Review Comment:
   Right this is called when datasource is missing. But since we have 
`datasource` for each Segment, we want to avoid loading the config if it was 
already fetched by a different segment



##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java:
##########
@@ -4013,4 +4015,100 @@ public void testResponseWithClientRequestId(boolean 
useMultiStageQueryEngine)
 
     assertEquals(result.get("clientRequestId").asText(), clientRequestId);
   }
+
+  @Test
+  public void testQueryOnNewColumnAdditionWithPartialReload() throws Exception 
{
+    String testColumn = "TestColumn";
+    Schema oldSchema = getSchema(DEFAULT_SCHEMA_NAME);
+    // Pick any existing INT column name for the “valid” cases
+    String validColumnName = oldSchema.getAllFieldSpecs().stream()
+            .filter(fs -> fs.getDataType() == DataType.INT)
+            .findFirst()
+            .get()
+            .getName();
+
+    // Pair each query with whether it should succeed (true) or fail (false) 
before full reload
+    List<String> queries = List.of(
+      SELECT_STAR_QUERY,
+      SELECT_STAR_QUERY + " WHERE " + validColumnName + " > 0 LIMIT 10000",
+      SELECT_STAR_QUERY + " ORDER BY " + validColumnName + " LIMIT 10000",
+      SELECT_STAR_QUERY + " WHERE " + testColumn + " > 0 LIMIT 10000",
+      SELECT_STAR_QUERY + " ORDER BY " + testColumn + " LIMIT 10000",
+      "SELECT " + testColumn + " FROM " + DEFAULT_TABLE_NAME + "_OFFLINE"
+    );
+
+    for (String query: queries) {
+      try {
+        // Build new schema with the extra column
+        Schema newSchema = new Schema();
+        newSchema.setSchemaName(oldSchema.getSchemaName());
+        for (FieldSpec fs : oldSchema.getAllFieldSpecs()) {
+          newSchema.addField(fs);
+        }
+        newSchema.addField(new DimensionFieldSpec(testColumn, 
FieldSpec.DataType.INT, true));
+        updateSchema(newSchema);
+
+        // Partially reload one segment
+        reloadAndWait(DEFAULT_TABLE_NAME + "_OFFLINE",
+                listSegments(DEFAULT_TABLE_NAME + "_OFFLINE").get(0));
+        // Column still not present until full reload
+        runQueryAndAssert(query, testColumn);
+        // Now do a full reload and assert the column shows up
+        reloadAndWait(DEFAULT_TABLE_NAME + "_OFFLINE", null);
+        runQueryAndAssert(query, testColumn);
+      } finally {
+        // Reset back to the original schema for the next iteration
+        forceUpdateSchema(oldSchema);
+        reloadAndWait(DEFAULT_TABLE_NAME + "_OFFLINE", null);
+      }
+    }
+  }
+
+  private void reloadAndWait(String tableNameWithType, @Nullable String 
segmentName) throws Exception {
+    String response = (segmentName != null)
+        ? reloadOfflineSegment(tableNameWithType, segmentName, true)
+        : reloadOfflineTable(tableNameWithType, true);
+    JsonNode responseJson = JsonUtils.stringToJsonNode(response);
+    String jobId;
+    if (segmentName != null) {
+      // Single segment reload response: status is a string, parse manually
+      String statusString = responseJson.get("status").asText();
+      assertTrue(statusString.contains("SUCCESS"), "Segment reload failed: " + 
statusString);
+      int startIdx = statusString.indexOf("reload job id:") + "reload job 
id:".length();
+      int endIdx = statusString.indexOf(',', startIdx);
+      jobId = statusString.substring(startIdx, endIdx).trim();
+    } else {
+      // Full table reload response: structured JSON
+      JsonNode tableLevelDetails
+              = 
JsonUtils.stringToJsonNode(responseJson.get("status").asText()).get(tableNameWithType);
+      
assertEquals(tableLevelDetails.get("reloadJobMetaZKStorageStatus").asText(), 
"SUCCESS");
+      jobId = tableLevelDetails.get("reloadJobId").asText();
+    }
+    String finalJobId = jobId;
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        return isReloadJobCompleted(finalJobId);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }, 600_000L, "Reload job did not complete in 10 minutes");
+  }
+
+  private void runQueryAndAssert(String query, String newAddedColumn) throws 
Exception {
+    JsonNode response = postQuery(query);
+    assertNoError(response);
+    JsonNode rows = response.get("resultTable").get("rows");
+    JsonNode jsonSchema = response.get("resultTable").get("dataSchema");
+    DataSchema resultSchema = JsonUtils.jsonNodeToObject(jsonSchema, 
DataSchema.class);
+
+    assert !rows.isEmpty();
+    boolean columnPresent = false;
+    for (String columnName : resultSchema.getColumnNames()) {
+      if (columnName.equals(newAddedColumn)) {
+          columnPresent = true;
+          break;
+      }
+    }
+    assertTrue(columnPresent, "Column " + newAddedColumn + " not present in 
result set");

Review Comment:
   I can add those test 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to