This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b39d9484a fixing the corner of missing segment file to download when 
refresh segment with metadata then segment push (#11720)
9b39d9484a is described below

commit 9b39d9484a0cd21a69f6207c5e5a576d7444dd0c
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Mon Oct 2 12:35:12 2023 -0700

    fixing the corner of missing segment file to download when refresh segment 
with metadata then segment push (#11720)
---
 .../pinot/controller/api/upload/ZKOperator.java    |  13 +-
 .../pinot/integration/tests/ClusterTest.java       |  10 +-
 .../tests/OfflineClusterIntegrationTest.java       | 224 ++++++---------------
 3 files changed, 85 insertions(+), 162 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
index 38cf2919e7..b0aee83d0d 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java
@@ -212,9 +212,20 @@ public class ZKOperator {
           segmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap());
         }
         if (!segmentZKMetadata.getDownloadUrl().equals(segmentDownloadURIStr)) 
{
+          // For offline ingestion, it is quite common that the download.uri 
would change but the crc would be the same.
+          // E.g. a user re-runs the job which process the same data and 
segments are stored/pushed from a different
+          // path from the Deepstore. Read more: 
https://github.com/apache/pinot/issues/11535
           LOGGER.info("Updating segment download url from: {} to: {} even 
though crc is the same",
-                  segmentZKMetadata.getDownloadUrl(), segmentDownloadURIStr);
+              segmentZKMetadata.getDownloadUrl(), segmentDownloadURIStr);
           segmentZKMetadata.setDownloadUrl(segmentDownloadURIStr);
+          // When download URI changes, we also need to copy the segment to 
the final location if existed.
+          // This typically means users changed the push type from METADATA to 
SEGMENT or SEGMENT to METADATA.
+          // Note that switching push type from SEGMENT to METADATA may lead 
orphan segments in the controller
+          // managed directory. Read more: 
https://github.com/apache/pinot/pull/11720
+          if (finalSegmentLocationURI != null) {
+            copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, 
segmentFile, sourceDownloadURIStr,
+                finalSegmentLocationURI);
+          }
         }
         if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, 
segmentZKMetadata, expectedVersion)) {
           throw new RuntimeException(
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 9b41232f9b..d04b9efc6c 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -323,6 +323,14 @@ public abstract class ClusterTest extends ControllerTest {
     uploadSegments(tableName, TableType.OFFLINE, tarDir);
   }
 
+  /**
+   * Upload all segments inside the given directory to the cluster.
+   */
+  protected void uploadSegments(String tableName, List<File> tarDirs)
+      throws Exception {
+    uploadSegments(tableName, TableType.OFFLINE, tarDirs);
+  }
+
   /**
    * Upload all segments inside the given directory to the cluster.
    */
@@ -545,7 +553,7 @@ public abstract class ClusterTest extends ControllerTest {
 
   @DataProvider(name = "systemColumns")
   public Object[][] systemColumns() {
-    return new Object[][] {
+    return new Object[][]{
         {"$docId"},
         {"$hostName"},
         {"$segmentName"}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index d9d645c065..c29fbe59f1 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -203,11 +203,14 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
       uploadSegments(getTableName(), TableType.OFFLINE, tarDirs);
     } catch (Exception e) {
       // If enableParallelPushProtection is enabled and the same segment is 
uploaded concurrently, we could get one
-      // of the two exception - 409 conflict of the second call enters 
ProcessExistingSegment ; segmentZkMetadata
-      // creation failure if both calls entered ProcessNewSegment. In/such 
cases ensure that we upload all the
-      // segments again/to ensure that the data is setup correctly.
+      // of the three exception:
+      //   - 409 conflict of the second call enters ProcessExistingSegment ;
+      //   - segmentZkMetadata creation failure if both calls entered 
ProcessNewSegment.
+      //   - Failed to copy segment tar file to final location due to the same 
segment pushed twice concurrently.
+      // In such cases we upload all the segments again to ensure that the 
data is setup correctly.
       assertTrue(e.getMessage().contains("Another segment upload is in 
progress for segment") || e.getMessage()
-          .contains("Failed to create ZK metadata for segment"), 
e.getMessage());
+          .contains("Failed to update ZK metadata for segment") || 
e.getMessage()
+          .contains("java.nio.file.FileAlreadyExistsException"), 
e.getMessage());
       uploadSegments(getTableName(), _tarDir);
     }
 
@@ -224,6 +227,47 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
 
     // Wait for all documents loaded
     waitForAllDocsLoaded(600_000L);
+
+    // Try to reload all the segments with force download from the controller 
URI.
+    reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, 
getCountStarResult());
+
+    // Try to upload all the segments again with force download from the 
controller URI.
+    try {
+      uploadSegments(getTableName(), tarDirs);
+    } catch (Exception e) {
+      // If enableParallelPushProtection is enabled and the same segment is 
uploaded concurrently, we could get one
+      // of the three exception:
+      //   - 409 conflict of the second call enters ProcessExistingSegment ;
+      //   - segmentZkMetadata creation failure if both calls entered 
ProcessNewSegment.
+      //   - Failed to copy segment tar file to final location due to the same 
segment pushed twice concurrently.
+      // In such cases we upload all the segments again to ensure that the 
data is setup correctly.
+      assertTrue(e.getMessage().contains("Another segment upload is in 
progress for segment") || e.getMessage()
+          .contains("Failed to update ZK metadata for segment") || 
e.getMessage()
+          .contains("java.nio.file.FileAlreadyExistsException"), 
e.getMessage());
+      uploadSegments(getTableName(), _tarDir);
+    }
+
+    // Try to reload all the segments with force download from the controller 
URI.
+    reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, 
getCountStarResult());
+  }
+
+  private void reloadAllSegments(String testQuery, boolean forceDownload, long 
numTotalDocs)
+      throws IOException {
+    // Try to refresh all the segments again with force download from the 
controller URI.
+    String reloadJob = reloadTableAndValidateResponse(getTableName(), 
TableType.OFFLINE, forceDownload);
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        JsonNode queryResponse = postQuery(testQuery);
+        if (!queryResponse.get("exceptions").isEmpty()) {
+          return false;
+        }
+        // Total docs should not change during reload
+        assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
+        return isReloadJobCompleted(reloadJob);
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }, 600_000L, "Failed to reload table with force download");
   }
 
   @BeforeMethod
@@ -482,16 +526,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     TableConfig tableConfig = getOfflineTableConfig();
     
tableConfig.getIndexingConfig().setInvertedIndexColumns(getInvertedIndexColumns());
     updateTableConfig(tableConfig);
-    String removeInvertedIndexJobId = 
reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false);
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        // Total docs should not change during reload
-        
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("totalDocs").asLong(),
 numTotalDocs);
