Jackie-Jiang commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r892882977


##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java:
##########
@@ -245,7 +245,7 @@ public void reloadSegment(String tableNameWithType, String 
segmentName, boolean
   }
 
   @Override
-  public void reloadAllSegments(String tableNameWithType, boolean 
forceDownload,
+  public void reloadAllSegments(String taskId, String tableNameWithType, 
boolean forceDownload,

Review Comment:
   Let's also add `taskId` to the single `reloadSegment()` method



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentReloadTaskStatusCache.java:
##########
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.starter.helix;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+import org.codehaus.jackson.annotate.JsonIgnore;
+
+
+public class SegmentReloadTaskStatusCache {
+
+  public static class SegmentReloadStatusValue {
+    private final long _totalSegmentCount;
+    private final AtomicLong _successCount;
+
+    SegmentReloadStatusValue(long totalSegmentCount) {
+      _totalSegmentCount = totalSegmentCount;
+      _successCount = new AtomicLong(0);
+    }
+
+    @JsonIgnore
+    public void incrementSuccess() {
+      _successCount.addAndGet(1);
+    }
+
+    public long getTotalSegmentCount() {
+      return _totalSegmentCount;
+    }
+
+    public long getSuccessCount() {
+      return _successCount.get();
+    }
+  }
+
+  private SegmentReloadTaskStatusCache() {
+  }
+
+  private static final Map<String, SegmentReloadStatusValue> 
SEGMENT_RELOAD_STATUS_MAP = new ConcurrentHashMap<>();

Review Comment:
   We can consider adding a size limit to the cache, and once the entry count 
reaches the limit, we remove the earliest entries



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TaskStatusResource.java:
##########
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.server.api.resources;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import javax.ws.rs.GET;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import org.apache.pinot.server.starter.helix.SegmentReloadTaskStatusCache;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+@Api(tags = "Tasks")
+@Path("/")
+public class TaskStatusResource {
+  @GET
+  @Path("/task/status/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Task status", notes = "Return status of the given 
task")
+  public String taskStatus(@PathParam("taskId") String taskId)
+      throws Exception {
+    SegmentReloadTaskStatusCache.SegmentReloadStatusValue 
segmentReloadStatusValue =
+        SegmentReloadTaskStatusCache.getStatus(taskId);
+
+    return JsonUtils.objectToString(segmentReloadStatusValue);

Review Comment:
   Will this handle `null` status properly?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +548,54 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation", notes = 
"Get status for a submitted reload "
+      + "operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "Reload task id", required = true) 
@PathParam("taskId") String reloadTaskId)
+      throws Exception {
+    // Call all servers to get status, collate and return
+    List<String> tableNamesWithType =
+        
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, null, LOGGER);
+
+    Set<String> instances = new HashSet<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      Map<String, List<String>> serverToSegments = 
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+      instances.addAll(serverToSegments.keySet());
+    }
+
+    BiMap<String, String> serverEndPoints = 
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(instances);
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/task/status/" + 
reloadTaskId;
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiGetRequest(serverUrls, null, true, 
1000);
+
+    ServerReloadTaskStatusResponse serverReloadTaskStatusResponse = new 
ServerReloadTaskStatusResponse();
+    serverReloadTaskStatusResponse.setSuccessCount(0);
+    serverReloadTaskStatusResponse.setTotalSegmentCount(0);
+    for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
+      ServerReloadTaskStatusResponse response =
+          JsonUtils.stringToObject(streamResponse.getValue(), 
ServerReloadTaskStatusResponse.class);

Review Comment:
   `null` response need to be properly handled



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2159,7 +2159,7 @@ public void refreshSegment(String tableNameWithType, 
SegmentMetadata segmentMeta
     sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true);
   }
 
-  public int reloadAllSegments(String tableNameWithType, boolean 
forceDownload) {
+  public Pair<Integer, String> reloadAllSegments(String tableNameWithType, 
boolean forceDownload) {

Review Comment:
   Let's also track task id for single segment reload



##########
pinot-common/src/main/java/org/apache/pinot/common/messages/SegmentReloadMessage.java:
##########
@@ -35,6 +35,9 @@ public class SegmentReloadMessage extends Message {
 
   private static final String FORCE_DOWNLOAD_KEY = "forceDownload";
 
+  // todo (saurabh) : getMsgId() returns different id on the server side than 
the one being set at controller. Check

Review Comment:
   Interesting. The msgId should just be the ZNode id



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +548,54 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation", notes = 
"Get status for a submitted reload "
+      + "operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(

Review Comment:
   We should consider storing the task id and reload info (table name / segment 
name) into a ZK node, and add another API to read the reload tasks in the 
cluster in case user loses the response of the reload call. We can also put 
other useful info there, e.g. number of messages sent, which can be included in 
the status check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to