This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f4c85e9596 Fix the flaky UpsertTableSegmentUploadIntegrationTest 
(#8675)
f4c85e9596 is described below

commit f4c85e95969b0bc7276ec18ae118051af93e9f2c
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Tue May 10 14:25:13 2022 -0700

    Fix the flaky UpsertTableSegmentUploadIntegrationTest (#8675)
---
 .../pinot/integration/tests/ClusterTest.java       |  95 ++++-----
 .../tests/OfflineClusterIntegrationTest.java       |  10 +-
 .../UpsertTableSegmentUploadIntegrationTest.java   | 218 +++++++++------------
 3 files changed, 134 insertions(+), 189 deletions(-)

diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 35132c3bb8..e940fd0f06 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -67,6 +67,7 @@ import org.apache.pinot.spi.data.readers.GenericRow;
 import org.apache.pinot.spi.data.readers.RecordExtractor;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.stream.StreamMessageDecoder;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Broker;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import org.apache.pinot.spi.utils.CommonConstants.Minion;
@@ -299,28 +300,47 @@ public abstract class ClusterTest extends ControllerTest {
 
   /**
    * Upload all segments inside the given directory to the cluster.
-   *
-   * @param tarDir Segment directory
    */
   protected void uploadSegments(String tableName, File tarDir)
       throws Exception {
-    File[] segmentTarFiles = tarDir.listFiles();
-    assertNotNull(segmentTarFiles);
-    int numSegments = segmentTarFiles.length;
+    uploadSegments(tableName, TableType.OFFLINE, tarDir);
+  }
+
+  /**
+   * Upload all segments inside the given directory to the cluster.
+   */
+  protected void uploadSegments(String tableName, TableType tableType, File 
tarDir)
+      throws Exception {
+    uploadSegments(tableName, tableType, Collections.singletonList(tarDir));
+  }
+
+  /**
+   * Upload all segments inside the given directories to the cluster.
+   */
+  protected void uploadSegments(String tableName, TableType tableType, 
List<File> tarDirs)
+      throws Exception {
+    List<File> segmentTarFiles = new ArrayList<>();
+    for (File tarDir : tarDirs) {
+      File[] tarFiles = tarDir.listFiles();
+      assertNotNull(tarFiles);
+      Collections.addAll(segmentTarFiles, tarFiles);
+    }
+    int numSegments = segmentTarFiles.size();
     assertTrue(numSegments > 0);
 
-    URI uploadSegmentHttpURI = 
FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
+    URI uploadSegmentHttpURI =
+        
FileUploadDownloadClient.getUploadSegmentURI(CommonConstants.HTTP_PROTOCOL, 
LOCAL_HOST, _controllerPort);
     try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient()) {
       if (numSegments == 1) {
-        File segmentTarFile = segmentTarFiles[0];
+        File segmentTarFile = segmentTarFiles.get(0);
         if (System.currentTimeMillis() % 2 == 0) {
           assertEquals(
               fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, 
segmentTarFile.getName(), segmentTarFile,
-                  tableName).getStatusCode(), HttpStatus.SC_OK);
+                  tableName, tableType).getStatusCode(), HttpStatus.SC_OK);
         } else {
           assertEquals(
-              uploadSegmentWithOnlyMetadata(tableName, uploadSegmentHttpURI, 
fileUploadDownloadClient, segmentTarFile),
-              HttpStatus.SC_OK);
+              uploadSegmentWithOnlyMetadata(tableName, tableType, 
uploadSegmentHttpURI, fileUploadDownloadClient,
+                  segmentTarFile), HttpStatus.SC_OK);
         }
       } else {
         // Upload all segments in parallel
@@ -330,9 +350,9 @@ public abstract class ClusterTest extends ControllerTest {
           futures.add(executorService.submit(() -> {
             if (System.currentTimeMillis() % 2 == 0) {
               return 
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, 
segmentTarFile.getName(),
-                  segmentTarFile, tableName).getStatusCode();
+                  segmentTarFile, tableName, tableType).getStatusCode();
             } else {
-              return uploadSegmentWithOnlyMetadata(tableName, 
uploadSegmentHttpURI, fileUploadDownloadClient,
+              return uploadSegmentWithOnlyMetadata(tableName, tableType, 
uploadSegmentHttpURI, fileUploadDownloadClient,
                   segmentTarFile);
             }
           }));
@@ -345,60 +365,19 @@ public abstract class ClusterTest extends ControllerTest {
     }
   }
 