-        return isReloadJobCompleted(removeInvertedIndexJobId);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to cleanup obsolete index");
+    reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, numTotalDocs);
     
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
 numTotalDocs);
     assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex);
 
@@ -544,16 +579,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     tableConfig = getOfflineTableConfig();
     
tableConfig.getIndexingConfig().setInvertedIndexColumns(getInvertedIndexColumns());
     updateTableConfig(tableConfig);
-    String forceDownloadJobId = reloadTableAndValidateResponse(getTableName(), 
TableType.OFFLINE, true);
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        // Total docs should not change during reload
-        
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("totalDocs").asLong(),
 numTotalDocs);
-        return isReloadJobCompleted(forceDownloadJobId);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to cleanup obsolete index in table");
+    reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, numTotalDocs);
     
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
 numTotalDocs);
     // With force download, the table size gets back to the initial value.
     assertEquals(getTableSize(getTableName()), DISK_SIZE_IN_BYTES);
@@ -566,22 +592,12 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     TableConfig tableConfig = getOfflineTableConfig();
     
tableConfig.getIndexingConfig().setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS);
     updateTableConfig(tableConfig);
-    String reloadJobId = reloadTableAndValidateResponse(getTableName(), 
TableType.OFFLINE, false);
 
     // It takes a while to reload multiple segments, thus we retry the query 
for some time.
     // After all segments are reloaded, the inverted index is added on 
DivActualElapsedTime.
     // It's expected to have numEntriesScannedInFilter equal to 0, i.e. no 
docs is scanned
     // at filtering stage when inverted index can answer the predicate 
directly.
-    long numTotalDocs = getCountStarResult();
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        // Total docs should not change during reload
-        
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("totalDocs").asLong(),
 numTotalDocs);
-        return isReloadJobCompleted(reloadJobId);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to generate inverted index");
+    reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, false, 
getCountStarResult());
     
assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
 0L);
   }
 
@@ -1117,33 +1133,14 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     // Update table config and trigger reload
     TableConfig tableConfig = getOfflineTableConfig();
     
tableConfig.getIndexingConfig().setRangeIndexColumns(UPDATED_RANGE_INDEX_COLUMNS);
-    updateTableConfig(tableConfig);
-    String addIndexJobId = reloadTableAndValidateResponse(getTableName(), 
TableType.OFFLINE, false);
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        // Total docs should not change during reload
-        
assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("totalDocs").asLong(),
 numTotalDocs);
-        return isReloadJobCompleted(addIndexJobId);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to generate range index");
+    reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, false, numTotalDocs);
     
assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
 0L);
 
     // Update table config to remove the new range index, and check if the new 
range index is removed
     tableConfig = getOfflineTableConfig();
     
tableConfig.getIndexingConfig().setRangeIndexColumns(getRangeIndexColumns());
     updateTableConfig(tableConfig);
-    String removeIndexJobId = reloadTableAndValidateResponse(getTableName(), 
TableType.OFFLINE, false);
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        // Total docs should not change during reload
-        
assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("totalDocs").asLong(),
 numTotalDocs);
-        return isReloadJobCompleted(removeIndexJobId);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to cleanup obsolete index");
+    reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, false, numTotalDocs);
     
assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(),
 numTotalDocs);
     assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex);
   }
@@ -1158,16 +1155,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     TableConfig tableConfig = getOfflineTableConfig();
     
tableConfig.getIndexingConfig().setBloomFilterColumns(UPDATED_BLOOM_FILTER_COLUMNS);
     updateTableConfig(tableConfig);
-    String addIndexJobId = reloadTableAndValidateResponse(getTableName(), 
TableType.OFFLINE, false);
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        // Total docs should not change during reload
-        
assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("totalDocs").asLong(),
 numTotalDocs);
-        return isReloadJobCompleted(addIndexJobId);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to generate bloom filter");
+    reloadAllSegments(TEST_UPDATED_BLOOM_FILTER_QUERY, false, numTotalDocs);
     
assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("numSegmentsProcessed").asLong(),
 0L);
 
     // Update table config to remove the new bloom filter, and
@@ -1175,16 +1163,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     tableConfig = getOfflineTableConfig();
     
tableConfig.getIndexingConfig().setBloomFilterColumns(getBloomFilterColumns());
     updateTableConfig(tableConfig);
-    String removeIndexJobId = reloadTableAndValidateResponse(getTableName(), 
TableType.OFFLINE, false);
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        // Total docs should not change during reload
-        
assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("totalDocs").asLong(),
 numTotalDocs);
-        return isReloadJobCompleted(removeIndexJobId);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to cleanup obsolete index");
+    reloadAllSegments(TEST_UPDATED_BLOOM_FILTER_QUERY, false, numTotalDocs);
     
assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("numSegmentsProcessed").asLong(),
 NUM_SEGMENTS);
     assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex);
   }
@@ -1223,24 +1202,12 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     
indexingConfig.setStarTreeIndexConfigs(Collections.singletonList(STAR_TREE_INDEX_CONFIG_1));
     indexingConfig.setEnableDynamicStarTreeCreation(true);
     updateTableConfig(tableConfig);
-    String addIndexJobId = reloadTableAndValidateResponse(getTableName(), 
TableType.OFFLINE, false);
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        JsonNode queryResponse = postQuery(TEST_STAR_TREE_QUERY_1);
-        // Result should not change during reload
-        
assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asInt(),
 firstQueryResult);
-        // Total docs should not change during reload
-        assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
-        return isReloadJobCompleted(addIndexJobId);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to add first star-tree index");
+    reloadAllSegments(TEST_STAR_TREE_QUERY_1, false, numTotalDocs);
     // With star-tree, 'numDocsScanned' should be the same as number of 
segments (1 per segment)
     
assertEquals(postQuery(TEST_STAR_TREE_QUERY_1).get("numDocsScanned").asLong(), 
NUM_SEGMENTS);
 
     // Reload again should have no effect
-    reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false);
+    reloadAllSegments(TEST_STAR_TREE_QUERY_1, false, numTotalDocs);
     firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1);
     
assertEquals(firstQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(),
 firstQueryResult);
     assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs);
@@ -1268,19 +1235,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     // Update table config with a different star-tree index config and trigger 
reload
     
indexingConfig.setStarTreeIndexConfigs(Collections.singletonList(STAR_TREE_INDEX_CONFIG_2));
     updateTableConfig(tableConfig);
-    String changeIndexJobId = reloadTableAndValidateResponse(getTableName(), 
TableType.OFFLINE, false);
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        JsonNode queryResponse = postQuery(TEST_STAR_TREE_QUERY_2);
-        // Result should not change during reload
-        
assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asInt(),
 secondQueryResult);
-        // Total docs should not change during reload
-        assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
-        return isReloadJobCompleted(changeIndexJobId);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to change to second star-tree index");
+    reloadAllSegments(TEST_STAR_TREE_QUERY_2, false, numTotalDocs);
     // With star-tree, 'numDocsScanned' should be the same as number of 
segments (1 per segment)
     
assertEquals(postQuery(TEST_STAR_TREE_QUERY_2).get("numDocsScanned").asLong(), 
NUM_SEGMENTS);
 
