This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d6e37012d6 Enhance OfflineClusterIntegrationTest to check index size and fix index removal for default column (#15740) d6e37012d6 is described below commit d6e37012d65be7569986834467e752627029023f Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu May 8 12:38:35 2025 -0600 Enhance OfflineClusterIntegrationTest to check index size and fix index removal for default column (#15740) --- .../tests/BaseClusterIntegrationTest.java | 5 + .../MultiNodesOfflineClusterIntegrationTest.java | 42 +-- .../tests/OfflineClusterIntegrationTest.java | 381 ++++++++++----------- .../defaultcolumn/BaseDefaultColumnHandler.java | 12 +- .../segment/local/segment/store/IndexEntry.java | 4 - .../segment/local/segment/store/IndexKey.java | 4 - 6 files changed, 197 insertions(+), 251 deletions(-) diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index 2c84a33c4b..ed36424506 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -191,6 +191,10 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { return new ArrayList<>(DEFAULT_INVERTED_INDEX_COLUMNS); } + protected boolean isCreateInvertedIndexDuringSegmentGeneration() { + return false; + } + @Nullable protected List<String> getNoDictionaryColumns() { return new ArrayList<>(DEFAULT_NO_DICTIONARY_COLUMNS); @@ -305,6 +309,7 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { .setTimeColumnName(getTimeColumnName()) .setSortedColumn(getSortedColumn()) .setInvertedIndexColumns(getInvertedIndexColumns()) + .setCreateInvertedIndexDuringSegmentGeneration(isCreateInvertedIndexDuringSegmentGeneration()) .setNoDictionaryColumns(getNoDictionaryColumns()) .setRangeIndexColumns(getRangeIndexColumns()) .setBloomFilterColumns(getBloomFilterColumns()) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java index fcd505194b..e73f47fe0b 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java @@ -288,47 +288,7 @@ public class MultiNodesOfflineClusterIntegrationTest extends OfflineClusterInteg assertEquals(row.get(1).doubleValue(), 725560.0 / 444); } - // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded - @Test(enabled = false) - public void testStarTreeTriggering() { - // Ignored - } - - // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded - @Test(enabled = false) - @Override - public void testDefaultColumns(boolean useMultiStageQueryEngine) { - // Ignored - } - - // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded - @Test(enabled = false) - @Override - public void testForwardIndexTriggering() { - // Ignored - } - - // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded - @Test(enabled = false) - public void testBloomFilterTriggering() { - // Ignored - } - - // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded - @Test(enabled = false) - @Override - public void testRangeIndexTriggering(boolean useMultiStageQueryEngine) - throws Exception { - // Ignored - } - - // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded - @Test(enabled = false) - @Override - public void testInvertedIndexTriggering() { - // Ignored - } - + // Disabled because segments might not be server partitioned with multiple servers @Test(enabled = false) @Override public void testHardcodedServerPartitionedSqlQueries() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index ca35934586..a9341eef1a 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -91,6 +91,7 @@ import org.apache.pinot.util.TestUtils; import org.intellij.lang.annotations.Language; import org.testng.Assert; import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; @@ -156,7 +157,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet private final List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbacks = new ArrayList<>(getNumBrokers() + getNumServers()); - private String _schemaFileName = DEFAULT_SCHEMA_FILE_NAME; + + private TableConfig _tableConfig; + private Schema _schema; // Store the table size. Table size is platform dependent because of the native library used by the ChunkCompressor. // Once this value is set, assert that table size always gets back to this value after removing the added indices. @@ -170,16 +173,20 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet return NUM_SERVERS; } + /// Create inverted index when generating the segment to ensure the index ordering won't change when re-assembling + /// them, so that the table size is consistent. @Override - protected String getSchemaFileName() { - return _schemaFileName; + protected boolean isCreateInvertedIndexDuringSegmentGeneration() { + return true; } @Override protected List<FieldConfig> getFieldConfigs() { - return List.of( + List<FieldConfig> fieldConfigs = new ArrayList<>(); + fieldConfigs.add( new FieldConfig("DivAirports", FieldConfig.EncodingType.DICTIONARY, List.of(), CompressionCodec.MV_ENTRY_DICT, null)); + return fieldConfigs; } @Override @@ -221,17 +228,17 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet startServers(); // Create and upload the schema and table config - Schema schema = createSchema(); - addSchema(schema); - TableConfig tableConfig = createOfflineTableConfig(); - addTableConfig(tableConfig); + _schema = createSchema(); + addSchema(_schema); + _tableConfig = createOfflineTableConfig(); + addTableConfig(_tableConfig); // Unpack the Avro files List<File> avroFiles = unpackAvroData(_tempDir); // Create and upload segments. For exhaustive testing, concurrently upload multiple segments with the same name // and validate correctness with parallel push protection enabled. - ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, _tableConfig, _schema, 0, _segmentDir, _tarDir); // Create a copy of _tarDir to create multiple segments with the same name. File tarDir2 = new File(_tempDir, "tarDir2"); FileUtils.copyDirectory(_tarDir, tarDir2); @@ -271,25 +278,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet _tableSize = getTableSize(getTableName()); } - private void reloadAllSegments(String testQuery, boolean forceDownload, long numTotalDocs) - throws IOException { - // Try to refresh all the segments again with force download from the controller URI. - String reloadJob = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, forceDownload); - TestUtils.waitForCondition(aVoid -> { - try { - JsonNode queryResponse = postQuery(testQuery); - if (!queryResponse.get("exceptions").isEmpty()) { - return false; - } - // Total docs should not change during reload - assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); - return isReloadJobCompleted(reloadJob); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to reload table with force download"); - } - protected void startBrokers() throws Exception { startBrokers(getNumBrokers()); @@ -326,11 +314,37 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet } } + /// Ensure table is at the same state after the test. + @AfterMethod + public void checkTableSetup() + throws IOException { + assertEquals(getOfflineTableConfig(), _tableConfig); + assertEquals(getSchema(getTableName()), _schema); + assertEquals(getTableSize(getTableName()), _tableSize); + } + + private void reloadAllSegments(String testQuery, boolean forceDownload, long numTotalDocs) + throws IOException { + // Try to refresh all the segments again with force download from the controller URI. + String reloadJob = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, forceDownload); + TestUtils.waitForCondition(aVoid -> { + try { + JsonNode queryResponse = postQuery(testQuery); + if (!queryResponse.get("exceptions").isEmpty()) { + return false; + } + // Total docs should not change during reload + assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); + return isReloadJobCompleted(reloadJob); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, 600_000L, "Failed to reload table with force download"); + } + private void testQueryError(@Language("sql") String query, QueryErrorCode errorCode) throws Exception { - assertQuery(query) - .firstException() - .hasErrorCode(errorCode); + assertQuery(query).firstException().hasErrorCode(errorCode); } @Test @@ -360,7 +374,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet public void testRefreshTableConfigAndQueryTimeout() throws Exception { // Set timeout as 5ms so that query will timeout - TableConfig tableConfig = getOfflineTableConfig(); + TableConfig tableConfig = createOfflineTableConfig(); tableConfig.setQueryConfig(new QueryConfig(5L, null, null, null, null, null)); updateTableConfig(tableConfig); @@ -391,8 +405,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet }, 60_000L, "Failed to refresh table config"); // Remove timeout so that query will finish - tableConfig.setQueryConfig(null); - updateTableConfig(tableConfig); + updateTableConfig(_tableConfig); // Wait for at most 1 minute for broker to receive and process the table config refresh message TestUtils.waitForCondition(aVoid -> { @@ -501,6 +514,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet } waitForNumOfSegmentsBecomeOnline(offlineTableName, 1); dropOfflineTable(SEGMENT_UPLOAD_TEST_TABLE); + deleteSchema(SEGMENT_UPLOAD_TEST_TABLE); waitForEVToDisappear(offlineTableName); } @@ -518,7 +532,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet + tableNameWithType + ")"); } - @Test(dependsOnMethods = "testRangeIndexTriggering") + @Test public void testInvertedIndexTriggering() throws Exception { long numTotalDocs = getCountStarResult(); @@ -527,30 +541,27 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs); addInvertedIndex(); - long tableSizeWithNewIndex = getTableSize(getTableName()); + String rawTableName = getTableName(); + long tableSizeWithNewIndex = getTableSize(rawTableName); // Update table config to remove the new inverted index, and check if the new inverted index is removed - TableConfig tableConfig = getOfflineTableConfig(); - tableConfig.getIndexingConfig().setInvertedIndexColumns(getInvertedIndexColumns()); - updateTableConfig(tableConfig); - reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, numTotalDocs); + updateTableConfig(_tableConfig); + reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, false, numTotalDocs); assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs); - assertEquals(getTableSize(getTableName()), _tableSize); + assertEquals(getTableSize(rawTableName), _tableSize); // Add the inverted index back to test index removal via force download. addInvertedIndex(); - assertEquals(getTableSize(getTableName()), tableSizeWithNewIndex); + assertEquals(getTableSize(rawTableName), tableSizeWithNewIndex); // Update table config to remove the new inverted index. - tableConfig = getOfflineTableConfig(); - tableConfig.getIndexingConfig().setInvertedIndexColumns(getInvertedIndexColumns()); - updateTableConfig(tableConfig); + updateTableConfig(_tableConfig); // Force to download a single segment, and disk usage should drop a bit. SegmentZKMetadata segmentZKMetadata = - _helixResourceManager.getSegmentsZKMetadata(TableNameBuilder.OFFLINE.tableNameWithType(getTableName())).get(0); + _helixResourceManager.getSegmentsZKMetadata(TableNameBuilder.OFFLINE.tableNameWithType(rawTableName)).get(0); String segmentName = segmentZKMetadata.getSegmentName(); - reloadOfflineSegment(getTableName(), segmentName, true); + reloadOfflineSegment(rawTableName, segmentName, true); TestUtils.waitForCondition(aVoid -> { try { JsonNode queryResponse = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY); @@ -564,7 +575,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet }, 600_000L, "Failed to clean up obsolete index in segment"); // As query behavior changed, the segment reload must have been done. The new table size should be like below, // with only one segment being reloaded with force download and dropping the inverted index. - long tableSizeAfterReloadSegment = getTableSize(getTableName()); + long tableSizeAfterReloadSegment = getTableSize(rawTableName); assertTrue(tableSizeAfterReloadSegment > _tableSize && tableSizeAfterReloadSegment < tableSizeWithNewIndex, "Table size: " + tableSizeAfterReloadSegment + " should be between " + _tableSize + " and " + tableSizeWithNewIndex + " after dropping inverted index from segment: " + segmentName); @@ -579,24 +590,20 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // download, the original segment dir is deleted and then replaced with the newly downloaded segment, leaving a // small chance of race condition between getting table size check and replacing the segment dir, i.e. flaky test. addInvertedIndex(); - assertEquals(getTableSize(getTableName()), tableSizeWithNewIndex); + assertEquals(getTableSize(rawTableName), tableSizeWithNewIndex); // Force to download the whole table and use the original table config, so the disk usage should get back to // initial value. - tableConfig = getOfflineTableConfig(); - tableConfig.getIndexingConfig().setInvertedIndexColumns(getInvertedIndexColumns()); - updateTableConfig(tableConfig); + updateTableConfig(_tableConfig); reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, numTotalDocs); assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs); - // With force download, the table size gets back to the initial value. - assertEquals(getTableSize(getTableName()), _tableSize); } private void addInvertedIndex(boolean shouldReload) throws Exception { // Update table config to add inverted index on DivActualElapsedTime column, and // reload the table to get config change into effect and add the inverted index. - TableConfig tableConfig = getOfflineTableConfig(); + TableConfig tableConfig = createOfflineTableConfig(); tableConfig.getIndexingConfig().setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS); updateTableConfig(tableConfig); @@ -619,7 +626,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet throws Exception { // Update table config to add Range index on DivActualElapsedTime column, and // reload the table to get config change into effect and add the Range index. - TableConfig tableConfig = getOfflineTableConfig(); + TableConfig tableConfig = createOfflineTableConfig(); tableConfig.getIndexingConfig().setRangeIndexColumns(UPDATED_RANGE_INDEX_COLUMNS); updateTableConfig(tableConfig); @@ -680,7 +687,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet @Test public void testMaxQueryResponseSizeTableConfig() throws Exception { - TableConfig tableConfig = getOfflineTableConfig(); + TableConfig tableConfig = createOfflineTableConfig(); tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, 1000L, null)); updateTableConfig(tableConfig); @@ -696,8 +703,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet } }, 60_000L, "Failed to execute query"); - tableConfig.setQueryConfig(null); - updateTableConfig(tableConfig); + updateTableConfig(_tableConfig); TestUtils.waitForCondition(aVoid -> { try { @@ -713,7 +719,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet @Test public void testMaxServerResponseSizeTableConfig() throws Exception { - TableConfig tableConfig = getOfflineTableConfig(); + TableConfig tableConfig = createOfflineTableConfig(); tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, null, 1000L)); updateTableConfig(tableConfig); @@ -729,8 +735,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet } }, 60_000L, "Failed to execute query"); - tableConfig.setQueryConfig(null); - updateTableConfig(tableConfig); + updateTableConfig(_tableConfig); TestUtils.waitForCondition(aVoid -> { try { @@ -746,7 +751,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet @Test public void testMaxResponseSizeTableConfigOrdering() throws Exception { - TableConfig tableConfig = getOfflineTableConfig(); + TableConfig tableConfig = createOfflineTableConfig(); tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, 1000000L, 1000L)); updateTableConfig(tableConfig); @@ -762,8 +767,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet } }, 60_000L, "Failed to execute query"); - tableConfig.setQueryConfig(null); - updateTableConfig(tableConfig); + updateTableConfig(_tableConfig); TestUtils.waitForCondition(aVoid -> { try { @@ -1097,7 +1101,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet setUseMultiStageQueryEngine(useMultiStageQueryEngine); // string literal - String sqlQuery = "SELECT toBase64(toUtf8('hello!')), " + "fromUtf8(fromBase64('aGVsbG8h')) FROM mytable"; + String sqlQuery = "SELECT toBase64(toUtf8('hello!')), fromUtf8(fromBase64('aGVsbG8h')) FROM mytable"; JsonNode response = postQuery(sqlQuery); JsonNode resultTable = response.get("resultTable"); JsonNode dataSchema = resultTable.get("dataSchema"); @@ -1421,7 +1425,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet assertEquals(results.get(10).asText(), "hello!"); } - @Test(dependsOnMethods = "testBloomFilterTriggering", dataProvider = "useBothQueryEngines") + @Test(dataProvider = "useBothQueryEngines") public void testRangeIndexTriggering(boolean useMultiStageQueryEngine) throws Exception { setUseMultiStageQueryEngine(useMultiStageQueryEngine); @@ -1432,38 +1436,32 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet addRangeIndex(); // Update table config to remove the new range index, and check if the new range index is removed - TableConfig tableConfig = getOfflineTableConfig(); - tableConfig.getIndexingConfig().setRangeIndexColumns(getRangeIndexColumns()); - updateTableConfig(tableConfig); - reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, true, numTotalDocs); + updateTableConfig(_tableConfig); + reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, false, numTotalDocs); assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs); - assertEquals(getTableSize(getTableName()), _tableSize); } - @Test(dependsOnMethods = "testForwardIndexTriggering") + @Test public void testBloomFilterTriggering() throws Exception { long numTotalDocs = getCountStarResult(); assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("numSegmentsProcessed").asLong(), NUM_SEGMENTS); // Update table config and trigger reload - TableConfig tableConfig = getOfflineTableConfig(); - tableConfig.getIndexingConfig().setBloomFilterColumns(UPDATED_BLOOM_FILTER_COLUMNS); + TableConfig tableConfig = createOfflineTableConfig(); + IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); + indexingConfig.setBloomFilterColumns(UPDATED_BLOOM_FILTER_COLUMNS); updateTableConfig(tableConfig); reloadAllSegments(TEST_UPDATED_BLOOM_FILTER_QUERY, false, numTotalDocs); assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("numSegmentsProcessed").asLong(), 0L); - // Update table config to remove the new bloom filter, and - // reload table to clean the new bloom filter physically. - tableConfig = getOfflineTableConfig(); - tableConfig.getIndexingConfig().setBloomFilterColumns(getBloomFilterColumns()); - updateTableConfig(tableConfig); - reloadAllSegments(TEST_UPDATED_BLOOM_FILTER_QUERY, true, numTotalDocs); + // Update table config to remove the new bloom filter, and reload table to clean the new bloom filter physically. + updateTableConfig(_tableConfig); + reloadAllSegments(TEST_UPDATED_BLOOM_FILTER_QUERY, false, numTotalDocs); assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("numSegmentsProcessed").asLong(), NUM_SEGMENTS); - assertEquals(getTableSize(getTableName()), _tableSize); } - @Test(dependsOnMethods = "testDefaultColumns") + @Test public void testForwardIndexTriggering() throws Exception { String column = "DestCityName"; @@ -1474,7 +1472,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet double forwardIndexSize = columnIndexSize.get(StandardIndexes.FORWARD_ID).asDouble(); // Convert 'DestCityName' to raw index - TableConfig tableConfig = getOfflineTableConfig(); + TableConfig tableConfig = createOfflineTableConfig(); IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); List<String> noDictionaryColumns = indexingConfig.getNoDictionaryColumns(); assertNotNull(noDictionaryColumns); @@ -1553,8 +1551,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet private void resetForwardIndex(double expectedDictionarySize, double expectedForwardIndexSize) throws Exception { - TableConfig tableConfig = createOfflineTableConfig(); - updateTableConfig(tableConfig); + updateTableConfig(_tableConfig); reloadAllSegments(SELECT_STAR_QUERY, false, getCountStarResult()); JsonNode columnIndexSize = getColumnIndexSize("DestCityName"); assertEquals(columnIndexSize.get(StandardIndexes.DICTIONARY_ID).asDouble(), expectedDictionarySize); @@ -1582,7 +1579,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet public void testStarTreeTriggering() throws Exception { int numTotalDocs = (int) getCountStarResult(); - long tableSizeWithDefaultIndex = getTableSize(getTableName()); // Test the first query JsonNode firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1); @@ -1594,7 +1590,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet firstQueryResult); // Update table config without enabling dynamic star-tree creation and trigger reload, should have no effect - TableConfig tableConfig = getOfflineTableConfig(); + TableConfig tableConfig = createOfflineTableConfig(); IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); indexingConfig.setStarTreeIndexConfigs(List.of(STAR_TREE_INDEX_CONFIG_1)); indexingConfig.setEnableDynamicStarTreeCreation(false); @@ -1671,7 +1667,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet reloadAllSegments(TEST_STAR_TREE_QUERY_2, false, numTotalDocs); // Without star-tree, 'numDocsScanned' should be the same as the 'COUNT(*)' result verifySingleValueResponse(postQuery(TEST_STAR_TREE_QUERY_2), secondQueryResult, numTotalDocs, secondQueryResult); - assertEquals(getTableSize(getTableName()), tableSizeWithDefaultIndex); // First query should not be able to use the star-tree verifySingleValueResponse(postQuery(TEST_STAR_TREE_QUERY_1), firstQueryResult, numTotalDocs, firstQueryResult); @@ -1680,6 +1675,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet reloadAllSegments(TEST_STAR_TREE_QUERY_2, false, numTotalDocs); verifySingleValueResponse(postQuery(TEST_STAR_TREE_QUERY_1), firstQueryResult, numTotalDocs, firstQueryResult); verifySingleValueResponse(postQuery(TEST_STAR_TREE_QUERY_2), secondQueryResult, numTotalDocs, secondQueryResult); + + // Reset the table config and reload, should have no effect + updateTableConfig(_tableConfig); + reloadAllSegments(TEST_STAR_TREE_QUERY_2, false, numTotalDocs); + verifySingleValueResponse(postQuery(TEST_STAR_TREE_QUERY_1), firstQueryResult, numTotalDocs, firstQueryResult); + verifySingleValueResponse(postQuery(TEST_STAR_TREE_QUERY_2), secondQueryResult, numTotalDocs, secondQueryResult); } private void verifySingleValueResponse(JsonNode queryResponse, int expectedResult, int expectedTotalDocs, @@ -1721,75 +1722,17 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet * <li>"NewAddedDerivedNullString", DIMENSION, STRING, single-value, caseWhen(true, null, null)</li> * </ul> */ - @Test(dependsOnMethods = "testAggregateMetadataAPI", dataProvider = "useBothQueryEngines") + @Test(dataProvider = "useBothQueryEngines") public void testDefaultColumns(boolean useMultiStageQueryEngine) throws Exception { setUseMultiStageQueryEngine(useMultiStageQueryEngine); long numTotalDocs = getCountStarResult(); + TableConfig tableConfig = createOfflineTableConfig(); + Schema schema = createSchema(); - reloadWithExtraColumns(); - JsonNode queryResponse = postQuery(SELECT_STAR_QUERY); - assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); - assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(), 104); - - testNewAddedColumns(); - - // The multi-stage query engine doesn't support expression overrides currently - if (!useMultiStageQueryEngine()) { - testExpressionOverride(); - } - - reloadWithMissingColumns(); - queryResponse = postQuery(SELECT_STAR_QUERY); - assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); - assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(), 75); - - reloadWithRegularColumns(); - queryResponse = postQuery(SELECT_STAR_QUERY); - assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); - assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(), 79); - } - - @Test - public void testDisableGroovyQueryTableConfigOverride() - throws Exception { - String groovyQuery = "SELECT GROOVY('{\"returnType\":\"STRING\",\"isSingleValue\":true}', " - + "'arg0 + arg1', FlightNum, Origin) FROM mytable"; - TableConfig tableConfig = getOfflineTableConfig(); - tableConfig.setQueryConfig(new QueryConfig(null, false, null, null, null, null)); - updateTableConfig(tableConfig); - - TestUtils.waitForCondition(aVoid -> { - try { - // Query should not throw exception - postQuery(groovyQuery); - return true; - } catch (Exception e) { - return false; - } - }, 60_000L, "Failed to accept Groovy query with table override"); - - // Remove query config - tableConfig.setQueryConfig(null); - updateTableConfig(tableConfig); - - TestUtils.waitForCondition(aVoid -> { - try { - postQuery(groovyQuery); - return false; - } catch (Exception e) { - // expected - return true; - } - }, 60_000L, "Failed to reject Groovy query without query table config override"); - } - - private void reloadWithExtraColumns() - throws Exception { - long numTotalDocs = getCountStarResult(); + // TEST EXTRA COLUMNS // Add columns to the schema first to pass the validation of the table config - Schema schema = createSchema(); schema.addField(new MetricFieldSpec("NewAddedIntMetric", DataType.INT, 1)); schema.addField(new MetricFieldSpec("NewAddedLongMetric", DataType.LONG, 1)); schema.addField(new MetricFieldSpec("NewAddedFloatMetric", DataType.FLOAT)); @@ -1817,9 +1760,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet schema.addField(new DimensionFieldSpec("NewAddedDerivedMVDoubleDimension", DataType.DOUBLE, false)); schema.addField(new DimensionFieldSpec("NewAddedDerivedNullString", DataType.STRING, true, "nil")); schema.setEnableColumnBasedNullHandling(true); - addSchema(schema); + updateSchema(schema); - TableConfig tableConfig = getOfflineTableConfig(); List<TransformConfig> transformConfigs = List.of( new TransformConfig("NewAddedDerivedHoursSinceEpoch", "DaysSinceEpoch * 24"), new TransformConfig("NewAddedDerivedTimestamp", "DaysSinceEpoch * 24 * 3600 * 1000"), @@ -1835,9 +1777,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setTransformConfigs(transformConfigs); tableConfig.setIngestionConfig(ingestionConfig); + // Ensure that we can reload segments with a new raw derived column - tableConfig.getIndexingConfig().getNoDictionaryColumns().add("NewAddedRawDerivedStringDimension"); - tableConfig.getIndexingConfig().getNoDictionaryColumns().add("NewAddedRawDerivedMVIntDimension"); + List<String> noDictionaryColumns = tableConfig.getIndexingConfig().getNoDictionaryColumns(); + assertNotNull(noDictionaryColumns); + noDictionaryColumns.add("NewAddedRawDerivedStringDimension"); + noDictionaryColumns.add("NewAddedRawDerivedMVIntDimension"); List<FieldConfig> fieldConfigList = tableConfig.getFieldConfigList(); assertNotNull(fieldConfigList); fieldConfigList.add( @@ -1848,10 +1793,15 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet CompressionCodec.MV_ENTRY_DICT, null)); updateTableConfig(tableConfig); - // Trigger reload + // Trigger reload and verify column count reloadAllSegments(TEST_EXTRA_COLUMNS_QUERY, false, numTotalDocs); - assertEquals(postQuery(TEST_EXTRA_COLUMNS_QUERY).get("resultTable").get("rows").get(0).get(0).asLong(), - numTotalDocs); + JsonNode segmentsMetadata = JsonUtils.stringToJsonNode( + sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName(), List.of("*")))); + assertEquals(segmentsMetadata.size(), 12); + for (JsonNode segmentMetadata : segmentsMetadata) { + assertEquals(segmentMetadata.get("columns").size(), 104); + } + assertEquals(postQuery(SELECT_STAR_QUERY).get("resultTable").get("dataSchema").get("columnNames").size(), 104); // Verify the index sizes JsonNode columnIndexSizeMap = JsonUtils.stringToJsonNode(sendGetRequest( @@ -1889,47 +1839,44 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet assertFalse(derivedRawMVIntColumnIndex.has(StandardIndexes.DICTIONARY_ID)); assertTrue(derivedNullStringColumnIndex.has(StandardIndexes.NULL_VALUE_VECTOR_ID)); - } - private void reloadWithMissingColumns() - throws Exception { - // Remove columns from the table config first to pass the validation of the table config - TableConfig tableConfig = getOfflineTableConfig(); - tableConfig.setIngestionConfig(null); - tableConfig.setFieldConfigList(getFieldConfigs()); - tableConfig.getIndexingConfig().getNoDictionaryColumns().remove("NewAddedRawDerivedStringDimension"); - tableConfig.getIndexingConfig().getNoDictionaryColumns().remove("NewAddedRawDerivedMVIntDimension"); - updateTableConfig(tableConfig); + testNewAddedColumns(); + + // The multi-stage query engine doesn't support expression overrides currently + if (!useMultiStageQueryEngine()) { + testExpressionOverride(); + } + + // TEST MISSING COLUMNS + + // Reset the table config first to pass the validation + updateTableConfig(_tableConfig); // Need to force update the schema because removing columns is backward-incompatible change - Schema schema = createSchema(); + schema = createSchema(); schema.removeField("AirlineID"); schema.removeField("ArrTime"); schema.removeField("AirTime"); schema.removeField("ArrDel15"); forceUpdateSchema(schema); - // Trigger reload - reloadAllSegments(SELECT_STAR_QUERY, true, getCountStarResult()); - JsonNode segmentsMetadata = JsonUtils.stringToJsonNode( + // Trigger reload and verify column count + reloadAllSegments(SELECT_STAR_QUERY, false, numTotalDocs); + segmentsMetadata = JsonUtils.stringToJsonNode( sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName(), List.of("*")))); assertEquals(segmentsMetadata.size(), 12); for (JsonNode segmentMetadata : segmentsMetadata) { assertEquals(segmentMetadata.get("columns").size(), 75); } - } + assertEquals(postQuery(SELECT_STAR_QUERY).get("resultTable").get("dataSchema").get("columnNames").size(), 75); - private void reloadWithRegularColumns() - throws Exception { - long numTotalDocs = getCountStarResult(); + // TEST REGULAR COLUMNS - _schemaFileName = DEFAULT_SCHEMA_FILE_NAME; - addSchema(createSchema()); + updateSchema(_schema); - // Trigger reload - reloadAllSegments(SELECT_STAR_QUERY, true, numTotalDocs); - assertEquals(postQuery(TEST_REGULAR_COLUMNS_QUERY).get("resultTable").get("rows").get(0).get(0).asLong(), - numTotalDocs); + // Trigger reload and verify column count + reloadAllSegments(TEST_REGULAR_COLUMNS_QUERY, false, numTotalDocs); + assertEquals(postQuery(SELECT_STAR_QUERY).get("resultTable").get("dataSchema").get("columnNames").size(), 79); } private void testNewAddedColumns() @@ -2175,7 +2122,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet } // Add expression override - TableConfig tableConfig = getOfflineTableConfig(); + TableConfig tableConfig = createOfflineTableConfig(); tableConfig.setQueryConfig( new QueryConfig(null, null, null, Map.of("DaysSinceEpoch * 24", "NewAddedDerivedHoursSinceEpoch"), null, null)); updateTableConfig(tableConfig); @@ -2191,8 +2138,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet }, 60_000L, "Failed to add expression override"); // Remove expression override - tableConfig.setQueryConfig(null); - updateTableConfig(tableConfig); + updateTableConfig(_tableConfig); TestUtils.waitForCondition(aVoid -> { try { @@ -2205,6 +2151,42 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet }, 60_000L, "Failed to remove expression override"); } + @Test + public void testDisableGroovyQueryTableConfigOverride() + throws Exception { + String groovyQuery = "SELECT GROOVY('{\"returnType\":\"STRING\",\"isSingleValue\":true}', " + + "'arg0 + arg1', FlightNum, Origin) FROM mytable"; + + // Query should not throw exception + postQuery(groovyQuery); + + // Remove query config + TableConfig tableConfig = createOfflineTableConfig(); + tableConfig.setQueryConfig(null); + updateTableConfig(tableConfig); + TestUtils.waitForCondition(aVoid -> { + try { + postQuery(groovyQuery); + return false; + } catch (Exception e) { + // expected + return true; + } + }, 60_000L, "Failed to reject Groovy query without query table config override"); + + // Reset query config + updateTableConfig(_tableConfig); + TestUtils.waitForCondition(aVoid -> { + try { + // Query should not throw exception + postQuery(groovyQuery); + return true; + } catch (Exception e) { + return false; + } + }, 60_000L, "Failed to accept Groovy query with table override"); + } + @Test(dataProvider = "useBothQueryEngines") public void testBrokerResponseMetadata(boolean useMultiStageQueryEngine) throws Exception { @@ -2704,9 +2686,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet public void testCaseStatementInSelectionWithTransformFunctionInThen(boolean useMultiStageQueryEngine) throws Exception { setUseMultiStageQueryEngine(useMultiStageQueryEngine); - String sqlQuery = - "SELECT ArrDelay, CASE WHEN ArrDelay > 0 THEN ArrDelay WHEN ArrDelay < 0 THEN ArrDelay * -1 ELSE 0 END AS " - + "ArrTimeDiff FROM mytable LIMIT 1000"; + String sqlQuery = "SELECT ArrDelay, " + + "CASE WHEN ArrDelay > 0 THEN ArrDelay WHEN ArrDelay < 0 THEN ArrDelay * -1 ELSE 0 END AS ArrTimeDiff " + + "FROM mytable LIMIT 1000"; JsonNode response = postQuery(sqlQuery); JsonNode rows = response.get("resultTable").get("rows"); assertTrue(response.get("exceptions").isEmpty()); @@ -2725,8 +2707,10 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet public void testCaseStatementWithLogicalTransformFunction(boolean useMultiStageQueryEngine) throws Exception { setUseMultiStageQueryEngine(useMultiStageQueryEngine); - String sqlQuery = "SELECT ArrDelay" + ", CASE WHEN ArrDelay > 50 OR ArrDelay < 10 THEN 10 ELSE 0 END" - + ", CASE WHEN ArrDelay < 50 AND ArrDelay >= 10 THEN 10 ELSE 0 END" + " FROM mytable LIMIT 1000"; + String sqlQuery = "SELECT ArrDelay, " + + "CASE WHEN ArrDelay > 50 OR ArrDelay < 10 THEN 10 ELSE 0 END, " + + "CASE WHEN ArrDelay < 50 AND ArrDelay >= 10 THEN 10 ELSE 0 END " + + "FROM mytable LIMIT 1000"; JsonNode response = postQuery(sqlQuery); JsonNode rows = response.get("resultTable").get("rows"); assertTrue(response.get("exceptions").isEmpty()); @@ -3845,8 +3829,13 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs); // Update table config to add range and inverted index, and trigger reload - addRangeIndex(false); // skip segment reload and instead reload after also adding inverted index - addInvertedIndex(); + TableConfig tableConfig = createOfflineTableConfig(); + IndexingConfig indexingConfig = tableConfig.getIndexingConfig(); + indexingConfig.setRangeIndexColumns(UPDATED_RANGE_INDEX_COLUMNS); + indexingConfig.setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS); + updateTableConfig(tableConfig); + reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, false, numTotalDocs); + assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L); // Ensure inv index is operational assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L); @@ -3879,11 +3868,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet numTotalDocs); // Update table config to remove the new indexes, and check if the new indexes are removed - TableConfig tableConfig = getOfflineTableConfig(); - tableConfig.getIndexingConfig().setRangeIndexColumns(getRangeIndexColumns()); - tableConfig.getIndexingConfig().setInvertedIndexColumns(getInvertedIndexColumns()); - updateTableConfig(tableConfig); - reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, true, numTotalDocs); + updateTableConfig(_tableConfig); + reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, false, numTotalDocs); + assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs); } @Test(dataProvider = "useBothQueryEngines") diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java index 0c025341f8..844b886ea3 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/loader/defaultcolumn/BaseDefaultColumnHandler.java @@ -63,6 +63,8 @@ import org.apache.pinot.segment.spi.creator.StatsCollectorConfig; import org.apache.pinot.segment.spi.index.DictionaryIndexConfig; import org.apache.pinot.segment.spi.index.FieldIndexConfigs; import org.apache.pinot.segment.spi.index.ForwardIndexConfig; +import org.apache.pinot.segment.spi.index.IndexService; +import org.apache.pinot.segment.spi.index.IndexType; import org.apache.pinot.segment.spi.index.StandardIndexes; import org.apache.pinot.segment.spi.index.creator.DictionaryBasedInvertedIndexCreator; import org.apache.pinot.segment.spi.index.creator.ForwardIndexCreator; @@ -360,13 +362,13 @@ public abstract class BaseDefaultColumnHandler implements DefaultColumnHandler { * * @param column column name. */ - protected void removeColumnIndices(String column) - throws IOException { + protected void removeColumnIndices(String column) { String segmentName = _segmentMetadata.getName(); LOGGER.info("Removing default column: {} from segment: {}", column, segmentName); - // Delete existing dictionary and forward index - _segmentWriter.removeIndex(column, StandardIndexes.dictionary()); - _segmentWriter.removeIndex(column, StandardIndexes.forward()); + // Remove indexes + for (IndexType<?, ?, ?> indexType : IndexService.getInstance().getAllIndexes()) { + _segmentWriter.removeIndex(column, indexType); + } // Remove the column metadata SegmentColumnarIndexCreator.removeColumnMetadataInfo(_segmentProperties, column); LOGGER.info("Removed default column: {} from segment: {}", column, segmentName); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java index bf20cf94a8..fc1c35584b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexEntry.java @@ -19,14 +19,10 @@ package org.apache.pinot.segment.local.segment.store; import org.apache.pinot.segment.spi.memory.PinotDataBuffer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /* package-private */ class IndexEntry { - private static final Logger LOGGER = LoggerFactory.getLogger(IndexEntry.class); - final IndexKey _key; long _startOffset = -1; long _size = -1; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexKey.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexKey.java index 90bf9b820d..d4eb36fb28 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexKey.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/IndexKey.java @@ -20,16 +20,12 @@ package org.apache.pinot.segment.local.segment.store; import org.apache.pinot.segment.spi.index.IndexService; import org.apache.pinot.segment.spi.index.IndexType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Class representing index name and type */ public class IndexKey implements Comparable<IndexKey> { - private static final Logger LOGGER = LoggerFactory.getLogger(IndexKey.class); - final String _name; final IndexType<?, ?, ?> _type; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org