-  /**
-   * tarDirPaths contains a list of directories that contain segment files. 
API uploads all segments inside the given
-   * list of directories to the cluster.
-   *
-   * @param tarDirPaths List of directories containing segments
-   */
-  protected void uploadSegments(String tableName, List<File> tarDirPaths, 
TableType tableType,
-      boolean enableParallelPushProtection)
-      throws Exception {
-    List<File> segmentTarFiles = new ArrayList<>();
-
-    for (File tarDir : tarDirPaths) {
-      Collections.addAll(segmentTarFiles, tarDir.listFiles());
-    }
-    assertNotNull(segmentTarFiles);
-    int numSegments = segmentTarFiles.size();
-    assertTrue(numSegments > 0);
-
-    URI uploadSegmentHttpURI = 
FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
-    try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient()) {
-      if (numSegments == 1) {
-        File segmentTarFile = segmentTarFiles.get(0);
-        assertEquals(
-            fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, 
segmentTarFile.getName(), segmentTarFile,
-                tableName, tableType.OFFLINE, enableParallelPushProtection, 
true).getStatusCode(), HttpStatus.SC_OK);
-      } else {
-        // Upload all segments in parallel
-        ExecutorService executorService = 
Executors.newFixedThreadPool(numSegments);
-        List<Future<Integer>> futures = new ArrayList<>(numSegments);
-        for (File segmentTarFile : segmentTarFiles) {
-          futures.add(executorService.submit(() -> {
-            return 
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, 
segmentTarFile.getName(),
-                segmentTarFile, tableName, tableType.OFFLINE, 
enableParallelPushProtection, true).getStatusCode();
-          }));
-        }
-        executorService.shutdown();
-        for (Future<Integer> future : futures) {
-          assertEquals((int) future.get(), HttpStatus.SC_OK);
-        }
-      }
-    }
-  }
-
-  private int uploadSegmentWithOnlyMetadata(String tableName, URI 
uploadSegmentHttpURI,
+  private int uploadSegmentWithOnlyMetadata(String tableName, TableType 
tableType, URI uploadSegmentHttpURI,
       FileUploadDownloadClient fileUploadDownloadClient, File segmentTarFile)
       throws IOException, HttpErrorStatusException {
     List<Header> headers = ImmutableList.of(new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
         "file://" + segmentTarFile.getParentFile().getAbsolutePath() + "/" + 
URLEncoder.encode(segmentTarFile.getName(),
             StandardCharsets.UTF_8.toString())), new 
BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
         FileUploadDownloadClient.FileUploadType.METADATA.toString()));
-    // Add table name as a request parameter
+    // Add table name and table type as request parameters
     NameValuePair tableNameValuePair =
         new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, 
tableName);
-    List<NameValuePair> parameters = Arrays.asList(tableNameValuePair);
+    NameValuePair tableTypeValuePair =
+        new 
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, 
tableType.name());
+    List<NameValuePair> parameters = Arrays.asList(tableNameValuePair, 
tableTypeValuePair);
     return 
fileUploadDownloadClient.uploadSegmentMetadata(uploadSegmentHttpURI, 
segmentTarFile.getName(),
         segmentTarFile, headers, parameters, 
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
   }
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 9bb4853283..0669847093 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -184,13 +184,11 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     File tarDir2 = new File(_tempDir, "tarDir2");
     FileUtils.copyDirectory(_tarDir, tarDir2);
 