@@ -1289,7 +1244,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), 
firstQueryResult);
 
     // Reload again should have no effect
-    reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false);
+    reloadAllSegments(TEST_STAR_TREE_QUERY_2, false, numTotalDocs);
     firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1);
     
assertEquals(firstQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(),
 firstQueryResult);
     assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs);
@@ -1314,19 +1269,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     // Remove the star-tree index config and trigger reload
     indexingConfig.setStarTreeIndexConfigs(null);
     updateTableConfig(tableConfig);
-    String removeIndexJobId = reloadTableAndValidateResponse(getTableName(), 
TableType.OFFLINE, false);
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        JsonNode queryResponse = postQuery(TEST_STAR_TREE_QUERY_2);
-        // Result should not change during reload
-        
assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asInt(),
 secondQueryResult);
-        // Total docs should not change during reload
-        assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
-        return isReloadJobCompleted(removeIndexJobId);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to remove star-tree index");
+    reloadAllSegments(TEST_STAR_TREE_QUERY_2, false, numTotalDocs);
     // Without star-tree, 'numDocsScanned' should be the same as the 
'COUNT(*)' result
     
assertEquals(postQuery(TEST_STAR_TREE_QUERY_2).get("numDocsScanned").asLong(), 
secondQueryResult);
     assertEquals(getTableSize(getTableName()), tableSizeWithDefaultIndex);
@@ -1336,7 +1279,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), 
firstQueryResult);
 
     // Reload again should have no effect
-    reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false);
+    reloadAllSegments(TEST_STAR_TREE_QUERY_2, false, numTotalDocs);
     firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1);
     
assertEquals(firstQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(),
 firstQueryResult);
     assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs);
@@ -1475,29 +1418,13 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     updateTableConfig(tableConfig);
 
     // Trigger reload
-    String reloadJobId = reloadTableAndValidateResponse(getTableName(), 
TableType.OFFLINE, false);
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        JsonNode queryResponse = postQuery(TEST_EXTRA_COLUMNS_QUERY);
-        if (!queryResponse.get("exceptions").isEmpty()) {
-          // Schema is not refreshed on the broker side yet
-          return false;
-        }
-        // Total docs should not change during reload
-        assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
-        return isReloadJobCompleted(reloadJobId);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to add default columns");
+    reloadAllSegments(TEST_EXTRA_COLUMNS_QUERY, false, numTotalDocs);
     
assertEquals(postQuery(TEST_EXTRA_COLUMNS_QUERY).get("resultTable").get("rows").get(0).get(0).asLong(),
         numTotalDocs);
   }
 
   private void reloadWithMissingColumns()
       throws Exception {
-    long numTotalDocs = getCountStarResult();
-
     // Remove columns from the table config first to pass the validation of 
the table config
     TableConfig tableConfig = getOfflineTableConfig();
     tableConfig.setIngestionConfig(null);
@@ -1513,16 +1440,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     addSchema(schema);
 
     // Trigger reload
-    String reloadJobId = reloadTableAndValidateResponse(getTableName(), 
TableType.OFFLINE, false);
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        // Total docs should not change during reload
-        assertEquals(postQuery(SELECT_STAR_QUERY).get("totalDocs").asLong(), 
numTotalDocs);
-        return isReloadJobCompleted(reloadJobId);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to skip missing columns");
+    reloadAllSegments(SELECT_STAR_QUERY, true, getCountStarResult());
     JsonNode segmentsMetadata = JsonUtils.stringToJsonNode(
         
sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName(),
 "*")));
     assertEquals(segmentsMetadata.size(), 12);
@@ -1539,21 +1457,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     addSchema(createSchema());
 
     // Trigger reload
-    String reloadJobId = reloadTableAndValidateResponse(getTableName(), 
TableType.OFFLINE, false);
-    TestUtils.waitForCondition(aVoid -> {
-      try {
-        JsonNode queryResponse = postQuery(TEST_REGULAR_COLUMNS_QUERY);
-        if (!queryResponse.get("exceptions").isEmpty()) {
-          // Schema is not refreshed on the broker side yet
-          return false;
-        }
-        // Total docs should not change during reload
-        assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs);
-        return isReloadJobCompleted(reloadJobId);
-      } catch (Exception e) {
-        throw new RuntimeException(e);
-      }
-    }, 600_000L, "Failed to reload regular columns");
+    reloadAllSegments(SELECT_STAR_QUERY, true, numTotalDocs);
     
assertEquals(postQuery(TEST_REGULAR_COLUMNS_QUERY).get("resultTable").get("rows").get(0).get(0).asLong(),
         numTotalDocs);
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to