This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 95b075b527 Batch reload api to specify what segments to be reloaded on 
what servers to be more flexible (#14544)
95b075b527 is described below

commit 95b075b52765088e1a27cc420f49a468bcd789bb
Author: Xiaobing <61892277+klsi...@users.noreply.github.com>
AuthorDate: Thu Dec 5 06:57:25 2024 -0800

    Batch reload api to specify what segments to be reloaded on what servers to 
be more flexible (#14544)
    
    * extend existing reload all segments API to make it more flexible, by 
taking a map to reload different batch of segments on different instances
---
 .../api/resources/PinotSegmentRestletResource.java | 129 +++++++++++++++++----
 .../helix/core/PinotHelixResourceManager.java      |  93 +++++++++++----
 .../resources/PinotSegmentRestletResourceTest.java |  88 ++++++++++++++
 .../segment/spi/creator/name/SegmentNameUtils.java |   2 +
 .../api/resources/ControllerJobStatusResource.java |  53 ++++-----
 .../apache/pinot/spi/utils/CommonConstants.java    |   1 +
 6 files changed, 290 insertions(+), 76 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 7499098780..37e365bc7f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -18,8 +18,9 @@
  */
 package org.apache.pinot.controller.api.resources;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.common.collect.BiMap;
@@ -39,6 +40,7 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Executor;
 import java.util.stream.Collectors;
@@ -87,6 +89,7 @@ import org.apache.pinot.controller.util.TableTierReader;
 import org.apache.pinot.core.auth.Actions;
 import org.apache.pinot.core.auth.Authorize;
 import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DateTimeFieldSpec;
@@ -403,8 +406,8 @@ public class PinotSegmentRestletResource {
     int numReloadMsgSent = msgInfo.getLeft();
     if (numReloadMsgSent > 0) {
       try {
-        if 
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, 
segmentName, msgInfo.getRight(),
-            startTimeMs, numReloadMsgSent)) {
+        if 
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, 
segmentName, targetInstance,
+            msgInfo.getRight(), startTimeMs, numReloadMsgSent)) {
           zkJobMetaWriteSuccess = true;
         } else {
           LOGGER.error("Failed to add reload segment job meta into zookeeper 
for table: {}, segment: {}",
@@ -533,20 +536,11 @@ public class PinotSegmentRestletResource {
     }
 
     String tableNameWithType = 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
-    Map<String, List<String>> serverToSegments;
-
-    String singleSegmentName =
+    String segmentNames =
         
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
-    if (singleSegmentName != null) {
-      // No need to query servers where this segment is not supposed to be 
hosted
-      serverToSegments = new TreeMap<>();
-      List<String> segmentList = Collections.singletonList(singleSegmentName);
-      _pinotHelixResourceManager.getServers(tableNameWithType, 
singleSegmentName).forEach(server -> {
-        serverToSegments.put(server, segmentList);
-      });
-    } else {
-      serverToSegments = 
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
-    }
+    String instanceName =
+        
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
+    Map<String, List<String>> serverToSegments = 
getServerToSegments(tableNameWithType, segmentNames, instanceName);
 
     BiMap<String, String> serverEndPoints =
         
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
@@ -554,13 +548,16 @@ public class PinotSegmentRestletResource {
         new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
 
     List<String> serverUrls = new ArrayList<>();
-    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
-    for (String endpoint : endpointsToServers.keySet()) {
+    for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
+      String server = entry.getKey();
+      String endpoint = entry.getValue();
       String reloadTaskStatusEndpoint =
           endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + 
"?reloadJobTimestamp="
               + 
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
-      if (singleSegmentName != null) {
-        reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName=" 
+ singleSegmentName;
+      if (segmentNames != null) {
+        List<String> targetSegments = serverToSegments.get(server);
+        reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName=" 
+ StringUtils.join(targetSegments,
+            SegmentNameUtils.SEGMENT_NAME_SEPARATOR);
       }
       serverUrls.add(reloadTaskStatusEndpoint);
     }
@@ -615,6 +612,31 @@ public class PinotSegmentRestletResource {
     return serverReloadControllerJobStatusResponse;
   }
 
+  @VisibleForTesting
+  Map<String, List<String>> getServerToSegments(String tableNameWithType, 
@Nullable String segmentNames,
+      @Nullable String instanceName) {
+    if (segmentNames == null) {
+      // instanceName can be null or not null, and this method below can 
handle both cases.
+      return 
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType, 
instanceName);
+    }
+    // Skip servers and segments not involved in the segment reloading job.
+    List<String> segmnetNameList = new ArrayList<>();
+    Collections.addAll(segmnetNameList, StringUtils.split(segmentNames, 
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
+    if (instanceName != null) {
+      return Map.of(instanceName, segmnetNameList);
+    }
+    // If instance is null, then either one or all segments are being reloaded 
via current segment reload restful APIs.
+    // And the if-check at the beginning of this method has handled the case 
of reloading all segments. So here we
+    // expect only one segment name.
+    Preconditions.checkState(segmnetNameList.size() == 1, "Only one segment is 
expected but got: %s", segmnetNameList);
+    Map<String, List<String>> serverToSegments = new HashMap<>();
+    Set<String> servers = 
_pinotHelixResourceManager.getServers(tableNameWithType, segmentNames);
+    for (String server : servers) {
+      serverToSegments.put(server, Collections.singletonList(segmentNames));
+    }
+    return serverToSegments;
+  }
+
   @POST
   @Path("segments/{tableName}/reload")
   @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = 
Actions.Table.RELOAD_SEGMENT)
@@ -627,10 +649,11 @@ public class PinotSegmentRestletResource {
       @ApiParam(value = "Whether to force server to download segment") 
@QueryParam("forceDownload")
       @DefaultValue("false") boolean forceDownload,
       @ApiParam(value = "Name of the target instance to reload") 
@QueryParam("targetInstance") @Nullable
-      String targetInstance, @Context HttpHeaders headers)
-      throws JsonProcessingException {
+      String targetInstance,
+      @ApiParam(value = "Map from instances to segments to reload. This param 
takes precedence over targetInstance")
+      @QueryParam("instanceToSegmentsMap") @Nullable String 
instanceToSegmentsMapInJson, @Context HttpHeaders headers)
+      throws IOException {
     tableName = DatabaseUtils.translateTableName(tableName, headers);
-    long startTimeMs = System.currentTimeMillis();
     TableType tableTypeFromTableName = 
TableNameBuilder.getTableTypeFromTableName(tableName);
     TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
     // When rawTableName is provided but w/o table type, Pinot tries to reload 
both OFFLINE
@@ -644,6 +667,20 @@ public class PinotSegmentRestletResource {
     List<String> tableNamesWithType =
         
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, tableTypeFromRequest,
             LOGGER);
+    if (instanceToSegmentsMapInJson != null) {
+      Map<String, List<String>> instanceToSegmentsMap =
+          JsonUtils.stringToObject(instanceToSegmentsMapInJson, new 
TypeReference<>() {
+          });
+      Map<String, Map<String, Map<String, String>>> tableInstanceMsgData =
+          reloadSegments(tableNamesWithType, forceDownload, 
instanceToSegmentsMap);
+      if (tableInstanceMsgData.isEmpty()) {
+        throw new ControllerApplicationException(LOGGER,
+            String.format("Failed to find any segments in table: %s with 
instanceToSegmentsMap: %s", tableName,
+                instanceToSegmentsMap), Status.NOT_FOUND);
+      }
+      return new 
SuccessResponse(JsonUtils.objectToString(tableInstanceMsgData));
+    }
+    long startTimeMs = System.currentTimeMillis();
     Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>();
     for (String tableNameWithType : tableNamesWithType) {
       Pair<Integer, String> msgInfo =
@@ -658,8 +695,8 @@ public class PinotSegmentRestletResource {
       perTableMsgData.put(tableNameWithType, tableReloadMeta);
       // Store in ZK
       try {
-        if 
(_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, 
msgInfo.getRight(), startTimeMs,
-            numReloadMsgSent)) {
+        if 
(_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, 
targetInstance, msgInfo.getRight(),
+            startTimeMs, numReloadMsgSent)) {
           tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
         } else {
           tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
@@ -678,6 +715,48 @@ public class PinotSegmentRestletResource {
     return new SuccessResponse(JsonUtils.objectToString(perTableMsgData));
   }
 
+  private Map<String, Map<String, Map<String, String>>> 
reloadSegments(List<String> tableNamesWithType,
+      boolean forceDownload, Map<String, List<String>> instanceToSegmentsMap) {
+    long startTimeMs = System.currentTimeMillis();
+    Map<String, Map<String, Map<String, String>>> tableInstanceMsgData = new 
LinkedHashMap<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      Map<String, Pair<Integer, String>> instanceMsgInfoMap =
+          _pinotHelixResourceManager.reloadSegments(tableNameWithType, 
forceDownload, instanceToSegmentsMap);
+      Map<String, Map<String, String>> instanceMsgData =
+          tableInstanceMsgData.computeIfAbsent(tableNameWithType, t -> new 
HashMap<>());
+      for (Map.Entry<String, Pair<Integer, String>> instanceMsgInfo : 
instanceMsgInfoMap.entrySet()) {
+        String instance = instanceMsgInfo.getKey();
+        Pair<Integer, String> msgInfo = instanceMsgInfo.getValue();
+        int numReloadMsgSent = msgInfo.getLeft();
+        if (numReloadMsgSent <= 0) {
+          continue;
+        }
+        Map<String, String> tableReloadMeta = new HashMap<>();
+        tableReloadMeta.put("numMessagesSent", 
String.valueOf(numReloadMsgSent));
+        tableReloadMeta.put("reloadJobId", msgInfo.getRight());
+        instanceMsgData.put(instance, tableReloadMeta);
+        // Store in ZK
+        try {
+          String segmentNames =
+              StringUtils.join(instanceToSegmentsMap.get(instance), 
SegmentNameUtils.SEGMENT_NAME_SEPARATOR);
+          if 
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, 
segmentNames, instance,
+              msgInfo.getRight(), startTimeMs, numReloadMsgSent)) {
+            tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
+          } else {
+            tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
+            LOGGER.error("Failed to add batch reload job meta into zookeeper 
for table: {} targeted instance: {}",
+                tableNameWithType, instance);
+          }
+        } catch (Exception e) {
+          tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
+          LOGGER.error("Failed to add batch reload job meta into zookeeper for 
table: {} targeted instance: {}",
+              tableNameWithType, instance, e);
+        }
+      }
+    }
+    return tableInstanceMsgData;
+  }
+
   @DELETE
   @Produces(MediaType.APPLICATION_JSON)
   @Path("/segments/{tableName}/{segmentName}")
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1f1e95877f..42bd8c4ac4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2193,56 +2193,65 @@ public class PinotHelixResourceManager {
   /**
    * Adds a new reload segment job metadata into ZK
    * @param tableNameWithType Table for which job is to be added
-   * @param segmentName Name of the segment being reloaded
+   * @param segmentNames Name of the segments being reloaded, separated by 
comma
+   * @param instanceName Name of the instance done the segment reloading, 
optional.
    * @param jobId job's UUID
    * @param jobSubmissionTimeMs time at which the job was submitted
    * @param numMessagesSent number of messages that were sent to servers. 
Saved as metadata
    * @return boolean representing success / failure of the ZK write step
    */
-  public boolean addNewReloadSegmentJob(String tableNameWithType, String 
segmentName, String jobId,
-      long jobSubmissionTimeMs, int numMessagesSent) {
+  public boolean addNewReloadSegmentJob(String tableNameWithType, String 
segmentNames, @Nullable String instanceName,
+      String jobId, long jobSubmissionTimeMs, int numMessagesSent) {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.RELOAD_SEGMENT);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(jobSubmissionTimeMs));
     jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, 
Integer.toString(numMessagesSent));
-    
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, 
segmentName);
+    
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, 
segmentNames);
+    if (instanceName != null) {
+      
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME, 
instanceName);
+    }
     return addControllerJobToZK(jobId, jobMetadata, 
ControllerJobType.RELOAD_SEGMENT);
   }
 
-  public boolean addNewForceCommitJob(String tableNameWithType, String jobId, 
long jobSubmissionTimeMs,
-      Set<String> consumingSegmentsCommitted)
-      throws JsonProcessingException {
-    Map<String, String> jobMetadata = new HashMap<>();
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
-    jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
-    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.FORCE_COMMIT);
-    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(jobSubmissionTimeMs));
-    
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
-        JsonUtils.objectToString(consumingSegmentsCommitted));
-    return addControllerJobToZK(jobId, jobMetadata, 
ControllerJobType.FORCE_COMMIT);
-  }
-
   /**
    * Adds a new reload segment job metadata into ZK
    * @param tableNameWithType Table for which job is to be added
+   * @param instanceName Name of the instance done the segment reloading, 
optional.
    * @param jobId job's UUID
    * @param jobSubmissionTimeMs time at which the job was submitted
    * @param numberOfMessagesSent number of messages that were sent to servers. 
Saved as metadata
    * @return boolean representing success / failure of the ZK write step
    */
-  public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String 
jobId, long jobSubmissionTimeMs,
-      int numberOfMessagesSent) {
+  public boolean addNewReloadAllSegmentsJob(String tableNameWithType, 
@Nullable String instanceName, String jobId,
+      long jobSubmissionTimeMs, int numberOfMessagesSent) {
     Map<String, String> jobMetadata = new HashMap<>();
     jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
     jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
     jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.RELOAD_SEGMENT);
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(jobSubmissionTimeMs));
     jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, 
Integer.toString(numberOfMessagesSent));
+    if (instanceName != null) {
+      
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME, 
instanceName);
+    }
     return addControllerJobToZK(jobId, jobMetadata, 
ControllerJobType.RELOAD_SEGMENT);
   }
 
