This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new f4c85e9596 Fix the flaky UpsertTableSegmentUploadIntegrationTest (#8675) f4c85e9596 is described below commit f4c85e95969b0bc7276ec18ae118051af93e9f2c Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Tue May 10 14:25:13 2022 -0700 Fix the flaky UpsertTableSegmentUploadIntegrationTest (#8675) --- .../pinot/integration/tests/ClusterTest.java | 95 ++++----- .../tests/OfflineClusterIntegrationTest.java | 10 +- .../UpsertTableSegmentUploadIntegrationTest.java | 218 +++++++++------------ 3 files changed, 134 insertions(+), 189 deletions(-) diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index 35132c3bb8..e940fd0f06 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -67,6 +67,7 @@ import org.apache.pinot.spi.data.readers.GenericRow; import org.apache.pinot.spi.data.readers.RecordExtractor; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.stream.StreamMessageDecoder; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Broker; import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.apache.pinot.spi.utils.CommonConstants.Minion; @@ -299,28 +300,47 @@ public abstract class ClusterTest extends ControllerTest { /** * Upload all segments inside the given directory to the cluster. - * - * @param tarDir Segment directory */ protected void uploadSegments(String tableName, File tarDir) throws Exception { - File[] segmentTarFiles = tarDir.listFiles(); - assertNotNull(segmentTarFiles); - int numSegments = segmentTarFiles.length; + uploadSegments(tableName, TableType.OFFLINE, tarDir); + } + + /** + * Upload all segments inside the given directory to the cluster. + */ + protected void uploadSegments(String tableName, TableType tableType, File tarDir) + throws Exception { + uploadSegments(tableName, tableType, Collections.singletonList(tarDir)); + } + + /** + * Upload all segments inside the given directories to the cluster. + */ + protected void uploadSegments(String tableName, TableType tableType, List<File> tarDirs) + throws Exception { + List<File> segmentTarFiles = new ArrayList<>(); + for (File tarDir : tarDirs) { + File[] tarFiles = tarDir.listFiles(); + assertNotNull(tarFiles); + Collections.addAll(segmentTarFiles, tarFiles); + } + int numSegments = segmentTarFiles.size(); assertTrue(numSegments > 0); - URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort); + URI uploadSegmentHttpURI = + FileUploadDownloadClient.getUploadSegmentURI(CommonConstants.HTTP_PROTOCOL, LOCAL_HOST, _controllerPort); try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { if (numSegments == 1) { - File segmentTarFile = segmentTarFiles[0]; + File segmentTarFile = segmentTarFiles.get(0); if (System.currentTimeMillis() % 2 == 0) { assertEquals( fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, - tableName).getStatusCode(), HttpStatus.SC_OK); + tableName, tableType).getStatusCode(), HttpStatus.SC_OK); } else { assertEquals( - uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI, fileUploadDownloadClient, segmentTarFile), - HttpStatus.SC_OK); + uploadSegmentWithOnlyMetadata(tableName, tableType, uploadSegmentHttpURI, fileUploadDownloadClient, + segmentTarFile), HttpStatus.SC_OK); } } else { // Upload all segments in parallel @@ -330,9 +350,9 @@ public abstract class ClusterTest extends ControllerTest { futures.add(executorService.submit(() -> { if (System.currentTimeMillis() % 2 == 0) { return fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), - segmentTarFile, tableName).getStatusCode(); + segmentTarFile, tableName, tableType).getStatusCode(); } else { - return uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI, fileUploadDownloadClient, + return uploadSegmentWithOnlyMetadata(tableName, tableType, uploadSegmentHttpURI, fileUploadDownloadClient, segmentTarFile); } })); @@ -345,60 +365,19 @@ public abstract class ClusterTest extends ControllerTest { } } - /** - * tarDirPaths contains a list of directories that contain segment files. API uploads all segments inside the given - * list of directories to the cluster. - * - * @param tarDirPaths List of directories containing segments - */ - protected void uploadSegments(String tableName, List<File> tarDirPaths, TableType tableType, - boolean enableParallelPushProtection) - throws Exception { - List<File> segmentTarFiles = new ArrayList<>(); - - for (File tarDir : tarDirPaths) { - Collections.addAll(segmentTarFiles, tarDir.listFiles()); - } - assertNotNull(segmentTarFiles); - int numSegments = segmentTarFiles.size(); - assertTrue(numSegments > 0); - - URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort); - try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { - if (numSegments == 1) { - File segmentTarFile = segmentTarFiles.get(0); - assertEquals( - fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, - tableName, tableType.OFFLINE, enableParallelPushProtection, true).getStatusCode(), HttpStatus.SC_OK); - } else { - // Upload all segments in parallel - ExecutorService executorService = Executors.newFixedThreadPool(numSegments); - List<Future<Integer>> futures = new ArrayList<>(numSegments); - for (File segmentTarFile : segmentTarFiles) { - futures.add(executorService.submit(() -> { - return fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), - segmentTarFile, tableName, tableType.OFFLINE, enableParallelPushProtection, true).getStatusCode(); - })); - } - executorService.shutdown(); - for (Future<Integer> future : futures) { - assertEquals((int) future.get(), HttpStatus.SC_OK); - } - } - } - } - - private int uploadSegmentWithOnlyMetadata(String tableName, URI uploadSegmentHttpURI, + private int uploadSegmentWithOnlyMetadata(String tableName, TableType tableType, URI uploadSegmentHttpURI, FileUploadDownloadClient fileUploadDownloadClient, File segmentTarFile) throws IOException, HttpErrorStatusException { List<Header> headers = ImmutableList.of(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, "file://" + segmentTarFile.getParentFile().getAbsolutePath() + "/" + URLEncoder.encode(segmentTarFile.getName(), StandardCharsets.UTF_8.toString())), new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, FileUploadDownloadClient.FileUploadType.METADATA.toString())); - // Add table name as a request parameter + // Add table name and table type as request parameters NameValuePair tableNameValuePair = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, tableName); - List<NameValuePair> parameters = Arrays.asList(tableNameValuePair); + NameValuePair tableTypeValuePair = + new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, tableType.name()); + List<NameValuePair> parameters = Arrays.asList(tableNameValuePair, tableTypeValuePair); return fileUploadDownloadClient.uploadSegmentMetadata(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, headers, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode(); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 9bb4853283..0669847093 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -184,13 +184,11 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet File tarDir2 = new File(_tempDir, "tarDir2"); FileUtils.copyDirectory(_tarDir, tarDir2); - List<File> tarDirPaths = new ArrayList<>(); - tarDirPaths.add(_tarDir); - tarDirPaths.add(tarDir2); - - // TODO: Move this block to a separate method. + List<File> tarDirs = new ArrayList<>(); + tarDirs.add(_tarDir); + tarDirs.add(tarDir2); try { - uploadSegments(getTableName(), tarDirPaths, TableType.OFFLINE, true); + uploadSegments(getTableName(), TableType.OFFLINE, tarDirs); } catch (Exception e) { // If enableParallelPushProtection is enabled and the same segment is uploaded concurrently, we could get one // of the two exception - 409 conflict of the second call enters ProcessExistingSegment ; segmentZkMetadata diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java index 616f150d52..e6f5ff248f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java @@ -20,48 +20,36 @@ package org.apache.pinot.integration.tests; import java.io.File; import java.io.IOException; -import java.net.URI; -import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import org.apache.commons.io.FileUtils; -import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; -import org.apache.http.HttpStatus; -import org.apache.pinot.common.utils.FileUploadDownloadClient; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.common.utils.helix.HelixHelper; -import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; -import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrationTestSet { - private static final int NUM_BROKERS = 1; private static final int NUM_SERVERS = 2; - // Segment 1 contains records of pk value 100000 + private static final String PRIMARY_KEY_COL = "clientId"; + private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME); + + // Segment 1 contains records of pk value 100000 (partition 0) private static final String UPLOADED_SEGMENT_1 = "mytable_10027_19736_0 %"; - // Segment 2 contains records of pk value 100001 + // Segment 2 contains records of pk value 100001 (partition 1) private static final String UPLOADED_SEGMENT_2 = "mytable_10072_19919_1 %"; - // Segment 3 contains records of pk value 100000 + // Segment 3 contains records of pk value 100002 (partition 1) private static final String UPLOADED_SEGMENT_3 = "mytable_10158_19938_2 %"; - private static final String PRIMARY_KEY_COL = "clientId"; - private static final String TABLE_NAME_WITH_TYPE = "mytable_REALTIME"; @BeforeClass public void setUp() @@ -72,27 +60,24 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat startZk(); // Start a customized controller with more frequent realtime segment validation startController(); - startBrokers(getNumBrokers()); + startBroker(); startServers(NUM_SERVERS); - // Start Kafka - startKafka(); - - // Create and upload the schema. - Schema schema = createSchema(); - addSchema(schema); - // Unpack the Avro files List<File> avroFiles = unpackAvroData(_tempDir); - // Push data to Kafka + // Start Kafka and push data into Kafka + startKafka(); pushAvroIntoKafka(avroFiles); - // Create and upload the table config - TableConfig upsertTableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, getNumKafkaPartitions()); - addTableConfig(upsertTableConfig); + + // Create and upload schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), PRIMARY_KEY_COL, getNumKafkaPartitions()); + addTableConfig(tableConfig); // Create and upload segments - ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, upsertTableConfig, schema, 0, _segmentDir, _tarDir); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); uploadSegments(getTableName(), TableType.REALTIME, _tarDir); // Wait for all documents loaded @@ -131,8 +116,9 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat return true; } - protected int getNumBrokers() { - return NUM_BROKERS; + @Override + protected String getPartitionColumn() { + return PRIMARY_KEY_COL; } @Override @@ -142,113 +128,95 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat } @Override - protected String getPartitionColumn() { - return PRIMARY_KEY_COL; + protected void waitForAllDocsLoaded(long timeoutMs) + throws Exception { + TestUtils.waitForCondition(aVoid -> { + try { + return getCurrentCountStarResultWithoutUpsert() == getCountStarResultWithoutUpsert(); + } catch (Exception e) { + return null; + } + }, 100L, timeoutMs, "Failed to load all documents"); + assertEquals(getCurrentCountStarResult(), getCountStarResult()); } - @Override - protected void startController() - throws Exception { - Map<String, Object> controllerConfig = getDefaultControllerConfiguration(); - // Perform realtime segment validation every second with 1 second initial delay. - controllerConfig - .put(ControllerConf.ControllerPeriodicTasksConf.DEPRECATED_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS, 1); - controllerConfig - .put(ControllerConf.ControllerPeriodicTasksConf.DEPRECATED_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS, 1); - controllerConfig - .put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS, 1); - startController(controllerConfig); + private long getCurrentCountStarResultWithoutUpsert() { + return getPinotConnection().execute("SELECT COUNT(*) FROM " + getTableName() + " OPTION(skipUpsert=true)") + .getResultSet(0).getLong(0); + } + + private long getCountStarResultWithoutUpsert() { + // 3 Avro files, each with 100 documents, one copy from streaming source, one copy from batch source + return 600; } @Test public void testSegmentAssignment() throws Exception { - IdealState idealState = HelixHelper.getTableIdealState(_helixManager, TABLE_NAME_WITH_TYPE); - Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult()); - verifyTableIdealStates(idealState); - // Wait 3 seconds to let the realtime validation thread to run. - Thread.sleep(3000); - // Verify the result again. - Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult()); - verifyTableIdealStates(idealState); - - // Restart the servers and check every segment is not in ERROR state. + verifyIdealState(); + + // Run the real-time segment validation and check again + _controllerStarter.getRealtimeSegmentValidationManager().run(); + verifyIdealState(); + assertEquals(getCurrentCountStarResult(), getCountStarResult()); + assertEquals(getCurrentCountStarResultWithoutUpsert(), getCountStarResultWithoutUpsert()); + + // Restart the servers and check again restartServers(); - verifyTableIdealStates(idealState); - ExternalView ev = - HelixHelper.getExternalViewForResource(_helixAdmin, this.getHelixClusterName(), TABLE_NAME_WITH_TYPE); - Set<String> segments = ev.getPartitionSet(); - Assert.assertEquals(segments.size(), 5); - for (String segment : segments) { - Map<String, String> stateMap = ev.getStateMap(segment); - Assert.assertTrue(stateMap.size() > 0); - for (Map.Entry<String, String> server2state: stateMap.entrySet()) { - Assert.assertFalse("ERROR".equals(server2state.getValue())); - } - } - // Verify the result again. - Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult()); + verifyIdealState(); + waitForAllDocsLoaded(600_000L); } - private void verifyTableIdealStates(IdealState idealState) { - // Verify various ideal state properties - Set<String> segments = idealState.getPartitionSet(); - Assert.assertEquals(segments.size(), 5); - Map<String, Integer> segment2PartitionId = new HashMap<>(); - segment2PartitionId.put(UPLOADED_SEGMENT_1, 0); - segment2PartitionId.put(UPLOADED_SEGMENT_2, 1); - segment2PartitionId.put(UPLOADED_SEGMENT_3, 1); - - // Verify that all segments of the same partition are mapped to the same single server. - Map<Integer, Set<String>> segmentAssignment = new HashMap<>(); - for (String segment : segments) { - Integer partitionId; - if (LLCSegmentName.isLowLevelConsumerSegmentName(segment)) { - partitionId = new LLCSegmentName(segment).getPartitionGroupId(); + private void verifyIdealState() { + IdealState idealState = HelixHelper.getTableIdealState(_helixManager, REALTIME_TABLE_NAME); + Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields(); + assertEquals(segmentAssignment.size(), 5); + + String serverForPartition0 = null; + String serverForPartition1 = null; + for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { + String segmentName = entry.getKey(); + Map<String, String> instanceStateMap = entry.getValue(); + + // Verify that all segments have the correct state + assertEquals(instanceStateMap.size(), 1); + Map.Entry<String, String> instanceIdAndState = instanceStateMap.entrySet().iterator().next(); + String state = instanceIdAndState.getValue(); + if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) { + assertEquals(state, SegmentStateModel.CONSUMING); } else { - partitionId = segment2PartitionId.get(segment); + assertEquals(state, SegmentStateModel.ONLINE); } - Assert.assertNotNull(partitionId); - Set<String> instances = idealState.getInstanceSet(segment); - Assert.assertEquals(1, instances.size()); - if (segmentAssignment.containsKey(partitionId)) { - Assert.assertEquals(instances, segmentAssignment.get(partitionId)); + + // Verify that all segments of the same partition are mapped to the same server + String instanceId = instanceIdAndState.getKey(); + int partitionId = getSegmentPartitionId(segmentName); + if (partitionId == 0) { + if (serverForPartition0 == null) { + serverForPartition0 = instanceId; + } else { + assertEquals(instanceId, serverForPartition0); + } } else { - segmentAssignment.put(partitionId, instances); + assertEquals(partitionId, 1); + if (serverForPartition1 == null) { + serverForPartition1 = instanceId; + } else { + assertEquals(instanceId, serverForPartition1); + } } } } - private void uploadSegments(String tableName, TableType tableType, File tarDir) - throws Exception { - File[] segmentTarFiles = tarDir.listFiles(); - assertNotNull(segmentTarFiles); - int numSegments = segmentTarFiles.length; - assertTrue(numSegments > 0); - - URI uploadSegmentHttpURI = FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort); - try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient()) { - if (numSegments == 1) { - File segmentTarFile = segmentTarFiles[0]; - assertEquals(fileUploadDownloadClient - .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName, tableType) - .getStatusCode(), HttpStatus.SC_OK); - } else { - // Upload all segments in parallel - ExecutorService executorService = Executors.newFixedThreadPool(numSegments); - List<Future<Integer>> futures = new ArrayList<>(numSegments); - for (File segmentTarFile : segmentTarFiles) { - futures.add(executorService.submit(() -> { - return fileUploadDownloadClient - .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName, tableType) - .getStatusCode(); - })); - } - executorService.shutdown(); - for (Future<Integer> future : futures) { - assertEquals((int) future.get(), HttpStatus.SC_OK); - } - } + private static int getSegmentPartitionId(String segmentName) { + switch (segmentName) { + case UPLOADED_SEGMENT_1: + return 0; + case UPLOADED_SEGMENT_2: + case UPLOADED_SEGMENT_3: + return 1; + default: + return new LLCSegmentName(segmentName).getPartitionGroupId(); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org