This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b69f438a79 Make Preload Integration test more extensible (#11195) b69f438a79 is described below commit b69f438a79716179c3a7c8161a67d675bdd9909e Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Fri Jul 28 22:59:01 2023 +0530 Make Preload Integration test more extensible (#11195) * Make Preload Integration test more extensible * Make snapshot method protected as well --------- Co-authored-by: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local> --- .../UpsertTableSegmentPreloadIntegrationTest.java | 38 ++++++++++++++-------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java index e662403347..0bd5a84af6 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java @@ -73,12 +73,17 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra startBroker(); startServers(NUM_SERVERS); - // Unpack the Avro files - List<File> avroFiles = unpackAvroData(_tempDir); - // Start Kafka and push data into Kafka startKafka(); + populateTables(); + } + + protected void populateTables() + throws Exception { + // Unpack the Avro files + List<File> avroFiles = unpackAvroData(_tempDir); + // Create and upload schema and table config Schema schema = createSchema(); addSchema(schema); @@ -192,6 +197,16 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra assertEquals(getCurrentCountStarResult(), getCountStarResult()); assertEquals(getCurrentCountStarResultWithoutUpsert(), getCountStarResultWithoutUpsert()); + waitForSnapshotCreation(); + + // Restart the servers and check again + restartServers(); + verifyIdealState(7); + waitForAllDocsLoaded(600_000L); + } + + protected void waitForSnapshotCreation() + throws Exception { Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME"); // trigger force commit for snapshots String jobId = forceCommit(getTableName()); @@ -211,7 +226,7 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra serverStarter.getConfig().getProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR); File[] files = new File(segmentDir, getTableName() + "_REALTIME").listFiles(); for (File file : files) { - if (file.getName().contains("tmp") || file.getName().contains("consumer")) { + if (!file.getName().startsWith(getTableName())) { continue; } if (file.isDirectory()) { @@ -231,15 +246,10 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra } catch (Exception e) { return false; } - }, 60000L, "Error verifying force commit operation on table!"); - - // Restart the servers and check again - restartServers(); - verifyIdealState(7); - waitForAllDocsLoaded(600_000L); + }, 120000L, "Error verifying force commit operation on table!"); } - private void verifyIdealState(int numSegmentsExpected) { + protected void verifyIdealState(int numSegmentsExpected) { IdealState idealState = HelixHelper.getTableIdealState(_helixManager, REALTIME_TABLE_NAME); Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields(); assertEquals(segmentAssignment.size(), numSegmentsExpected); @@ -295,7 +305,7 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra } } - public Set<String> getConsumingSegmentsFromIdealState(String tableNameWithType) { + protected Set<String> getConsumingSegmentsFromIdealState(String tableNameWithType) { IdealState tableIdealState = _controllerStarter.getHelixResourceManager().getTableIdealState(tableNameWithType); Map<String, Map<String, String>> segmentAssignment = tableIdealState.getRecord().getMapFields(); Set<String> matchingSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size())); @@ -308,7 +318,7 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra return matchingSegments; } - public boolean isForceCommitJobCompleted(String forceCommitJobId) + protected boolean isForceCommitJobCompleted(String forceCommitJobId) throws Exception { String jobStatusResponse = sendGetRequest(_controllerRequestURLBuilder.forForceCommitJobStatus(forceCommitJobId)); JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse); @@ -318,7 +328,7 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0; } - private String forceCommit(String tableName) + protected String forceCommit(String tableName) throws Exception { String response = sendPostRequest(_controllerRequestURLBuilder.forTableForceCommit(tableName), null); return JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org