-    List<File> tarDirPaths = new ArrayList<>();
-    tarDirPaths.add(_tarDir);
-    tarDirPaths.add(tarDir2);
-
-    // TODO: Move this block to a separate method.
+    List<File> tarDirs = new ArrayList<>();
+    tarDirs.add(_tarDir);
+    tarDirs.add(tarDir2);
     try {
-      uploadSegments(getTableName(), tarDirPaths, TableType.OFFLINE, true);
+      uploadSegments(getTableName(), TableType.OFFLINE, tarDirs);
     } catch (Exception e) {
       // If enableParallelPushProtection is enabled and the same segment is 
uploaded concurrently, we could get one
       // of the two exception - 409 conflict of the second call enters 
ProcessExistingSegment ; segmentZkMetadata
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
index 616f150d52..e6f5ff248f 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java
@@ -20,48 +20,36 @@ package org.apache.pinot.integration.tests;
 
 import java.io.File;
 import java.io.IOException;
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import org.apache.commons.io.FileUtils;
-import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
-import org.apache.http.HttpStatus;
-import org.apache.pinot.common.utils.FileUploadDownloadClient;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.common.utils.helix.HelixHelper;
-import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.Schema;
+import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
 
 
 public class UpsertTableSegmentUploadIntegrationTest extends 
BaseClusterIntegrationTestSet {
-  private static final int NUM_BROKERS = 1;
   private static final int NUM_SERVERS = 2;
-  // Segment 1 contains records of pk value 100000
+  private static final String PRIMARY_KEY_COL = "clientId";
+  private static final String REALTIME_TABLE_NAME = 
TableNameBuilder.REALTIME.tableNameWithType(DEFAULT_TABLE_NAME);
+
+  // Segment 1 contains records of pk value 100000 (partition 0)
   private static final String UPLOADED_SEGMENT_1 = "mytable_10027_19736_0 %";
-  // Segment 2 contains records of pk value 100001
+  // Segment 2 contains records of pk value 100001 (partition 1)
   private static final String UPLOADED_SEGMENT_2 = "mytable_10072_19919_1 %";
-  // Segment 3 contains records of pk value 100000
+  // Segment 3 contains records of pk value 100002 (partition 1)
   private static final String UPLOADED_SEGMENT_3 = "mytable_10158_19938_2 %";
-  private static final String PRIMARY_KEY_COL = "clientId";
-  private static final String TABLE_NAME_WITH_TYPE = "mytable_REALTIME";
 
   @BeforeClass
   public void setUp()
@@ -72,27 +60,24 @@ public class UpsertTableSegmentUploadIntegrationTest 
extends BaseClusterIntegrat
     startZk();
     // Start a customized controller with more frequent realtime segment 
validation
     startController();
-    startBrokers(getNumBrokers());
+    startBroker();
     startServers(NUM_SERVERS);
 
-    // Start Kafka
-    startKafka();
-
-    // Create and upload the schema.
-    Schema schema = createSchema();
-    addSchema(schema);
-
     // Unpack the Avro files
     List<File> avroFiles = unpackAvroData(_tempDir);
 
-    // Push data to Kafka
+    // Start Kafka and push data into Kafka
+    startKafka();
     pushAvroIntoKafka(avroFiles);
-    // Create and upload the table config
-    TableConfig upsertTableConfig = createUpsertTableConfig(avroFiles.get(0), 
PRIMARY_KEY_COL, getNumKafkaPartitions());
-    addTableConfig(upsertTableConfig);
+
+    // Create and upload schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createUpsertTableConfig(avroFiles.get(0), 
PRIMARY_KEY_COL, getNumKafkaPartitions());
+    addTableConfig(tableConfig);
 
     // Create and upload segments
-    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, 
upsertTableConfig, schema, 0, _segmentDir, _tarDir);
+    ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, 
schema, 0, _segmentDir, _tarDir);
     uploadSegments(getTableName(), TableType.REALTIME, _tarDir);
 
     // Wait for all documents loaded
@@ -131,8 +116,9 @@ public class UpsertTableSegmentUploadIntegrationTest 
extends BaseClusterIntegrat
     return true;
   }
 
-  protected int getNumBrokers() {
-    return NUM_BROKERS;
+  @Override
+  protected String getPartitionColumn() {
+    return PRIMARY_KEY_COL;
   }
 
   @Override
@@ -142,113 +128,95 @@ public class UpsertTableSegmentUploadIntegrationTest 
extends BaseClusterIntegrat
   }
 
   @Override
