npawar commented on code in PR #8828:
URL: https://github.com/apache/pinot/pull/8828#discussion_r899689040


##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java:
##########
@@ -55,6 +55,7 @@ private ZKMetadataProvider() {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ZKMetadataProvider.class);
   private static final String CLUSTER_TENANT_ISOLATION_ENABLED_KEY = 
"tenantIsolationEnabled";
+  private static final String PROPERTYSTORE_TASKS_PREFIX = "/TASKS";

Review Comment:
   could we name this something other than TASKS? Task is already a concept in 
helix task framework and as a result in minion, so I can see this getting very 
confusing.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +559,78 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")

Review Comment:
   s/tableName/tableNameWithType ?



##########
pinot-common/src/main/java/org/apache/pinot/common/metadata/task/TaskType.java:
##########
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.metadata.task;
+
+public enum TaskType {

Review Comment:
   same here, there's already a string association between TaskType and minion 
tasks like MergeRollup, R2O etc.



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +559,78 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "Reload task id", required = true) 
@PathParam("taskId") String reloadTaskId)
+      throws Exception {
+    Map<String, String> taskZKMetadata = null;
+
+    // Call all servers to get status, collate and return
+    List<String> tableNamesWithType =
+        
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, null, LOGGER);
+
+    Set<String> instances = new HashSet<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      Map<String, List<String>> serverToSegments = 
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);

Review Comment:
   in case of a hybrid table, these 2 lines would run twice and always be 
redundant for one of the tables. Curious, why you chose to not just have 
tableNameWithType in the params?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java:
##########
@@ -543,6 +559,78 @@ public SuccessResponse reloadSegmentDeprecated2(
     return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr);
   }
 
+  @GET
+  @Path("tables/{tableName}/segmentReloadStatus/{taskId}")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Get status for a submitted reload operation",
+      notes = "Get status for a submitted reload operation")
+  public ServerReloadTaskStatusResponse getReloadTaskStatus(
+      @ApiParam(value = "Name of the table", required = true) 
@PathParam("tableName") String tableName,
+      @ApiParam(value = "Reload task id", required = true) 
@PathParam("taskId") String reloadTaskId)
+      throws Exception {
+    Map<String, String> taskZKMetadata = null;
+
+    // Call all servers to get status, collate and return
+    List<String> tableNamesWithType =
+        
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, 
tableName, null, LOGGER);
+
+    Set<String> instances = new HashSet<>();
+    for (String tableNameWithType : tableNamesWithType) {
+      Map<String, List<String>> serverToSegments = 
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+      instances.addAll(serverToSegments.keySet());
+
+      if (taskZKMetadata == null) {
+        taskZKMetadata = 
_pinotHelixResourceManager.getTaskZKMetadata(tableNameWithType, reloadTaskId);
+      }
+    }
+
+    BiMap<String, String> serverEndPoints = 
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(instances);
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverEndPoints);
+
+    List<String> serverUrls = new ArrayList<>();
+    BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
+    for (String endpoint : endpointsToServers.keySet()) {
+      String reloadTaskStatusEndpoint = endpoint + "/task/status/" + 
reloadTaskId;
+      serverUrls.add(reloadTaskStatusEndpoint);
+    }
+
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =

Review Comment:
   this will also happen for an extra set of servers in hybrid case?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1975,6 +1977,105 @@ private Set<String> getAllInstancesForTable(String 
tableNameWithType) {
     return instanceSet;
   }
 
