This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new 37f2e28 Segment reset API (#6336) 37f2e28 is described below commit 37f2e28a37df53f14daf4ce9205a71f9318dce1e Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Wed Dec 30 13:47:40 2020 -0800 Segment reset API (#6336) Adding a reset API. This API will disable and then enable the segment. This API will be useful in case of resetting consumers which are stuck as reported in #6308. * If the segment is in ERROR state, invoking this API will send state transitions first to OFFLINE, wait for EV to stabilize, and then back to ONLINE/CONSUMING. * If segment is ONLINE/CONSUMING, invoking this API will send state transitions, first to OFFLINE, wait for EV to stabilize, and then back to ONLINE/CONSUMING. --- .../api/resources/PinotSegmentRestletResource.java | 65 ++++++++- .../helix/core/PinotHelixResourceManager.java | 155 +++++++++++++++++++++ 2 files changed, 218 insertions(+), 2 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index b62ce61..0078c54 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -20,6 +20,7 @@ package org.apache.pinot.controller.api.resources; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; +import com.google.common.base.Preconditions; import io.swagger.annotations.Api; import io.swagger.annotations.ApiOperation; import io.swagger.annotations.ApiParam; @@ -32,11 +33,11 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; +import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; import javax.ws.rs.Encoded; import javax.ws.rs.GET; import javax.ws.rs.POST; @@ -63,7 +64,6 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse; import org.apache.pinot.controller.util.ConsumingSegmentInfoReader; import org.apache.pinot.controller.util.TableMetadataReader; -import org.apache.pinot.controller.util.TableSizeReader; import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; @@ -355,6 +355,67 @@ public class PinotSegmentRestletResource { } } + /** + * Resets the segment of the table, by disabling and then enabling it. + * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state, + * thus effective in resetting segments or consumers in error states. + */ + @POST + @Path("segments/{tableNameWithType}/{segmentName}/reset") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Resets a segment by first disabling it, waiting for external view to stabilize, and finally enabling it again", notes = "Resets a segment by disabling and then enabling the segment") + public SuccessResponse resetSegment( + @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType, + @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName, + @ApiParam(value = "Maximum time in milliseconds to wait for reset to be completed. By default, uses serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long maxWaitTimeMs) { + segmentName = URIUtils.decode(segmentName); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + try { + Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType); + _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName, + maxWaitTimeMs > 0 ? maxWaitTimeMs : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); + return new SuccessResponse( + String.format("Successfully reset segment: %s of table: %s", segmentName, tableNameWithType)); + } catch (IllegalStateException e) { + throw new ControllerApplicationException(LOGGER, + String.format("Failed to reset segments in table: %s. %s", tableNameWithType, e.getMessage()), + Status.NOT_FOUND); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, + String.format("Failed to reset segment: %s of table: %s. %s", segmentName, tableNameWithType, e.getMessage()), + Status.INTERNAL_SERVER_ERROR); + } + } + + /** + * Resets all segments of the given table + * This API will take segments to OFFLINE state, wait for External View to stabilize, and then back to ONLINE/CONSUMING state, + * thus effective in resetting segments or consumers in error states. + */ + @POST + @Path("segments/{tableNameWithType}/reset") + @Produces(MediaType.APPLICATION_JSON) + @ApiOperation(value = "Resets all segments of the table, by first disabling them, waiting for external view to stabilize, and finally enabling the segments", notes = "Resets a segment by disabling and then enabling a segment") + public SuccessResponse resetAllSegments( + @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType, + @ApiParam(value = "Maximum time in milliseconds to wait for reset to be completed. By default, uses serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long maxWaitTimeMs) { + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + try { + Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType); + _pinotHelixResourceManager.resetAllSegments(tableNameWithType, + maxWaitTimeMs > 0 ? maxWaitTimeMs : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); + return new SuccessResponse(String.format("Successfully reset all segments of table: %s", tableNameWithType)); + } catch (IllegalStateException e) { + throw new ControllerApplicationException(LOGGER, + String.format("Failed to reset segments in table: %s. %s", tableNameWithType, e.getMessage()), + Status.NOT_FOUND); + } catch (Exception e) { + throw new ControllerApplicationException(LOGGER, + String.format("Failed to reset segments in table: %s. %s", tableNameWithType, e.getMessage()), + Status.INTERNAL_SERVER_ERROR); + } + } + @Deprecated @POST @Path("tables/{tableName}/segments/{segmentName}/reload") diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 6b5168f..6eb6ebe 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -25,6 +25,7 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; +import com.google.common.collect.Lists; import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; @@ -45,6 +46,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import javax.annotation.Nullable; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.configuration.Configuration; import org.apache.commons.lang3.StringUtils; import org.apache.helix.AccessOption; @@ -1778,6 +1780,159 @@ public class PinotHelixResourceManager { } /** + * Resets a segment. The steps involved are + * 1. If segment is in ERROR state in the External View, invoke resetPartition, else invoke disablePartition + * 2. Wait for the external view to stabilize. Step 1 should turn the segment to OFFLINE state + * 3. Invoke enablePartition on the segment + */ + public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) + throws InterruptedException, TimeoutException { + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); + ExternalView externalView = getTableExternalView(tableNameWithType); + Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); + Set<String> instanceSet = idealState.getInstanceSet(segmentName); + Preconditions + .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find segment: %s in ideal state for table: %s"); + Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); + + // First, disable or reset the segment + for (String instance : instanceSet) { + if (externalViewStateMap == null || !SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { + LOGGER.info("Disabling segment: {} of table: {}", segmentName, tableNameWithType); + // enablePartition takes a segment which is NOT in ERROR state, to OFFLINE state + _helixAdmin + .enablePartition(false, _helixClusterName, instance, tableNameWithType, Lists.newArrayList(segmentName)); + } else { + LOGGER.info("Resetting segment: {} of table: {}", segmentName, tableNameWithType); + // resetPartition takes a segment which is in ERROR state, to OFFLINE state + _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, Lists.newArrayList(segmentName)); + } + } + + // Wait for external view to stabilize + LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of segment: {} of table: {}", + externalViewWaitTimeMs, segmentName, tableNameWithType); + long startTime = System.currentTimeMillis(); + Set<String> instancesToCheck = new HashSet<>(instanceSet); + while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { + ExternalView newExternalView = getTableExternalView(tableNameWithType); + Preconditions + .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType); + Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName); + if (newExternalViewStateMap == null) { + continue; + } + instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))); + Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS); + } + if (!instancesToCheck.isEmpty()) { + throw new TimeoutException(String.format( + "Timed out waiting for external view to stabilize after call to disable/reset segment: %s of table: %s. " + + "Disable/reset might complete in the background, but skipping enable of segment.", segmentName, + tableNameWithType)); + } + + // Lastly, enable segment + LOGGER.info("Enabling segment: {} of table: {}", segmentName, tableNameWithType); + for (String instance : instanceSet) { + _helixAdmin + .enablePartition(true, _helixClusterName, instance, tableNameWithType, Lists.newArrayList(segmentName)); + } + } + + /** + * Resets all segments of a table. The steps involved are + * 1. If segment is in ERROR state in the External View, invoke resetPartition, else invoke disablePartition + * 2. Wait for the external view to stabilize. Step 1 should turn all segments to OFFLINE state + * 3. Invoke enablePartition on the segments + */ + public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) + throws InterruptedException, TimeoutException { + IdealState idealState = getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); + ExternalView externalView = getTableExternalView(tableNameWithType); + Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); + + Map<String, Set<String>> instanceToResetSegmentsMap = new HashMap<>(); + Map<String, Set<String>> instanceToDisableSegmentsMap = new HashMap<>(); + Map<String, Set<String>> segmentInstancesToCheck = new HashMap<>(); + + for (String segmentName : idealState.getPartitionSet()) { + Set<String> instanceSet = idealState.getInstanceSet(segmentName); + Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); + for (String instance : instanceSet) { + if (externalViewStateMap == null || !SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { + instanceToDisableSegmentsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(segmentName); + } else { + instanceToResetSegmentsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(segmentName); + } + } + segmentInstancesToCheck.put(segmentName, new HashSet<>(instanceSet)); + } + + // First, disable/reset the segments + LOGGER.info("Disabling/resetting segments of table: {}", tableNameWithType); + for (Map.Entry<String, Set<String>> entry : instanceToResetSegmentsMap.entrySet()) { + // resetPartition takes a segment which is in ERROR state, to OFFLINE state + _helixAdmin + .resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, Lists.newArrayList(entry.getValue())); + } for (Map.Entry<String, Set<String>> entry : instanceToDisableSegmentsMap.entrySet()) { + // enablePartition takes a segment which is NOT in ERROR state, to OFFLINE state + _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, + Lists.newArrayList(entry.getValue())); + } + + // Wait for external view to stabilize + LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of segments of table: {}", + externalViewWaitTimeMs, tableNameWithType); + long startTime = System.currentTimeMillis(); + while (!segmentInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { + ExternalView newExternalView = getTableExternalView(tableNameWithType); + Preconditions + .checkState(newExternalView != null, "Could not find external view for table: %s", tableNameWithType); + Iterator<Map.Entry<String, Set<String>>> iterator = segmentInstancesToCheck.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry<String, Set<String>> entryToCheck = iterator.next(); + String segmentToCheck = entryToCheck.getKey(); + Set<String> instancesToCheck = entryToCheck.getValue(); + Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentToCheck); + if (newExternalViewStateMap == null) { + continue; + } + boolean allOffline = true; + for (String instance : instancesToCheck) { + if (!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) { + allOffline = false; + break; + } + } + if (allOffline) { + iterator.remove(); + } + } + Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS); + } + if (!segmentInstancesToCheck.isEmpty()) { + throw new TimeoutException(String.format( + "Timed out waiting for external view to stabilize after call to disable/reset segments. " + + "Disable/reset might complete in the background, but skipping enable of segments of table: %s", + tableNameWithType)); + } + + // Lastly, enable segments + LOGGER.info("Enabling segments of table: {}", tableNameWithType); + for (Map.Entry<String, Set<String>> entry : instanceToResetSegmentsMap.entrySet()) { + _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), tableNameWithType, + Lists.newArrayList(entry.getValue())); + } + for (Map.Entry<String, Set<String>> entry : instanceToDisableSegmentsMap.entrySet()) { + _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), tableNameWithType, + Lists.newArrayList(entry.getValue())); + } + } + + /** * Sends a segment refresh message to: * <ul> * <li>Server: Refresh (replace) the segment by downloading a new one based on the segment ZK metadata</li> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org