This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7571f711b8 Fix segment size check in OfflineClusterIntegrationTest 
(#13389)
7571f711b8 is described below

commit 7571f711b804a7632a63541083f2d2bf958ee9ce
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Thu Jun 13 16:34:55 2024 -0700

    Fix segment size check in OfflineClusterIntegrationTest (#13389)
---
 .../MultiNodesOfflineClusterIntegrationTest.java   |   2 +-
 .../tests/OfflineClusterIntegrationTest.java       | 100 +++++++++------------
 2 files changed, 44 insertions(+), 58 deletions(-)

diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
index 200c022523..3a4555d8e0 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
@@ -288,7 +288,7 @@ public class MultiNodesOfflineClusterIntegrationTest 
extends OfflineClusterInteg
 
   // Disabled because with multiple replicas, there is no guarantee that all 
replicas are reloaded
   @Test(enabled = false)
-  public void testStarTreeTriggering(boolean useMultiStageQueryEngine) {
+  public void testStarTreeTriggering() {
     // Ignored
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index e172438ced..49bf22b8e8 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -157,16 +157,15 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   private static final String COLUMN_LENGTH_MAP_KEY = "columnLengthMap";
   private static final String COLUMN_CARDINALITY_MAP_KEY = 
"columnCardinalityMap";
   private static final String MAX_NUM_MULTI_VALUES_MAP_KEY = 
"maxNumMultiValuesMap";
-  private static final int DISK_SIZE_IN_BYTES = 20286558;
   private static final int NUM_ROWS = 115545;
 
   private final List<ServiceStatus.ServiceStatusCallback> 
_serviceStatusCallbacks =
       new ArrayList<>(getNumBrokers() + getNumServers());
   private String _schemaFileName = DEFAULT_SCHEMA_FILE_NAME;
-  // Cache the table size after removing an index via reloading. Once this 
value
-  // is set, assert that table size always gets back to this value after 
removing
-  // any other kind of index.
-  private long _tableSizeAfterRemovingIndex;
+
+  // Store the table size. Table size is platform dependent because of the 
native library used by the ChunkCompressor.
+  // Once this value is set, assert that table size always gets back to this 
value after removing the added indices.
+  private long _tableSize;
 
   protected int getNumBrokers() {
     return NUM_BROKERS;
@@ -224,10 +223,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     } catch (Exception e) {
       // If enableParallelPushProtection is enabled and the same segment is 
uploaded concurrently, we could get one
       // of the three exception:
-      //   - 409 conflict of the second call enters ProcessExistingSegment ;
-      //   - segmentZkMetadata creation failure if both calls entered 
ProcessNewSegment.
-      //   - Failed to copy segment tar file to final location due to the same 
segment pushed twice concurrently.
-      // In such cases we upload all the segments again to ensure that the 
data is setup correctly.
+      //   - 409 conflict of the second call enters ProcessExistingSegment
+      //   - segmentZkMetadata creation failure if both calls entered 
ProcessNewSegment
+      //   - Failed to copy segment tar file to final location due to the same 
segment pushed twice concurrently
+      // In such cases we upload all the segments again to ensure that the 
data is set up correctly.
       assertTrue(e.getMessage().contains("Another segment upload is in 
progress for segment") || e.getMessage()
           .contains("Failed to create ZK metadata for segment") || 
e.getMessage()
           .contains("java.nio.file.FileAlreadyExistsException"), 
e.getMessage());
@@ -248,27 +247,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
 
-    // Try to reload all the segments with force download from the controller 
URI.
-    reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, 
getCountStarResult());
-
-    // Try to upload all the segments again with force download from the 
controller URI.
-    try {
-      uploadSegments(getTableName(), tarDirs);
-    } catch (Exception e) {
-      // If enableParallelPushProtection is enabled and the same segment is 
uploaded concurrently, we could get one
-      // of the three exception:
-      //   - 409 conflict of the second call enters ProcessExistingSegment ;
-      //   - segmentZkMetadata update failure if both calls entered 
ProcessNewSegment.
-      //   - Failed to copy segment tar file to final location due to the same 
segment pushed twice concurrently.
-      // In such cases we upload all the segments again to ensure that the 
data is setup correctly.
-      assertTrue(e.getMessage().contains("Another segment upload is in 
progress for segment") || e.getMessage()
-          .contains("Failed to update ZK metadata for segment") || 
e.getMessage()
-          .contains("java.nio.file.FileAlreadyExistsException"), 
e.getMessage());
-      uploadSegments(getTableName(), _tarDir);
-    }
-
-    // Try to reload all the segments with force download from the controller 
URI.
-    reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, 
getCountStarResult());
+    _tableSize = getTableSize(getTableName());
   }
 
   private void reloadAllSegments(String testQuery, boolean forceDownload, long 
numTotalDocs)
@@ -542,7 +521,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     updateTableConfig(tableConfig);
     reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, numTotalDocs);
     
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
 numTotalDocs);
-    assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex);
+    assertEquals(getTableSize(getTableName()), _tableSize);
 
     // Add the inverted index back to test index removal via force download.
     addInvertedIndex();
@@ -572,9 +551,9 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     // As query behavior changed, the segment reload must have been done. The 
new table size should be like below,
     // with only one segment being reloaded with force download and dropping 
the inverted index.
     long tableSizeAfterReloadSegment = getTableSize(getTableName());
-    assertTrue(tableSizeAfterReloadSegment > DISK_SIZE_IN_BYTES && 
tableSizeAfterReloadSegment < tableSizeWithNewIndex,
+    assertTrue(tableSizeAfterReloadSegment > _tableSize && 
tableSizeAfterReloadSegment < tableSizeWithNewIndex,
         String.format("Table size: %d should be between %d and %d after 
dropping inverted index from segment: %s",
-            tableSizeAfterReloadSegment, DISK_SIZE_IN_BYTES, 
tableSizeWithNewIndex, segmentName));
+            tableSizeAfterReloadSegment, _tableSize, tableSizeWithNewIndex, 
segmentName));
 
     // Add inverted index back to check if reloading whole table with force 
