This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7571f711b8 Fix segment size check in OfflineClusterIntegrationTest (#13389) 7571f711b8 is described below commit 7571f711b804a7632a63541083f2d2bf958ee9ce Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Thu Jun 13 16:34:55 2024 -0700 Fix segment size check in OfflineClusterIntegrationTest (#13389) --- .../MultiNodesOfflineClusterIntegrationTest.java | 2 +- .../tests/OfflineClusterIntegrationTest.java | 100 +++++++++------------ 2 files changed, 44 insertions(+), 58 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java index 200c022523..3a4555d8e0 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java @@ -288,7 +288,7 @@ public class MultiNodesOfflineClusterIntegrationTest extends OfflineClusterInteg // Disabled because with multiple replicas, there is no guarantee that all replicas are reloaded @Test(enabled = false) - public void testStarTreeTriggering(boolean useMultiStageQueryEngine) { + public void testStarTreeTriggering() { // Ignored } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index e172438ced..49bf22b8e8 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -157,16 +157,15 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet private static final String COLUMN_LENGTH_MAP_KEY = "columnLengthMap"; private static final String COLUMN_CARDINALITY_MAP_KEY = "columnCardinalityMap"; private static final String MAX_NUM_MULTI_VALUES_MAP_KEY = "maxNumMultiValuesMap"; - private static final int DISK_SIZE_IN_BYTES = 20286558; private static final int NUM_ROWS = 115545; private final List<ServiceStatus.ServiceStatusCallback> _serviceStatusCallbacks = new ArrayList<>(getNumBrokers() + getNumServers()); private String _schemaFileName = DEFAULT_SCHEMA_FILE_NAME; - // Cache the table size after removing an index via reloading. Once this value - // is set, assert that table size always gets back to this value after removing - // any other kind of index. - private long _tableSizeAfterRemovingIndex; + + // Store the table size. Table size is platform dependent because of the native library used by the ChunkCompressor. + // Once this value is set, assert that table size always gets back to this value after removing the added indices. + private long _tableSize; protected int getNumBrokers() { return NUM_BROKERS; @@ -224,10 +223,10 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet } catch (Exception e) { // If enableParallelPushProtection is enabled and the same segment is uploaded concurrently, we could get one // of the three exception: - // - 409 conflict of the second call enters ProcessExistingSegment ; - // - segmentZkMetadata creation failure if both calls entered ProcessNewSegment. - // - Failed to copy segment tar file to final location due to the same segment pushed twice concurrently. - // In such cases we upload all the segments again to ensure that the data is setup correctly. + // - 409 conflict of the second call enters ProcessExistingSegment + // - segmentZkMetadata creation failure if both calls entered ProcessNewSegment + // - Failed to copy segment tar file to final location due to the same segment pushed twice concurrently + // In such cases we upload all the segments again to ensure that the data is set up correctly. assertTrue(e.getMessage().contains("Another segment upload is in progress for segment") || e.getMessage() .contains("Failed to create ZK metadata for segment") || e.getMessage() .contains("java.nio.file.FileAlreadyExistsException"), e.getMessage()); @@ -248,27 +247,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // Wait for all documents loaded waitForAllDocsLoaded(600_000L); - // Try to reload all the segments with force download from the controller URI. - reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, getCountStarResult()); - - // Try to upload all the segments again with force download from the controller URI. - try { - uploadSegments(getTableName(), tarDirs); - } catch (Exception e) { - // If enableParallelPushProtection is enabled and the same segment is uploaded concurrently, we could get one - // of the three exception: - // - 409 conflict of the second call enters ProcessExistingSegment ; - // - segmentZkMetadata update failure if both calls entered ProcessNewSegment. - // - Failed to copy segment tar file to final location due to the same segment pushed twice concurrently. - // In such cases we upload all the segments again to ensure that the data is setup correctly. - assertTrue(e.getMessage().contains("Another segment upload is in progress for segment") || e.getMessage() - .contains("Failed to update ZK metadata for segment") || e.getMessage() - .contains("java.nio.file.FileAlreadyExistsException"), e.getMessage()); - uploadSegments(getTableName(), _tarDir); - } - - // Try to reload all the segments with force download from the controller URI. - reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, getCountStarResult()); + _tableSize = getTableSize(getTableName()); } private void reloadAllSegments(String testQuery, boolean forceDownload, long numTotalDocs) @@ -542,7 +521,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet updateTableConfig(tableConfig); reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, numTotalDocs); assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs); - assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex); + assertEquals(getTableSize(getTableName()), _tableSize); // Add the inverted index back to test index removal via force download. addInvertedIndex(); @@ -572,9 +551,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // As query behavior changed, the segment reload must have been done. The new table size should be like below, // with only one segment being reloaded with force download and dropping the inverted index. long tableSizeAfterReloadSegment = getTableSize(getTableName()); - assertTrue(tableSizeAfterReloadSegment > DISK_SIZE_IN_BYTES && tableSizeAfterReloadSegment < tableSizeWithNewIndex, + assertTrue(tableSizeAfterReloadSegment > _tableSize && tableSizeAfterReloadSegment < tableSizeWithNewIndex, String.format("Table size: %d should be between %d and %d after dropping inverted index from segment: %s", - tableSizeAfterReloadSegment, DISK_SIZE_IN_BYTES, tableSizeWithNewIndex, segmentName)); + tableSizeAfterReloadSegment, _tableSize, tableSizeWithNewIndex, segmentName)); // Add inverted index back to check if reloading whole table with force download works. // Note that because we have force downloaded a segment above, it's important to reset the table state by adding @@ -596,7 +575,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, numTotalDocs); assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs); // With force download, the table size gets back to the initial value. - assertEquals(getTableSize(getTableName()), DISK_SIZE_IN_BYTES); + assertEquals(getTableSize(getTableName()), _tableSize); } private void addInvertedIndex(boolean shouldReload) @@ -616,6 +595,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L); } } + private void addInvertedIndex() throws Exception { addInvertedIndex(true); @@ -639,7 +619,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet } } - private void addRangeIndex() throws Exception { + private void addRangeIndex() + throws Exception { addRangeIndex(true); } @@ -1314,7 +1295,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet updateTableConfig(tableConfig); reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, true, numTotalDocs); assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs); - assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex); + assertEquals(getTableSize(getTableName()), _tableSize); } @Test(dependsOnMethods = "testDefaultColumns") @@ -1337,7 +1318,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet updateTableConfig(tableConfig); reloadAllSegments(TEST_UPDATED_BLOOM_FILTER_QUERY, true, numTotalDocs); assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("numSegmentsProcessed").asLong(), NUM_SEGMENTS); - assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex); + assertEquals(getTableSize(getTableName()), _tableSize); } /** @@ -1530,8 +1511,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet queryResponse = postQuery(SELECT_STAR_QUERY); assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(), 79); - - _tableSizeAfterRemovingIndex = getTableSize(getTableName()); } @Test @@ -2932,9 +2911,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // case with different number of documents in the segment. response1 = response1.replaceAll("docs:[0-9]+", "docs:*"); - assertEquals(response1, "{\"dataSchema\":{\"columnNames\":[\"SQL\",\"PLAN\"],\"columnDataTypes\":[\"STRING\"," - + "\"STRING\"]},\"rows\":[[\"EXPLAIN PLAN FOR SELECT count(*) AS count, Carrier AS name FROM mytable " - + "GROUP BY name ORDER BY 1\",\"Execution Plan\\n" + //@formatter:off + assertEquals(response1, "{" + + "\"dataSchema\":{\"columnNames\":[\"SQL\",\"PLAN\"],\"columnDataTypes\":[\"STRING\",\"STRING\"]}," + + "\"rows\":[[" + + "\"EXPLAIN PLAN FOR SELECT count(*) AS count, Carrier AS name FROM mytable GROUP BY name ORDER BY 1\"," + + "\"Execution Plan\\n" + "LogicalSort(sort0=[$0], dir0=[ASC])\\n" + " PinotLogicalSortExchange(" + "distribution=[hash], collation=[[0]], isSortOnSender=[false], isSortOnReceiver=[true])\\n" @@ -2944,20 +2926,23 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet + " PinotLogicalAggregate(group=[{17}], agg#0=[COUNT()])\\n" + " LogicalTableScan(table=[[default, mytable]])\\n" + "\"]]}"); + //@formatter:on // In the query below, FlightNum column has an inverted index and there is no data satisfying the predicate // "FlightNum < 0". Hence, all segments are pruned out before query execution on the server side. String query2 = "EXPLAIN PLAN FOR SELECT * FROM mytable WHERE FlightNum < 0"; String response2 = postQuery(query2).get("resultTable").toString(); - Pattern pattern = Pattern.compile("\\{\"dataSchema\":\\{\"columnNames\":\\[\"SQL\",\"PLAN\"]," - + "\"columnDataTypes\":\\[\"STRING\",\"STRING\"]}," + //@formatter:off + Pattern pattern = Pattern.compile("\\{" + + "\"dataSchema\":\\{\"columnNames\":\\[\"SQL\",\"PLAN\"],\"columnDataTypes\":\\[\"STRING\",\"STRING\"]}," + "\"rows\":\\[\\[\"EXPLAIN PLAN FOR SELECT \\* FROM mytable WHERE FlightNum < 0\"," + "\"Execution Plan.." + "LogicalProject\\(.*\\).." + " LogicalFilter\\(condition=\\[<\\(.*, 0\\)]\\).." + " LogicalTableScan\\(table=\\[\\[default, mytable]]\\)..\"" + "]]}"); + //@formatter:on boolean found = pattern.matcher(response2).find(); assertTrue(found, "Pattern " + pattern + " not found in " + response2); } @@ -3226,7 +3211,9 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet } private void validateMetadataResponse(JsonNode response, int numTotalColumn, int numMVColumn) { - assertEquals(response.get(DISK_SIZE_IN_BYTES_KEY).asInt(), DISK_SIZE_IN_BYTES); + if (getNumServers() == 1) { + assertEquals(response.get(DISK_SIZE_IN_BYTES_KEY).asInt(), _tableSize); + } assertEquals(response.get(NUM_SEGMENTS_KEY).asInt(), NUM_SEGMENTS); assertEquals(response.get(NUM_ROWS_KEY).asInt(), NUM_ROWS); assertEquals(response.get(COLUMN_LENGTH_MAP_KEY).size(), numTotalColumn); @@ -3335,10 +3322,10 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // disallow use of range index on DivActualElapsedTime, inverted should be unaffected String skipIndexes = buildSkipIndexesOption("DivActualElapsedTime=range"); - assertEquals(postQuery( - skipIndexes + TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L); - assertEquals(postQuery( - skipIndexes + TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs); + assertEquals(postQuery(skipIndexes + TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), + 0L); + assertEquals(postQuery(skipIndexes + TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), + numTotalDocs); // disallow use of inverted index on DivActualElapsedTime, range should be unaffected skipIndexes = buildSkipIndexesOption("DivActualElapsedTime=inverted"); @@ -3349,17 +3336,16 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // EQ predicate type allows for using range index if one exists, even if inverted index is skipped. That is why // we still see no docs scanned even though we skip the inverted index. This is a good test to show that using // the skipIndexes can allow fine-grained experimentation of index usage at query time. - assertEquals(postQuery( - skipIndexes + TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L); - assertEquals(postQuery( - skipIndexes + TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L); + assertEquals(postQuery(skipIndexes + TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), + 0L); + assertEquals(postQuery(skipIndexes + TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L); // disallow use of both range and inverted indexes on DivActualElapsedTime, neither should be used at query time skipIndexes = buildSkipIndexesOption("DivActualElapsedTime=inverted,range"); - assertEquals(postQuery( - skipIndexes + TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs); - assertEquals(postQuery( - skipIndexes + TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs); + assertEquals(postQuery(skipIndexes + TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), + numTotalDocs); + assertEquals(postQuery(skipIndexes + TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), + numTotalDocs); // Update table config to remove the new indexes, and check if the new indexes are removed TableConfig tableConfig = getOfflineTableConfig(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org