This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 7af3edbd52 Fix flakiness and cleanup SegmentLineageCleanupTest (#9256) 7af3edbd52 is described below commit 7af3edbd52c80dec2fb46830fd84cedb985b6b82 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Fri Aug 19 16:27:41 2022 -0700 Fix flakiness and cleanup SegmentLineageCleanupTest (#9256) --- .../core/retention/SegmentLineageCleanupTest.java | 189 +++++++-------------- 1 file changed, 63 insertions(+), 126 deletions(-) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java index 66246b9c62..be1de7f92d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java @@ -18,9 +18,7 @@ */ package org.apache.pinot.controller.helix.core.retention; -import java.io.IOException; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.lineage.LineageEntry; @@ -31,214 +29,153 @@ import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.ControllerTest; -import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.utils.SegmentMetadataMockUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig; import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; -import org.apache.pinot.spi.config.tenant.Tenant; -import org.apache.pinot.spi.config.tenant.TenantRole; -import org.apache.pinot.spi.metrics.PinotMetricUtils; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; -import org.testng.Assert; +import org.apache.pinot.util.TestUtils; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.mockito.Mockito.anyString; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; public class SegmentLineageCleanupTest { private static final ControllerTest TEST_INSTANCE = ControllerTest.getInstance(); - private static final int NUM_INSTANCES = 1; - private static final long MAX_TIMEOUT_IN_MILLISECOND = 10_000L; // 10 seconds - private static final String OFFLINE_TABLE_NAME = "segmentTable_OFFLINE"; private static final String REFRESH_OFFLINE_TABLE_NAME = "refreshSegmentTable_OFFLINE"; - private static final String BROKER_TENANT_NAME = "brokerTenant"; - private static final String SERVER_TENANT_NAME = "serverTenant"; - private static final String RETENTION_TIME_UNIT = "DAYS"; - private static final String RETENTION_TIME_VALUE = "1"; + private PinotHelixResourceManager _resourceManager; private RetentionManager _retentionManager; @BeforeClass public void setUp() throws Exception { TEST_INSTANCE.setupSharedStateAndValidate(); - - // Create server tenant - Tenant serverTenant = new Tenant(TenantRole.SERVER, SERVER_TENANT_NAME, NUM_INSTANCES, NUM_INSTANCES, 0); - TEST_INSTANCE.getHelixResourceManager().createServerTenant(serverTenant); - - // Create broker tenant - Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, NUM_INSTANCES, 0, 0); - PinotResourceManagerResponse response = - TEST_INSTANCE.getHelixResourceManager().createBrokerTenant(brokerTenant); - Assert.assertTrue(response.isSuccessful()); - - // Enable lead controller resource - TEST_INSTANCE.enableResourceConfigForLeadControllerResource(true); + _resourceManager = TEST_INSTANCE.getHelixResourceManager(); // Create the retention manager - LeadControllerManager leadControllerManager = mock(LeadControllerManager.class); - when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true); - ControllerConf conf = new ControllerConf(); - ControllerMetrics controllerMetrics = new ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); - conf.setRetentionControllerFrequencyInSeconds(0); - conf.setDeletedSegmentsRetentionInDays(0); - _retentionManager = new RetentionManager(TEST_INSTANCE.getHelixResourceManager(), leadControllerManager, conf, - controllerMetrics); + ControllerConf controllerConf = new ControllerConf(); + controllerConf.setRetentionControllerFrequencyInSeconds(0); + controllerConf.setDeletedSegmentsRetentionInDays(0); + _retentionManager = new RetentionManager(_resourceManager, mock(LeadControllerManager.class), controllerConf, + mock(ControllerMetrics.class)); // Update table config TableConfig tableConfig = - new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME) - .setServerTenant(SERVER_TENANT_NAME).setNumReplicas(1).setRetentionTimeUnit(RETENTION_TIME_UNIT) - .setRetentionTimeValue(RETENTION_TIME_VALUE).build(); - TEST_INSTANCE.getHelixResourceManager().addTable(tableConfig); + new TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(1).build(); + _resourceManager.addTable(tableConfig); IngestionConfig ingestionConfig = new IngestionConfig(); ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, "REFRESH", "DAILY")); - TableConfig refreshTableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(REFRESH_OFFLINE_TABLE_NAME) - .setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).setNumReplicas(1) - .setRetentionTimeUnit(RETENTION_TIME_UNIT).setRetentionTimeValue(RETENTION_TIME_VALUE) - .setIngestionConfig(ingestionConfig).build(); - TEST_INSTANCE.getHelixResourceManager().addTable(refreshTableConfig); + TableConfig refreshTableConfig = + new TableConfigBuilder(TableType.OFFLINE).setTableName(REFRESH_OFFLINE_TABLE_NAME).setNumReplicas(1) + .setIngestionConfig(ingestionConfig).build(); + _resourceManager.addTable(refreshTableConfig); } @Test - public void testSegmentLineageCleanup() - throws IOException, InterruptedException { + public void testSegmentLineageCleanup() { // Create metadata for original segments. for (int i = 0; i < 5; i++) { - TEST_INSTANCE.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME, + _resourceManager.addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "segment_" + i), "downloadUrl"); } // Create metadata for merged segments. for (int i = 0; i < 2; i++) { - TEST_INSTANCE.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME, + _resourceManager.addNewSegment(OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, "merged_" + i), "downloadUrl"); } - Assert.assertEquals(TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME, false).size(), - 7); - long currentTimeInMillis = System.currentTimeMillis(); + + assertEquals(getNumSegments(OFFLINE_TABLE_NAME), 7); // Validate the case when the lineage entry state is 'IN_PROGRESS'. + long currentTimeMs = System.currentTimeMillis(); SegmentLineage segmentLineage = new SegmentLineage(OFFLINE_TABLE_NAME); segmentLineage.addLineageEntry("0", new LineageEntry(Arrays.asList("segment_0", "segment_1"), Arrays.asList("merged_0"), - LineageEntryState.IN_PROGRESS, currentTimeInMillis)); - SegmentLineageAccessHelper - .writeSegmentLineage(TEST_INSTANCE.getHelixResourceManager().getPropertyStore(), segmentLineage, -1); + LineageEntryState.IN_PROGRESS, currentTimeMs)); + SegmentLineageAccessHelper.writeSegmentLineage(_resourceManager.getPropertyStore(), segmentLineage, -1); _retentionManager.processTable(OFFLINE_TABLE_NAME); - waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 7); - List<String> segmentsForTable = - TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME, false); - Assert.assertEquals(segmentsForTable.size(), 7); + verifySegmentsDeleted(7); // Validate the case when the lineage entry state is 'COMPLETED'. segmentLineage.updateLineageEntry("0", new LineageEntry(Arrays.asList("segment_0", "segment_1"), Arrays.asList("merged_0"), - LineageEntryState.COMPLETED, currentTimeInMillis)); - SegmentLineageAccessHelper - .writeSegmentLineage(TEST_INSTANCE.getHelixResourceManager().getPropertyStore(), segmentLineage, -1); + LineageEntryState.COMPLETED, currentTimeMs)); + SegmentLineageAccessHelper.writeSegmentLineage(_resourceManager.getPropertyStore(), segmentLineage, -1); _retentionManager.processTable(OFFLINE_TABLE_NAME); - waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 5); - segmentsForTable = TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME, false); - Assert.assertEquals(segmentsForTable.size(), 5); - Assert.assertTrue(Collections.disjoint(segmentsForTable, Arrays.asList("segment_0", "segment_1"))); + verifySegmentsDeleted(5); // Validate the case when the lineage entry state is 'COMPLETED' and all segments are deleted. - TEST_INSTANCE.getHelixResourceManager().deleteSegment(OFFLINE_TABLE_NAME, "merged_0"); - waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 4); + _resourceManager.deleteSegment(OFFLINE_TABLE_NAME, "merged_0"); + verifySegmentsDeleted(4); _retentionManager.processTable(OFFLINE_TABLE_NAME); - waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 4); - segmentsForTable = TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME, false); - Assert.assertEquals(segmentsForTable.size(), 4); - Assert.assertTrue(Collections.disjoint(segmentsForTable, Arrays.asList("segment_0", "segment_1", "merged_0"))); - segmentLineage = SegmentLineageAccessHelper - .getSegmentLineage(TEST_INSTANCE.getHelixResourceManager().getPropertyStore(), OFFLINE_TABLE_NAME); - Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 0); + verifySegmentsDeleted(4); + assertEquals(getSegments(OFFLINE_TABLE_NAME), Arrays.asList("merged_1", "segment_2", "segment_3", "segment_4")); + segmentLineage = + SegmentLineageAccessHelper.getSegmentLineage(_resourceManager.getPropertyStore(), OFFLINE_TABLE_NAME); + assertEquals(segmentLineage.getLineageEntryIds().size(), 0); // Validate the case when the lineage entry state is 'IN_PROGRESS' and timestamp is old. LineageEntry lineageEntry = new LineageEntry(Arrays.asList("segment_2", "segment_3"), Arrays.asList("merged_1", "merged_2"), - LineageEntryState.IN_PROGRESS, currentTimeInMillis - TimeUnit.DAYS.toMillis(2L)); + LineageEntryState.IN_PROGRESS, currentTimeMs - TimeUnit.DAYS.toMillis(2L)); segmentLineage.addLineageEntry("1", lineageEntry); - SegmentLineageAccessHelper - .writeSegmentLineage(TEST_INSTANCE.getHelixResourceManager().getPropertyStore(), segmentLineage, -1); + SegmentLineageAccessHelper.writeSegmentLineage(_resourceManager.getPropertyStore(), segmentLineage, -1); _retentionManager.processTable(OFFLINE_TABLE_NAME); - waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 3); - segmentsForTable = TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME, false); - Assert.assertEquals(segmentsForTable.size(), 3); - Assert.assertTrue(Collections.disjoint(segmentsForTable, Arrays.asList("merged_1", "merged_2"))); - segmentLineage = SegmentLineageAccessHelper - .getSegmentLineage(TEST_INSTANCE.getHelixResourceManager().getPropertyStore(), OFFLINE_TABLE_NAME); - Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1); + verifySegmentsDeleted(3); + assertEquals(getSegments(OFFLINE_TABLE_NAME), Arrays.asList("segment_2", "segment_3", "segment_4")); + segmentLineage = + SegmentLineageAccessHelper.getSegmentLineage(_resourceManager.getPropertyStore(), OFFLINE_TABLE_NAME); + assertEquals(segmentLineage.getLineageEntryIds().size(), 1); + } + + private void verifySegmentsDeleted(int expectedNumRemainingSegments) { + // Segment deletion happens asynchronously + TestUtils.waitForCondition(aVoid -> getNumSegments(OFFLINE_TABLE_NAME) == expectedNumRemainingSegments, 60_000L, + "Failed to delete the segments"); + } + + private List<String> getSegments(String tableNameWithType) { + return _resourceManager.getSegmentsFor(tableNameWithType, false); + } + + private int getNumSegments(String tableNameWithType) { + return getSegments(tableNameWithType).size(); } @Test - public void testRefreshTableCleanup() - throws InterruptedException { + public void testRefreshTableCleanup() { // Create metadata for original segments for (int i = 0; i < 3; i++) { - TEST_INSTANCE.getHelixResourceManager().addNewSegment(REFRESH_OFFLINE_TABLE_NAME, + _resourceManager.addNewSegment(REFRESH_OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(REFRESH_OFFLINE_TABLE_NAME, "segment1_" + i), "downloadUrl"); } // Create metadata for new segments. for (int i = 0; i < 3; i++) { - TEST_INSTANCE.getHelixResourceManager().addNewSegment(REFRESH_OFFLINE_TABLE_NAME, + _resourceManager.addNewSegment(REFRESH_OFFLINE_TABLE_NAME, SegmentMetadataMockUtils.mockSegmentMetadata(REFRESH_OFFLINE_TABLE_NAME, "segment2_" + i), "downloadUrl"); } - Assert.assertEquals( - TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME, false).size(), 6); + assertEquals(getNumSegments(REFRESH_OFFLINE_TABLE_NAME), 6); // Validate the case when the lineage entry state is 'IN_PROGRESS' SegmentLineage segmentLineage = new SegmentLineage(REFRESH_OFFLINE_TABLE_NAME); segmentLineage.addLineageEntry("0", new LineageEntry(Arrays.asList("segment1_0", "segment1_1", "segment1_2"), Arrays.asList("segment2_0", "segment2_1", "segment2_2"), LineageEntryState.IN_PROGRESS, System.currentTimeMillis())); - SegmentLineageAccessHelper - .writeSegmentLineage(TEST_INSTANCE.getHelixResourceManager().getPropertyStore(), segmentLineage, -1); + SegmentLineageAccessHelper.writeSegmentLineage(_resourceManager.getPropertyStore(), segmentLineage, -1); _retentionManager.processTable(REFRESH_OFFLINE_TABLE_NAME); - try { - waitForSegmentsToDelete(REFRESH_OFFLINE_TABLE_NAME, 3, 1000L); - Assert.fail(); - } catch (Exception e) { - // expected since the original segments are not supposed to be immediately erased by the retention manager. - } - List<String> segmentsForTable = - TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME, false); - Assert.assertEquals(segmentsForTable.size(), 6); - - segmentsForTable = TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME, true); - Assert.assertEquals(segmentsForTable.size(), 3); - } - - private void waitForSegmentsToDelete(String tableNameWithType, int expectedNumSegmentsAfterDelete) - throws InterruptedException { - waitForSegmentsToDelete(tableNameWithType, expectedNumSegmentsAfterDelete, MAX_TIMEOUT_IN_MILLISECOND); - } - - private void waitForSegmentsToDelete(String tableNameWithType, int expectedNumSegmentsAfterDelete, - long timeOutInMillis) - throws InterruptedException { - long endTimeMs = System.currentTimeMillis() + timeOutInMillis; - do { - if (TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(tableNameWithType, false).size() - == expectedNumSegmentsAfterDelete) { - return; - } else { - Thread.sleep(500L); - } - } while (System.currentTimeMillis() < endTimeMs); - throw new RuntimeException("Timeout while waiting for segments to be deleted"); + assertEquals(getNumSegments(REFRESH_OFFLINE_TABLE_NAME), 6); + assertEquals(_resourceManager.getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME, true).size(), 3); } @AfterClass --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org