This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 37f2e28  Segment reset API (#6336)
37f2e28 is described below

commit 37f2e28a37df53f14daf4ce9205a71f9318dce1e
Author: Neha Pawar <neha.pawa...@gmail.com>
AuthorDate: Wed Dec 30 13:47:40 2020 -0800

    Segment reset API (#6336)
    
    Adding a reset API. This API will disable and then enable the segment. This 
API will be useful in case of resetting consumers which are stuck as reported 
in #6308.
    * If the segment is in ERROR state, invoking this API will send state 
transitions first to OFFLINE, wait for EV to stabilize, and then back to 
ONLINE/CONSUMING.
    * If segment is ONLINE/CONSUMING, invoking this API will send state 
transitions, first to OFFLINE, wait for EV to stabilize, and then back to 
ONLINE/CONSUMING.
---
 .../api/resources/PinotSegmentRestletResource.java |  65 ++++++++-
 .../helix/core/PinotHelixResourceManager.java      | 155 +++++++++++++++++++++
 2 files changed, 218 insertions(+), 2 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index b62ce61..0078c54 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -20,6 +20,7 @@ package org.apache.pinot.controller.api.resources;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.base.Preconditions;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import io.swagger.annotations.ApiParam;
@@ -32,11 +33,11 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executor;
+import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 import javax.inject.Inject;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
-import javax.ws.rs.DefaultValue;
 import javax.ws.rs.Encoded;
 import javax.ws.rs.GET;
 import javax.ws.rs.POST;
@@ -63,7 +64,6 @@ import 
org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
 import org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
 import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
 import org.apache.pinot.controller.util.TableMetadataReader;
-import org.apache.pinot.controller.util.TableSizeReader;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -355,6 +355,67 @@ public class PinotSegmentRestletResource {
     }
   }
 
+  /**
+   * Resets the segment of the table, by disabling and then enabling it.
+   * This API will take segments to OFFLINE state, wait for External View to 
stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/{segmentName}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets a segment by first disabling it, waiting for 
external view to stabilize, and finally enabling it again", notes = "Resets a 
segment by disabling and then enabling the segment")
+  public SuccessResponse resetSegment(
+      @ApiParam(value = "Name of the table with type", required = true) 
@PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Name of the segment", required = true) 
@PathParam("segmentName") @Encoded String segmentName,
+      @ApiParam(value = "Maximum time in milliseconds to wait for reset to be 
completed. By default, uses serverAdminRequestTimeout") 
@QueryParam("maxWaitTimeMs") long maxWaitTimeMs) {
+    segmentName = URIUtils.decode(segmentName);
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name 
with type: %s", tableNameWithType);
+      _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName,
+          maxWaitTimeMs > 0 ? maxWaitTimeMs : 
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      return new SuccessResponse(
+          String.format("Successfully reset segment: %s of table: %s", 
segmentName, tableNameWithType));
+    } catch (IllegalStateException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segments in table: %s. %s", 
tableNameWithType, e.getMessage()),
+          Status.NOT_FOUND);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segment: %s of table: %s. %s", 
segmentName, tableNameWithType, e.getMessage()),
+          Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  /**
+   * Resets all segments of the given table
+   * This API will take segments to OFFLINE state, wait for External View to 
stabilize, and then back to ONLINE/CONSUMING state,
+   * thus effective in resetting segments or consumers in error states.
+   */
+  @POST
+  @Path("segments/{tableNameWithType}/reset")
+  @Produces(MediaType.APPLICATION_JSON)
+  @ApiOperation(value = "Resets all segments of the table, by first disabling 
them, waiting for external view to stabilize, and finally enabling the 
segments", notes = "Resets a segment by disabling and then enabling a segment")
+  public SuccessResponse resetAllSegments(
+      @ApiParam(value = "Name of the table with type", required = true) 
@PathParam("tableNameWithType") String tableNameWithType,
+      @ApiParam(value = "Maximum time in milliseconds to wait for reset to be 
completed. By default, uses serverAdminRequestTimeout") 
@QueryParam("maxWaitTimeMs") long maxWaitTimeMs) {
+    TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+    try {
+      Preconditions.checkState(tableType != null, "Must provide table name 
with type: %s", tableNameWithType);
+      _pinotHelixResourceManager.resetAllSegments(tableNameWithType,
+          maxWaitTimeMs > 0 ? maxWaitTimeMs : 
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      return new SuccessResponse(String.format("Successfully reset all 
segments of table: %s", tableNameWithType));
+    } catch (IllegalStateException e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segments in table: %s. %s", 
tableNameWithType, e.getMessage()),
+          Status.NOT_FOUND);
+    } catch (Exception e) {
+      throw new ControllerApplicationException(LOGGER,
+          String.format("Failed to reset segments in table: %s. %s", 
tableNameWithType, e.getMessage()),
+          Status.INTERNAL_SERVER_ERROR);
+    }
+  }
+
   @Deprecated
   @POST
   @Path("tables/{tableName}/segments/{segmentName}/reload")
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 6b5168f..6eb6ebe 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -25,6 +25,7 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
+import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -45,6 +46,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.AccessOption;
@@ -1778,6 +1780,159 @@ public class PinotHelixResourceManager {
   }
 
   /**
+   * Resets a segment. The steps involved are
+   *  1. If segment is in ERROR state in the External View, invoke 
resetPartition, else invoke disablePartition
+   *  2. Wait for the external view to stabilize. Step 1 should turn the 
segment to OFFLINE state
+   *  3. Invoke enablePartition on the segment
+   */
+  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs)
+      throws InterruptedException, TimeoutException {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions
+        .checkState(CollectionUtils.isNotEmpty(instanceSet), "Could not find 
segment: %s in ideal state for table: %s");
+    Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
+
+    // First, disable or reset the segment
+    for (String instance : instanceSet) {
+      if (externalViewStateMap == null || 
!SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Disabling segment: {} of table: {}", segmentName, 
tableNameWithType);
+        // enablePartition takes a segment which is NOT in ERROR state, to 
OFFLINE state
+        _helixAdmin
+            .enablePartition(false, _helixClusterName, instance, 
tableNameWithType, Lists.newArrayList(segmentName));
+      } else {
+        LOGGER.info("Resetting segment: {} of table: {}", segmentName, 
tableNameWithType);
+        // resetPartition takes a segment which is in ERROR state, to OFFLINE 
state
+        _helixAdmin.resetPartition(_helixClusterName, instance, 
tableNameWithType, Lists.newArrayList(segmentName));
+      }
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of segment: {} of table: {}",
+        externalViewWaitTimeMs, segmentName, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    Set<String> instancesToCheck = new HashSet<>(instanceSet);
+    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - 
startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view 
for table: %s", tableNameWithType);
+      Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(segmentName);
+      if (newExternalViewStateMap == null) {
+        continue;
+      }
+      instancesToCheck.removeIf(instance -> 
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
+      Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
+    }
+    if (!instancesToCheck.isEmpty()) {
+      throw new TimeoutException(String.format(
+          "Timed out waiting for external view to stabilize after call to 
disable/reset segment: %s of table: %s. "
+              + "Disable/reset might complete in the background, but skipping 
enable of segment.", segmentName,
+          tableNameWithType));
+    }
+
+    // Lastly, enable segment
+    LOGGER.info("Enabling segment: {} of table: {}", segmentName, 
tableNameWithType);
+    for (String instance : instanceSet) {
+      _helixAdmin
+          .enablePartition(true, _helixClusterName, instance, 
tableNameWithType, Lists.newArrayList(segmentName));
+    }
+  }
+
+  /**
+   * Resets all segments of a table. The steps involved are
+   * 1. If segment is in ERROR state in the External View, invoke 
resetPartition, else invoke disablePartition
+   * 2. Wait for the external view to stabilize. Step 1 should turn all 
segments to OFFLINE state
+   * 3. Invoke enablePartition on the segments
+   */
+  public void resetAllSegments(String tableNameWithType, long 
externalViewWaitTimeMs)
+      throws InterruptedException, TimeoutException {
+    IdealState idealState = getTableIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
+    ExternalView externalView = getTableExternalView(tableNameWithType);
+    Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
+
+    Map<String, Set<String>> instanceToResetSegmentsMap = new HashMap<>();
+    Map<String, Set<String>> instanceToDisableSegmentsMap = new HashMap<>();
+    Map<String, Set<String>> segmentInstancesToCheck = new HashMap<>();
+
+    for (String segmentName : idealState.getPartitionSet()) {
+      Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+      Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
+      for (String instance : instanceSet) {
+        if (externalViewStateMap == null || 
!SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
+          instanceToDisableSegmentsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(segmentName);
+        } else {
+          instanceToResetSegmentsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(segmentName);
+        }
+      }
+      segmentInstancesToCheck.put(segmentName, new HashSet<>(instanceSet));
+    }
+
+    // First, disable/reset the segments
+    LOGGER.info("Disabling/resetting segments of table: {}", 
tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : 
instanceToResetSegmentsMap.entrySet()) {
+      // resetPartition takes a segment which is in ERROR state, to OFFLINE 
state
+      _helixAdmin
+          .resetPartition(_helixClusterName, entry.getKey(), 
tableNameWithType, Lists.newArrayList(entry.getValue()));
+    } for (Map.Entry<String, Set<String>> entry : 
instanceToDisableSegmentsMap.entrySet()) {
+      // enablePartition takes a segment which is NOT in ERROR state, to 
OFFLINE state
+      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), 
tableNameWithType,
+          Lists.newArrayList(entry.getValue()));
+    }
+
+    // Wait for external view to stabilize
+    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of segments of table: {}",
+        externalViewWaitTimeMs, tableNameWithType);
+    long startTime = System.currentTimeMillis();
+    while (!segmentInstancesToCheck.isEmpty() && System.currentTimeMillis() - 
startTime < externalViewWaitTimeMs) {
+      ExternalView newExternalView = getTableExternalView(tableNameWithType);
+      Preconditions
+          .checkState(newExternalView != null, "Could not find external view 
for table: %s", tableNameWithType);
+      Iterator<Map.Entry<String, Set<String>>> iterator = 
segmentInstancesToCheck.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<String, Set<String>> entryToCheck = iterator.next();
+        String segmentToCheck = entryToCheck.getKey();
+        Set<String> instancesToCheck = entryToCheck.getValue();
+        Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(segmentToCheck);
+        if (newExternalViewStateMap == null) {
+          continue;
+        }
+        boolean allOffline = true;
+        for (String instance : instancesToCheck) {
+          if 
(!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) {
+            allOffline = false;
+            break;
+          }
+        }
+        if (allOffline) {
+          iterator.remove();
+        }
+      }
+      Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
+    }
+    if (!segmentInstancesToCheck.isEmpty()) {
+      throw new TimeoutException(String.format(
+          "Timed out waiting for external view to stabilize after call to 
disable/reset segments. "
+              + "Disable/reset might complete in the background, but skipping 
enable of segments of table: %s",
+          tableNameWithType));
+    }
+
+    // Lastly, enable segments
+    LOGGER.info("Enabling segments of table: {}", tableNameWithType);
+    for (Map.Entry<String, Set<String>> entry : 
instanceToResetSegmentsMap.entrySet()) {
+      _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), 
tableNameWithType,
+          Lists.newArrayList(entry.getValue()));
+    }
+    for (Map.Entry<String, Set<String>> entry : 
instanceToDisableSegmentsMap.entrySet()) {
+      _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), 
tableNameWithType,
+          Lists.newArrayList(entry.getValue()));
+    }
+  }
+
+  /**
    * Sends a segment refresh message to:
    * <ul>
    *   <li>Server: Refresh (replace) the segment by downloading a new one 
based on the segment ZK metadata</li>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to