+  public Map<String, String> getTaskZKMetadata(String tableNameWithType, 
String taskId) {
+    String taskResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType);
+    if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord taskResourceZnRecord = _propertyStore.get(taskResourcePath, 
null, -1);
+      return taskResourceZnRecord.getMapFields().get(taskId);
+    } else {
+      return null;
+    }
+  }
+
+  public Map<String, Map<String, String>> getAllTasksForTable(String 
tableNameWithType) {
+    String taskResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType);
+    if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) {
+      ZNRecord tableTaskRecord = _propertyStore.get(taskResourcePath, null, 
-1);
+      return tableTaskRecord.getMapFields();
+    } else {
+      return Collections.emptyMap();
+    }
+  }
+
+  public void addNewReloadSegmentTask(String tableNameWithType, String 
segmentName, String taskId,
+      int numberOfMessagesSent) {
+    Map<String, String> taskMetadata = new HashMap<>();
+    taskMetadata.put(CommonConstants.Task.TASK_ID, taskId);
+    taskMetadata.put(CommonConstants.Task.TASK_TYPE, 
TaskType.RELOAD_SEGMENT.toString());
+    taskMetadata.put(CommonConstants.Task.TASK_SUBMISSION_TIME, 
Long.toString(System.currentTimeMillis()));
+    taskMetadata.put(CommonConstants.Task.TASK_MESSAGE_COUNT, 
Integer.toString(numberOfMessagesSent));
+    taskMetadata.put(CommonConstants.Task.SEGMENT_RELOAD_TASK_SEGMENT_NAME, 
segmentName);
+
+    String taskResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType);
+    ZNRecord tableTaskZnRecord;
+
+    if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) {
+      tableTaskZnRecord = _propertyStore.get(taskResourcePath, null, -1);
+      Map<String, Map<String, String>> tasks = 
tableTaskZnRecord.getMapFields();
+      tasks.put(taskId, taskMetadata);
+      if (tasks.size() > CommonConstants.Task.MAXIMUM_RELOAD_TASKS_IN_ZK) {
+        tasks = tasks.
+            entrySet()
+            .stream()
+            .sorted(new Comparator<Map.Entry<String, Map<String, String>>>() {
+          @Override
+          public int compare(Map.Entry<String, Map<String, String>> v1, 
Map.Entry<String, Map<String, String>> v2) {
+            return 
Long.compare(Long.parseLong(v2.getValue().get(CommonConstants.Task.TASK_SUBMISSION_TIME)),
+                
Long.parseLong(v1.getValue().get(CommonConstants.Task.TASK_SUBMISSION_TIME)));
+          }
+        })
+            .collect(Collectors.toList())
+            .subList(0, CommonConstants.Task.MAXIMUM_RELOAD_TASKS_IN_ZK)
+            .stream()
+            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+      }
+      tableTaskZnRecord.setMapFields(tasks);
+    } else {
+      tableTaskZnRecord = new ZNRecord(taskResourcePath);
+      tableTaskZnRecord.setMapField(taskId, taskMetadata);
+    }
+
+    _propertyStore.set(taskResourcePath, tableTaskZnRecord, 
AccessOption.PERSISTENT);
+  }
+
+  public void addNewReloadAllSegmentsTask(String tableNameWithType, String 
taskId, int numberOfMessagesSent) {
+    Map<String, String> taskMetadata = new HashMap<>();
+    taskMetadata.put(CommonConstants.Task.TASK_ID, taskId);
+    taskMetadata.put(CommonConstants.Task.TASK_TYPE, 
TaskType.RELOAD_ALL_SEGMENTS.toString());
+    taskMetadata.put(CommonConstants.Task.TASK_SUBMISSION_TIME, 
Long.toString(System.currentTimeMillis()));
+    taskMetadata.put(CommonConstants.Task.TASK_MESSAGE_COUNT, 
Integer.toString(numberOfMessagesSent));
+
+    String taskResourcePath = 
ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType);
+    ZNRecord tableTaskZnRecord;
+
+    if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) {
+      tableTaskZnRecord = _propertyStore.get(taskResourcePath, null, -1);
+      Map<String, Map<String, String>> tasks = 
tableTaskZnRecord.getMapFields();
+      tasks.put(taskId, taskMetadata);
+      if (tasks.size() > CommonConstants.Task.MAXIMUM_RELOAD_TASKS_IN_ZK) {
+        tasks = tasks.

Review Comment:
   extract this block into a common util to share across this and the above 
method?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to