download works.
     // Note that because we have force downloaded a segment above, it's 
important to reset the table state by adding
@@ -596,7 +575,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, numTotalDocs);
     
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
 numTotalDocs);
     // With force download, the table size gets back to the initial value.
-    assertEquals(getTableSize(getTableName()), DISK_SIZE_IN_BYTES);
+    assertEquals(getTableSize(getTableName()), _tableSize);
   }
 
   private void addInvertedIndex(boolean shouldReload)
@@ -616,6 +595,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
       
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
 0L);
     }
   }
+
   private void addInvertedIndex()
       throws Exception {
     addInvertedIndex(true);
@@ -639,7 +619,8 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     }
   }
 
-  private void addRangeIndex() throws Exception {
+  private void addRangeIndex()
+      throws Exception {
     addRangeIndex(true);
   }
 
@@ -1314,7 +1295,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     updateTableConfig(tableConfig);
     reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, true, numTotalDocs);
     
assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
 numTotalDocs);
-    assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex);
+    assertEquals(getTableSize(getTableName()), _tableSize);
   }
 
   @Test(dependsOnMethods = "testDefaultColumns")
@@ -1337,7 +1318,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     updateTableConfig(tableConfig);
     reloadAllSegments(TEST_UPDATED_BLOOM_FILTER_QUERY, true, numTotalDocs);
     
assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("numSegmentsProcessed").asLong(),
 NUM_SEGMENTS);
