This is an automated email from the ASF dual-hosted git repository. xbli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 95b075b527 Batch reload api to specify what segments to be reloaded on what servers to be more flexible (#14544) 95b075b527 is described below commit 95b075b52765088e1a27cc420f49a468bcd789bb Author: Xiaobing <61892277+klsi...@users.noreply.github.com> AuthorDate: Thu Dec 5 06:57:25 2024 -0800 Batch reload api to specify what segments to be reloaded on what servers to be more flexible (#14544) * extend existing reload all segments API to make it more flexible, by taking a map to reload different batch of segments on different instances --- .../api/resources/PinotSegmentRestletResource.java | 129 +++++++++++++++++---- .../helix/core/PinotHelixResourceManager.java | 93 +++++++++++---- .../resources/PinotSegmentRestletResourceTest.java | 88 ++++++++++++++ .../segment/spi/creator/name/SegmentNameUtils.java | 2 + .../api/resources/ControllerJobStatusResource.java | 53 ++++----- .../apache/pinot/spi/utils/CommonConstants.java | 1 + 6 files changed, 290 insertions(+), 76 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index 7499098780..37e365bc7f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -18,8 +18,9 @@ */ package org.apache.pinot.controller.api.resources; -import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.base.Strings; import com.google.common.collect.BiMap; @@ -39,6 +40,7 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.TreeMap; import java.util.concurrent.Executor; import java.util.stream.Collectors; @@ -87,6 +89,7 @@ import org.apache.pinot.controller.util.TableTierReader; import org.apache.pinot.core.auth.Actions; import org.apache.pinot.core.auth.Authorize; import org.apache.pinot.core.auth.TargetType; +import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.DateTimeFieldSpec; @@ -403,8 +406,8 @@ public class PinotSegmentRestletResource { int numReloadMsgSent = msgInfo.getLeft(); if (numReloadMsgSent > 0) { try { - if (_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, msgInfo.getRight(), - startTimeMs, numReloadMsgSent)) { + if (_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentName, targetInstance, + msgInfo.getRight(), startTimeMs, numReloadMsgSent)) { zkJobMetaWriteSuccess = true; } else { LOGGER.error("Failed to add reload segment job meta into zookeeper for table: {}, segment: {}", @@ -533,20 +536,11 @@ public class PinotSegmentRestletResource { } String tableNameWithType = controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE); - Map<String, List<String>> serverToSegments; - - String singleSegmentName = + String segmentNames = controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME); - if (singleSegmentName != null) { - // No need to query servers where this segment is not supposed to be hosted - serverToSegments = new TreeMap<>(); - List<String> segmentList = Collections.singletonList(singleSegmentName); - _pinotHelixResourceManager.getServers(tableNameWithType, singleSegmentName).forEach(server -> { - serverToSegments.put(server, segmentList); - }); - } else { - serverToSegments = _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType); - } + String instanceName = + controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME); + Map<String, List<String>> serverToSegments = getServerToSegments(tableNameWithType, segmentNames, instanceName); BiMap<String, String> serverEndPoints = _pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet()); @@ -554,13 +548,16 @@ public class PinotSegmentRestletResource { new CompletionServiceHelper(_executor, _connectionManager, serverEndPoints); List<String> serverUrls = new ArrayList<>(); - BiMap<String, String> endpointsToServers = serverEndPoints.inverse(); - for (String endpoint : endpointsToServers.keySet()) { + for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) { + String server = entry.getKey(); + String endpoint = entry.getValue(); String reloadTaskStatusEndpoint = endpoint + "/controllerJob/reloadStatus/" + tableNameWithType + "?reloadJobTimestamp=" + controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS); - if (singleSegmentName != null) { - reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName=" + singleSegmentName; + if (segmentNames != null) { + List<String> targetSegments = serverToSegments.get(server); + reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName=" + StringUtils.join(targetSegments, + SegmentNameUtils.SEGMENT_NAME_SEPARATOR); } serverUrls.add(reloadTaskStatusEndpoint); } @@ -615,6 +612,31 @@ public class PinotSegmentRestletResource { return serverReloadControllerJobStatusResponse; } + @VisibleForTesting + Map<String, List<String>> getServerToSegments(String tableNameWithType, @Nullable String segmentNames, + @Nullable String instanceName) { + if (segmentNames == null) { + // instanceName can be null or not null, and this method below can handle both cases. + return _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType, instanceName); + } + // Skip servers and segments not involved in the segment reloading job. + List<String> segmnetNameList = new ArrayList<>(); + Collections.addAll(segmnetNameList, StringUtils.split(segmentNames, SegmentNameUtils.SEGMENT_NAME_SEPARATOR)); + if (instanceName != null) { + return Map.of(instanceName, segmnetNameList); + } + // If instance is null, then either one or all segments are being reloaded via current segment reload restful APIs. + // And the if-check at the beginning of this method has handled the case of reloading all segments. So here we + // expect only one segment name. + Preconditions.checkState(segmnetNameList.size() == 1, "Only one segment is expected but got: %s", segmnetNameList); + Map<String, List<String>> serverToSegments = new HashMap<>(); + Set<String> servers = _pinotHelixResourceManager.getServers(tableNameWithType, segmentNames); + for (String server : servers) { + serverToSegments.put(server, Collections.singletonList(segmentNames)); + } + return serverToSegments; + } + @POST @Path("segments/{tableName}/reload") @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.RELOAD_SEGMENT) @@ -627,10 +649,11 @@ public class PinotSegmentRestletResource { @ApiParam(value = "Whether to force server to download segment") @QueryParam("forceDownload") @DefaultValue("false") boolean forceDownload, @ApiParam(value = "Name of the target instance to reload") @QueryParam("targetInstance") @Nullable - String targetInstance, @Context HttpHeaders headers) - throws JsonProcessingException { + String targetInstance, + @ApiParam(value = "Map from instances to segments to reload. This param takes precedence over targetInstance") + @QueryParam("instanceToSegmentsMap") @Nullable String instanceToSegmentsMapInJson, @Context HttpHeaders headers) + throws IOException { tableName = DatabaseUtils.translateTableName(tableName, headers); - long startTimeMs = System.currentTimeMillis(); TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName); TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr); // When rawTableName is provided but w/o table type, Pinot tries to reload both OFFLINE @@ -644,6 +667,20 @@ public class PinotSegmentRestletResource { List<String> tableNamesWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableTypeFromRequest, LOGGER); + if (instanceToSegmentsMapInJson != null) { + Map<String, List<String>> instanceToSegmentsMap = + JsonUtils.stringToObject(instanceToSegmentsMapInJson, new TypeReference<>() { + }); + Map<String, Map<String, Map<String, String>>> tableInstanceMsgData = + reloadSegments(tableNamesWithType, forceDownload, instanceToSegmentsMap); + if (tableInstanceMsgData.isEmpty()) { + throw new ControllerApplicationException(LOGGER, + String.format("Failed to find any segments in table: %s with instanceToSegmentsMap: %s", tableName, + instanceToSegmentsMap), Status.NOT_FOUND); + } + return new SuccessResponse(JsonUtils.objectToString(tableInstanceMsgData)); + } + long startTimeMs = System.currentTimeMillis(); Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>(); for (String tableNameWithType : tableNamesWithType) { Pair<Integer, String> msgInfo = @@ -658,8 +695,8 @@ public class PinotSegmentRestletResource { perTableMsgData.put(tableNameWithType, tableReloadMeta); // Store in ZK try { - if (_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, msgInfo.getRight(), startTimeMs, - numReloadMsgSent)) { + if (_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType, targetInstance, msgInfo.getRight(), + startTimeMs, numReloadMsgSent)) { tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS"); } else { tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED"); @@ -678,6 +715,48 @@ public class PinotSegmentRestletResource { return new SuccessResponse(JsonUtils.objectToString(perTableMsgData)); } + private Map<String, Map<String, Map<String, String>>> reloadSegments(List<String> tableNamesWithType, + boolean forceDownload, Map<String, List<String>> instanceToSegmentsMap) { + long startTimeMs = System.currentTimeMillis(); + Map<String, Map<String, Map<String, String>>> tableInstanceMsgData = new LinkedHashMap<>(); + for (String tableNameWithType : tableNamesWithType) { + Map<String, Pair<Integer, String>> instanceMsgInfoMap = + _pinotHelixResourceManager.reloadSegments(tableNameWithType, forceDownload, instanceToSegmentsMap); + Map<String, Map<String, String>> instanceMsgData = + tableInstanceMsgData.computeIfAbsent(tableNameWithType, t -> new HashMap<>()); + for (Map.Entry<String, Pair<Integer, String>> instanceMsgInfo : instanceMsgInfoMap.entrySet()) { + String instance = instanceMsgInfo.getKey(); + Pair<Integer, String> msgInfo = instanceMsgInfo.getValue(); + int numReloadMsgSent = msgInfo.getLeft(); + if (numReloadMsgSent <= 0) { + continue; + } + Map<String, String> tableReloadMeta = new HashMap<>(); + tableReloadMeta.put("numMessagesSent", String.valueOf(numReloadMsgSent)); + tableReloadMeta.put("reloadJobId", msgInfo.getRight()); + instanceMsgData.put(instance, tableReloadMeta); + // Store in ZK + try { + String segmentNames = + StringUtils.join(instanceToSegmentsMap.get(instance), SegmentNameUtils.SEGMENT_NAME_SEPARATOR); + if (_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType, segmentNames, instance, + msgInfo.getRight(), startTimeMs, numReloadMsgSent)) { + tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS"); + } else { + tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED"); + LOGGER.error("Failed to add batch reload job meta into zookeeper for table: {} targeted instance: {}", + tableNameWithType, instance); + } + } catch (Exception e) { + tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED"); + LOGGER.error("Failed to add batch reload job meta into zookeeper for table: {} targeted instance: {}", + tableNameWithType, instance, e); + } + } + } + return tableInstanceMsgData; + } + @DELETE @Produces(MediaType.APPLICATION_JSON) @Path("/segments/{tableName}/{segmentName}") diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 1f1e95877f..42bd8c4ac4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2193,56 +2193,65 @@ public class PinotHelixResourceManager { /** * Adds a new reload segment job metadata into ZK * @param tableNameWithType Table for which job is to be added - * @param segmentName Name of the segment being reloaded + * @param segmentNames Name of the segments being reloaded, separated by comma + * @param instanceName Name of the instance done the segment reloading, optional. * @param jobId job's UUID * @param jobSubmissionTimeMs time at which the job was submitted * @param numMessagesSent number of messages that were sent to servers. Saved as metadata * @return boolean representing success / failure of the ZK write step */ - public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentName, String jobId, - long jobSubmissionTimeMs, int numMessagesSent) { + public boolean addNewReloadSegmentJob(String tableNameWithType, String segmentNames, @Nullable String instanceName, + String jobId, long jobSubmissionTimeMs, int numMessagesSent) { Map<String, String> jobMetadata = new HashMap<>(); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numMessagesSent)); - jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentName); + jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME, segmentNames); + if (instanceName != null) { + jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME, instanceName); + } return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.RELOAD_SEGMENT); } - public boolean addNewForceCommitJob(String tableNameWithType, String jobId, long jobSubmissionTimeMs, - Set<String> consumingSegmentsCommitted) - throws JsonProcessingException { - Map<String, String> jobMetadata = new HashMap<>(); - jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); - jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); - jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.FORCE_COMMIT); - jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); - jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST, - JsonUtils.objectToString(consumingSegmentsCommitted)); - return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.FORCE_COMMIT); - } - /** * Adds a new reload segment job metadata into ZK * @param tableNameWithType Table for which job is to be added + * @param instanceName Name of the instance done the segment reloading, optional. * @param jobId job's UUID * @param jobSubmissionTimeMs time at which the job was submitted * @param numberOfMessagesSent number of messages that were sent to servers. Saved as metadata * @return boolean representing success / failure of the ZK write step */ - public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String jobId, long jobSubmissionTimeMs, - int numberOfMessagesSent) { + public boolean addNewReloadAllSegmentsJob(String tableNameWithType, @Nullable String instanceName, String jobId, + long jobSubmissionTimeMs, int numberOfMessagesSent) { Map<String, String> jobMetadata = new HashMap<>(); jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.RELOAD_SEGMENT); jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT, Integer.toString(numberOfMessagesSent)); + if (instanceName != null) { + jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME, instanceName); + } return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.RELOAD_SEGMENT); } + + public boolean addNewForceCommitJob(String tableNameWithType, String jobId, long jobSubmissionTimeMs, + Set<String> consumingSegmentsCommitted) + throws JsonProcessingException { + Map<String, String> jobMetadata = new HashMap<>(); + jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId); + jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE, tableNameWithType); + jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE, ControllerJobType.FORCE_COMMIT); + jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, Long.toString(jobSubmissionTimeMs)); + jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST, + JsonUtils.objectToString(consumingSegmentsCommitted)); + return addControllerJobToZK(jobId, jobMetadata, ControllerJobType.FORCE_COMMIT); + } + /** * Adds a new job metadata for controller job like table rebalance or reload into ZK * @param jobId job's UUID @@ -2605,6 +2614,42 @@ public class PinotHelixResourceManager { sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true); } + public Map<String, Pair<Integer, String>> reloadSegments(String tableNameWithType, boolean forceDownload, + Map<String, List<String>> instanceToSegmentsMap) { + LOGGER.info("Sending reload messages for table: {} with forceDownload: {}, and instanceToSegmentsMap: {}", + tableNameWithType, forceDownload, instanceToSegmentsMap); + + if (forceDownload) { + TableType tt = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + // TODO: support to force download immutable segments from RealTime table. + Preconditions.checkArgument(tt == TableType.OFFLINE, + "Table: %s is not an OFFLINE table, which is required to force to download segments", tableNameWithType); + } + // Infinite timeout on the recipient + int timeoutMs = -1; + Map<String, Pair<Integer, String>> instanceMsgInfoMap = new HashMap<>(); + for (Map.Entry<String, List<String>> entry : instanceToSegmentsMap.entrySet()) { + String targetInstance = entry.getKey(); + List<String> segments = entry.getValue(); + Criteria recipientCriteria = new Criteria(); + recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); + recipientCriteria.setInstanceName(targetInstance); + recipientCriteria.setResource(tableNameWithType); + recipientCriteria.setSessionSpecific(true); + SegmentReloadMessage segmentReloadMessage = new SegmentReloadMessage(tableNameWithType, segments, forceDownload); + ClusterMessagingService messagingService = _helixZkManager.getMessagingService(); + int numMessagesSent = messagingService.send(recipientCriteria, segmentReloadMessage, null, timeoutMs); + if (numMessagesSent > 0) { + LOGGER.info("Sent {} reload messages to instance: {} for table: {}", numMessagesSent, targetInstance, + tableNameWithType); + } else { + LOGGER.warn("No reload message sent to instance: {} for table: {}", targetInstance, tableNameWithType); + } + instanceMsgInfoMap.put(targetInstance, Pair.of(numMessagesSent, segmentReloadMessage.getMsgId())); + } + return instanceMsgInfoMap; + } + public Pair<Integer, String> reloadAllSegments(String tableNameWithType, boolean forceDownload, @Nullable String targetInstance) { LOGGER.info("Sending reload message for table: {} with forceDownload: {}, and target: {}", tableNameWithType, @@ -2985,6 +3030,10 @@ public class PinotHelixResourceManager { * the ideal state because they are not supposed to be served. */ public Map<String, List<String>> getServerToSegmentsMap(String tableNameWithType) { + return getServerToSegmentsMap(tableNameWithType, null); + } + + public Map<String, List<String>> getServerToSegmentsMap(String tableNameWithType, @Nullable String targetServer) { Map<String, List<String>> serverToSegmentsMap = new TreeMap<>(); IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType); if (idealState == null) { @@ -2993,8 +3042,12 @@ public class PinotHelixResourceManager { for (Map.Entry<String, Map<String, String>> entry : idealState.getRecord().getMapFields().entrySet()) { String segmentName = entry.getKey(); for (Map.Entry<String, String> instanceStateEntry : entry.getValue().entrySet()) { + String server = instanceStateEntry.getKey(); + if (targetServer != null && !server.equals(targetServer)) { + continue; + } if (!instanceStateEntry.getValue().equals(SegmentStateModel.OFFLINE)) { - serverToSegmentsMap.computeIfAbsent(instanceStateEntry.getKey(), key -> new ArrayList<>()).add(segmentName); + serverToSegmentsMap.computeIfAbsent(server, key -> new ArrayList<>()).add(segmentName); } } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java new file mode 100644 index 0000000000..392fc05bd8 --- /dev/null +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + + +public class PinotSegmentRestletResourceTest { + @Mock + PinotHelixResourceManager _pinotHelixResourceManager; + + @InjectMocks + PinotSegmentRestletResource _pinotSegmentRestletResource; + + @BeforeMethod + public void setup() { + MockitoAnnotations.openMocks(this); + } + + @Test + public void testGetServerToSegments() { + String tableName = "testTable"; + Map<String, List<String>> fullServerToSegmentsMap = new HashMap<>(); + fullServerToSegmentsMap.put("svr01", new ArrayList<>(List.of("seg01", "seg02"))); + fullServerToSegmentsMap.put("svr02", new ArrayList<>(List.of("seg02", "seg03"))); + fullServerToSegmentsMap.put("svr03", new ArrayList<>(List.of("seg03", "seg01"))); + when(_pinotHelixResourceManager.getServerToSegmentsMap(tableName, null)).thenReturn(fullServerToSegmentsMap); + when(_pinotHelixResourceManager.getServerToSegmentsMap(tableName, "svr02")).thenReturn( + Map.of("svr02", new ArrayList<>(List.of("seg02", "seg03")))); + when(_pinotHelixResourceManager.getServers(tableName, "seg01")).thenReturn(Set.of("svr01", "svr03")); + + // Get all servers and all their segments. + Map<String, List<String>> serverToSegmentsMap = + _pinotSegmentRestletResource.getServerToSegments(tableName, null, null); + assertEquals(serverToSegmentsMap, fullServerToSegmentsMap); + + // Get all segments on svr02. + serverToSegmentsMap = _pinotSegmentRestletResource.getServerToSegments(tableName, null, "svr02"); + assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg02", "seg03"))); + + // Get all servers with seg01. + serverToSegmentsMap = _pinotSegmentRestletResource.getServerToSegments(tableName, "seg01", null); + assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01"), "svr03", List.of("seg01"))); + + // Simply map the provided server to the provided segments. + serverToSegmentsMap = _pinotSegmentRestletResource.getServerToSegments(tableName, "seg01", "svr01"); + assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01"))); + serverToSegmentsMap = _pinotSegmentRestletResource.getServerToSegments(tableName, "anySegment", "anyServer"); + assertEquals(serverToSegmentsMap, Map.of("anyServer", List.of("anySegment"))); + serverToSegmentsMap = _pinotSegmentRestletResource.getServerToSegments(tableName, "seg01|seg02", "svr02"); + assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg01", "seg02"))); + try { + _pinotSegmentRestletResource.getServerToSegments(tableName, "seg01,seg02", null); + } catch (Exception e) { + assertTrue(e.getMessage().contains("Only one segment is expected but got: [seg01, seg02]")); + } + } +} diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameUtils.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameUtils.java index f3000c994a..fcd2ec55da 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameUtils.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameUtils.java @@ -25,6 +25,8 @@ import java.util.regex.Pattern; * Utils for segment names. */ public class SegmentNameUtils { + // According to the invalid name pattern below, `|` is safer than `,` as the segment name separator. + public static final char SEGMENT_NAME_SEPARATOR = '|'; private static final Pattern INVALID_SEGMENT_NAME_REGEX = Pattern.compile(".*[\\\\/:\\*?\"<>|].*"); private SegmentNameUtils() { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java index 66ebdd88cc..9bd96c0da9 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java @@ -20,6 +20,8 @@ package org.apache.pinot.server.api.resources; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import javax.inject.Inject; import javax.ws.rs.GET; @@ -30,9 +32,11 @@ import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; +import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.utils.DatabaseUtils; import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils; import org.apache.pinot.server.starter.ServerInstance; import org.apache.pinot.server.starter.helix.SegmentReloadStatusValue; import org.apache.pinot.spi.utils.JsonUtils; @@ -51,45 +55,32 @@ public class ControllerJobStatusResource { @ApiOperation(value = "Task status", notes = "Return the status of a given reload job") public String reloadJobStatus(@PathParam("tableNameWithType") String tableNameWithType, @QueryParam("reloadJobTimestamp") long reloadJobSubmissionTimestamp, - @QueryParam("segmentName") String segmentName, - @Context HttpHeaders headers) + @QueryParam("segmentName") String segmentName, @Context HttpHeaders headers) throws Exception { tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType, headers); TableDataManager tableDataManager = ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableNameWithType); - + List<SegmentDataManager> segmentDataManagers; + long totalSegmentCount; if (segmentName == null) { - // All segments - List<SegmentDataManager> allSegments = tableDataManager.acquireAllSegments(); - try { - long successCount = 0; - for (SegmentDataManager segmentDataManager : allSegments) { - if (segmentDataManager.getLoadTimeMs() >= reloadJobSubmissionTimestamp) { - successCount++; - } - } - SegmentReloadStatusValue segmentReloadStatusValue = - new SegmentReloadStatusValue(allSegments.size(), successCount); - return JsonUtils.objectToString(segmentReloadStatusValue); - } finally { - for (SegmentDataManager segmentDataManager : allSegments) { - tableDataManager.releaseSegment(segmentDataManager); - } - } + segmentDataManagers = tableDataManager.acquireAllSegments(); + totalSegmentCount = segmentDataManagers.size(); } else { - SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName); - if (segmentDataManager == null) { - return JsonUtils.objectToString(new SegmentReloadStatusValue(0, 0)); - } - try { - int successCount = 0; + List<String> targetSegments = new ArrayList<>(); + Collections.addAll(targetSegments, StringUtils.split(segmentName, SegmentNameUtils.SEGMENT_NAME_SEPARATOR)); + segmentDataManagers = tableDataManager.acquireSegments(targetSegments, new ArrayList<>()); + totalSegmentCount = targetSegments.size(); + } + try { + long successCount = 0; + for (SegmentDataManager segmentDataManager : segmentDataManagers) { if (segmentDataManager.getLoadTimeMs() >= reloadJobSubmissionTimestamp) { - successCount = 1; + successCount++; } - SegmentReloadStatusValue segmentReloadStatusValue = - new SegmentReloadStatusValue(1, successCount); - return JsonUtils.objectToString(segmentReloadStatusValue); - } finally { + } + return JsonUtils.objectToString(new SegmentReloadStatusValue(totalSegmentCount, successCount)); + } finally { + for (SegmentDataManager segmentDataManager : segmentDataManagers) { tableDataManager.releaseSegment(segmentDataManager); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 89046dec81..06c7184f4e 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -978,6 +978,7 @@ public class CommonConstants { * Segment reload job ZK props */ public static final String SEGMENT_RELOAD_JOB_SEGMENT_NAME = "segmentName"; + public static final String SEGMENT_RELOAD_JOB_INSTANCE_NAME = "instanceName"; // Force commit job ZK props public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST = "segmentsForceCommitted"; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org