This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8b2fb03d85 [Flaky-test] Fix flaky PinotResourceManagerTest and also clean it up (#10941) 8b2fb03d85 is described below commit 8b2fb03d8532afd092c0211990e61f156fb45b73 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Jun 16 23:02:39 2023 -0700 [Flaky-test] Fix flaky PinotResourceManagerTest and also clean it up (#10941) --- .../controller/helix/PinotResourceManagerTest.java | 231 ++++++++++----------- .../controller/utils/SegmentMetadataMockUtils.java | 4 +- 2 files changed, 111 insertions(+), 124 deletions(-) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java index 445e43a871..6f4c87de04 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotResourceManagerTest.java @@ -23,10 +23,12 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Future; import org.apache.helix.model.IdealState; +import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.utils.LLCSegmentName; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.utils.SegmentMetadataMockUtils; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.table.ReplicaGroupStrategyConfig; @@ -36,65 +38,64 @@ import org.apache.pinot.spi.config.table.UpsertConfig; import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.testng.Assert.*; + public class PinotResourceManagerTest { - private static final ControllerTest TEST_INSTANCE = ControllerTest.getInstance(); - private static final String OFFLINE_TABLE_NAME = "offlineResourceManagerTestTable_OFFLINE"; - private static final String REALTIME_TABLE_NAME = "realtimeResourceManagerTestTable_REALTIME"; - private static final String NUM_REPLICAS_STRING = "2"; - private static final String PARTITION_COLUMN = "Partition_Column"; + private static final String RAW_TABLE_NAME = "testTable"; + private static final String OFFLINE_TABLE_NAME = TableNameBuilder.OFFLINE.tableNameWithType(RAW_TABLE_NAME); + private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); + private static final int NUM_REPLICAS = 2; + private static final String PARTITION_COLUMN = "partitionColumn"; + + private final ControllerTest _testInstance = ControllerTest.getInstance(); + private PinotHelixResourceManager _resourceManager; @BeforeClass public void setUp() throws Exception { - TEST_INSTANCE.setupSharedStateAndValidate(); + _testInstance.setupSharedStateAndValidate(); + _resourceManager = _testInstance.getHelixResourceManager(); // Adding an offline table - TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).build(); - TEST_INSTANCE.getHelixResourceManager().addTable(offlineTableConfig); + TableConfig offlineTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build(); + _resourceManager.addTable(offlineTableConfig); // Adding an upsert enabled realtime table which consumes from a stream with 2 partitions - Schema dummySchema = TEST_INSTANCE.createDummySchema(REALTIME_TABLE_NAME); - TEST_INSTANCE.addSchema(dummySchema); + Schema dummySchema = ControllerTest.createDummySchema(RAW_TABLE_NAME); + _testInstance.addSchema(dummySchema); Map<String, String> streamConfigs = FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap(); TableConfig realtimeTableConfig = - new TableConfigBuilder(TableType.REALTIME).setStreamConfigs(streamConfigs).setTableName(REALTIME_TABLE_NAME) - .setSchemaName(dummySchema.getSchemaName()).build(); - realtimeTableConfig.getValidationConfig().setReplicasPerPartition(NUM_REPLICAS_STRING); - realtimeTableConfig.getValidationConfig() - .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)); - realtimeTableConfig.setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)); - TEST_INSTANCE.getHelixResourceManager().addTable(realtimeTableConfig); + new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS) + .setStreamConfigs(streamConfigs) + .setReplicaGroupStrategyConfig(new ReplicaGroupStrategyConfig(PARTITION_COLUMN, 1)) + .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL)).build(); + _resourceManager.addTable(realtimeTableConfig); } @Test public void testTableCleanupAfterRealtimeClusterException() throws Exception { - String invalidRealtimeTable = "invalidTable_REALTIME"; - Schema dummySchema = TEST_INSTANCE.createDummySchema(invalidRealtimeTable); - TEST_INSTANCE.addSchema(dummySchema); + String invalidRawTableName = "invalidTable"; + Schema dummySchema = ControllerTest.createDummySchema(invalidRawTableName); + _testInstance.addSchema(dummySchema); - // Missing replicasPerPartition + // Missing stream config TableConfig invalidRealtimeTableConfig = - new TableConfigBuilder(TableType.REALTIME).setTableName(invalidRealtimeTable) - .setSchemaName(dummySchema.getSchemaName()).build(); - + new TableConfigBuilder(TableType.REALTIME).setTableName(invalidRawTableName).build(); try { - TEST_INSTANCE.getHelixResourceManager().addTable(invalidRealtimeTableConfig); - Assert.fail( - "Table creation should have thrown exception due to missing stream config and replicasPerPartition in " - + "validation config"); + _resourceManager.addTable(invalidRealtimeTableConfig); + fail("Table creation should have thrown exception due to missing stream config in validation config"); } catch (Exception e) { // expected } // Verify invalid table config is cleaned up - Assert.assertNull(TEST_INSTANCE.getHelixResourceManager().getTableConfig(invalidRealtimeTable)); + assertNull(_resourceManager.getTableConfig(invalidRealtimeTableConfig.getTableName())); } @Test @@ -102,144 +103,130 @@ public class PinotResourceManagerTest { SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata("testSegment"); // Segment ZK metadata does not exist - Assert.assertFalse(TEST_INSTANCE.getHelixResourceManager() - .updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata, 0)); + assertFalse(_resourceManager.updateZkMetadata(OFFLINE_TABLE_NAME, segmentZKMetadata, 0)); // Set segment ZK metadata - Assert.assertTrue(TEST_INSTANCE.getHelixResourceManager() - .updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata)); + assertTrue(_resourceManager.updateZkMetadata(OFFLINE_TABLE_NAME, segmentZKMetadata)); // Update ZK metadata - Assert.assertEquals(TEST_INSTANCE.getHelixResourceManager() - .getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME + "_OFFLINE", "testSegment").getVersion(), 0); - Assert.assertTrue(TEST_INSTANCE.getHelixResourceManager() - .updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata, 0)); - Assert.assertEquals(TEST_INSTANCE.getHelixResourceManager() - .getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME + "_OFFLINE", "testSegment").getVersion(), 1); - Assert.assertFalse(TEST_INSTANCE.getHelixResourceManager() - .updateZkMetadata(OFFLINE_TABLE_NAME + "_OFFLINE", segmentZKMetadata, 0)); + ZNRecord segmentMetadataZnRecord = _resourceManager.getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME, "testSegment"); + assertNotNull(segmentMetadataZnRecord); + assertEquals(segmentMetadataZnRecord.getVersion(), 0); + assertTrue(_resourceManager.updateZkMetadata(OFFLINE_TABLE_NAME, segmentZKMetadata, 0)); + segmentMetadataZnRecord = _resourceManager.getSegmentMetadataZnRecord(OFFLINE_TABLE_NAME, "testSegment"); + assertNotNull(segmentMetadataZnRecord); + assertEquals(segmentMetadataZnRecord.getVersion(), 1); + assertFalse(_resourceManager.updateZkMetadata(OFFLINE_TABLE_NAME, segmentZKMetadata, 0)); } /** * First tests basic segment adding/deleting. - * Then creates 3 threads that concurrently try to add 10 segments each, and asserts that we have - * 100 segments in the end. Then launches 5 threads again that concurrently try to delete all segments, - * and makes sure that we have zero segments left in the end. - * @throws Exception + * Then creates 3 threads that concurrently try to add 10 segments each, and asserts that we have 30 segments in the + * end. Then launches 3 threads again that concurrently try to delete all segments, and makes sure that we have 0 + * segments left in the end. */ - @Test public void testBasicAndConcurrentAddingAndDeletingSegments() throws Exception { - final String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(OFFLINE_TABLE_NAME); + PinotHelixResourceManager resourceManager = _resourceManager; - // Basic add/delete case + // Basic add/delete for (int i = 1; i <= 2; i++) { - TEST_INSTANCE.getHelixResourceManager() - .addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME), - "downloadUrl"); + resourceManager.addNewSegment(OFFLINE_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME), "downloadUrl"); } - IdealState idealState = TEST_INSTANCE.getHelixAdmin() - .getResourceIdealState(TEST_INSTANCE.getHelixClusterName(), offlineTableName); + IdealState idealState = resourceManager.getTableIdealState(OFFLINE_TABLE_NAME); + assertNotNull(idealState); Set<String> segments = idealState.getPartitionSet(); - Assert.assertEquals(segments.size(), 2); + assertEquals(segments.size(), 2); for (String segmentName : segments) { - TEST_INSTANCE.getHelixResourceManager().deleteSegment(offlineTableName, segmentName); + resourceManager.deleteSegment(OFFLINE_TABLE_NAME, segmentName); } - idealState = TEST_INSTANCE.getHelixAdmin() - .getResourceIdealState(TEST_INSTANCE.getHelixClusterName(), offlineTableName); - Assert.assertEquals(idealState.getPartitionSet().size(), 0); + idealState = resourceManager.getTableIdealState(OFFLINE_TABLE_NAME); + assertNotNull(idealState); + assertEquals(idealState.getNumPartitions(), 0); - // Concurrent segment deletion - ExecutorService addSegmentExecutor = Executors.newFixedThreadPool(3); + // Concurrent add/deletion + ExecutorService executor = Executors.newFixedThreadPool(3); + Future<?>[] futures = new Future[3]; for (int i = 0; i < 3; i++) { - addSegmentExecutor.execute(new Runnable() { - @Override - public void run() { - for (int i = 0; i < 10; i++) { - TEST_INSTANCE.getHelixResourceManager() - .addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME), - "downloadUrl"); - } + futures[i] = executor.submit(() -> { + for (int i1 = 0; i1 < 10; i1++) { + resourceManager.addNewSegment(OFFLINE_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME), "downloadUrl"); } }); } - addSegmentExecutor.shutdown(); - addSegmentExecutor.awaitTermination(1, TimeUnit.MINUTES); - - idealState = TEST_INSTANCE.getHelixAdmin() - .getResourceIdealState(TEST_INSTANCE.getHelixClusterName(), offlineTableName); - Assert.assertEquals(idealState.getPartitionSet().size(), 30); - - ExecutorService deleteSegmentExecutor = Executors.newFixedThreadPool(3); - for (final String segmentName : idealState.getPartitionSet()) { - deleteSegmentExecutor.execute(new Runnable() { - @Override - public void run() { - TEST_INSTANCE.getHelixResourceManager().deleteSegment(offlineTableName, segmentName); - } - }); + for (int i = 0; i < 3; i++) { + futures[i].get(); + } + idealState = resourceManager.getTableIdealState(OFFLINE_TABLE_NAME); + assertNotNull(idealState); + segments = idealState.getPartitionSet(); + assertEquals(segments.size(), 30); + + futures = new Future[30]; + int index = 0; + for (String segment : segments) { + futures[index++] = executor.submit(() -> resourceManager.deleteSegment(OFFLINE_TABLE_NAME, segment)); + } + for (int i = 0; i < 30; i++) { + futures[i].get(); } - deleteSegmentExecutor.shutdown(); - deleteSegmentExecutor.awaitTermination(1, TimeUnit.MINUTES); + idealState = resourceManager.getTableIdealState(OFFLINE_TABLE_NAME); + assertNotNull(idealState); + assertEquals(idealState.getNumPartitions(), 0); - idealState = TEST_INSTANCE.getHelixAdmin() - .getResourceIdealState(TEST_INSTANCE.getHelixClusterName(), offlineTableName); - Assert.assertEquals(idealState.getPartitionSet().size(), 0); + executor.shutdown(); } @Test public void testAddingRealtimeTableSegmentsWithPartitionIdInZkMetadata() { // Add three segments: two from partition 0 and 1 from partition 1; - String partition0Segment0 = "realtimeResourceManagerTestTable__aa"; - String partition0Segment1 = "realtimeResourceManagerTestTable__bb"; - String partition1Segment1 = "realtimeResourceManagerTestTable__cc"; - TEST_INSTANCE.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME, SegmentMetadataMockUtils - .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME, partition0Segment0, PARTITION_COLUMN, 0), - "downloadUrl"); - TEST_INSTANCE.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME, SegmentMetadataMockUtils - .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME, partition0Segment1, PARTITION_COLUMN, 0), - "downloadUrl"); - TEST_INSTANCE.getHelixResourceManager().addNewSegment(REALTIME_TABLE_NAME, SegmentMetadataMockUtils - .mockSegmentMetadataWithPartitionInfo(REALTIME_TABLE_NAME, partition1Segment1, PARTITION_COLUMN, 1), - "downloadUrl"); - Map<String, Integer> segment2PartitionId = new HashMap<>(); - segment2PartitionId.put(partition0Segment0, 0); - segment2PartitionId.put(partition0Segment1, 0); - segment2PartitionId.put(partition1Segment1, 1); - - IdealState idealState = TEST_INSTANCE.getHelixAdmin() - .getResourceIdealState(TEST_INSTANCE.getHelixClusterName(), - TableNameBuilder.REALTIME.tableNameWithType(REALTIME_TABLE_NAME)); + String partition0Segment0 = "p0s0"; + String partition0Segment1 = "p0s1"; + String partition1Segment0 = "p1s0"; + _resourceManager.addNewSegment(REALTIME_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME, partition0Segment0, + PARTITION_COLUMN, 0), "downloadUrl"); + _resourceManager.addNewSegment(REALTIME_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME, partition0Segment1, + PARTITION_COLUMN, 0), "downloadUrl"); + _resourceManager.addNewSegment(REALTIME_TABLE_NAME, + SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME, partition1Segment0, + PARTITION_COLUMN, 1), "downloadUrl"); + + IdealState idealState = _resourceManager.getTableIdealState(REALTIME_TABLE_NAME); + assertNotNull(idealState); Set<String> segments = idealState.getPartitionSet(); - Assert.assertEquals(segments.size(), 5); - Assert.assertTrue(segments.contains(partition0Segment0)); - Assert.assertTrue(segments.contains(partition0Segment1)); - Assert.assertTrue(segments.contains(partition1Segment1)); + // 2 consuming segments, 3 uploaded segments + assertEquals(segments.size(), 5); + assertTrue(segments.contains(partition0Segment0)); + assertTrue(segments.contains(partition0Segment1)); + assertTrue(segments.contains(partition1Segment0)); // Check the segments of the same partition is assigned to the same set of servers. - Map<Integer, Set<String>> segmentAssignment = new HashMap<>(); + Map<Integer, Set<String>> partitionIdToServersMap = new HashMap<>(); for (String segment : segments) { - Integer partitionId; + int partitionId; LLCSegmentName llcSegmentName = LLCSegmentName.of(segment); if (llcSegmentName != null) { partitionId = llcSegmentName.getPartitionGroupId(); } else { - partitionId = segment2PartitionId.get(segment); + partitionId = Integer.parseInt(segment.substring(1, 2)); } - Assert.assertNotNull(partitionId); Set<String> instances = idealState.getInstanceSet(segment); - if (segmentAssignment.containsKey(partitionId)) { - Assert.assertEquals(instances, segmentAssignment.get(partitionId)); + if (partitionIdToServersMap.containsKey(partitionId)) { + assertEquals(instances, partitionIdToServersMap.get(partitionId)); } else { - segmentAssignment.put(partitionId, instances); + partitionIdToServersMap.put(partitionId, instances); } } } @AfterClass public void tearDown() { - TEST_INSTANCE.cleanup(); + _testInstance.cleanup(); } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java index 4a47d56512..f411be0daa 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/utils/SegmentMetadataMockUtils.java @@ -71,7 +71,7 @@ public class SegmentMetadataMockUtils { return segmentZKMetadata; } - public static SegmentMetadata mockSegmentMetadataWithPartitionInfo(String tableName, String segmentName, + public static SegmentMetadata mockSegmentMetadataWithPartitionInfo(String rawTableName, String segmentName, String columnName, int partitionNumber) { ColumnMetadata columnMetadata = mock(ColumnMetadata.class); Set<Integer> partitions = Collections.singleton(partitionNumber); @@ -82,7 +82,7 @@ public class SegmentMetadataMockUtils { if (columnName != null) { when(segmentMetadata.getColumnMetadataFor(columnName)).thenReturn(columnMetadata); } - when(segmentMetadata.getTableName()).thenReturn(tableName); + when(segmentMetadata.getTableName()).thenReturn(rawTableName); when(segmentMetadata.getName()).thenReturn(segmentName); when(segmentMetadata.getCrc()).thenReturn("0"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org