This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9b39d9484a fixing the corner of missing segment file to download when refresh segment with metadata then segment push (#11720) 9b39d9484a is described below commit 9b39d9484a0cd21a69f6207c5e5a576d7444dd0c Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Mon Oct 2 12:35:12 2023 -0700 fixing the corner of missing segment file to download when refresh segment with metadata then segment push (#11720) --- .../pinot/controller/api/upload/ZKOperator.java | 13 +- .../pinot/integration/tests/ClusterTest.java | 10 +- .../tests/OfflineClusterIntegrationTest.java | 224 ++++++--------------- 3 files changed, 85 insertions(+), 162 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java index 38cf2919e7..b0aee83d0d 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java @@ -212,9 +212,20 @@ public class ZKOperator { segmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap()); } if (!segmentZKMetadata.getDownloadUrl().equals(segmentDownloadURIStr)) { + // For offline ingestion, it is quite common that the download.uri would change but the crc would be the same. + // E.g. a user re-runs the job which process the same data and segments are stored/pushed from a different + // path from the Deepstore. Read more: https://github.com/apache/pinot/issues/11535 LOGGER.info("Updating segment download url from: {} to: {} even though crc is the same", - segmentZKMetadata.getDownloadUrl(), segmentDownloadURIStr); + segmentZKMetadata.getDownloadUrl(), segmentDownloadURIStr); segmentZKMetadata.setDownloadUrl(segmentDownloadURIStr); + // When download URI changes, we also need to copy the segment to the final location if existed. + // This typically means users changed the push type from METADATA to SEGMENT or SEGMENT to METADATA. + // Note that switching push type from SEGMENT to METADATA may lead orphan segments in the controller + // managed directory. Read more: https://github.com/apache/pinot/pull/11720 + if (finalSegmentLocationURI != null) { + copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, + finalSegmentLocationURI); + } } if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { throw new RuntimeException( diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index 9b41232f9b..d04b9efc6c 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -323,6 +323,14 @@ public abstract class ClusterTest extends ControllerTest { uploadSegments(tableName, TableType.OFFLINE, tarDir); } + /** + * Upload all segments inside the given directory to the cluster. + */ + protected void uploadSegments(String tableName, List<File> tarDirs) + throws Exception { + uploadSegments(tableName, TableType.OFFLINE, tarDirs); + } + /** * Upload all segments inside the given directory to the cluster. */ @@ -545,7 +553,7 @@ public abstract class ClusterTest extends ControllerTest { @DataProvider(name = "systemColumns") public Object[][] systemColumns() { - return new Object[][] { + return new Object[][]{ {"$docId"}, {"$hostName"}, {"$segmentName"} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index d9d645c065..c29fbe59f1 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -203,11 +203,14 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet uploadSegments(getTableName(), TableType.OFFLINE, tarDirs); } catch (Exception e) { // If enableParallelPushProtection is enabled and the same segment is uploaded concurrently, we could get one - // of the two exception - 409 conflict of the second call enters ProcessExistingSegment ; segmentZkMetadata - // creation failure if both calls entered ProcessNewSegment. In/such cases ensure that we upload all the - // segments again/to ensure that the data is setup correctly. + // of the three exception: + // - 409 conflict of the second call enters ProcessExistingSegment ; + // - segmentZkMetadata creation failure if both calls entered ProcessNewSegment. + // - Failed to copy segment tar file to final location due to the same segment pushed twice concurrently. + // In such cases we upload all the segments again to ensure that the data is setup correctly. assertTrue(e.getMessage().contains("Another segment upload is in progress for segment") || e.getMessage() - .contains("Failed to create ZK metadata for segment"), e.getMessage()); + .contains("Failed to update ZK metadata for segment") || e.getMessage() + .contains("java.nio.file.FileAlreadyExistsException"), e.getMessage()); uploadSegments(getTableName(), _tarDir); } @@ -224,6 +227,47 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // Wait for all documents loaded waitForAllDocsLoaded(600_000L); + + // Try to reload all the segments with force download from the controller URI. + reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, getCountStarResult()); + + // Try to upload all the segments again with force download from the controller URI. + try { + uploadSegments(getTableName(), tarDirs); + } catch (Exception e) { + // If enableParallelPushProtection is enabled and the same segment is uploaded concurrently, we could get one + // of the three exception: + // - 409 conflict of the second call enters ProcessExistingSegment ; + // - segmentZkMetadata creation failure if both calls entered ProcessNewSegment. + // - Failed to copy segment tar file to final location due to the same segment pushed twice concurrently. + // In such cases we upload all the segments again to ensure that the data is setup correctly. + assertTrue(e.getMessage().contains("Another segment upload is in progress for segment") || e.getMessage() + .contains("Failed to update ZK metadata for segment") || e.getMessage() + .contains("java.nio.file.FileAlreadyExistsException"), e.getMessage()); + uploadSegments(getTableName(), _tarDir); + } + + // Try to reload all the segments with force download from the controller URI. + reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, getCountStarResult()); + } + + private void reloadAllSegments(String testQuery, boolean forceDownload, long numTotalDocs) + throws IOException { + // Try to refresh all the segments again with force download from the controller URI. + String reloadJob = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, forceDownload); + TestUtils.waitForCondition(aVoid -> { + try { + JsonNode queryResponse = postQuery(testQuery); + if (!queryResponse.get("exceptions").isEmpty()) { + return false; + } + // Total docs should not change during reload + assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); + return isReloadJobCompleted(reloadJob); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, 600_000L, "Failed to reload table with force download"); } @BeforeMethod @@ -482,16 +526,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet TableConfig tableConfig = getOfflineTableConfig(); tableConfig.getIndexingConfig().setInvertedIndexColumns(getInvertedIndexColumns()); updateTableConfig(tableConfig); - String removeInvertedIndexJobId = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false); - TestUtils.waitForCondition(aVoid -> { - try { - // Total docs should not change during reload - assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("totalDocs").asLong(), numTotalDocs); - return isReloadJobCompleted(removeInvertedIndexJobId); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to cleanup obsolete index"); + reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, numTotalDocs); assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs); assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex); @@ -544,16 +579,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet tableConfig = getOfflineTableConfig(); tableConfig.getIndexingConfig().setInvertedIndexColumns(getInvertedIndexColumns()); updateTableConfig(tableConfig); - String forceDownloadJobId = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, true); - TestUtils.waitForCondition(aVoid -> { - try { - // Total docs should not change during reload - assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("totalDocs").asLong(), numTotalDocs); - return isReloadJobCompleted(forceDownloadJobId); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to cleanup obsolete index in table"); + reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, true, numTotalDocs); assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs); // With force download, the table size gets back to the initial value. assertEquals(getTableSize(getTableName()), DISK_SIZE_IN_BYTES); @@ -566,22 +592,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet TableConfig tableConfig = getOfflineTableConfig(); tableConfig.getIndexingConfig().setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS); updateTableConfig(tableConfig); - String reloadJobId = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false); // It takes a while to reload multiple segments, thus we retry the query for some time. // After all segments are reloaded, the inverted index is added on DivActualElapsedTime. // It's expected to have numEntriesScannedInFilter equal to 0, i.e. no docs is scanned // at filtering stage when inverted index can answer the predicate directly. - long numTotalDocs = getCountStarResult(); - TestUtils.waitForCondition(aVoid -> { - try { - // Total docs should not change during reload - assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("totalDocs").asLong(), numTotalDocs); - return isReloadJobCompleted(reloadJobId); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to generate inverted index"); + reloadAllSegments(TEST_UPDATED_INVERTED_INDEX_QUERY, false, getCountStarResult()); assertEquals(postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L); } @@ -1117,33 +1133,14 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // Update table config and trigger reload TableConfig tableConfig = getOfflineTableConfig(); tableConfig.getIndexingConfig().setRangeIndexColumns(UPDATED_RANGE_INDEX_COLUMNS); - updateTableConfig(tableConfig); - String addIndexJobId = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false); - TestUtils.waitForCondition(aVoid -> { - try { - // Total docs should not change during reload - assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("totalDocs").asLong(), numTotalDocs); - return isReloadJobCompleted(addIndexJobId); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to generate range index"); + reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, false, numTotalDocs); assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), 0L); // Update table config to remove the new range index, and check if the new range index is removed tableConfig = getOfflineTableConfig(); tableConfig.getIndexingConfig().setRangeIndexColumns(getRangeIndexColumns()); updateTableConfig(tableConfig); - String removeIndexJobId = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false); - TestUtils.waitForCondition(aVoid -> { - try { - // Total docs should not change during reload - assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("totalDocs").asLong(), numTotalDocs); - return isReloadJobCompleted(removeIndexJobId); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to cleanup obsolete index"); + reloadAllSegments(TEST_UPDATED_RANGE_INDEX_QUERY, false, numTotalDocs); assertEquals(postQuery(TEST_UPDATED_RANGE_INDEX_QUERY).get("numEntriesScannedInFilter").asLong(), numTotalDocs); assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex); } @@ -1158,16 +1155,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet TableConfig tableConfig = getOfflineTableConfig(); tableConfig.getIndexingConfig().setBloomFilterColumns(UPDATED_BLOOM_FILTER_COLUMNS); updateTableConfig(tableConfig); - String addIndexJobId = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false); - TestUtils.waitForCondition(aVoid -> { - try { - // Total docs should not change during reload - assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("totalDocs").asLong(), numTotalDocs); - return isReloadJobCompleted(addIndexJobId); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to generate bloom filter"); + reloadAllSegments(TEST_UPDATED_BLOOM_FILTER_QUERY, false, numTotalDocs); assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("numSegmentsProcessed").asLong(), 0L); // Update table config to remove the new bloom filter, and @@ -1175,16 +1163,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet tableConfig = getOfflineTableConfig(); tableConfig.getIndexingConfig().setBloomFilterColumns(getBloomFilterColumns()); updateTableConfig(tableConfig); - String removeIndexJobId = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false); - TestUtils.waitForCondition(aVoid -> { - try { - // Total docs should not change during reload - assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("totalDocs").asLong(), numTotalDocs); - return isReloadJobCompleted(removeIndexJobId); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to cleanup obsolete index"); + reloadAllSegments(TEST_UPDATED_BLOOM_FILTER_QUERY, false, numTotalDocs); assertEquals(postQuery(TEST_UPDATED_BLOOM_FILTER_QUERY).get("numSegmentsProcessed").asLong(), NUM_SEGMENTS); assertEquals(getTableSize(getTableName()), _tableSizeAfterRemovingIndex); } @@ -1223,24 +1202,12 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet indexingConfig.setStarTreeIndexConfigs(Collections.singletonList(STAR_TREE_INDEX_CONFIG_1)); indexingConfig.setEnableDynamicStarTreeCreation(true); updateTableConfig(tableConfig); - String addIndexJobId = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false); - TestUtils.waitForCondition(aVoid -> { - try { - JsonNode queryResponse = postQuery(TEST_STAR_TREE_QUERY_1); - // Result should not change during reload - assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), firstQueryResult); - // Total docs should not change during reload - assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); - return isReloadJobCompleted(addIndexJobId); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to add first star-tree index"); + reloadAllSegments(TEST_STAR_TREE_QUERY_1, false, numTotalDocs); // With star-tree, 'numDocsScanned' should be the same as number of segments (1 per segment) assertEquals(postQuery(TEST_STAR_TREE_QUERY_1).get("numDocsScanned").asLong(), NUM_SEGMENTS); // Reload again should have no effect - reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false); + reloadAllSegments(TEST_STAR_TREE_QUERY_1, false, numTotalDocs); firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1); assertEquals(firstQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), firstQueryResult); assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs); @@ -1268,19 +1235,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // Update table config with a different star-tree index config and trigger reload indexingConfig.setStarTreeIndexConfigs(Collections.singletonList(STAR_TREE_INDEX_CONFIG_2)); updateTableConfig(tableConfig); - String changeIndexJobId = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false); - TestUtils.waitForCondition(aVoid -> { - try { - JsonNode queryResponse = postQuery(TEST_STAR_TREE_QUERY_2); - // Result should not change during reload - assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), secondQueryResult); - // Total docs should not change during reload - assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); - return isReloadJobCompleted(changeIndexJobId); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to change to second star-tree index"); + reloadAllSegments(TEST_STAR_TREE_QUERY_2, false, numTotalDocs); // With star-tree, 'numDocsScanned' should be the same as number of segments (1 per segment) assertEquals(postQuery(TEST_STAR_TREE_QUERY_2).get("numDocsScanned").asLong(), NUM_SEGMENTS); @@ -1289,7 +1244,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), firstQueryResult); // Reload again should have no effect - reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false); + reloadAllSegments(TEST_STAR_TREE_QUERY_2, false, numTotalDocs); firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1); assertEquals(firstQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), firstQueryResult); assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs); @@ -1314,19 +1269,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet // Remove the star-tree index config and trigger reload indexingConfig.setStarTreeIndexConfigs(null); updateTableConfig(tableConfig); - String removeIndexJobId = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false); - TestUtils.waitForCondition(aVoid -> { - try { - JsonNode queryResponse = postQuery(TEST_STAR_TREE_QUERY_2); - // Result should not change during reload - assertEquals(queryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), secondQueryResult); - // Total docs should not change during reload - assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); - return isReloadJobCompleted(removeIndexJobId); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to remove star-tree index"); + reloadAllSegments(TEST_STAR_TREE_QUERY_2, false, numTotalDocs); // Without star-tree, 'numDocsScanned' should be the same as the 'COUNT(*)' result assertEquals(postQuery(TEST_STAR_TREE_QUERY_2).get("numDocsScanned").asLong(), secondQueryResult); assertEquals(getTableSize(getTableName()), tableSizeWithDefaultIndex); @@ -1336,7 +1279,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet assertEquals(firstQueryResponse.get("numDocsScanned").asInt(), firstQueryResult); // Reload again should have no effect - reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false); + reloadAllSegments(TEST_STAR_TREE_QUERY_2, false, numTotalDocs); firstQueryResponse = postQuery(TEST_STAR_TREE_QUERY_1); assertEquals(firstQueryResponse.get("resultTable").get("rows").get(0).get(0).asInt(), firstQueryResult); assertEquals(firstQueryResponse.get("totalDocs").asLong(), numTotalDocs); @@ -1475,29 +1418,13 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet updateTableConfig(tableConfig); // Trigger reload - String reloadJobId = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false); - TestUtils.waitForCondition(aVoid -> { - try { - JsonNode queryResponse = postQuery(TEST_EXTRA_COLUMNS_QUERY); - if (!queryResponse.get("exceptions").isEmpty()) { - // Schema is not refreshed on the broker side yet - return false; - } - // Total docs should not change during reload - assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); - return isReloadJobCompleted(reloadJobId); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to add default columns"); + reloadAllSegments(TEST_EXTRA_COLUMNS_QUERY, false, numTotalDocs); assertEquals(postQuery(TEST_EXTRA_COLUMNS_QUERY).get("resultTable").get("rows").get(0).get(0).asLong(), numTotalDocs); } private void reloadWithMissingColumns() throws Exception { - long numTotalDocs = getCountStarResult(); - // Remove columns from the table config first to pass the validation of the table config TableConfig tableConfig = getOfflineTableConfig(); tableConfig.setIngestionConfig(null); @@ -1513,16 +1440,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet addSchema(schema); // Trigger reload - String reloadJobId = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false); - TestUtils.waitForCondition(aVoid -> { - try { - // Total docs should not change during reload - assertEquals(postQuery(SELECT_STAR_QUERY).get("totalDocs").asLong(), numTotalDocs); - return isReloadJobCompleted(reloadJobId); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to skip missing columns"); + reloadAllSegments(SELECT_STAR_QUERY, true, getCountStarResult()); JsonNode segmentsMetadata = JsonUtils.stringToJsonNode( sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName(), "*"))); assertEquals(segmentsMetadata.size(), 12); @@ -1539,21 +1457,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet addSchema(createSchema()); // Trigger reload - String reloadJobId = reloadTableAndValidateResponse(getTableName(), TableType.OFFLINE, false); - TestUtils.waitForCondition(aVoid -> { - try { - JsonNode queryResponse = postQuery(TEST_REGULAR_COLUMNS_QUERY); - if (!queryResponse.get("exceptions").isEmpty()) { - // Schema is not refreshed on the broker side yet - return false; - } - // Total docs should not change during reload - assertEquals(queryResponse.get("totalDocs").asLong(), numTotalDocs); - return isReloadJobCompleted(reloadJobId); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, 600_000L, "Failed to reload regular columns"); + reloadAllSegments(SELECT_STAR_QUERY, true, numTotalDocs); assertEquals(postQuery(TEST_REGULAR_COLUMNS_QUERY).get("resultTable").get("rows").get(0).get(0).asLong(), numTotalDocs); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org