This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7af3edbd52 Fix flakiness and cleanup SegmentLineageCleanupTest (#9256)
7af3edbd52 is described below

commit 7af3edbd52c80dec2fb46830fd84cedb985b6b82
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Fri Aug 19 16:27:41 2022 -0700

    Fix flakiness and cleanup SegmentLineageCleanupTest (#9256)
---
 .../core/retention/SegmentLineageCleanupTest.java  | 189 +++++++--------------
 1 file changed, 63 insertions(+), 126 deletions(-)

diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
index 66246b9c62..be1de7f92d 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/SegmentLineageCleanupTest.java
@@ -18,9 +18,7 @@
  */
 package org.apache.pinot.controller.helix.core.retention;
 
-import java.io.IOException;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.lineage.LineageEntry;
@@ -31,214 +29,153 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.LeadControllerManager;
 import org.apache.pinot.controller.helix.ControllerTest;
-import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.config.table.ingestion.BatchIngestionConfig;
 import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
-import org.apache.pinot.spi.config.tenant.Tenant;
-import org.apache.pinot.spi.config.tenant.TenantRole;
-import org.apache.pinot.spi.metrics.PinotMetricUtils;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
-import org.testng.Assert;
+import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.mockito.Mockito.anyString;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 
 
 public class SegmentLineageCleanupTest {
   private static final ControllerTest TEST_INSTANCE = 
ControllerTest.getInstance();
-  private static final int NUM_INSTANCES = 1;
-  private static final long MAX_TIMEOUT_IN_MILLISECOND = 10_000L; // 10 seconds
-
   private static final String OFFLINE_TABLE_NAME = "segmentTable_OFFLINE";
   private static final String REFRESH_OFFLINE_TABLE_NAME = 
"refreshSegmentTable_OFFLINE";
-  private static final String BROKER_TENANT_NAME = "brokerTenant";
-  private static final String SERVER_TENANT_NAME = "serverTenant";
-  private static final String RETENTION_TIME_UNIT = "DAYS";
-  private static final String RETENTION_TIME_VALUE = "1";
 
+  private PinotHelixResourceManager _resourceManager;
   private RetentionManager _retentionManager;
 
   @BeforeClass
   public void setUp()
       throws Exception {
     TEST_INSTANCE.setupSharedStateAndValidate();
-
-    // Create server tenant
-    Tenant serverTenant = new Tenant(TenantRole.SERVER, SERVER_TENANT_NAME, 
NUM_INSTANCES, NUM_INSTANCES, 0);
-    TEST_INSTANCE.getHelixResourceManager().createServerTenant(serverTenant);
-
-    // Create broker tenant
-    Tenant brokerTenant = new Tenant(TenantRole.BROKER, BROKER_TENANT_NAME, 
NUM_INSTANCES, 0, 0);
-    PinotResourceManagerResponse response =
-        
TEST_INSTANCE.getHelixResourceManager().createBrokerTenant(brokerTenant);
-    Assert.assertTrue(response.isSuccessful());
-
-    // Enable lead controller resource
-    TEST_INSTANCE.enableResourceConfigForLeadControllerResource(true);
+    _resourceManager = TEST_INSTANCE.getHelixResourceManager();
 
     // Create the retention manager
-    LeadControllerManager leadControllerManager = 
mock(LeadControllerManager.class);
-    when(leadControllerManager.isLeaderForTable(anyString())).thenReturn(true);
-    ControllerConf conf = new ControllerConf();
-    ControllerMetrics controllerMetrics = new 
ControllerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
-    conf.setRetentionControllerFrequencyInSeconds(0);
-    conf.setDeletedSegmentsRetentionInDays(0);
-    _retentionManager = new 
RetentionManager(TEST_INSTANCE.getHelixResourceManager(), 
leadControllerManager, conf,
-        controllerMetrics);
+    ControllerConf controllerConf = new ControllerConf();
+    controllerConf.setRetentionControllerFrequencyInSeconds(0);
+    controllerConf.setDeletedSegmentsRetentionInDays(0);
+    _retentionManager = new RetentionManager(_resourceManager, 
mock(LeadControllerManager.class), controllerConf,
+        mock(ControllerMetrics.class));
 
     // Update table config
     TableConfig tableConfig =
-        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setBrokerTenant(BROKER_TENANT_NAME)
-            
.setServerTenant(SERVER_TENANT_NAME).setNumReplicas(1).setRetentionTimeUnit(RETENTION_TIME_UNIT)
-            .setRetentionTimeValue(RETENTION_TIME_VALUE).build();
-    TEST_INSTANCE.getHelixResourceManager().addTable(tableConfig);
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(OFFLINE_TABLE_NAME).setNumReplicas(1).build();
+    _resourceManager.addTable(tableConfig);
 
     IngestionConfig ingestionConfig = new IngestionConfig();
     ingestionConfig.setBatchIngestionConfig(new BatchIngestionConfig(null, 
"REFRESH", "DAILY"));
-    TableConfig refreshTableConfig = new 
TableConfigBuilder(TableType.OFFLINE).setTableName(REFRESH_OFFLINE_TABLE_NAME)
-        
.setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).setNumReplicas(1)
-        
.setRetentionTimeUnit(RETENTION_TIME_UNIT).setRetentionTimeValue(RETENTION_TIME_VALUE)
-        .setIngestionConfig(ingestionConfig).build();
-    TEST_INSTANCE.getHelixResourceManager().addTable(refreshTableConfig);
+    TableConfig refreshTableConfig =
+        new 
TableConfigBuilder(TableType.OFFLINE).setTableName(REFRESH_OFFLINE_TABLE_NAME).setNumReplicas(1)
+            .setIngestionConfig(ingestionConfig).build();
+    _resourceManager.addTable(refreshTableConfig);
   }
 
   @Test
-  public void testSegmentLineageCleanup()
-      throws IOException, InterruptedException {
+  public void testSegmentLineageCleanup() {
     // Create metadata for original segments.
     for (int i = 0; i < 5; i++) {
-      TEST_INSTANCE.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME,
+      _resourceManager.addNewSegment(OFFLINE_TABLE_NAME,
           SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"segment_" + i), "downloadUrl");
     }
 
     // Create metadata for merged segments.
     for (int i = 0; i < 2; i++) {
-      TEST_INSTANCE.getHelixResourceManager().addNewSegment(OFFLINE_TABLE_NAME,
+      _resourceManager.addNewSegment(OFFLINE_TABLE_NAME,
           SegmentMetadataMockUtils.mockSegmentMetadata(OFFLINE_TABLE_NAME, 
"merged_" + i), "downloadUrl");
     }
-    
Assert.assertEquals(TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME,
 false).size(),
-        7);
-    long currentTimeInMillis = System.currentTimeMillis();
+
+    assertEquals(getNumSegments(OFFLINE_TABLE_NAME), 7);
 
     // Validate the case when the lineage entry state is 'IN_PROGRESS'.
+    long currentTimeMs = System.currentTimeMillis();
     SegmentLineage segmentLineage = new SegmentLineage(OFFLINE_TABLE_NAME);
     segmentLineage.addLineageEntry("0",
         new LineageEntry(Arrays.asList("segment_0", "segment_1"), 
Arrays.asList("merged_0"),
-            LineageEntryState.IN_PROGRESS, currentTimeInMillis));
-    SegmentLineageAccessHelper
-        
.writeSegmentLineage(TEST_INSTANCE.getHelixResourceManager().getPropertyStore(),
 segmentLineage, -1);
+            LineageEntryState.IN_PROGRESS, currentTimeMs));
+    
SegmentLineageAccessHelper.writeSegmentLineage(_resourceManager.getPropertyStore(),
 segmentLineage, -1);
     _retentionManager.processTable(OFFLINE_TABLE_NAME);
-    waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 7);
-    List<String> segmentsForTable =
-        
TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME, 
false);
-    Assert.assertEquals(segmentsForTable.size(), 7);
+    verifySegmentsDeleted(7);
 
     // Validate the case when the lineage entry state is 'COMPLETED'.
     segmentLineage.updateLineageEntry("0",
         new LineageEntry(Arrays.asList("segment_0", "segment_1"), 
Arrays.asList("merged_0"),
-            LineageEntryState.COMPLETED, currentTimeInMillis));
-    SegmentLineageAccessHelper
-        
.writeSegmentLineage(TEST_INSTANCE.getHelixResourceManager().getPropertyStore(),
 segmentLineage, -1);