+
+  public boolean addNewForceCommitJob(String tableNameWithType, String jobId, 
long jobSubmissionTimeMs,
+      Set<String> consumingSegmentsCommitted)
+      throws JsonProcessingException {
+    Map<String, String> jobMetadata = new HashMap<>();
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
+    jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, 
tableNameWithType);
+    jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, 
ControllerJobType.FORCE_COMMIT);
+    jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(jobSubmissionTimeMs));
+    
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
+        JsonUtils.objectToString(consumingSegmentsCommitted));
+    return addControllerJobToZK(jobId, jobMetadata, 
ControllerJobType.FORCE_COMMIT);
+  }
+
   /**
    * Adds a new job metadata for controller job like table rebalance or reload 
into ZK
    * @param jobId job's UUID
@@ -2605,6 +2614,42 @@ public class PinotHelixResourceManager {
     sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true);
   }
 
+  public Map<String, Pair<Integer, String>> reloadSegments(String 
tableNameWithType, boolean forceDownload,
+      Map<String, List<String>> instanceToSegmentsMap) {
+    LOGGER.info("Sending reload messages for table: {} with forceDownload: {}, 
and instanceToSegmentsMap: {}",
+        tableNameWithType, forceDownload, instanceToSegmentsMap);
+
+    if (forceDownload) {
+      TableType tt = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+      // TODO: support to force download immutable segments from RealTime 
table.
+      Preconditions.checkArgument(tt == TableType.OFFLINE,
+          "Table: %s is not an OFFLINE table, which is required to force to 
download segments", tableNameWithType);
+    }
+    // Infinite timeout on the recipient
+    int timeoutMs = -1;
+    Map<String, Pair<Integer, String>> instanceMsgInfoMap = new HashMap<>();
+    for (Map.Entry<String, List<String>> entry : 
instanceToSegmentsMap.entrySet()) {
+      String targetInstance = entry.getKey();
+      List<String> segments = entry.getValue();
+      Criteria recipientCriteria = new Criteria();
+      recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+      recipientCriteria.setInstanceName(targetInstance);
+      recipientCriteria.setResource(tableNameWithType);
+      recipientCriteria.setSessionSpecific(true);
+      SegmentReloadMessage segmentReloadMessage = new 
SegmentReloadMessage(tableNameWithType, segments, forceDownload);
+      ClusterMessagingService messagingService = 
_helixZkManager.getMessagingService();
+      int numMessagesSent = messagingService.send(recipientCriteria, 
segmentReloadMessage, null, timeoutMs);
+      if (numMessagesSent > 0) {
+        LOGGER.info("Sent {} reload messages to instance: {} for table: {}", 
numMessagesSent, targetInstance,
+            tableNameWithType);
+      } else {
+        LOGGER.warn("No reload message sent to instance: {} for table: {}", 
targetInstance, tableNameWithType);
+      }
+      instanceMsgInfoMap.put(targetInstance, Pair.of(numMessagesSent, 
segmentReloadMessage.getMsgId()));
+    }
+    return instanceMsgInfoMap;
+  }
+
   public Pair<Integer, String> reloadAllSegments(String tableNameWithType, 
boolean forceDownload,
       @Nullable String targetInstance) {
     LOGGER.info("Sending reload message for table: {} with forceDownload: {}, 
and target: {}", tableNameWithType,
@@ -2985,6 +3030,10 @@ public class PinotHelixResourceManager {
    * the ideal state because they are not supposed to be served.
    */
   public Map<String, List<String>> getServerToSegmentsMap(String 
tableNameWithType) {
+    return getServerToSegmentsMap(tableNameWithType, null);
+  }
+
+  public Map<String, List<String>> getServerToSegmentsMap(String 
tableNameWithType, @Nullable String targetServer) {
     Map<String, List<String>> serverToSegmentsMap = new TreeMap<>();
     IdealState idealState = 
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
     if (idealState == null) {
@@ -2993,8 +3042,12 @@ public class PinotHelixResourceManager {
     for (Map.Entry<String, Map<String, String>> entry : 
idealState.getRecord().getMapFields().entrySet()) {
       String segmentName = entry.getKey();
       for (Map.Entry<String, String> instanceStateEntry : 
entry.getValue().entrySet()) {
+        String server = instanceStateEntry.getKey();
+        if (targetServer != null && !server.equals(targetServer)) {
+          continue;
+        }
         if (!instanceStateEntry.getValue().equals(SegmentStateModel.OFFLINE)) {
-          serverToSegmentsMap.computeIfAbsent(instanceStateEntry.getKey(), key 
-> new ArrayList<>()).add(segmentName);
+          serverToSegmentsMap.computeIfAbsent(server, key -> new 
ArrayList<>()).add(segmentName);
         }
       }
     }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
new file mode 100644
index 0000000000..392fc05bd8
--- /dev/null
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class PinotSegmentRestletResourceTest {
+  @Mock
+  PinotHelixResourceManager _pinotHelixResourceManager;
+
+  @InjectMocks
+  PinotSegmentRestletResource _pinotSegmentRestletResource;
+
+  @BeforeMethod
+  public void setup() {
+    MockitoAnnotations.openMocks(this);
+  }
+
+  @Test
+  public void testGetServerToSegments() {
+    String tableName = "testTable";
+    Map<String, List<String>> fullServerToSegmentsMap = new HashMap<>();
+    fullServerToSegmentsMap.put("svr01", new ArrayList<>(List.of("seg01", 
"seg02")));
+    fullServerToSegmentsMap.put("svr02", new ArrayList<>(List.of("seg02", 
"seg03")));
+    fullServerToSegmentsMap.put("svr03", new ArrayList<>(List.of("seg03", 
"seg01")));
+    when(_pinotHelixResourceManager.getServerToSegmentsMap(tableName, 
null)).thenReturn(fullServerToSegmentsMap);
+    when(_pinotHelixResourceManager.getServerToSegmentsMap(tableName, 
"svr02")).thenReturn(
+        Map.of("svr02", new ArrayList<>(List.of("seg02", "seg03"))));
+    when(_pinotHelixResourceManager.getServers(tableName, 
"seg01")).thenReturn(Set.of("svr01", "svr03"));
+
+    // Get all servers and all their segments.
+    Map<String, List<String>> serverToSegmentsMap =
+        _pinotSegmentRestletResource.getServerToSegments(tableName, null, 
null);
+    assertEquals(serverToSegmentsMap, fullServerToSegmentsMap);
+
+    // Get all segments on svr02.
+    serverToSegmentsMap = 
_pinotSegmentRestletResource.getServerToSegments(tableName, null, "svr02");
+    assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg02", 
"seg03")));
+
+    // Get all servers with seg01.
+    serverToSegmentsMap = 
_pinotSegmentRestletResource.getServerToSegments(tableName, "seg01", null);
+    assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01"), 
"svr03", List.of("seg01")));
+
+    // Simply map the provided server to the provided segments.
+    serverToSegmentsMap = 
_pinotSegmentRestletResource.getServerToSegments(tableName, "seg01", "svr01");
+    assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01")));
+    serverToSegmentsMap = 
_pinotSegmentRestletResource.getServerToSegments(tableName, "anySegment", 
"anyServer");
+    assertEquals(serverToSegmentsMap, Map.of("anyServer", 
List.of("anySegment")));
+    serverToSegmentsMap = 
_pinotSegmentRestletResource.getServerToSegments(tableName, "seg01|seg02", 
"svr02");
+    assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg01", 
"seg02")));
+    try {
+      _pinotSegmentRestletResource.getServerToSegments(tableName, 
"seg01,seg02", null);
+    } catch (Exception e) {
+      assertTrue(e.getMessage().contains("Only one segment is expected but 
got: [seg01, seg02]"));
+    }
+  }
+}
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameUtils.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameUtils.java
index f3000c994a..fcd2ec55da 100644
--- 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameUtils.java
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameUtils.java
@@ -25,6 +25,8 @@ import java.util.regex.Pattern;
  * Utils for segment names.
  */
 public class SegmentNameUtils {
+  // According to the invalid name pattern below, `|` is safer than `,` as the 
segment name separator.
+  public static final char SEGMENT_NAME_SEPARATOR = '|';
   private static final Pattern INVALID_SEGMENT_NAME_REGEX = 
Pattern.compile(".*[\\\\/:\\*?\"<>|].*");
 
   private SegmentNameUtils() {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
index 66ebdd88cc..9bd96c0da9 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
@@ -20,6 +20,8 @@ package org.apache.pinot.server.api.resources;
 
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import javax.inject.Inject;
 import javax.ws.rs.GET;
@@ -30,9 +32,11 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.utils.DatabaseUtils;
 import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
 import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
 import org.apache.pinot.server.starter.ServerInstance;
 import org.apache.pinot.server.starter.helix.SegmentReloadStatusValue;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -51,45 +55,32 @@ public class ControllerJobStatusResource {
   @ApiOperation(value = "Task status", notes = "Return the status of a given 
reload job")
   public String reloadJobStatus(@PathParam("tableNameWithType") String 
tableNameWithType,
       @QueryParam("reloadJobTimestamp") long reloadJobSubmissionTimestamp,
-      @QueryParam("segmentName") String segmentName,
-      @Context HttpHeaders headers)
+      @QueryParam("segmentName") String segmentName, @Context HttpHeaders 
headers)
       throws Exception {
     tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, 
headers);
     TableDataManager tableDataManager =
         ServerResourceUtils.checkGetTableDataManager(_serverInstance, 
tableNameWithType);
-
+    List<SegmentDataManager> segmentDataManagers;
+    long totalSegmentCount;
     if (segmentName == null) {
-      // All segments
-      List<SegmentDataManager> allSegments = 
tableDataManager.acquireAllSegments();
-      try {
-        long successCount = 0;
-        for (SegmentDataManager segmentDataManager : allSegments) {
-          if (segmentDataManager.getLoadTimeMs() >= 
reloadJobSubmissionTimestamp) {
-            successCount++;
-          }
-        }
-        SegmentReloadStatusValue segmentReloadStatusValue =
-            new SegmentReloadStatusValue(allSegments.size(), successCount);
-        return JsonUtils.objectToString(segmentReloadStatusValue);
-      } finally {
-        for (SegmentDataManager segmentDataManager : allSegments) {
-          tableDataManager.releaseSegment(segmentDataManager);
-        }
-      }
+      segmentDataManagers = tableDataManager.acquireAllSegments();
+      totalSegmentCount = segmentDataManagers.size();
     } else {
-      SegmentDataManager segmentDataManager = 
tableDataManager.acquireSegment(segmentName);
-      if (segmentDataManager == null) {
-        return JsonUtils.objectToString(new SegmentReloadStatusValue(0, 0));
-      }
-      try {
-        int successCount = 0;
+      List<String> targetSegments = new ArrayList<>();
+      Collections.addAll(targetSegments, StringUtils.split(segmentName, 
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
+      segmentDataManagers = tableDataManager.acquireSegments(targetSegments, 
new ArrayList<>());
+      totalSegmentCount = targetSegments.size();
+    }
+    try {
+      long successCount = 0;
+      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
         if (segmentDataManager.getLoadTimeMs() >= 
reloadJobSubmissionTimestamp) {
-          successCount = 1;
+          successCount++;
         }
-        SegmentReloadStatusValue segmentReloadStatusValue =
-            new SegmentReloadStatusValue(1, successCount);
-        return JsonUtils.objectToString(segmentReloadStatusValue);
-      } finally {
+      }
+      return JsonUtils.objectToString(new 
SegmentReloadStatusValue(totalSegmentCount, successCount));
+    } finally {
+      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
         tableDataManager.releaseSegment(segmentDataManager);
       }
     }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 89046dec81..06c7184f4e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -978,6 +978,7 @@ public class CommonConstants {
      * Segment reload job ZK props
      */
     public static final String SEGMENT_RELOAD_JOB_SEGMENT_NAME = "segmentName";
+    public static final String SEGMENT_RELOAD_JOB_INSTANCE_NAME = 
"instanceName";
     // Force commit job ZK props
     public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST = 
"segmentsForceCommitted";
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to