-    assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex);
+    assertEquals(getTableSize(getTableName()), _tableSize);
   }
 
   /**
@@ -1530,8 +1511,6 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     queryResponse = postQuery(SELECT_STAR_QUERY);
     assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
     
assertEquals(queryResponse.get("resultTable").get("dataSchema").get("columnNames").size(),
 79);
-
-    _tableSizeAfterRemovingIndex = getTableSize(getTableName());
   }
 
   @Test
@@ -2932,9 +2911,12 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     // case with different number of documents in the segment.
     response1 = response1.replaceAll("docs:[0-9]+", "docs:*");
 
-    assertEquals(response1, 
"{\"dataSchema\":{\"columnNames\":[\"SQL\",\"PLAN\"],\"columnDataTypes\":[\"STRING\","
-        + "\"STRING\"]},\"rows\":[[\"EXPLAIN PLAN FOR SELECT count(*) AS 
count, Carrier AS name FROM mytable "
-        + "GROUP BY name ORDER BY 1\",\"Execution Plan\\n"
+    //@formatter:off
+    assertEquals(response1, "{"
+        + 
"\"dataSchema\":{\"columnNames\":[\"SQL\",\"PLAN\"],\"columnDataTypes\":[\"STRING\",\"STRING\"]},"
+        + "\"rows\":[["
+        + "\"EXPLAIN PLAN FOR SELECT count(*) AS count, Carrier AS name FROM 
mytable GROUP BY name ORDER BY 1\","
+        + "\"Execution Plan\\n"
         + "LogicalSort(sort0=[$0], dir0=[ASC])\\n"
         + "  PinotLogicalSortExchange("
         + "distribution=[hash], collation=[[0]], isSortOnSender=[false], 
isSortOnReceiver=[true])\\n"
@@ -2944,20 +2926,23 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
         + "          PinotLogicalAggregate(group=[{17}], agg#0=[COUNT()])\\n"
         + "            LogicalTableScan(table=[[default, mytable]])\\n"
         + "\"]]}");
+    //@formatter:on
 
     // In the query below, FlightNum column has an inverted index and there is 
no data satisfying the predicate
     // "FlightNum < 0". Hence, all segments are pruned out before query 
execution on the server side.
     String query2 = "EXPLAIN PLAN FOR SELECT * FROM mytable WHERE FlightNum < 
0";
     String response2 = postQuery(query2).get("resultTable").toString();
 
-    Pattern pattern = 
Pattern.compile("\\{\"dataSchema\":\\{\"columnNames\":\\[\"SQL\",\"PLAN\"],"
-        + "\"columnDataTypes\":\\[\"STRING\",\"STRING\"]},"
+    //@formatter:off
+    Pattern pattern = Pattern.compile("\\{"
+        + 
"\"dataSchema\":\\{\"columnNames\":\\[\"SQL\",\"PLAN\"],\"columnDataTypes\":\\[\"STRING\",\"STRING\"]},"
         + "\"rows\":\\[\\[\"EXPLAIN PLAN FOR SELECT \\* FROM mytable WHERE 
FlightNum < 0\","
         + "\"Execution Plan.."
         + "LogicalProject\\(.*\\).."
         + "  LogicalFilter\\(condition=\\[<\\(.*, 0\\)]\\).."
         + "    LogicalTableScan\\(table=\\[\\[default, mytable]]\\)..\""
         + "]]}");
+    //@formatter:on
     boolean found = pattern.matcher(response2).find();
     assertTrue(found, "Pattern " + pattern + " not found in " + response2);
   }
@@ -3226,7 +3211,9 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
   }
 
   private void validateMetadataResponse(JsonNode response, int numTotalColumn, 
int numMVColumn) {
-    assertEquals(response.get(DISK_SIZE_IN_BYTES_KEY).asInt(), 
DISK_SIZE_IN_BYTES);
+    if (getNumServers() == 1) {
+      assertEquals(response.get(DISK_SIZE_IN_BYTES_KEY).asInt(), _tableSize);
+    }
     assertEquals(response.get(NUM_SEGMENTS_KEY).asInt(), NUM_SEGMENTS);
     assertEquals(response.get(NUM_ROWS_KEY).asInt(), NUM_ROWS);
     assertEquals(response.get(COLUMN_LENGTH_MAP_KEY).size(), numTotalColumn);
@@ -3335,10 +3322,10 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
 
     // disallow use of range index on DivActualElapsedTime, inverted should be 
unaffected
     String skipIndexes = buildSkipIndexesOption("DivActualElapsedTime=range");
-    assertEquals(postQuery(
-        skipIndexes + 
TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 
0L);
-    assertEquals(postQuery(
-        skipIndexes + 
TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 
numTotalDocs);
+    assertEquals(postQuery(skipIndexes + 
TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
+        0L);
+    assertEquals(postQuery(skipIndexes + 
TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
+        numTotalDocs);
 
     // disallow use of inverted index on DivActualElapsedTime, range should be 
unaffected
     skipIndexes = buildSkipIndexesOption("DivActualElapsedTime=inverted");
@@ -3349,17 +3336,16 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     // EQ predicate type allows for using range index if one exists, even if 
inverted index is skipped. That is why
     // we still see no docs scanned even though we skip the inverted index. 
This is a good test to show that using
     // the skipIndexes can allow fine-grained experimentation of index usage 
at query time.
-    assertEquals(postQuery(
-        skipIndexes + 
TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 
0L);
-    assertEquals(postQuery(
-        skipIndexes + 
TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);
+    assertEquals(postQuery(skipIndexes + 
TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
+        0L);
+    assertEquals(postQuery(skipIndexes + 
TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L);
 
     // disallow use of both range and inverted indexes on 
DivActualElapsedTime, neither should be used at query time
     skipIndexes = 
buildSkipIndexesOption("DivActualElapsedTime=inverted,range");
-    assertEquals(postQuery(
-        skipIndexes + 
TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 
numTotalDocs);
-    assertEquals(postQuery(
-        skipIndexes + 
TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 
numTotalDocs);
+    assertEquals(postQuery(skipIndexes + 
TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
+        numTotalDocs);
+    assertEquals(postQuery(skipIndexes + 
TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
+        numTotalDocs);
 
     // Update table config to remove the new indexes, and check if the new 
indexes are removed
     TableConfig tableConfig = getOfflineTableConfig();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to