-  protected String getPartitionColumn() {
-    return PRIMARY_KEY_COL;
+  protected void waitForAllDocsLoaded(long timeoutMs)
+      throws Exception {
+    TestUtils.waitForCondition(aVoid -> {
+      try {
+        return getCurrentCountStarResultWithoutUpsert() == 
getCountStarResultWithoutUpsert();
+      } catch (Exception e) {
+        return null;
+      }
+    }, 100L, timeoutMs, "Failed to load all documents");
+    assertEquals(getCurrentCountStarResult(), getCountStarResult());
   }
 
-  @Override
-  protected void startController()
-      throws Exception {
-    Map<String, Object> controllerConfig = getDefaultControllerConfiguration();
-    // Perform realtime segment validation every second with 1 second initial 
delay.
-    controllerConfig
-        
.put(ControllerConf.ControllerPeriodicTasksConf.DEPRECATED_REALTIME_SEGMENT_VALIDATION_FREQUENCY_IN_SECONDS,
 1);
-    controllerConfig
-        
.put(ControllerConf.ControllerPeriodicTasksConf.DEPRECATED_SEGMENT_LEVEL_VALIDATION_INTERVAL_IN_SECONDS,
 1);
-    controllerConfig
-        
.put(ControllerConf.ControllerPeriodicTasksConf.REALTIME_SEGMENT_VALIDATION_INITIAL_DELAY_IN_SECONDS,
 1);
-    startController(controllerConfig);
+  private long getCurrentCountStarResultWithoutUpsert() {
+    return getPinotConnection().execute("SELECT COUNT(*) FROM " + 
getTableName() + " OPTION(skipUpsert=true)")
+        .getResultSet(0).getLong(0);
+  }
+
+  private long getCountStarResultWithoutUpsert() {
+    // 3 Avro files, each with 100 documents, one copy from streaming source, 
one copy from batch source
+    return 600;
   }
 
   @Test
   public void testSegmentAssignment()
       throws Exception {
-    IdealState idealState = HelixHelper.getTableIdealState(_helixManager, 
TABLE_NAME_WITH_TYPE);
-    Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
-    verifyTableIdealStates(idealState);
-    // Wait 3 seconds to let the realtime validation thread to run.
-    Thread.sleep(3000);
-    // Verify the result again.
-    Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
-    verifyTableIdealStates(idealState);
-
-    // Restart the servers and check every segment is not in ERROR state.
+    verifyIdealState();
+
+    // Run the real-time segment validation and check again
+    _controllerStarter.getRealtimeSegmentValidationManager().run();
+    verifyIdealState();
+    assertEquals(getCurrentCountStarResult(), getCountStarResult());
+    assertEquals(getCurrentCountStarResultWithoutUpsert(), 
getCountStarResultWithoutUpsert());
+
+    // Restart the servers and check again
     restartServers();
-    verifyTableIdealStates(idealState);
-    ExternalView ev =
-        HelixHelper.getExternalViewForResource(_helixAdmin, 
this.getHelixClusterName(), TABLE_NAME_WITH_TYPE);
-    Set<String> segments = ev.getPartitionSet();
-    Assert.assertEquals(segments.size(), 5);
-    for (String segment : segments) {
-      Map<String, String> stateMap = ev.getStateMap(segment);
-      Assert.assertTrue(stateMap.size() > 0);
-      for (Map.Entry<String, String> server2state: stateMap.entrySet()) {
-        Assert.assertFalse("ERROR".equals(server2state.getValue()));
-      }
-    }
-    // Verify the result again.
-    Assert.assertEquals(getCurrentCountStarResult(), getCountStarResult());
+    verifyIdealState();
+    waitForAllDocsLoaded(600_000L);
   }
 
-  private void verifyTableIdealStates(IdealState idealState) {
-    // Verify various ideal state properties
-    Set<String> segments = idealState.getPartitionSet();
-    Assert.assertEquals(segments.size(), 5);
-    Map<String, Integer> segment2PartitionId = new HashMap<>();
-    segment2PartitionId.put(UPLOADED_SEGMENT_1, 0);
-    segment2PartitionId.put(UPLOADED_SEGMENT_2, 1);
-    segment2PartitionId.put(UPLOADED_SEGMENT_3, 1);
-
-    // Verify that all segments of the same partition are mapped to the same 
single server.
-    Map<Integer, Set<String>> segmentAssignment = new HashMap<>();
-    for (String segment : segments) {
-      Integer partitionId;
-      if (LLCSegmentName.isLowLevelConsumerSegmentName(segment)) {
-        partitionId = new LLCSegmentName(segment).getPartitionGroupId();
+  private void verifyIdealState() {
+    IdealState idealState = HelixHelper.getTableIdealState(_helixManager, 
REALTIME_TABLE_NAME);
+    Map<String, Map<String, String>> segmentAssignment = 
idealState.getRecord().getMapFields();
+    assertEquals(segmentAssignment.size(), 5);
+
+    String serverForPartition0 = null;
+    String serverForPartition1 = null;
+    for (Map.Entry<String, Map<String, String>> entry : 
segmentAssignment.entrySet()) {
+      String segmentName = entry.getKey();
+      Map<String, String> instanceStateMap = entry.getValue();
+
+      // Verify that all segments have the correct state
+      assertEquals(instanceStateMap.size(), 1);
+      Map.Entry<String, String> instanceIdAndState = 
instanceStateMap.entrySet().iterator().next();
+      String state = instanceIdAndState.getValue();
+      if (LLCSegmentName.isLowLevelConsumerSegmentName(segmentName)) {
+        assertEquals(state, SegmentStateModel.CONSUMING);
       } else {
-        partitionId = segment2PartitionId.get(segment);
+        assertEquals(state, SegmentStateModel.ONLINE);
       }
-      Assert.assertNotNull(partitionId);
-      Set<String> instances = idealState.getInstanceSet(segment);
-      Assert.assertEquals(1, instances.size());
-      if (segmentAssignment.containsKey(partitionId)) {
-        Assert.assertEquals(instances, segmentAssignment.get(partitionId));
+
+      // Verify that all segments of the same partition are mapped to the same 
server
+      String instanceId = instanceIdAndState.getKey();
+      int partitionId = getSegmentPartitionId(segmentName);
+      if (partitionId == 0) {
+        if (serverForPartition0 == null) {
+          serverForPartition0 = instanceId;
+        } else {
+          assertEquals(instanceId, serverForPartition0);
+        }
       } else {
-        segmentAssignment.put(partitionId, instances);
+        assertEquals(partitionId, 1);
+        if (serverForPartition1 == null) {
+          serverForPartition1 = instanceId;
+        } else {
+          assertEquals(instanceId, serverForPartition1);
+        }
       }
     }
   }
 
-  private void uploadSegments(String tableName, TableType tableType, File 
tarDir)
-      throws Exception {
-    File[] segmentTarFiles = tarDir.listFiles();
-    assertNotNull(segmentTarFiles);
-    int numSegments = segmentTarFiles.length;
-    assertTrue(numSegments > 0);
-
-    URI uploadSegmentHttpURI = 
FileUploadDownloadClient.getUploadSegmentHttpURI(LOCAL_HOST, _controllerPort);
-    try (FileUploadDownloadClient fileUploadDownloadClient = new 
FileUploadDownloadClient()) {
-      if (numSegments == 1) {
-        File segmentTarFile = segmentTarFiles[0];
-        assertEquals(fileUploadDownloadClient
-            .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), 
segmentTarFile, tableName, tableType)
-            .getStatusCode(), HttpStatus.SC_OK);
-      } else {
-        // Upload all segments in parallel
-        ExecutorService executorService = 
Executors.newFixedThreadPool(numSegments);
-        List<Future<Integer>> futures = new ArrayList<>(numSegments);
-        for (File segmentTarFile : segmentTarFiles) {
-          futures.add(executorService.submit(() -> {
-            return fileUploadDownloadClient
-                .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), 
segmentTarFile, tableName, tableType)
-                .getStatusCode();
-          }));
-        }
-        executorService.shutdown();
-        for (Future<Integer> future : futures) {
-          assertEquals((int) future.get(), HttpStatus.SC_OK);
-        }
-      }
+  private static int getSegmentPartitionId(String segmentName) {
+    switch (segmentName) {
+      case UPLOADED_SEGMENT_1:
+        return 0;
+      case UPLOADED_SEGMENT_2:
+      case UPLOADED_SEGMENT_3:
+        return 1;
+      default:
+        return new LLCSegmentName(segmentName).getPartitionGroupId();
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to