+            LineageEntryState.COMPLETED, currentTimeMs));
+    
SegmentLineageAccessHelper.writeSegmentLineage(_resourceManager.getPropertyStore(),
 segmentLineage, -1);
     _retentionManager.processTable(OFFLINE_TABLE_NAME);
-    waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 5);
-    segmentsForTable = 
TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME, 
false);
-    Assert.assertEquals(segmentsForTable.size(), 5);
-    Assert.assertTrue(Collections.disjoint(segmentsForTable, 
Arrays.asList("segment_0", "segment_1")));
+    verifySegmentsDeleted(5);
 
     // Validate the case when the lineage entry state is 'COMPLETED' and all 
segments are deleted.
-    TEST_INSTANCE.getHelixResourceManager().deleteSegment(OFFLINE_TABLE_NAME, 
"merged_0");
-    waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 4);
+    _resourceManager.deleteSegment(OFFLINE_TABLE_NAME, "merged_0");
+    verifySegmentsDeleted(4);
     _retentionManager.processTable(OFFLINE_TABLE_NAME);
-    waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 4);
-    segmentsForTable = 
TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME, 
false);
-    Assert.assertEquals(segmentsForTable.size(), 4);
-    Assert.assertTrue(Collections.disjoint(segmentsForTable, 
Arrays.asList("segment_0", "segment_1", "merged_0")));
-    segmentLineage = SegmentLineageAccessHelper
-        
.getSegmentLineage(TEST_INSTANCE.getHelixResourceManager().getPropertyStore(), 
OFFLINE_TABLE_NAME);
-    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 0);
+    verifySegmentsDeleted(4);
+    assertEquals(getSegments(OFFLINE_TABLE_NAME), Arrays.asList("merged_1", 
"segment_2", "segment_3", "segment_4"));
+    segmentLineage =
+        
SegmentLineageAccessHelper.getSegmentLineage(_resourceManager.getPropertyStore(),
 OFFLINE_TABLE_NAME);
