This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6fddffe66d make reset table async (#9203) 6fddffe66d is described below commit 6fddffe66d587f45deefaf817ea1d6be45220e02 Author: Rong Rong <walterddr.walter...@gmail.com> AuthorDate: Tue Aug 16 13:30:41 2022 -0700 make reset table async (#9203) * make reset table async * change to use external view resync to check status Co-authored-by: Rong Rong <ro...@startree.ai> --- .../api/resources/PinotSegmentRestletResource.java | 16 +- .../controller/helix/ControllerRequestClient.java | 20 ++ .../helix/core/PinotHelixResourceManager.java | 232 +++++++++++---------- .../tests/BaseClusterIntegrationTest.java | 10 + .../tests/BaseClusterIntegrationTestSet.java | 42 ++++ .../tests/LLCRealtimeClusterIntegrationTest.java | 6 + .../tests/OfflineClusterIntegrationTest.java | 5 + .../utils/builder/ControllerRequestURLBuilder.java | 10 + 8 files changed, 217 insertions(+), 124 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index b378e8306d..9fa65836da 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -507,14 +507,13 @@ public class PinotSegmentRestletResource { @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") String tableNameWithType, @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName, - @ApiParam(value = "Maximum time in milliseconds to wait for reset to be completed. By default, uses " - + "serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long maxWaitTimeMs) { + @ApiParam(value = "Name of the target instance to reset") @QueryParam("targetInstance") @Nullable + String targetInstance) { segmentName = URIUtils.decode(segmentName); TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); try { Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType); - _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName, - maxWaitTimeMs > 0 ? maxWaitTimeMs : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); + _pinotHelixResourceManager.resetSegment(tableNameWithType, segmentName, targetInstance); return new SuccessResponse( String.format("Successfully reset segment: %s of table: %s", segmentName, tableNameWithType)); } catch (IllegalStateException e) { @@ -543,14 +542,13 @@ public class PinotSegmentRestletResource { + " finally enabling the segments", notes = "Resets a segment by disabling and then enabling a segment") public SuccessResponse resetAllSegments( @ApiParam(value = "Name of the table with type", required = true) @PathParam("tableNameWithType") - String tableNameWithType, @ApiParam( - value = "Maximum time in milliseconds to wait for reset to be completed. By default, uses " - + "serverAdminRequestTimeout") @QueryParam("maxWaitTimeMs") long maxWaitTimeMs) { + String tableNameWithType, + @ApiParam(value = "Name of the target instance to reset") @QueryParam("targetInstance") @Nullable + String targetInstance) { TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); try { Preconditions.checkState(tableType != null, "Must provide table name with type: %s", tableNameWithType); - _pinotHelixResourceManager.resetAllSegments(tableNameWithType, - maxWaitTimeMs > 0 ? maxWaitTimeMs : _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000); + _pinotHelixResourceManager.resetAllSegments(tableNameWithType, targetInstance); return new SuccessResponse(String.format("Successfully reset all segments of table: %s", tableNameWithType)); } catch (IllegalStateException e) { throw new ControllerApplicationException(LOGGER, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java index a5ee4cbc4e..fb4978d1dd 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java @@ -140,6 +140,26 @@ public class ControllerRequestClient { } } + public void resetTable(String tableNameWithType, String targetInstance) + throws IOException { + try { + HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new URL( + _controllerRequestURLBuilder.forTableReset(tableNameWithType, targetInstance)).toURI(), null)); + } catch (HttpErrorStatusException | URISyntaxException e) { + throw new IOException(e); + } + } + + public void resetSegment(String tableNameWithType, String segmentName, String targetInstance) + throws IOException { + try { + HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(new URL( + _controllerRequestURLBuilder.forSegmentReset(tableNameWithType, segmentName, targetInstance)).toURI(), null)); + } catch (HttpErrorStatusException | URISyntaxException e) { + throw new IOException(e); + } + } + public void reloadTable(String tableName, TableType tableType, boolean forceDownload) throws IOException { try { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 74320f1127..4eba9a8d5b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -25,8 +25,9 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; -import com.google.common.collect.Lists; import java.io.IOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; @@ -68,13 +69,16 @@ import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.PropertyPathBuilder; import org.apache.helix.model.CurrentState; import org.apache.helix.model.ExternalView; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Message; import org.apache.helix.model.ParticipantHistory; +import org.apache.helix.model.StateModelDefinition; import org.apache.helix.model.builder.HelixConfigScopeBuilder; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -2365,75 +2369,32 @@ public class PinotHelixResourceManager { } /** - * Resets a segment. The steps involved are - * 1. If segment is in ERROR state in the External View, invoke resetPartition, else invoke disablePartition - * 2. Wait for the external view to stabilize. Step 1 should turn the segment to OFFLINE state - * 3. Invoke enablePartition on the segment + * Resets a segment. This operation invoke resetPartition via state transition message. */ - public void resetSegment(String tableNameWithType, String segmentName, long externalViewWaitTimeMs) + public void resetSegment(String tableNameWithType, String segmentName, @Nullable String targetInstance) throws InterruptedException, TimeoutException { IdealState idealState = getTableIdealState(tableNameWithType); Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); ExternalView externalView = getTableExternalView(tableNameWithType); Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); - Set<String> instanceSet = idealState.getInstanceSet(segmentName); - Preconditions.checkState(CollectionUtils.isNotEmpty(instanceSet), - "Could not find segment: %s in ideal state for table: %s", segmentName, tableNameWithType); + Set<String> instanceSet = parseInstanceSet(idealState, segmentName, targetInstance); Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); - // First, disable or reset the segment for (String instance : instanceSet) { - if (externalViewStateMap == null || !SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { - LOGGER.info("Disabling segment: {} of table: {}", segmentName, tableNameWithType); - // enablePartition takes a segment which is NOT in ERROR state, to OFFLINE state - // TODO: If the controller fails to re-enable the partition, it will be left in disabled state - _helixAdmin.enablePartition(false, _helixClusterName, instance, tableNameWithType, - Lists.newArrayList(segmentName)); + if (externalViewStateMap == null || SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) { + LOGGER.info("Skipping reset for segment: {} of table: {} on instance: {}", segmentName, tableNameWithType, + instance); } else { - LOGGER.info("Resetting segment: {} of table: {}", segmentName, tableNameWithType); - // resetPartition takes a segment which is in ERROR state, to OFFLINE state - _helixAdmin.resetPartition(_helixClusterName, instance, tableNameWithType, Lists.newArrayList(segmentName)); + LOGGER.info("Resetting segment: {} of table: {} on instance: {}", segmentName, tableNameWithType, instance); + resetPartitionAllState(instance, tableNameWithType, Collections.singleton(segmentName)); } } - - // Wait for external view to stabilize - LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of segment: {} of table: {}", - externalViewWaitTimeMs, segmentName, tableNameWithType); - long startTime = System.currentTimeMillis(); - Set<String> instancesToCheck = new HashSet<>(instanceSet); - while (!instancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { - ExternalView newExternalView = getTableExternalView(tableNameWithType); - Preconditions.checkState(newExternalView != null, "Could not find external view for table: %s", - tableNameWithType); - Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentName); - if (newExternalViewStateMap == null) { - continue; - } - instancesToCheck.removeIf(instance -> SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))); - Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS); - } - if (!instancesToCheck.isEmpty()) { - throw new TimeoutException(String.format( - "Timed out waiting for external view to stabilize after call to disable/reset segment: %s of table: %s. " - + "Disable/reset might complete in the background, but skipping enable of segment.", segmentName, - tableNameWithType)); - } - - // Lastly, enable segment - LOGGER.info("Enabling segment: {} of table: {}", segmentName, tableNameWithType); - for (String instance : instanceSet) { - _helixAdmin.enablePartition(true, _helixClusterName, instance, tableNameWithType, - Lists.newArrayList(segmentName)); - } } /** - * Resets all segments of a table. The steps involved are - * 1. If segment is in ERROR state in the External View, invoke resetPartition, else invoke disablePartition - * 2. Wait for the external view to stabilize. Step 1 should turn all segments to OFFLINE state - * 3. Invoke enablePartition on the segments + * Resets all segments of a table. This operation invoke resetPartition via state transition message. */ - public void resetAllSegments(String tableNameWithType, long externalViewWaitTimeMs) + public void resetAllSegments(String tableNameWithType, @Nullable String targetInstance) throws InterruptedException, TimeoutException { IdealState idealState = getTableIdealState(tableNameWithType); Preconditions.checkState(idealState != null, "Could not find ideal state for table: %s", tableNameWithType); @@ -2441,83 +2402,124 @@ public class PinotHelixResourceManager { Preconditions.checkState(externalView != null, "Could not find external view for table: %s", tableNameWithType); Map<String, Set<String>> instanceToResetSegmentsMap = new HashMap<>(); - Map<String, Set<String>> instanceToDisableSegmentsMap = new HashMap<>(); - Map<String, Set<String>> segmentInstancesToCheck = new HashMap<>(); + Map<String, Set<String>> instanceToSkippedSegmentsMap = new HashMap<>(); for (String segmentName : idealState.getPartitionSet()) { - Set<String> instanceSet = idealState.getInstanceSet(segmentName); + Set<String> instanceSet = parseInstanceSet(idealState, segmentName, targetInstance); Map<String, String> externalViewStateMap = externalView.getStateMap(segmentName); for (String instance : instanceSet) { - if (externalViewStateMap == null || !SegmentStateModel.ERROR.equals(externalViewStateMap.get(instance))) { - instanceToDisableSegmentsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(segmentName); + if (externalViewStateMap == null || SegmentStateModel.OFFLINE.equals(externalViewStateMap.get(instance))) { + instanceToSkippedSegmentsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(segmentName); } else { instanceToResetSegmentsMap.computeIfAbsent(instance, i -> new HashSet<>()).add(segmentName); } } - segmentInstancesToCheck.put(segmentName, new HashSet<>(instanceSet)); } - // First, disable/reset the segments - LOGGER.info("Disabling/resetting segments of table: {}", tableNameWithType); + LOGGER.info("Resetting all segments of table: {}", tableNameWithType); for (Map.Entry<String, Set<String>> entry : instanceToResetSegmentsMap.entrySet()) { - // resetPartition takes a segment which is in ERROR state, to OFFLINE state - _helixAdmin.resetPartition(_helixClusterName, entry.getKey(), tableNameWithType, - Lists.newArrayList(entry.getValue())); - } - for (Map.Entry<String, Set<String>> entry : instanceToDisableSegmentsMap.entrySet()) { - // enablePartition takes a segment which is NOT in ERROR state, to OFFLINE state - // TODO: If the controller fails to re-enable the partition, it will be left in disabled state - _helixAdmin.enablePartition(false, _helixClusterName, entry.getKey(), tableNameWithType, - Lists.newArrayList(entry.getValue())); - } - - // Wait for external view to stabilize - LOGGER.info("Waiting {} ms for external view to stabilize after disable/reset of segments of table: {}", - externalViewWaitTimeMs, tableNameWithType); - long startTime = System.currentTimeMillis(); - while (!segmentInstancesToCheck.isEmpty() && System.currentTimeMillis() - startTime < externalViewWaitTimeMs) { - ExternalView newExternalView = getTableExternalView(tableNameWithType); - Preconditions.checkState(newExternalView != null, "Could not find external view for table: %s", - tableNameWithType); - Iterator<Map.Entry<String, Set<String>>> iterator = segmentInstancesToCheck.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry<String, Set<String>> entryToCheck = iterator.next(); - String segmentToCheck = entryToCheck.getKey(); - Set<String> instancesToCheck = entryToCheck.getValue(); - Map<String, String> newExternalViewStateMap = newExternalView.getStateMap(segmentToCheck); - if (newExternalViewStateMap == null) { - continue; - } - boolean allOffline = true; - for (String instance : instancesToCheck) { - if (!SegmentStateModel.OFFLINE.equals(newExternalViewStateMap.get(instance))) { - allOffline = false; - break; - } - } - if (allOffline) { - iterator.remove(); - } - } - Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS); + resetPartitionAllState(entry.getKey(), tableNameWithType, entry.getValue()); + } + + LOGGER.info("Reset segments for table {} finished. With the following segments skipped: {}", tableNameWithType, + instanceToSkippedSegmentsMap); + } + + private static Set<String> parseInstanceSet(IdealState idealState, String segmentName, + @Nullable String targetInstance) { + Set<String> instanceSet = idealState.getInstanceSet(segmentName); + Preconditions.checkState(CollectionUtils.isNotEmpty(instanceSet), + "Could not find segment: %s in ideal state", segmentName); + if (targetInstance != null) { + return instanceSet.contains(targetInstance) ? Collections.singleton(targetInstance) : Collections.emptySet(); + } else { + return instanceSet; } - if (!segmentInstancesToCheck.isEmpty()) { - throw new TimeoutException(String.format( - "Timed out waiting for external view to stabilize after call to disable/reset segments. " - + "Disable/reset might complete in the background, but skipping enable of segments of table: %s", - tableNameWithType)); + } + + /** + * This util is similar to {@link HelixAdmin#resetPartition(String, String, String, List)}. + * However instead of resetting only the ERROR state to its initial state. we reset all state regardless. + */ + private void resetPartitionAllState(String instanceName, String resourceName, + Set<String> resetPartitionNames) { + LOGGER.info("Reset partitions {} for resource {} on instance {} in cluster {}.", + resetPartitionNames == null ? "NULL" : resetPartitionNames, resourceName, + instanceName, _helixClusterName); + HelixDataAccessor accessor = _helixZkManager.getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + // check the instance is alive + LiveInstance liveInstance = accessor.getProperty(keyBuilder.liveInstance(instanceName)); + if (liveInstance == null) { + // check if the instance exists in the cluster + String instanceConfigPath = PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName); + throw new RuntimeException(String.format("Can't find instance: %s on %s", instanceName, instanceConfigPath)); } - // Lastly, enable segments - LOGGER.info("Enabling segments of table: {}", tableNameWithType); - for (Map.Entry<String, Set<String>> entry : instanceToResetSegmentsMap.entrySet()) { - _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), tableNameWithType, - Lists.newArrayList(entry.getValue())); + // gather metadata for sending state transition message. + // we skip through the sanity checks normally done on Helix because in Pinot these are guaranteed to be safe. + // TODO: these are static in Pinot's resource reset (for each resource type). + IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resourceName)); + String stateModelDef = idealState.getStateModelDefRef(); + StateModelDefinition stateModel = accessor.getProperty(keyBuilder.stateModelDef(stateModelDef)); + + // get current state. + String sessionId = liveInstance.getEphemeralOwner(); + CurrentState curState = + accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName)); + + // check there is no pending messages for the partitions exist + List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName), true); + for (Message message : messages) { + if (!Message.MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) || !sessionId + .equals(message.getTgtSessionId()) || !resourceName.equals(message.getResourceName()) + || !resetPartitionNames.contains(message.getPartitionName())) { + continue; + } + throw new RuntimeException(String.format("Can't reset state for %s.%s on %s, " + + "because a pending message %s exists for resource %s", resourceName, resetPartitionNames, instanceName, + message.toString(), message.getResourceName())); } - for (Map.Entry<String, Set<String>> entry : instanceToDisableSegmentsMap.entrySet()) { - _helixAdmin.enablePartition(true, _helixClusterName, entry.getKey(), tableNameWithType, - Lists.newArrayList(entry.getValue())); + + String adminName = null; + try { + adminName = InetAddress.getLocalHost().getCanonicalHostName() + "-ADMIN"; + } catch (UnknownHostException e) { + // can ignore it + LOGGER.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e); + adminName = "UNKNOWN"; + } + + List<Message> resetMessages = new ArrayList<Message>(); + List<PropertyKey> messageKeys = new ArrayList<PropertyKey>(); + for (String partitionName : resetPartitionNames) { + // send currentState to initialState message + String msgId = UUID.randomUUID().toString(); + Message message = new Message(Message.MessageType.STATE_TRANSITION, msgId); + message.setSrcName(adminName); + message.setTgtName(instanceName); + message.setMsgState(Message.MessageState.NEW); + message.setPartitionName(partitionName); + message.setResourceName(resourceName); + message.setTgtSessionId(sessionId); + message.setStateModelDef(stateModelDef); + message.setFromState(curState.getState(partitionName)); + message.setToState(stateModel.getInitialState()); + message.setStateModelFactoryName(idealState.getStateModelFactoryName()); + + if (idealState.getResourceGroupName() != null) { + message.setResourceGroupName(idealState.getResourceGroupName()); + } + if (idealState.getInstanceGroupTag() != null) { + message.setResourceTag(idealState.getInstanceGroupTag()); + } + + resetMessages.add(message); + messageKeys.add(keyBuilder.message(instanceName, message.getId())); } + + accessor.setChildren(messageKeys, resetMessages); } /** diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index 21535e5ebe..71d2a60c97 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -55,6 +55,7 @@ import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; import org.apache.pinot.spi.stream.StreamDataServerStartable; import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.tools.utils.KafkaStarterUtils; import org.apache.pinot.util.TestUtils; import org.testng.Assert; @@ -594,6 +595,15 @@ public abstract class BaseClusterIntegrationTest extends ClusterTest { }, 100L, timeoutMs, "Failed to load " + countStarResult + " documents", raiseError); } + /** + * Reset table utils. + */ + protected void resetTable(String tableName, TableType tableType, @Nullable String targetInstance) + throws IOException { + getControllerRequestClient().resetTable(TableNameBuilder.forType(tableType).tableNameWithType(tableName), + targetInstance); + } + /** * Run equivalent Pinot and H2 query and compare the results. */ diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java index 5e1eeac0e7..6d13e4a6bd 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java @@ -23,14 +23,19 @@ import java.io.BufferedReader; import java.io.InputStream; import java.io.InputStreamReader; import java.util.List; +import java.util.Map; import org.apache.commons.lang3.StringUtils; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.Message; import org.apache.pinot.client.ResultSet; import org.apache.pinot.client.ResultSetGroup; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.core.query.utils.idset.IdSet; import org.apache.pinot.core.query.utils.idset.IdSets; import org.apache.pinot.server.starter.helix.BaseServerStarter; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.DimensionFieldSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.MetricFieldSpec; @@ -38,6 +43,7 @@ import org.apache.pinot.spi.data.Schema; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.InstanceTypeUtils; import org.apache.pinot.spi.utils.JsonUtils; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.util.TestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -531,6 +537,42 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati }, 60_000L, errorMessage); } + public void testReset(TableType tableType) + throws Exception { + String rawTableName = getTableName(); + + // reset the table. + resetTable(rawTableName, tableType, null); + + // wait for all live messages clear the queue. + List<String> instances = _helixResourceManager.getServerInstancesForTable(rawTableName, tableType); + PropertyKey.Builder keyBuilder = _helixDataAccessor.keyBuilder(); + TestUtils.waitForCondition(aVoid -> { + int liveMessageCount = 0; + for (String instanceName : instances) { + List<Message> messages = _helixDataAccessor.getChildValues(keyBuilder.messages(instanceName), true); + liveMessageCount += messages.size(); + } + return liveMessageCount == 0; + }, 30_000L, "Failed to wait for all segment reset messages clear helix state transition!"); + + // Check that all segment states come back to ONLINE. + TestUtils.waitForCondition(aVoid -> { + // check external view and wait for everything to come back online + ExternalView externalView = _helixAdmin.getResourceExternalView(getHelixClusterName(), + TableNameBuilder.forType(tableType).tableNameWithType(rawTableName)); + for (Map<String, String> externalViewStateMap : externalView.getRecord().getMapFields().values()) { + for (String state : externalViewStateMap.values()) { + if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state) + && !CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING.equals(state)) { + return false; + } + } + } + return true; + }, 30_000L, "Failed to wait for all segments come back online"); + } + /** * TODO: Support removing new added columns for MutableSegment and remove the new added columns before running the * next test. Use this to replace {@link OfflineClusterIntegrationTest#testDefaultColumns()}. diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index 7fb120cfb7..c0272f8954 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -277,6 +277,12 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio testReload(false); } + @Test + public void testReset() + throws Exception { + super.testReset(TableType.REALTIME); + } + @Test @Override public void testHardcodedServerPartitionedSqlQueries() diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index 053af685ef..0e0a79c4a9 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -2663,6 +2663,11 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet assertEquals(response.get(MAX_NUM_MULTI_VALUES_MAP_KEY).size(), numMVColumn); } + @Test + public void testReset() + throws Exception { + super.testReset(TableType.OFFLINE); + } @Test public void testJDBCClient() diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java index 2df8f5ffa0..8ec5172e56 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java @@ -223,6 +223,11 @@ public class ControllerRequestURLBuilder { return StringUtil.join("/", _baseUrl, "segments", tableName, query); } + public String forTableReset(String tableNameWithType, @Nullable String targetInstance) { + String query = targetInstance == null ? "reset" : String.format("reset?targetInstance=%s", targetInstance); + return StringUtil.join("/", _baseUrl, "segments", tableNameWithType, query); + } + public String forControllerJobStatus(String jobId) { return StringUtil.join("/", _baseUrl, "segments", "segmentReloadStatus", jobId); } @@ -316,6 +321,11 @@ public class ControllerRequestURLBuilder { "reload?forceDownload=" + forceDownload); } + public String forSegmentReset(String tableNameWithType, String segmentName, String targetInstance) { + String query = targetInstance == null ? "reset" : String.format("reset?targetInstance=%s", targetInstance); + return StringUtil.join("/", _baseUrl, "segments", tableNameWithType, encode(segmentName), query); + } + public String forSegmentDownload(String tableName, String segmentName) { return StringUtil.join("/", _baseUrl, "segments", tableName, encode(segmentName)); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org