This is an automated email from the ASF dual-hosted git repository. snlee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 9d55a46047 Instance retag validation check api (#11077) 9d55a46047 is described below commit 9d55a460470d76937b55ac8c4b814a6873e5239f Author: Shounak kulkarni <shounakmk...@gmail.com> AuthorDate: Wed Jul 26 03:13:27 2023 +0530 Instance retag validation check api (#11077) * API to validate safety of retag operations * code cleanup and proper error handling * checkstyle fixes * handle realtime and offline server tags separately * get all table configs at once * logic refactor * fixes * test cases * broker tenant min requirement behaviour fix * Added docs and comments to improve readability --- .../pinot/common/utils/config/TagNameUtils.java | 7 + .../api/resources/InstanceTagUpdateRequest.java | 50 +++++++ .../api/resources/OperationValidationResponse.java | 16 ++- .../resources/PinotInstanceRestletResource.java | 149 +++++++++++++++++++++ .../helix/core/PinotHelixResourceManager.java | 36 ++++- .../api/PinotInstanceRestletResourceTest.java | 125 +++++++++++++++-- .../utils/builder/ControllerRequestURLBuilder.java | 4 + 7 files changed, 373 insertions(+), 14 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TagNameUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TagNameUtils.java index 15ed490dc8..28a2e30d47 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TagNameUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/TagNameUtils.java @@ -102,6 +102,13 @@ public class TagNameUtils { return getTagForTenant(tenantName, REALTIME_SERVER_TAG_SUFFIX); } + /** + * Returns the server tag name for the given tenant and the given table type. + */ + public static String getServerTagForTenant(@Nullable String tenantName, TableType type) { + return getTagForTenant(tenantName, String.format("_%s", type)); + } + private static String getTagForTenant(@Nullable String tenantName, String tagSuffix) { if (tenantName == null) { return DEFAULT_TENANT_NAME + tagSuffix; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/InstanceTagUpdateRequest.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/InstanceTagUpdateRequest.java new file mode 100644 index 0000000000..c320387a4d --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/InstanceTagUpdateRequest.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.resources; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.swagger.annotations.ApiModel; +import io.swagger.annotations.ApiModelProperty; +import java.util.List; + + +@ApiModel +public class InstanceTagUpdateRequest { + @JsonProperty("instanceName") + @ApiModelProperty(example = "Server_a.b.com_20000") + private String _instanceName; + @JsonProperty("newTags") + private List<String> _newTags; + + public String getInstanceName() { + return _instanceName; + } + + public void setInstanceName(String instanceName) { + _instanceName = instanceName; + } + + public List<String> getNewTags() { + return _newTags; + } + + public void setNewTags(List<String> newTags) { + _newTags = newTags; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/OperationValidationResponse.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/OperationValidationResponse.java index 43897ccdc0..6b27e38e63 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/OperationValidationResponse.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/OperationValidationResponse.java @@ -58,14 +58,24 @@ public class OperationValidationResponse { return this; } + public OperationValidationResponse putAllIssues(List<ErrorWrapper> issues) { + _issues.addAll(issues); + return this; + } + public String getIssueMessage(int index) { return _issues.get(index).getMessage(); } public static class ErrorWrapper { + @JsonProperty("code") ErrorCode _code; + @JsonProperty("message") String _message; + public ErrorWrapper() { + } + public ErrorWrapper(ErrorCode code, String... args) { _code = code; _message = String.format(code._description, args); @@ -82,7 +92,11 @@ public class OperationValidationResponse { public enum ErrorCode { IS_ALIVE("Instance %s is still live"), - CONTAINS_RESOURCE("Instance %s exists in ideal state for %s"); + CONTAINS_RESOURCE("Instance %s exists in ideal state for %s"), + MINIMUM_INSTANCE_UNSATISFIED( + "Tenant '%s' will not satisfy minimum '%s' requirement if tag '%s' is removed from %s instance '%s'."), + ALREADY_DEFICIENT_TENANT("Tenant '%s' is low on '%s' instances by %s even after allocating instance %s"), + UNRECOGNISED_TAG_TYPE("The tag '%s' does not follow the suffix convention of either broker or server"); public final String _description; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java index dacd4a5e51..06ca9ec92f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceRestletResource.java @@ -20,6 +20,7 @@ package org.apache.pinot.controller.api.resources; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.common.collect.Sets; import io.swagger.annotations.Api; import io.swagger.annotations.ApiKeyAuthDefinition; import io.swagger.annotations.ApiOperation; @@ -29,8 +30,12 @@ import io.swagger.annotations.ApiResponses; import io.swagger.annotations.Authorization; import io.swagger.annotations.SecurityDefinition; import io.swagger.annotations.SwaggerDefinition; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.stream.Collectors; import javax.inject.Inject; import javax.ws.rs.ClientErrorException; @@ -49,6 +54,7 @@ import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import org.apache.helix.model.InstanceConfig; import org.apache.pinot.common.utils.config.InstanceUtils; +import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.controller.api.access.AccessType; import org.apache.pinot.controller.api.access.Authenticate; import org.apache.pinot.controller.api.exception.ControllerApplicationException; @@ -458,4 +464,147 @@ public class PinotInstanceRestletResource { Response.Status.INTERNAL_SERVER_ERROR, e); } } + + /** + * Endpoint to validate the safety of instance tag update requests. + * This is to ensure that any instance tag update operation that user wants to perform is safe and does not create any + * side effect on the cluster and disturb the cluster consistency. + * This operation does not perform any changes to the cluster, but surfaces the possible issues which might occur upon + * applying the intended changes. + * @param requests list if instance tag update requests + * @return list of {@link OperationValidationResponse} which denotes the validity of each request along with listing + * the issues if any. + */ + @POST + @Path("/instances/updateTags/validate") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Check if it's safe to update the tags of the given instances. If not list all the reasons.") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Success"), + @ApiResponse(code = 500, message = "Internal error") + }) + public List<OperationValidationResponse> instanceTagUpdateSafetyCheck(List<InstanceTagUpdateRequest> requests) { + LOGGER.info("Performing safety check on tag update request received for instances: {}", + requests.stream().map(InstanceTagUpdateRequest::getInstanceName).collect(Collectors.toList())); + Map<String, Integer> tagMinServerMap = _pinotHelixResourceManager.minimumInstancesRequiredForTags(); + Map<String, Integer> tagToInstanceCountMap = getUpdatedTagToInstanceCountMap(requests); + Map<String, Integer> tagDeficiency = computeTagDeficiency(tagToInstanceCountMap, tagMinServerMap); + + Map<String, List<OperationValidationResponse.ErrorWrapper>> responseMap = new HashMap<>(requests.size()); + List<OperationValidationResponse.ErrorWrapper> tenantIssues = new ArrayList<>(); + requests.forEach(request -> responseMap.put(request.getInstanceName(), new ArrayList<>())); + for (InstanceTagUpdateRequest request : requests) { + String name = request.getInstanceName(); + Set<String> oldTags; + try { + oldTags = new HashSet<>(_pinotHelixResourceManager.getTagsForInstance(name)); + } catch (NullPointerException exception) { + throw new ControllerApplicationException(LOGGER, + String.format("Instance %s is not a valid instance name.", name), Response.Status.PRECONDITION_FAILED); + } + Set<String> newTags = new HashSet<>(request.getNewTags()); + // tags removed from instance + for (String tag : Sets.difference(oldTags, newTags)) { + Integer deficiency = tagDeficiency.get(tag); + if (deficiency != null && deficiency > 0) { + String tenant = TagNameUtils.getTenantFromTag(tag); + String tagType = getInstanceTypeFromTag(tag); + responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper( + OperationValidationResponse.ErrorCode.MINIMUM_INSTANCE_UNSATISFIED, tenant, tagType, tag, tagType, name)); + tagDeficiency.put(tag, deficiency - 1); + } + } + // newly added tags to instance + for (String tag : newTags) { + String tagType = getInstanceTypeFromTag(tag); + if (tagType == null && (name.startsWith(CommonConstants.Helix.PREFIX_OF_BROKER_INSTANCE) + || name.startsWith(CommonConstants.Helix.PREFIX_OF_SERVER_INSTANCE))) { + responseMap.get(name).add(new OperationValidationResponse.ErrorWrapper( + OperationValidationResponse.ErrorCode.UNRECOGNISED_TAG_TYPE, tag)); + continue; + } + Integer deficiency = tagDeficiency.get(tag); + if (deficiency != null && deficiency > 0) { + tenantIssues.add(new OperationValidationResponse.ErrorWrapper( + OperationValidationResponse.ErrorCode.ALREADY_DEFICIENT_TENANT, TagNameUtils.getTenantFromTag(tag), + tagType, deficiency.toString(), name)); + } + } + } + + // consolidate all the issues based on instances + List<OperationValidationResponse> response = new ArrayList<>(requests.size()); + responseMap.forEach((instance, issueList) -> response.add(issueList.isEmpty() + ? new OperationValidationResponse().setInstanceName(instance).setSafe(true) + : new OperationValidationResponse().putAllIssues(issueList).setInstanceName(instance).setSafe(false))); + // separate entry to group all the deficient tenant issues as it's not related to any instance + if (!tenantIssues.isEmpty()) { + response.add(new OperationValidationResponse().putAllIssues(tenantIssues).setSafe(false)); + } + return response; + } + + private String getInstanceTypeFromTag(String tag) { + if (TagNameUtils.isServerTag(tag)) { + return "server"; + } else if (TagNameUtils.isBrokerTag(tag)) { + return "broker"; + } else { + return null; + } + } + + /** + * Compute the number of deficient instances for each tag. + * The utility accepts two maps + * - map of tags and count of their intended tagged instances + * - map of tags and their minimum number of instance requirements + * And then compares these two maps to return a map of tags and the number of their deficient instances. + * + * @param tagToInstanceCountMap tags and count of their intended tagged instances + * @param tagToMinInstanceCountMap tags and their minimum number of instance requirements + * @return tags and the number of their deficient instances + */ + private Map<String, Integer> computeTagDeficiency(Map<String, Integer> tagToInstanceCountMap, + Map<String, Integer> tagToMinInstanceCountMap) { + Map<String, Integer> tagDeficiency = new HashMap<>(); + Map<String, Integer> tagToInstanceCountMapCopy = new HashMap<>(tagToInstanceCountMap); + // compute deficiency for each of the minimum instance requirement entry + tagToMinInstanceCountMap.forEach((tag, minInstances) -> { + Integer updatedInstances = tagToInstanceCountMapCopy.remove(tag); + // if tag is not present in the provided map its considered as if tag is not assigned to any instance + // hence deficiency = minimum instance requirement. + tagDeficiency.put(tag, minInstances - (updatedInstances != null ? updatedInstances : 0)); + }); + // tags for which minimum instance requirement is not specified are assumed to have no deficiency (deficiency = 0) + tagToInstanceCountMapCopy.forEach((tag, updatedInstances) -> tagDeficiency.put(tag, 0)); + return tagDeficiency; + } + + /** + * Utility to fetch the existing tags and count of their respective tagged instances and then apply the changes based + * on the provided list of {@link InstanceTagUpdateRequest} to get the updated map of tags and count of their + * respective tagged instances + * @param requests list of {@link InstanceTagUpdateRequest} + * @return map of tags and updated count of their respective tagged instances + */ + private Map<String, Integer> getUpdatedTagToInstanceCountMap(List<InstanceTagUpdateRequest> requests) { + Map<String, Integer> updatedTagInstanceMap = new HashMap<>(); + Set<String> visitedInstances = new HashSet<>(); + // build the map of tags and their respective instance counts from the given tag update request list + requests.forEach(instance -> { + instance.getNewTags().forEach(tag -> + updatedTagInstanceMap.put(tag, updatedTagInstanceMap.getOrDefault(tag, 0) + 1)); + visitedInstances.add(instance.getInstanceName()); + }); + // add the instance counts to tags for the rest of the instances apart from the ones mentioned in requests + _pinotHelixResourceManager.getAllInstances().forEach(instance -> { + if (!visitedInstances.contains(instance)) { + _pinotHelixResourceManager.getTagsForInstance(instance).forEach(tag -> + updatedTagInstanceMap.put(tag, updatedTagInstanceMap.getOrDefault(tag, 0) + 1)); + visitedInstances.add(instance); + } + }); + return updatedTagInstanceMap; + } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index fdbca20433..346a19a1d6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1225,7 +1225,7 @@ public class PinotHelixResourceManager { return tenantSet; } - private List<String> getTagsForInstance(String instanceName) { + public List<String> getTagsForInstance(String instanceName) { InstanceConfig config = _helixDataAccessor.getProperty(_keyBuilder.instanceConfig(instanceName)); return config.getTags(); } @@ -3023,6 +3023,15 @@ public class PinotHelixResourceManager { return ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType); } + /** + * Get all table configs. + * + * @return List of table configs. Empty list in case of tables configs does not exist. + */ + public List<TableConfig> getAllTableConfigs() { + return ZKMetadataProvider.getAllTableConfigs(_propertyStore); + } + /** * Get the offline table config for the given table name. * @@ -4128,6 +4137,31 @@ public class PinotHelixResourceManager { return new PeriodicTaskInvocationResponse(periodicTaskRequestId, messageCount > 0); } + /** + * Construct a map of all the tags and their respective minimum instance requirements. + * The minimum instance requirement is computed by + * - for BROKER tenant tag set it to 1 if it hosts any table else set it to 0 + * - for SERVER tenant tag iterate over all the tables of that tenant and find the maximum table replication. + * - for rest of the tags just set it to 0 + * @return map of tags and their minimum instance requirements + */ + public Map<String, Integer> minimumInstancesRequiredForTags() { + Map<String, Integer> tagMinInstanceMap = new HashMap<>(); + for (InstanceConfig instanceConfig : getAllHelixInstanceConfigs()) { + for (String tag : instanceConfig.getTags()) { + tagMinInstanceMap.put(tag, 0); + } + } + for (TableConfig tableConfig : getAllTableConfigs()) { + String tag = TagNameUtils.getServerTagForTenant(tableConfig.getTenantConfig().getServer(), + tableConfig.getTableType()); + tagMinInstanceMap.put(tag, Math.max(tagMinInstanceMap.getOrDefault(tag, 0), tableConfig.getReplication())); + String brokerTag = TagNameUtils.getBrokerTagForTenant(tableConfig.getTenantConfig().getBroker()); + tagMinInstanceMap.put(brokerTag, 1); + } + return tagMinInstanceMap; + } + /* * Uncomment and use for testing on a real cluster public static void main(String[] args) throws Exception { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java index 9b84c134eb..3bf0e2c70d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceRestletResourceTest.java @@ -19,24 +19,34 @@ package org.apache.pinot.controller.api; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.Lists; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.TreeMap; import javax.annotation.Nullable; +import org.apache.pinot.controller.api.resources.InstanceTagUpdateRequest; +import org.apache.pinot.controller.api.resources.OperationValidationResponse; import org.apache.pinot.controller.helix.ControllerTest; import org.apache.pinot.spi.config.instance.Instance; import org.apache.pinot.spi.config.instance.InstanceType; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.CommonConstants.Helix; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; @@ -47,22 +57,24 @@ import static org.testng.Assert.assertTrue; */ public class PinotInstanceRestletResourceTest extends ControllerTest { + private ControllerRequestURLBuilder _urlBuilder = null; + @BeforeClass public void setUp() throws Exception { DEFAULT_INSTANCE.setupSharedStateAndValidate(); + _urlBuilder = DEFAULT_INSTANCE.getControllerRequestURLBuilder(); } @Test public void testInstanceListingAndCreation() throws Exception { - ControllerRequestURLBuilder requestURLBuilder = DEFAULT_INSTANCE.getControllerRequestURLBuilder(); - String listInstancesUrl = requestURLBuilder.forInstanceList(); + String listInstancesUrl = _urlBuilder.forInstanceList(); int expectedNumInstances = 1 + DEFAULT_NUM_BROKER_INSTANCES + DEFAULT_NUM_SERVER_INSTANCES; checkNumInstances(listInstancesUrl, expectedNumInstances); // Create untagged broker and server instances - String createInstanceUrl = requestURLBuilder.forInstanceCreate(); + String createInstanceUrl = _urlBuilder.forInstanceCreate(); Instance brokerInstance1 = new Instance("1.2.3.4", 1234, InstanceType.BROKER, null, null, 0, 0, 0, 0, false); sendPostRequest(createInstanceUrl, brokerInstance1.toJsonString()); Instance serverInstance1 = @@ -110,14 +122,14 @@ public class PinotInstanceRestletResourceTest extends ControllerTest { new Instance("1.2.3.4", 1234, InstanceType.BROKER, Collections.singletonList(newBrokerTag), null, 0, 0, 0, 0, false); String brokerInstanceId = "Broker_1.2.3.4_1234"; - String brokerInstanceUrl = requestURLBuilder.forInstance(brokerInstanceId); + String brokerInstanceUrl = _urlBuilder.forInstance(brokerInstanceId); sendPutRequest(brokerInstanceUrl, newBrokerInstance.toJsonString()); String newServerTag = "new-server-tag"; Instance newServerInstance = new Instance("1.2.3.4", 2345, InstanceType.SERVER, Collections.singletonList(newServerTag), null, 28090, 28091, 28092, 28093, true); String serverInstanceId = "Server_1.2.3.4_2345"; - String serverInstanceUrl = requestURLBuilder.forInstance(serverInstanceId); + String serverInstanceUrl = _urlBuilder.forInstance(serverInstanceId); sendPutRequest(serverInstanceUrl, newServerInstance.toJsonString()); checkInstanceInfo(brokerInstanceId, "1.2.3.4", 1234, new String[]{newBrokerTag}, null, -1, -1, -1, -1, false); @@ -126,9 +138,9 @@ public class PinotInstanceRestletResourceTest extends ControllerTest { // Test Instance updateTags API String brokerInstanceUpdateTagsUrl = - requestURLBuilder.forInstanceUpdateTags(brokerInstanceId, Lists.newArrayList("tag_BROKER", "newTag_BROKER")); + _urlBuilder.forInstanceUpdateTags(brokerInstanceId, Lists.newArrayList("tag_BROKER", "newTag_BROKER")); sendPutRequest(brokerInstanceUpdateTagsUrl); - String serverInstanceUpdateTagsUrl = requestURLBuilder.forInstanceUpdateTags(serverInstanceId, + String serverInstanceUpdateTagsUrl = _urlBuilder.forInstanceUpdateTags(serverInstanceId, Lists.newArrayList("tag_REALTIME", "newTag_OFFLINE", "newTag_REALTIME")); sendPutRequest(serverInstanceUpdateTagsUrl); checkInstanceInfo(brokerInstanceId, "1.2.3.4", 1234, new String[]{"tag_BROKER", "newTag_BROKER"}, null, -1, -1, -1, @@ -137,10 +149,10 @@ public class PinotInstanceRestletResourceTest extends ControllerTest { new String[]{"tag_REALTIME", "newTag_OFFLINE", "newTag_REALTIME"}, null, 28090, 28091, 28092, 28093, true); // Test DELETE instance API - sendDeleteRequest(requestURLBuilder.forInstance("Broker_1.2.3.4_1234")); - sendDeleteRequest(requestURLBuilder.forInstance("Server_1.2.3.4_2345")); - sendDeleteRequest(requestURLBuilder.forInstance("Broker_2.3.4.5_1234")); - sendDeleteRequest(requestURLBuilder.forInstance("Server_2.3.4.5_2345")); + sendDeleteRequest(_urlBuilder.forInstance("Broker_1.2.3.4_1234")); + sendDeleteRequest(_urlBuilder.forInstance("Server_1.2.3.4_2345")); + sendDeleteRequest(_urlBuilder.forInstance("Broker_2.3.4.5_1234")); + sendDeleteRequest(_urlBuilder.forInstance("Server_2.3.4.5_2345")); checkNumInstances(listInstancesUrl, expectedNumInstances); } @@ -163,7 +175,7 @@ public class PinotInstanceRestletResourceTest extends ControllerTest { boolean queriesDisabled) throws Exception { JsonNode response = JsonUtils.stringToJsonNode( - ControllerTest.sendGetRequest(DEFAULT_INSTANCE.getControllerRequestURLBuilder().forInstance(instanceName))); + ControllerTest.sendGetRequest(_urlBuilder.forInstance(instanceName))); assertEquals(response.get("instanceName").asText(), instanceName); assertEquals(response.get("hostName").asText(), hostName); assertTrue(response.get("enabled").asBoolean()); @@ -193,6 +205,95 @@ public class PinotInstanceRestletResourceTest extends ControllerTest { } } + @Test + public void instanceRetagHappyPathTest() + throws IOException { + Map<String, List<String>> currentInstanceTagsMap = getCurrentInstanceTagsMap(); + List<InstanceTagUpdateRequest> request = new ArrayList<>(); + currentInstanceTagsMap.forEach((instance, tags) -> { + if (instance.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE) + || instance.startsWith(Helix.PREFIX_OF_BROKER_INSTANCE)) { + InstanceTagUpdateRequest payload = new InstanceTagUpdateRequest(); + payload.setInstanceName(instance); + payload.setNewTags(tags); + request.add(payload); + } + }); + List<OperationValidationResponse> response = Arrays.asList(new ObjectMapper().readValue( + sendPostRequest(_urlBuilder.forUpdateTagsValidation(), JsonUtils.objectToString(request)), + OperationValidationResponse[].class)); + assertNotNull(response); + response.forEach(item -> assertTrue(item.isSafe())); + } + + @Test + public void instanceRetagServerDeficiencyTest() + throws Exception { + String tableName = "testTable"; + TableConfig tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(tableName) + .setNumReplicas(2).build(); + // create table with replication as 2 so that DefaultTenant has a minimum server requirement as 2. + DEFAULT_INSTANCE.addTableConfig(tableConfig); + Map<String, List<String>> currentInstanceTagsMap = getCurrentInstanceTagsMap(); + List<InstanceTagUpdateRequest> request = new ArrayList<>(); + currentInstanceTagsMap.forEach((instance, tags) -> { + if (instance.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE) + || instance.startsWith(Helix.PREFIX_OF_BROKER_INSTANCE)) { + InstanceTagUpdateRequest payload = new InstanceTagUpdateRequest(); + payload.setInstanceName(instance); + payload.setNewTags(Lists.newArrayList()); + request.add(payload); + } + }); + List<OperationValidationResponse> response = Arrays.asList(new ObjectMapper().readValue( + sendPostRequest(_urlBuilder.forUpdateTagsValidation(), JsonUtils.objectToString(request)), + OperationValidationResponse[].class)); + assertNotNull(response); + + int deficientServers = 2; + int deficientBrokers = 1; + for (OperationValidationResponse item : response) { + String instanceName = item.getInstanceName(); + boolean validity = item.isSafe(); + if (!validity) { + List<OperationValidationResponse.ErrorWrapper> issues = item.getIssues(); + assertEquals(issues.size(), 1); + assertEquals(issues.get(0).getCode(), OperationValidationResponse.ErrorCode.MINIMUM_INSTANCE_UNSATISFIED); + if (instanceName.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) { + deficientServers--; + } else if (instanceName.startsWith(Helix.PREFIX_OF_BROKER_INSTANCE)) { + deficientBrokers--; + } + } + } + assertEquals(deficientServers, 0); + assertEquals(deficientBrokers, 0); + DEFAULT_INSTANCE.dropOfflineTable(tableName); + } + + private Map<String, List<String>> getCurrentInstanceTagsMap() + throws IOException { + String listInstancesUrl = _urlBuilder.forInstanceList(); + JsonNode response = JsonUtils.stringToJsonNode(sendGetRequest(listInstancesUrl)); + JsonNode instances = response.get("instances"); + Map<String, List<String>> map = new HashMap<>(instances.size()); + for (int i = 0; i < instances.size(); i++) { + String instance = instances.get(i).asText(); + map.put(instance, getInstanceTags(instance)); + } + return map; + } + + private List<String> getInstanceTags(String instance) + throws IOException { + String getInstancesUrl = _urlBuilder.forInstance(instance); + List<String> tags = new ArrayList<>(); + for (JsonNode tag : JsonUtils.stringToJsonNode(sendGetRequest(getInstancesUrl)).get("tags")) { + tags.add(tag.asText()); + } + return tags; + } + @AfterClass public void tearDown() throws Exception { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index 394d11e47e..fe04bc3e19 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -542,6 +542,10 @@ public class ControllerRequestURLBuilder { return StringUtil.join("/", _baseUrl, "tables", tableName, "pauseStatus"); } + public String forUpdateTagsValidation() { + return String.format("%s/instances/updateTags/validate", _baseUrl); + } + private static String encode(String s) { try { return URLEncoder.encode(s, "UTF-8"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org