+    assertEquals(segmentLineage.getLineageEntryIds().size(), 0);
 
     // Validate the case when the lineage entry state is 'IN_PROGRESS' and 
timestamp is old.
     LineageEntry lineageEntry =
         new LineageEntry(Arrays.asList("segment_2", "segment_3"), 
Arrays.asList("merged_1", "merged_2"),
-            LineageEntryState.IN_PROGRESS, currentTimeInMillis - 
TimeUnit.DAYS.toMillis(2L));
+            LineageEntryState.IN_PROGRESS, currentTimeMs - 
TimeUnit.DAYS.toMillis(2L));
     segmentLineage.addLineageEntry("1", lineageEntry);
-    SegmentLineageAccessHelper
-        
.writeSegmentLineage(TEST_INSTANCE.getHelixResourceManager().getPropertyStore(),
 segmentLineage, -1);
+    
SegmentLineageAccessHelper.writeSegmentLineage(_resourceManager.getPropertyStore(),
 segmentLineage, -1);
     _retentionManager.processTable(OFFLINE_TABLE_NAME);
-    waitForSegmentsToDelete(OFFLINE_TABLE_NAME, 3);
-    segmentsForTable = 
TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(OFFLINE_TABLE_NAME, 
false);
-    Assert.assertEquals(segmentsForTable.size(), 3);
-    Assert.assertTrue(Collections.disjoint(segmentsForTable, 
Arrays.asList("merged_1", "merged_2")));
-    segmentLineage = SegmentLineageAccessHelper
-        
.getSegmentLineage(TEST_INSTANCE.getHelixResourceManager().getPropertyStore(), 
OFFLINE_TABLE_NAME);
-    Assert.assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
+    verifySegmentsDeleted(3);
+    assertEquals(getSegments(OFFLINE_TABLE_NAME), Arrays.asList("segment_2", 
"segment_3", "segment_4"));
+    segmentLineage =
+        
SegmentLineageAccessHelper.getSegmentLineage(_resourceManager.getPropertyStore(),
 OFFLINE_TABLE_NAME);
+    assertEquals(segmentLineage.getLineageEntryIds().size(), 1);
+  }
+
+  private void verifySegmentsDeleted(int expectedNumRemainingSegments) {
+    // Segment deletion happens asynchronously
+    TestUtils.waitForCondition(aVoid -> getNumSegments(OFFLINE_TABLE_NAME) == 
expectedNumRemainingSegments, 60_000L,
+        "Failed to delete the segments");
+  }
+
+  private List<String> getSegments(String tableNameWithType) {
+    return _resourceManager.getSegmentsFor(tableNameWithType, false);
+  }
+
+  private int getNumSegments(String tableNameWithType) {
+    return getSegments(tableNameWithType).size();
   }
 
   @Test
