npawar commented on code in PR #8828: URL: https://github.com/apache/pinot/pull/8828#discussion_r899689040
########## pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java: ########## @@ -55,6 +55,7 @@ private ZKMetadataProvider() { private static final Logger LOGGER = LoggerFactory.getLogger(ZKMetadataProvider.class); private static final String CLUSTER_TENANT_ISOLATION_ENABLED_KEY = "tenantIsolationEnabled"; + private static final String PROPERTYSTORE_TASKS_PREFIX = "/TASKS"; Review Comment: could we name this something other than TASKS? Task is already a concept in helix task framework and as a result in minion, so I can see this getting very confusing. ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java: ########## @@ -543,6 +559,78 @@ public SuccessResponse reloadSegmentDeprecated2( return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr); } + @GET + @Path("tables/{tableName}/segmentReloadStatus/{taskId}") Review Comment: s/tableName/tableNameWithType ? ########## pinot-common/src/main/java/org/apache/pinot/common/metadata/task/TaskType.java: ########## @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.metadata.task; + +public enum TaskType { Review Comment: same here, there's already a string association between TaskType and minion tasks like MergeRollup, R2O etc. ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java: ########## @@ -543,6 +559,78 @@ public SuccessResponse reloadSegmentDeprecated2( return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr); } + @GET + @Path("tables/{tableName}/segmentReloadStatus/{taskId}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get status for a submitted reload operation", + notes = "Get status for a submitted reload operation") + public ServerReloadTaskStatusResponse getReloadTaskStatus( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "Reload task id", required = true) @PathParam("taskId") String reloadTaskId) + throws Exception { + Map<String, String> taskZKMetadata = null; + + // Call all servers to get status, collate and return + List<String> tableNamesWithType = + ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, null, LOGGER); + + Set<String> instances = new HashSet<>(); + for (String tableNameWithType : tableNamesWithType) { + Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType); Review Comment: in case of a hybrid table, these 2 lines would run twice and always be redundant for one of the tables. Curious, why you chose to not just have tableNameWithType in the params? ########## pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java: ########## @@ -543,6 +559,78 @@ public SuccessResponse reloadSegmentDeprecated2( return reloadSegmentDeprecated1(tableName, segmentName, tableTypeStr); } + @GET + @Path("tables/{tableName}/segmentReloadStatus/{taskId}") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Get status for a submitted reload operation", + notes = "Get status for a submitted reload operation") + public ServerReloadTaskStatusResponse getReloadTaskStatus( + @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, + @ApiParam(value = "Reload task id", required = true) @PathParam("taskId") String reloadTaskId) + throws Exception { + Map<String, String> taskZKMetadata = null; + + // Call all servers to get status, collate and return + List<String> tableNamesWithType = + ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, null, LOGGER); + + Set<String> instances = new HashSet<>(); + for (String tableNameWithType : tableNamesWithType) { + Map<String, List<String>> serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType); + instances.addAll(serverToSegments.keySet()); + + if (taskZKMetadata == null) { + taskZKMetadata = _pinotHelixResourceManager.getTaskZKMetadata(tableNameWithType, reloadTaskId); + } + } + + BiMap<String, String> serverEndPoints = _pinotHelixResourceManager.getDataInstanceAdminEndpoints(instances); + CompletionServiceHelper completionServiceHelper = + new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints); + + List<String> serverUrls = new ArrayList<>(); + BiMap<String, String> endpointsToServers = serverEndPoints.inverse(); + for (String endpoint : endpointsToServers.keySet()) { + String reloadTaskStatusEndpoint = endpoint + "/task/status/" + reloadTaskId; + serverUrls.add(reloadTaskStatusEndpoint); + } + + CompletionServiceHelper.CompletionServiceResponse serviceResponse = Review Comment: this will also happen for an extra set of servers in hybrid case? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java: ########## @@ -1975,6 +1977,105 @@ private Set<String> getAllInstancesForTable(String tableNameWithType) { return instanceSet; } + public Map<String, String> getTaskZKMetadata(String tableNameWithType, String taskId) { + String taskResourcePath = ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType); + if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) { + ZNRecord taskResourceZnRecord = _propertyStore.get(taskResourcePath, null, -1); + return taskResourceZnRecord.getMapFields().get(taskId); + } else { + return null; + } + } + + public Map<String, Map<String, String>> getAllTasksForTable(String tableNameWithType) { + String taskResourcePath = ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType); + if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) { + ZNRecord tableTaskRecord = _propertyStore.get(taskResourcePath, null, -1); + return tableTaskRecord.getMapFields(); + } else { + return Collections.emptyMap(); + } + } + + public void addNewReloadSegmentTask(String tableNameWithType, String segmentName, String taskId, + int numberOfMessagesSent) { + Map<String, String> taskMetadata = new HashMap<>(); + taskMetadata.put(CommonConstants.Task.TASK_ID, taskId); + taskMetadata.put(CommonConstants.Task.TASK_TYPE, TaskType.RELOAD_SEGMENT.toString()); + taskMetadata.put(CommonConstants.Task.TASK_SUBMISSION_TIME, Long.toString(System.currentTimeMillis())); + taskMetadata.put(CommonConstants.Task.TASK_MESSAGE_COUNT, Integer.toString(numberOfMessagesSent)); + taskMetadata.put(CommonConstants.Task.SEGMENT_RELOAD_TASK_SEGMENT_NAME, segmentName); + + String taskResourcePath = ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType); + ZNRecord tableTaskZnRecord; + + if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) { + tableTaskZnRecord = _propertyStore.get(taskResourcePath, null, -1); + Map<String, Map<String, String>> tasks = tableTaskZnRecord.getMapFields(); + tasks.put(taskId, taskMetadata); + if (tasks.size() > CommonConstants.Task.MAXIMUM_RELOAD_TASKS_IN_ZK) { + tasks = tasks. + entrySet() + .stream() + .sorted(new Comparator<Map.Entry<String, Map<String, String>>>() { + @Override + public int compare(Map.Entry<String, Map<String, String>> v1, Map.Entry<String, Map<String, String>> v2) { + return Long.compare(Long.parseLong(v2.getValue().get(CommonConstants.Task.TASK_SUBMISSION_TIME)), + Long.parseLong(v1.getValue().get(CommonConstants.Task.TASK_SUBMISSION_TIME))); + } + }) + .collect(Collectors.toList()) + .subList(0, CommonConstants.Task.MAXIMUM_RELOAD_TASKS_IN_ZK) + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + tableTaskZnRecord.setMapFields(tasks); + } else { + tableTaskZnRecord = new ZNRecord(taskResourcePath); + tableTaskZnRecord.setMapField(taskId, taskMetadata); + } + + _propertyStore.set(taskResourcePath, tableTaskZnRecord, AccessOption.PERSISTENT); + } + + public void addNewReloadAllSegmentsTask(String tableNameWithType, String taskId, int numberOfMessagesSent) { + Map<String, String> taskMetadata = new HashMap<>(); + taskMetadata.put(CommonConstants.Task.TASK_ID, taskId); + taskMetadata.put(CommonConstants.Task.TASK_TYPE, TaskType.RELOAD_ALL_SEGMENTS.toString()); + taskMetadata.put(CommonConstants.Task.TASK_SUBMISSION_TIME, Long.toString(System.currentTimeMillis())); + taskMetadata.put(CommonConstants.Task.TASK_MESSAGE_COUNT, Integer.toString(numberOfMessagesSent)); + + String taskResourcePath = ZKMetadataProvider.constructPropertyStorePathForTask(tableNameWithType); + ZNRecord tableTaskZnRecord; + + if (_propertyStore.exists(taskResourcePath, AccessOption.PERSISTENT)) { + tableTaskZnRecord = _propertyStore.get(taskResourcePath, null, -1); + Map<String, Map<String, String>> tasks = tableTaskZnRecord.getMapFields(); + tasks.put(taskId, taskMetadata); + if (tasks.size() > CommonConstants.Task.MAXIMUM_RELOAD_TASKS_IN_ZK) { + tasks = tasks. Review Comment: extract this block into a common util to share across this and the above method? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org