This is an automated email from the ASF dual-hosted git repository.

rongr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 6fddffe66d make reset table async (#9203)
6fddffe66d is described below

commit 6fddffe66d587f45deefaf817ea1d6be45220e02
Author: Rong Rong <walterddr.walter...@gmail.com>
AuthorDate: Tue Aug 16 13:30:41 2022 -0700

    make reset table async (#9203)
    
    * make reset table async
    * change to use external view resync to check status
    
    Co-authored-by: Rong Rong <ro...@startree.ai>
---
 .../api/resources/PinotSegmentRestletResource.java |  16 +-
 .../controller/helix/ControllerRequestClient.java  |  20 ++
 .../helix/core/PinotHelixResourceManager.java      | 232 +++++++++++----------
 .../tests/BaseClusterIntegrationTest.java          |  10 +
 .../tests/BaseClusterIntegrationTestSet.java       |  42 ++++
 .../tests/LLCRealtimeClusterIntegrationTest.java   |   6 +
 .../tests/OfflineClusterIntegrationTest.java       |   5 +
 .../utils/builder/ControllerRequestURLBuilder.java |  10 +
 8 files changed, 217 insertions(+), 124 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index b378e8306d..9fa65836da 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -507,14 +507,13 @@ public class PinotSegmentRestletResource {
       @ApiParam(value = "Name of the table with type", required = true) 
@PathParam("tableNameWithType")
           String tableNameWithType,
       @ApiParam(value = "Name of the segment", required = true) 
@PathParam("segmentName") @Encoded String segmentName,
-      @ApiParam(value = "Maximum time in milliseconds to wait for reset to be 
completed. By default, uses "
-          + "serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long 
maxWaitTimeMs) {
+      @ApiParam(value = "Name of the target instance to reset") 
@QueryParam("targetInstance") @Nullable
+          String targetInstance) {
     segmentName = URIUtils.decode(segmentName);
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
     try {
       Preconditions.checkState(tableType != null, "Must provide table name 
with type: %s", tableNameWithType);
-      _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName,
-          maxWaitTimeMs > 0 ? maxWaitTimeMs : 
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName, 
targetInstance);
       return new SuccessResponse(
           String.format("Successfully reset segment: %s of table: %s", 
segmentName, tableNameWithType));
     } catch (IllegalStateException e) {
@@ -543,14 +542,13 @@ public class PinotSegmentRestletResource {
           + " finally enabling the segments", notes = "Resets a segment by 
disabling and then enabling a segment")
   public SuccessResponse resetAllSegments(
       @ApiParam(value = "Name of the table with type", required = true) 
@PathParam("tableNameWithType")
-          String tableNameWithType, @ApiParam(
-      value = "Maximum time in milliseconds to wait for reset to be completed. 
By default, uses "
-          + "serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long 
maxWaitTimeMs) {
+          String tableNameWithType,
+      @ApiParam(value = "Name of the target instance to reset") 
@QueryParam("targetInstance") @Nullable
+          String targetInstance) {
     TableType tableType = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
     try {
       Preconditions.checkState(tableType != null, "Must provide table name 
with type: %s", tableNameWithType);
-      _pinotHelixResourceManager.resetAllSegments(tableNameWithType,
-          maxWaitTimeMs > 0 ? maxWaitTimeMs : 
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+      _pinotHelixResourceManager.resetAllSegments(tableNameWithType, 
targetInstance);
       return new SuccessResponse(String.format("Successfully reset all 
segments of table: %s", tableNameWithType));
     } catch (IllegalStateException e) {
       throw new ControllerApplicationException(LOGGER,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index a5ee4cbc4e..fb4978d1dd 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -140,6 +140,26 @@ public class ControllerRequestClient {
     }
   }
 
+  public void resetTable(String tableNameWithType, String targetInstance)
+      throws IOException {
+    try {
+      HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new 
URL(
+          _controllerRequestURLBuilder.forTableReset(tableNameWithType, 
targetInstance)).toURI(), null));
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
+  public void resetSegment(String tableNameWithType, String segmentName, 
String targetInstance)
+      throws IOException {
+    try {
+      HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new 
URL(
+          _controllerRequestURLBuilder.forSegmentReset(tableNameWithType, 
segmentName, targetInstance)).toURI(), null));
+    } catch (HttpErrorStatusException | URISyntaxException e) {
+      throw new IOException(e);
+    }
+  }
+
   public void reloadTable(String tableName, TableType tableType, boolean 
forceDownload)
       throws IOException {
     try {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 74320f1127..4eba9a8d5b 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -25,8 +25,9 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
-import com.google.common.collect.Lists;
 import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -68,13 +69,16 @@ import org.apache.helix.HelixManager;
 import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.PropertyPathBuilder;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.ParticipantHistory;
+import org.apache.helix.model.StateModelDefinition;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
@@ -2365,75 +2369,32 @@ public class PinotHelixResourceManager {
   }
 
   /**
-   * Resets a segment. The steps involved are
-   *  1. If segment is in ERROR state in the External View, invoke 
resetPartition, else invoke disablePartition
-   *  2. Wait for the external view to stabilize. Step 1 should turn the 
segment to OFFLINE state
-   *  3. Invoke enablePartition on the segment
+   * Resets a segment. This operation invoke resetPartition via state 
transition message.
    */
-  public void resetSegment(String tableNameWithType, String segmentName, long 
externalViewWaitTimeMs)
+  public void resetSegment(String tableNameWithType, String segmentName, 
@Nullable String targetInstance)
       throws InterruptedException, TimeoutException {
     IdealState idealState = getTableIdealState(tableNameWithType);
     Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
     ExternalView externalView = getTableExternalView(tableNameWithType);
     Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
-    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
-    Preconditions.checkState(CollectionUtils.isNotEmpty(instanceSet),
-        "Could not find segment: %s in ideal state for table: %s", 
segmentName, tableNameWithType);
+    Set<String> instanceSet = parseInstanceSet(idealState, segmentName, 
targetInstance);
     Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
 
-    // First, disable or reset the segment
     for (String instance : instanceSet) {
-      if (externalViewStateMap == null || 
!SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
-        LOGGER.info("Disabling segment: {} of table: {}", segmentName, 
tableNameWithType);
-        // enablePartition takes a segment which is NOT in ERROR state, to 
OFFLINE state
-        // TODO: If the controller fails to re-enable the partition, it will 
be left in disabled state
-        _helixAdmin.enablePartition(false, _helixClusterName, instance, 
tableNameWithType,
-            Lists.newArrayList(segmentName));
+      if (externalViewStateMap == null || 
SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) {
+        LOGGER.info("Skipping reset for segment: {} of table: {} on instance: 
{}", segmentName, tableNameWithType,
+            instance);
       } else {
-        LOGGER.info("Resetting segment: {} of table: {}", segmentName, 
tableNameWithType);
-        // resetPartition takes a segment which is in ERROR state, to OFFLINE 
state
-        _helixAdmin.resetPartition(_helixClusterName, instance, 
tableNameWithType, Lists.newArrayList(segmentName));
+        LOGGER.info("Resetting segment: {} of table: {} on instance: {}", 
segmentName, tableNameWithType, instance);
+        resetPartitionAllState(instance, tableNameWithType, 
Collections.singleton(segmentName));
       }
     }
-
-    // Wait for external view to stabilize
-    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of segment: {} of table: {}",
-        externalViewWaitTimeMs, segmentName, tableNameWithType);
-    long startTime = System.currentTimeMillis();
-    Set<String> instancesToCheck = new HashSet<>(instanceSet);
-    while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - 
startTime < externalViewWaitTimeMs) {
-      ExternalView newExternalView = getTableExternalView(tableNameWithType);
-      Preconditions.checkState(newExternalView != null, "Could not find 
external view for table: %s",
-          tableNameWithType);
-      Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(segmentName);
-      if (newExternalViewStateMap == null) {
-        continue;
-      }
-      instancesToCheck.removeIf(instance -> 
SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance)));
-      Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
-    }
-    if (!instancesToCheck.isEmpty()) {
-      throw new TimeoutException(String.format(
-          "Timed out waiting for external view to stabilize after call to 
disable/reset segment: %s of table: %s. "
-              + "Disable/reset might complete in the background, but skipping 
enable of segment.", segmentName,
-          tableNameWithType));
-    }
-
-    // Lastly, enable segment
-    LOGGER.info("Enabling segment: {} of table: {}", segmentName, 
tableNameWithType);
-    for (String instance : instanceSet) {
-      _helixAdmin.enablePartition(true, _helixClusterName, instance, 
tableNameWithType,
-          Lists.newArrayList(segmentName));
-    }
   }
 
   /**
-   * Resets all segments of a table. The steps involved are
-   * 1. If segment is in ERROR state in the External View, invoke 
resetPartition, else invoke disablePartition
-   * 2. Wait for the external view to stabilize. Step 1 should turn all 
segments to OFFLINE state
-   * 3. Invoke enablePartition on the segments
+   * Resets all segments of a table. This operation invoke resetPartition via 
state transition message.
    */
-  public void resetAllSegments(String tableNameWithType, long 
externalViewWaitTimeMs)
+  public void resetAllSegments(String tableNameWithType, @Nullable String 
targetInstance)
       throws InterruptedException, TimeoutException {
     IdealState idealState = getTableIdealState(tableNameWithType);
     Preconditions.checkState(idealState != null, "Could not find ideal state 
for table: %s", tableNameWithType);
@@ -2441,83 +2402,124 @@ public class PinotHelixResourceManager {
     Preconditions.checkState(externalView != null, "Could not find external 
view for table: %s", tableNameWithType);
 
     Map<String, Set<String>> instanceToResetSegmentsMap = new HashMap<>();
-    Map<String, Set<String>> instanceToDisableSegmentsMap = new HashMap<>();
-    Map<String, Set<String>> segmentInstancesToCheck = new HashMap<>();
+    Map<String, Set<String>> instanceToSkippedSegmentsMap = new HashMap<>();
 
     for (String segmentName : idealState.getPartitionSet()) {
-      Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+      Set<String> instanceSet = parseInstanceSet(idealState, segmentName, 
targetInstance);
       Map<String, String> externalViewStateMap = 
externalView.getStateMap(segmentName);
       for (String instance : instanceSet) {
-        if (externalViewStateMap == null || 
!SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) {
-          instanceToDisableSegmentsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(segmentName);
+        if (externalViewStateMap == null || 
SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) {
+          instanceToSkippedSegmentsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(segmentName);
         } else {
           instanceToResetSegmentsMap.computeIfAbsent(instance, i -> new 
HashSet<>()).add(segmentName);
         }
       }
-      segmentInstancesToCheck.put(segmentName, new HashSet<>(instanceSet));
     }
 
-    // First, disable/reset the segments
-    LOGGER.info("Disabling/resetting segments of table: {}", 
tableNameWithType);
+    LOGGER.info("Resetting all segments of table: {}", tableNameWithType);
     for (Map.Entry<String, Set<String>> entry : 
instanceToResetSegmentsMap.entrySet()) {
-      // resetPartition takes a segment which is in ERROR state, to OFFLINE 
state
-      _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), 
tableNameWithType,
-          Lists.newArrayList(entry.getValue()));
-    }
-    for (Map.Entry<String, Set<String>> entry : 
instanceToDisableSegmentsMap.entrySet()) {
-      // enablePartition takes a segment which is NOT in ERROR state, to 
OFFLINE state
-      // TODO: If the controller fails to re-enable the partition, it will be 
left in disabled state
-      _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), 
tableNameWithType,
-          Lists.newArrayList(entry.getValue()));
-    }
-
-    // Wait for external view to stabilize
-    LOGGER.info("Waiting {} ms for external view to stabilize after 
disable/reset of segments of table: {}",
-        externalViewWaitTimeMs, tableNameWithType);
-    long startTime = System.currentTimeMillis();
-    while (!segmentInstancesToCheck.isEmpty() && System.currentTimeMillis() - 
startTime < externalViewWaitTimeMs) {
-      ExternalView newExternalView = getTableExternalView(tableNameWithType);
-      Preconditions.checkState(newExternalView != null, "Could not find 
external view for table: %s",
-          tableNameWithType);
-      Iterator<Map.Entry<String, Set<String>>> iterator = 
segmentInstancesToCheck.entrySet().iterator();
-      while (iterator.hasNext()) {
-        Map.Entry<String, Set<String>> entryToCheck = iterator.next();
-        String segmentToCheck = entryToCheck.getKey();
-        Set<String> instancesToCheck = entryToCheck.getValue();
-        Map<String, String> newExternalViewStateMap = 
newExternalView.getStateMap(segmentToCheck);
-        if (newExternalViewStateMap == null) {
-          continue;
-        }
-        boolean allOffline = true;
-        for (String instance : instancesToCheck) {
-          if 
(!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) {
-            allOffline = false;
-            break;
-          }
-        }
-        if (allOffline) {
-          iterator.remove();
-        }
-      }
-      Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
+      resetPartitionAllState(entry.getKey(), tableNameWithType, 
entry.getValue());
+    }
+
+    LOGGER.info("Reset segments for table {} finished. With the following 
segments skipped: {}", tableNameWithType,
+        instanceToSkippedSegmentsMap);
+  }
+
+  private static Set<String> parseInstanceSet(IdealState idealState, String 
segmentName,
+      @Nullable String targetInstance) {
+    Set<String> instanceSet = idealState.getInstanceSet(segmentName);
+    Preconditions.checkState(CollectionUtils.isNotEmpty(instanceSet),
+        "Could not find segment: %s in ideal state", segmentName);
+    if (targetInstance != null) {
+      return instanceSet.contains(targetInstance) ? 
Collections.singleton(targetInstance) : Collections.emptySet();
+    } else {
+      return instanceSet;
     }
-    if (!segmentInstancesToCheck.isEmpty()) {
-      throw new TimeoutException(String.format(
-          "Timed out waiting for external view to stabilize after call to 
disable/reset segments. "
-              + "Disable/reset might complete in the background, but skipping 
enable of segments of table: %s",
-          tableNameWithType));
+  }
+
+  /**
+   * This util is similar to {@link HelixAdmin#resetPartition(String, String, 
String, List)}.
+   * However instead of resetting only the ERROR state to its initial state. 
we reset all state regardless.
+   */
+  private void resetPartitionAllState(String instanceName, String resourceName,
+      Set<String> resetPartitionNames) {
+    LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster 
{}.",
+        resetPartitionNames == null ? "NULL" : resetPartitionNames, 
resourceName,
+        instanceName, _helixClusterName);
+    HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor();
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+    // check the instance is alive
+    LiveInstance liveInstance = 
accessor.getProperty(keyBuilder.liveInstance(instanceName));
+    if (liveInstance == null) {
+      // check if the instance exists in the cluster
+      String instanceConfigPath = 
PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName);
+      throw new RuntimeException(String.format("Can't find instance: %s on 
%s", instanceName, instanceConfigPath));
     }
 
-    // Lastly, enable segments
-    LOGGER.info("Enabling segments of table: {}", tableNameWithType);
-    for (Map.Entry<String, Set<String>> entry : 
instanceToResetSegmentsMap.entrySet()) {
-      _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), 
tableNameWithType,
-          Lists.newArrayList(entry.getValue()));
+    // gather metadata for sending state transition message.
+    // we skip through the sanity checks normally done on Helix because in 
Pinot these are guaranteed to be safe.
+    // TODO: these are static in Pinot's resource reset (for each resource 
type).
+    IdealState idealState = 
accessor.getProperty(keyBuilder.idealStates(resourceName));
+    String stateModelDef = idealState.getStateModelDefRef();
+    StateModelDefinition stateModel = 
accessor.getProperty(keyBuilder.stateModelDef(stateModelDef));
+
+    // get current state.
+    String sessionId = liveInstance.getEphemeralOwner();
+    CurrentState curState =
+        accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, 
resourceName));
+
+    // check there is no pending messages for the partitions exist
+    List<Message> messages = 
accessor.getChildValues(keyBuilder.messages(instanceName), true);
+    for (Message message : messages) {
+      if 
(!Message.MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType())
 || !sessionId
+          .equals(message.getTgtSessionId()) || 
!resourceName.equals(message.getResourceName())
+          || !resetPartitionNames.contains(message.getPartitionName())) {
+        continue;
+      }
+      throw new RuntimeException(String.format("Can't reset state for %s.%s on 
%s, "
+              + "because a pending message %s exists for resource %s", 
resourceName, resetPartitionNames, instanceName,
+          message.toString(), message.getResourceName()));
     }
-    for (Map.Entry<String, Set<String>> entry : 
instanceToDisableSegmentsMap.entrySet()) {
-      _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), 
tableNameWithType,
-          Lists.newArrayList(entry.getValue()));
+
+    String adminName = null;
+    try {
+      adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN";
+    } catch (UnknownHostException e) {
+      // can ignore it
+      LOGGER.info("Unable to get host name. Will set it to UNKNOWN, mostly 
ignorable", e);
+      adminName = "UNKNOWN";
+    }
+
+    List<Message> resetMessages = new ArrayList<Message>();
+    List<PropertyKey> messageKeys = new ArrayList<PropertyKey>();
+    for (String partitionName : resetPartitionNames) {
+      // send currentState to initialState message
+      String msgId = UUID.randomUUID().toString();
+      Message message = new Message(Message.MessageType.STATE_TRANSITION, 
msgId);
+      message.setSrcName(adminName);
+      message.setTgtName(instanceName);
+      message.setMsgState(Message.MessageState.NEW);
+      message.setPartitionName(partitionName);
+      message.setResourceName(resourceName);
+      message.setTgtSessionId(sessionId);
+      message.setStateModelDef(stateModelDef);
+      message.setFromState(curState.getState(partitionName));
+      message.setToState(stateModel.getInitialState());
+      message.setStateModelFactoryName(idealState.getStateModelFactoryName());
+
+      if (idealState.getResourceGroupName() != null) {
+        message.setResourceGroupName(idealState.getResourceGroupName());
+      }
+      if (idealState.getInstanceGroupTag() != null) {
+        message.setResourceTag(idealState.getInstanceGroupTag());
+      }
+
+      resetMessages.add(message);
+      messageKeys.add(keyBuilder.message(instanceName, message.getId()));
     }
+
+    accessor.setChildren(messageKeys, resetMessages);
   }
 
   /**
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 21535e5ebe..71d2a60c97 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -55,6 +55,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.stream.StreamDataServerStartable;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.tools.utils.KafkaStarterUtils;
 import org.apache.pinot.util.TestUtils;
 import org.testng.Assert;
@@ -594,6 +595,15 @@ public abstract class BaseClusterIntegrationTest extends 
ClusterTest {
     }, 100L, timeoutMs, "Failed to load " + countStarResult + " documents", 
raiseError);
   }
 
+  /**
+   * Reset table utils.
+   */
+  protected void resetTable(String tableName, TableType tableType, @Nullable 
String targetInstance)
+      throws IOException {
+    
getControllerRequestClient().resetTable(TableNameBuilder.forType(tableType).tableNameWithType(tableName),
+        targetInstance);
+  }
+
   /**
    * Run equivalent Pinot and H2 query and compare the results.
    */
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index 5e1eeac0e7..6d13e4a6bd 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -23,14 +23,19 @@ import java.io.BufferedReader;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.util.List;
+import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Message;
 import org.apache.pinot.client.ResultSet;
 import org.apache.pinot.client.ResultSetGroup;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.query.utils.idset.IdSets;
 import org.apache.pinot.server.starter.helix.BaseServerStarter;
+import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.MetricFieldSpec;
@@ -38,6 +43,7 @@ import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.InstanceTypeUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.util.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -531,6 +537,42 @@ public abstract class BaseClusterIntegrationTestSet 
extends BaseClusterIntegrati
     }, 60_000L, errorMessage);
   }
 
+  public void testReset(TableType tableType)
+      throws Exception {
+    String rawTableName = getTableName();
+
+    // reset the table.
+    resetTable(rawTableName, tableType, null);
+
+    // wait for all live messages clear the queue.
+    List<String> instances = 
_helixResourceManager.getServerInstancesForTable(rawTableName, tableType);
+    PropertyKey.Builder keyBuilder = _helixDataAccessor.keyBuilder();
+    TestUtils.waitForCondition(aVoid -> {
+      int liveMessageCount = 0;
+      for (String instanceName : instances) {
+        List<Message> messages = 
_helixDataAccessor.getChildValues(keyBuilder.messages(instanceName), true);
+        liveMessageCount += messages.size();
+      }
+      return liveMessageCount == 0;
+    }, 30_000L, "Failed to wait for all segment reset messages clear helix 
state transition!");
+
+    // Check that all segment states come back to ONLINE.
+    TestUtils.waitForCondition(aVoid -> {
+      // check external view and wait for everything to come back online
+      ExternalView externalView = 
_helixAdmin.getResourceExternalView(getHelixClusterName(),
+          TableNameBuilder.forType(tableType).tableNameWithType(rawTableName));
+      for (Map<String, String> externalViewStateMap : 
externalView.getRecord().getMapFields().values()) {
+        for (String state : externalViewStateMap.values()) {
+          if 
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)
+              && 
!CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING.equals(state)) {
+            return false;
+          }
+        }
+      }
+      return true;
+    }, 30_000L, "Failed to wait for all segments come back online");
+  }
+
   /**
    * TODO: Support removing new added columns for MutableSegment and remove 
the new added columns before running the
    *       next test. Use this to replace {@link 
OfflineClusterIntegrationTest#testDefaultColumns()}.
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 7fb120cfb7..c0272f8954 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -277,6 +277,12 @@ public class LLCRealtimeClusterIntegrationTest extends 
RealtimeClusterIntegratio
     testReload(false);
   }
 
+  @Test
+  public void testReset()
+      throws Exception {
+    super.testReset(TableType.REALTIME);
+  }
+
   @Test
   @Override
   public void testHardcodedServerPartitionedSqlQueries()
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 053af685ef..0e0a79c4a9 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -2663,6 +2663,11 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     assertEquals(response.get(MAX_NUM_MULTI_VALUES_MAP_KEY).size(), 
numMVColumn);
   }
 
+  @Test
+  public void testReset()
+      throws Exception {
+    super.testReset(TableType.OFFLINE);
+  }
 
   @Test
   public void testJDBCClient()
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index 2df8f5ffa0..8ec5172e56 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -223,6 +223,11 @@ public class ControllerRequestURLBuilder {
     return StringUtil.join("/", _baseUrl, "segments", tableName, query);
   }
 
+  public String forTableReset(String tableNameWithType, @Nullable String 
targetInstance) {
+    String query = targetInstance == null ? "reset" : 
String.format("reset?targetInstance=%s", targetInstance);
+    return StringUtil.join("/", _baseUrl, "segments", tableNameWithType, 
query);
+  }
+
   public String forControllerJobStatus(String jobId) {
     return StringUtil.join("/", _baseUrl, "segments", "segmentReloadStatus", 
jobId);
   }
@@ -316,6 +321,11 @@ public class ControllerRequestURLBuilder {
         "reload?forceDownload=" + forceDownload);
   }
 
+  public String forSegmentReset(String tableNameWithType, String segmentName, 
String targetInstance) {
+    String query = targetInstance == null ? "reset" : 
String.format("reset?targetInstance=%s", targetInstance);
+    return StringUtil.join("/", _baseUrl, "segments", tableNameWithType, 
encode(segmentName), query);
+  }
+
   public String forSegmentDownload(String tableName, String segmentName) {
     return StringUtil.join("/", _baseUrl, "segments", tableName, 
encode(segmentName));
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to