This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d9c64f347a Add Integration test for Upsert Preload (#11160) d9c64f347a is described below commit d9c64f347af0d1c6e27c8b0c304033be9f463f69 Author: Kartik Khare <kharekar...@gmail.com> AuthorDate: Wed Jul 26 16:22:09 2023 +0530 Add Integration test for Upsert Preload (#11160) * Add Integration test for Upsert Preload * Fix tests * Reverting Base method changes as it breaks compatibility tests --- ... UpsertTableSegmentPreloadIntegrationTest.java} | 123 +++++++++++++++++++-- .../UpsertTableSegmentUploadIntegrationTest.java | 3 +- 2 files changed, 115 insertions(+), 11 deletions(-) diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java similarity index 60% copy from pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java copy to pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java index 7fddd5e43c..e662403347 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java @@ -18,18 +18,26 @@ */ package org.apache.pinot.integration.tests; +import com.fasterxml.jackson.databind.JsonNode; import java.io.File; import java.io.IOException; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import org.apache.commons.io.FileUtils; import org.apache.helix.model.IdealState; +import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.helix.HelixHelper; +import org.apache.pinot.server.starter.helix.BaseServerStarter; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; +import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; import org.testng.annotations.AfterClass; @@ -38,10 +46,11 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; -public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrationTestSet { - private static final int NUM_SERVERS = 2; +public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegrationTestSet { + private static final int NUM_SERVERS = 1; private static final String PRIMARY_KEY_COL = "clientId"; private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME); @@ -69,22 +78,31 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat // Start Kafka and push data into Kafka startKafka(); - pushAvroIntoKafka(avroFiles); // Create and upload schema and table config Schema schema = createSchema(); addSchema(schema); - TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions()); + TableConfig tableConfig = + createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions()); + tableConfig.getUpsertConfig().setEnablePreload(true); + tableConfig.getUpsertConfig().setEnableSnapshot(true); addTableConfig(tableConfig); // Create and upload segments ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); uploadSegments(getTableName(), TableType.REALTIME, _tarDir); + pushAvroIntoKafka(avroFiles); // Wait for all documents loaded waitForAllDocsLoaded(600_000L); } + @Override + protected void overrideServerConf(PinotConfiguration serverConf) { + serverConf.setProperty(CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX + ".max.segment.preload.threads", + "1"); + } + @AfterClass public void tearDown() throws IOException { @@ -166,27 +184,78 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat @Test public void testSegmentAssignment() throws Exception { - verifyIdealState(); + verifyIdealState(5); // Run the real-time segment validation and check again _controllerStarter.getRealtimeSegmentValidationManager().run(); - verifyIdealState(); + verifyIdealState(5); assertEquals(getCurrentCountStarResult(), getCountStarResult()); assertEquals(getCurrentCountStarResultWithoutUpsert(), getCountStarResultWithoutUpsert()); + Set<String> consumingSegments = getConsumingSegmentsFromIdealState(getTableName() + "_REALTIME"); + // trigger force commit for snapshots + String jobId = forceCommit(getTableName()); + + Set<String> finalConsumingSegments = consumingSegments; + + TestUtils.waitForCondition(aVoid -> { + try { + if (isForceCommitJobCompleted(jobId)) { + assertTrue(_controllerStarter.getHelixResourceManager() + .getOnlineSegmentsFromIdealState(getTableName() + "_REALTIME", false) + .containsAll(finalConsumingSegments)); + + int snapshotFileCount = 0; + for (BaseServerStarter serverStarter : _serverStarters) { + String segmentDir = + serverStarter.getConfig().getProperty(CommonConstants.Server.CONFIG_OF_INSTANCE_DATA_DIR); + File[] files = new File(segmentDir, getTableName() + "_REALTIME").listFiles(); + for (File file : files) { + if (file.getName().contains("tmp") || file.getName().contains("consumer")) { + continue; + } + if (file.isDirectory()) { + File segmentV3Dir = new File(file, "v3"); + File[] segmentFiles = segmentV3Dir.listFiles(); + for (File segmentFile : segmentFiles) { + if (segmentFile.getName().endsWith(".snapshot")) { + snapshotFileCount++; + } + } + } + } + } + return snapshotFileCount == 5; + } + return false; + } catch (Exception e) { + return false; + } + }, 60000L, "Error verifying force commit operation on table!"); + // Restart the servers and check again restartServers(); - verifyIdealState(); + verifyIdealState(7); waitForAllDocsLoaded(600_000L); } - private void verifyIdealState() { + private void verifyIdealState(int numSegmentsExpected) { IdealState idealState = HelixHelper.getTableIdealState(_helixManager, REALTIME_TABLE_NAME); Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields(); - assertEquals(segmentAssignment.size(), 5); + assertEquals(segmentAssignment.size(), numSegmentsExpected); String serverForPartition0 = null; String serverForPartition1 = null; + + int maxSequenceNumber = 0; + for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { + String segmentName = entry.getKey(); + if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) { + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + maxSequenceNumber = Math.max(maxSequenceNumber, llcSegmentName.getSequenceNumber()); + } + } + for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { String segmentName = entry.getKey(); Map<String, String> instanceStateMap = entry.getValue(); @@ -196,7 +265,12 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat Map.Entry<String, String> instanceIdAndState = instanceStateMap.entrySet().iterator().next(); String state = instanceIdAndState.getValue(); if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) { - assertEquals(state, SegmentStateModel.CONSUMING); + LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); + if (llcSegmentName.getSequenceNumber() < maxSequenceNumber) { + assertEquals(state, SegmentStateModel.ONLINE); + } else { + assertEquals(state, SegmentStateModel.CONSUMING); + } } else { assertEquals(state, SegmentStateModel.ONLINE); } @@ -221,6 +295,35 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat } } + public Set<String> getConsumingSegmentsFromIdealState(String tableNameWithType) { + IdealState tableIdealState = _controllerStarter.getHelixResourceManager().getTableIdealState(tableNameWithType); + Map<String, Map<String, String>> segmentAssignment = tableIdealState.getRecord().getMapFields(); + Set<String> matchingSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size())); + for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { + Map<String, String> instanceStateMap = entry.getValue(); + if (instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING)) { + matchingSegments.add(entry.getKey()); + } + } + return matchingSegments; + } + + public boolean isForceCommitJobCompleted(String forceCommitJobId) + throws Exception { + String jobStatusResponse = sendGetRequest(_controllerRequestURLBuilder.forForceCommitJobStatus(forceCommitJobId)); + JsonNode jobStatus = JsonUtils.stringToJsonNode(jobStatusResponse); + + assertEquals(jobStatus.get("jobId").asText(), forceCommitJobId); + assertEquals(jobStatus.get("jobType").asText(), "FORCE_COMMIT"); + return jobStatus.get("numberOfSegmentsYetToBeCommitted").asInt(-1) == 0; + } + + private String forceCommit(String tableName) + throws Exception { + String response = sendPostRequest(_controllerRequestURLBuilder.forTableForceCommit(tableName), null); + return JsonUtils.stringToJsonNode(response).get("forceCommitJobId").asText(); + } + private static int getSegmentPartitionId(String segmentName) { switch (segmentName) { case UPLOADED_SEGMENT_1: diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java index 7fddd5e43c..2b9f94ae10 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java @@ -74,7 +74,8 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat // Create and upload schema and table config Schema schema = createSchema(); addSchema(schema); - TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions()); + TableConfig tableConfig = + createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, null, getNumKafkaPartitions()); addTableConfig(tableConfig); // Create and upload segments --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org