-  public void testRefreshTableCleanup()
-      throws InterruptedException {
+  public void testRefreshTableCleanup() {
     // Create metadata for original segments
     for (int i = 0; i < 3; i++) {
-      
TEST_INSTANCE.getHelixResourceManager().addNewSegment(REFRESH_OFFLINE_TABLE_NAME,
+      _resourceManager.addNewSegment(REFRESH_OFFLINE_TABLE_NAME,
           
SegmentMetadataMockUtils.mockSegmentMetadata(REFRESH_OFFLINE_TABLE_NAME, 
"segment1_" + i), "downloadUrl");
     }
 
     // Create metadata for new segments.
     for (int i = 0; i < 3; i++) {
-      
TEST_INSTANCE.getHelixResourceManager().addNewSegment(REFRESH_OFFLINE_TABLE_NAME,
+      _resourceManager.addNewSegment(REFRESH_OFFLINE_TABLE_NAME,
           
SegmentMetadataMockUtils.mockSegmentMetadata(REFRESH_OFFLINE_TABLE_NAME, 
"segment2_" + i), "downloadUrl");
     }
 
-    Assert.assertEquals(
-        
TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME,
 false).size(), 6);
+    assertEquals(getNumSegments(REFRESH_OFFLINE_TABLE_NAME), 6);
 
     // Validate the case when the lineage entry state is 'IN_PROGRESS'
     SegmentLineage segmentLineage = new 
SegmentLineage(REFRESH_OFFLINE_TABLE_NAME);
     segmentLineage.addLineageEntry("0", new 
LineageEntry(Arrays.asList("segment1_0", "segment1_1", "segment1_2"),
         Arrays.asList("segment2_0", "segment2_1", "segment2_2"), 
LineageEntryState.IN_PROGRESS,
         System.currentTimeMillis()));
-    SegmentLineageAccessHelper
-        
.writeSegmentLineage(TEST_INSTANCE.getHelixResourceManager().getPropertyStore(),
 segmentLineage, -1);
+    
SegmentLineageAccessHelper.writeSegmentLineage(_resourceManager.getPropertyStore(),
 segmentLineage, -1);
     _retentionManager.processTable(REFRESH_OFFLINE_TABLE_NAME);
-    try {
-      waitForSegmentsToDelete(REFRESH_OFFLINE_TABLE_NAME, 3, 1000L);
-      Assert.fail();
-    } catch (Exception e) {
-      // expected since the original segments are not supposed to be 
immediately erased by the retention manager.
-    }
-    List<String> segmentsForTable =
-        
TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME,
 false);
-    Assert.assertEquals(segmentsForTable.size(), 6);
-
-    segmentsForTable = 
TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME,
 true);
-    Assert.assertEquals(segmentsForTable.size(), 3);
-  }
-
-  private void waitForSegmentsToDelete(String tableNameWithType, int 
expectedNumSegmentsAfterDelete)
-      throws InterruptedException {
-    waitForSegmentsToDelete(tableNameWithType, expectedNumSegmentsAfterDelete, 
MAX_TIMEOUT_IN_MILLISECOND);
-  }
-
-  private void waitForSegmentsToDelete(String tableNameWithType, int 
expectedNumSegmentsAfterDelete,
-      long timeOutInMillis)
-      throws InterruptedException {
-    long endTimeMs = System.currentTimeMillis() + timeOutInMillis;
-    do {
-      if 
(TEST_INSTANCE.getHelixResourceManager().getSegmentsFor(tableNameWithType, 
false).size()
-          == expectedNumSegmentsAfterDelete) {
-        return;
-      } else {
-        Thread.sleep(500L);
-      }
-    } while (System.currentTimeMillis() < endTimeMs);
-    throw new RuntimeException("Timeout while waiting for segments to be 
deleted");
+    assertEquals(getNumSegments(REFRESH_OFFLINE_TABLE_NAME), 6);
+    assertEquals(_resourceManager.getSegmentsFor(REFRESH_OFFLINE_TABLE_NAME, 
true).size(), 3);
   }
 
   @AfterClass


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to