This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch full-auto-same-state-model in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 7408de4fd0069bf5caea938b4936d9f3a1f6a556 Author: jlli_LinkedIn <j...@linkedin.com> AuthorDate: Wed Mar 6 12:57:45 2024 -0800 Leverage ONLINE-OFFLINE state model for realtime tables --- .../pinot/broker/routing/BrokerRoutingManager.java | 3 +- .../instanceselector/BaseInstanceSelector.java | 2 +- .../SegmentPartitionMetadataManager.java | 2 +- .../instanceselector/InstanceSelectorTest.java | 13 +++-- .../helix/core/PinotHelixResourceManager.java | 25 +++++---- .../helix/core/PinotTableIdealStateHelper.java | 2 +- .../segment/RealtimeSegmentAssignment.java | 5 +- .../assignment/segment/SegmentAssignmentUtils.java | 22 ++++++-- .../segment/StrictRealtimeSegmentAssignment.java | 8 +-- .../realtime/MissingConsumingSegmentFinder.java | 23 ++++++-- .../realtime/PinotLLCRealtimeSegmentManager.java | 57 ++++++++++---------- ...altimeNonReplicaGroupSegmentAssignmentTest.java | 4 +- ...NonReplicaGroupTieredSegmentAssignmentTest.java | 6 +-- .../RealtimeReplicaGroupSegmentAssignmentTest.java | 4 +- .../StrictRealtimeSegmentAssignmentTest.java | 2 +- .../PinotLLCRealtimeSegmentManagerTest.java | 23 ++++---- .../helix/core/rebalance/TableRebalancerTest.java | 7 ++- .../manager/realtime/RealtimeTableDataManager.java | 2 +- .../ControllerPeriodicTasksIntegrationTest.java | 2 +- .../tests/LLCRealtimeClusterIntegrationTest.java | 4 +- ...PartialUpsertTableRebalanceIntegrationTest.java | 2 +- .../UpsertTableSegmentPreloadIntegrationTest.java | 2 +- .../UpsertTableSegmentUploadIntegrationTest.java | 2 +- .../local/utils/tablestate/TableStateUtils.java | 25 ++++++--- .../pinot/server/api/resources/TablesResource.java | 62 ++++++++++++++-------- .../server/starter/helix/BaseServerStarter.java | 17 ++++-- ...flineSegmentOnlineOfflineStateModelFactory.java | 11 ++-- 27 files changed, 204 insertions(+), 133 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java index cc3a5354ef..01e1296a26 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java @@ -543,8 +543,7 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle Set<String> onlineSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size())); for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { Map<String, String> instanceStateMap = entry.getValue(); - if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || instanceStateMap.containsValue( - SegmentStateModel.CONSUMING)) { + if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) { onlineSegments.add(entry.getKey()); } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java index de0a5f9600..91ee512f0c 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java @@ -122,7 +122,7 @@ abstract class BaseInstanceSelector implements InstanceSelector { * Returns whether the instance state is online for routing purpose (ONLINE/CONSUMING). */ static boolean isOnlineForRouting(@Nullable String state) { - return SegmentStateModel.ONLINE.equals(state) || SegmentStateModel.CONSUMING.equals(state); + return SegmentStateModel.ONLINE.equals(state); } /** diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java index 3623955591..b3ba0ac28a 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpartition/SegmentPartitionMetadataManager.java @@ -127,7 +127,7 @@ public class SegmentPartitionMetadataManager implements SegmentZkMetadataFetchLi List<String> onlineServers = new ArrayList<>(instanceStateMap.size()); for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { String instanceState = entry.getValue(); - if (instanceState.equals(SegmentStateModel.ONLINE) || instanceState.equals(SegmentStateModel.CONSUMING)) { + if (instanceState.equals(SegmentStateModel.ONLINE)) { onlineServers.add(entry.getKey()); } } diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java index c748be4885..c44614c88b 100644 --- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java +++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java @@ -61,7 +61,6 @@ import org.testng.annotations.Test; import static org.apache.pinot.broker.routing.instanceselector.InstanceSelector.NEW_SEGMENT_EXPIRATION_MILLIS; import static org.apache.pinot.spi.config.table.RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE; import static org.apache.pinot.spi.config.table.RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE; -import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING; import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ERROR; import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE; import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE; @@ -1105,15 +1104,15 @@ public class InstanceSelectorTest { String segment0 = "segment0"; String segment1 = "segment1"; Map<String, String> idealStateInstanceStateMap = new TreeMap<>(); - idealStateInstanceStateMap.put(instance, CONSUMING); + idealStateInstanceStateMap.put(instance, ONLINE); idealStateInstanceStateMap.put(errorInstance, ONLINE); idealStateSegmentAssignment.put(segment0, idealStateInstanceStateMap); idealStateSegmentAssignment.put(segment1, idealStateInstanceStateMap); Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>(); - externalViewInstanceStateMap0.put(instance, CONSUMING); + externalViewInstanceStateMap0.put(instance, ONLINE); externalViewInstanceStateMap0.put(errorInstance, ERROR); Map<String, String> externalViewInstanceStateMap1 = new TreeMap<>(); - externalViewInstanceStateMap1.put(instance, CONSUMING); + externalViewInstanceStateMap1.put(instance, ONLINE); externalViewInstanceStateMap1.put(errorInstance, ERROR); externalViewSegmentAssignment.put(segment0, externalViewInstanceStateMap0); externalViewSegmentAssignment.put(segment1, externalViewInstanceStateMap1); @@ -1359,10 +1358,10 @@ public class InstanceSelectorTest { // (enabled) errorInstance: ERROR // } // } - idealStateInstanceStateMap.put(instance, CONSUMING); - externalViewInstanceStateMap0.put(instance, CONSUMING); + idealStateInstanceStateMap.put(instance, ONLINE); + externalViewInstanceStateMap0.put(instance, ONLINE); externalViewInstanceStateMap0.put(errorInstance, ERROR); - externalViewInstanceStateMap1.put(instance, CONSUMING); + externalViewInstanceStateMap1.put(instance, ONLINE); balancedInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments); strictReplicaGroupInstanceSelector.onAssignmentChange(idealState, externalView, onlineSegments); selectionResult = balancedInstanceSelector.select(brokerRequest, segments, 0); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 8a5584eab7..57c75d7618 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2814,13 +2814,20 @@ public class PinotHelixResourceManager { if (idealState == null) { throw new IllegalStateException("Ideal state does not exist for table: " + tableNameWithType); } - Set<String> consumingSegments = new HashSet<>(); - for (String segment : idealState.getPartitionSet()) { - Map<String, String> instanceStateMap = idealState.getInstanceStateMap(segment); - if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) { - consumingSegments.add(segment); + return getConsumingSegments(idealState); + } + + public Set<String> getConsumingSegments(IdealState idealState) { + Set<String> consumingSegments = new TreeSet<>(); + idealState.getRecord().getMapFields().forEach((segmentName, instanceToStateMap) -> { + if (instanceToStateMap.containsValue(SegmentStateModel.ONLINE)) { + SegmentZKMetadata segmentZKMetadata = getSegmentZKMetadata(idealState.getResourceName(), segmentName); + if (segmentZKMetadata != null + && segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS) { + consumingSegments.add(segmentName); + } } - } + }); return consumingSegments; } @@ -3913,8 +3920,7 @@ public class PinotHelixResourceManager { Set<String> matchingSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size())); for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { Map<String, String> instanceStateMap = entry.getValue(); - if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || (includeConsuming - && instanceStateMap.containsValue(SegmentStateModel.CONSUMING))) { + if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) { matchingSegments.add(entry.getKey()); } } @@ -3931,8 +3937,7 @@ public class PinotHelixResourceManager { Set<String> onlineSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size())); for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { Map<String, String> instanceStateMap = entry.getValue(); - if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || instanceStateMap.containsValue( - SegmentStateModel.CONSUMING)) { + if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) { onlineSegments.add(entry.getKey()); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java index 1135d05f5d..37c4b7555b 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java @@ -68,7 +68,7 @@ public class PinotTableIdealStateHelper { PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; } else { stateModel = - PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; + PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; } // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java index 434dbec4e3..8d69262d0f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java @@ -194,7 +194,8 @@ public class RealtimeSegmentAssignment extends BaseSegmentAssignment { consumingInstancePartitions, includeConsuming, bootstrap); SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment completedConsumingOfflineSegmentAssignment = - new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(nonTierAssignment); + new SegmentAssignmentUtils.CompletedConsumingOfflineSegmentAssignment(_helixManager, _tableNameWithType, + nonTierAssignment); Map<String, Map<String, String>> newAssignment; // Reassign COMPLETED segments first @@ -237,7 +238,7 @@ public class RealtimeSegmentAssignment extends BaseSegmentAssignment { for (String segmentName : consumingSegmentAssignment.keySet()) { List<String> instancesAssigned = assignConsumingSegment(segmentName, consumingInstancePartitions); Map<String, String> instanceStateMap = - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING); + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE); newAssignment.put(segmentName, instanceStateMap); } } else { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java index 46e4cc4eca..0aaee3b0ac 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java @@ -39,6 +39,7 @@ import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.tier.Tier; import org.apache.pinot.common.utils.SegmentUtils; import org.apache.pinot.segment.spi.partition.metadata.ColumnPartitionMetadata; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.Pairs; @@ -324,14 +325,27 @@ public class SegmentAssignmentUtils { // 1. At least one instance ONLINE -> COMPLETED segment // 2. At least one instance CONSUMING -> CONSUMING segment // 3. All instances OFFLINE (all instances encountered error while consuming) -> OFFLINE segment - CompletedConsumingOfflineSegmentAssignment(Map<String, Map<String, String>> segmentAssignment) { + CompletedConsumingOfflineSegmentAssignment(HelixManager helixManager, String tableName, + Map<String, Map<String, String>> segmentAssignment) { for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { String segmentName = entry.getKey(); Map<String, String> instanceStateMap = entry.getValue(); if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) { - _completedSegmentAssignment.put(segmentName, instanceStateMap); - } else if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) { - _consumingSegmentAssignment.put(segmentName, instanceStateMap); + SegmentZKMetadata segmentZKMetadata = + ZKMetadataProvider.getSegmentZKMetadata(helixManager.getHelixPropertyStore(), tableName, segmentName); + if (segmentZKMetadata == null) { + _offlineSegmentAssignment.put(segmentName, instanceStateMap); + } else { + CommonConstants.Segment.Realtime.Status status = segmentZKMetadata.getStatus(); + if (status == CommonConstants.Segment.Realtime.Status.IN_PROGRESS) { + _consumingSegmentAssignment.put(segmentName, instanceStateMap); + } else { + _completedSegmentAssignment.put(segmentName, instanceStateMap); + } + } +// _completedSegmentAssignment.put(segmentName, instanceStateMap); +// } else if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) { +// _consumingSegmentAssignment.put(segmentName, instanceStateMap); } else { _offlineSegmentAssignment.put(segmentName, instanceStateMap); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java index 2bc0ada5bd..ceeb111203 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignment.java @@ -132,8 +132,7 @@ public class StrictRealtimeSegmentAssignment extends RealtimeSegmentAssignment { * Returns {@code true} if all instances are OFFLINE (neither ONLINE nor CONSUMING), {@code false} otherwise. */ private boolean isOfflineSegment(Map<String, String> instanceStateMap) { - return !instanceStateMap.containsValue(SegmentStateModel.ONLINE) && !instanceStateMap.containsValue( - SegmentStateModel.CONSUMING); + return !instanceStateMap.containsValue(SegmentStateModel.ONLINE); } /** @@ -180,8 +179,9 @@ public class StrictRealtimeSegmentAssignment extends RealtimeSegmentAssignment { // Reassign CONSUMING and COMPLETED segments List<String> instancesAssigned = assignConsumingSegment(getPartitionIdUsingCache(segmentName), instancePartitions); - String state = instanceStateMap.containsValue(SegmentStateModel.CONSUMING) ? SegmentStateModel.CONSUMING - : SegmentStateModel.ONLINE; + String state = SegmentStateModel.ONLINE; +// String state = instanceStateMap.containsValue(SegmentStateModel.CONSUMING) ? SegmentStateModel.CONSUMING +// : SegmentStateModel.ONLINE; newAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, state)); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java index d248f52d4c..c9850856cd 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java @@ -42,7 +42,7 @@ import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory; -import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.zookeeper.data.Stat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -62,6 +62,7 @@ public class MissingConsumingSegmentFinder { private final SegmentMetadataFetcher _segmentMetadataFetcher; private final Map<Integer, StreamPartitionMsgOffset> _partitionGroupIdToLargestStreamOffsetMap; private final StreamPartitionMsgOffsetFactory _streamPartitionMsgOffsetFactory; + private final ZkHelixPropertyStore<ZNRecord> _propertyStore; private ControllerMetrics _controllerMetrics; @@ -69,6 +70,7 @@ public class MissingConsumingSegmentFinder { ControllerMetrics controllerMetrics, StreamConfig streamConfig) { _realtimeTableName = realtimeTableName; _controllerMetrics = controllerMetrics; + _propertyStore = propertyStore; _segmentMetadataFetcher = new SegmentMetadataFetcher(propertyStore, controllerMetrics); _streamPartitionMsgOffsetFactory = StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); @@ -94,6 +96,7 @@ public class MissingConsumingSegmentFinder { StreamPartitionMsgOffsetFactory streamPartitionMsgOffsetFactory) { _realtimeTableName = realtimeTableName; _segmentMetadataFetcher = segmentMetadataFetcher; + _propertyStore = segmentMetadataFetcher._propertyStore; _partitionGroupIdToLargestStreamOffsetMap = partitionGroupIdToLargestStreamOffsetMap; _streamPartitionMsgOffsetFactory = streamPartitionMsgOffsetFactory; } @@ -118,11 +121,21 @@ public class MissingConsumingSegmentFinder { idealStateMap.forEach((segmentName, instanceToStatusMap) -> { LLCSegmentName llcSegmentName = LLCSegmentName.of(segmentName); if (llcSegmentName != null) { // Skip the uploaded realtime segments that don't conform to llc naming - if (instanceToStatusMap.containsValue(SegmentStateModel.CONSUMING)) { - updateMap(partitionGroupIdToLatestConsumingSegmentMap, llcSegmentName); - } else if (instanceToStatusMap.containsValue(SegmentStateModel.ONLINE)) { - updateMap(partitionGroupIdToLatestCompletedSegmentMap, llcSegmentName); + SegmentZKMetadata segmentZKMetadata = + ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, _realtimeTableName, segmentName); + if (segmentZKMetadata != null) { + CommonConstants.Segment.Realtime.Status status = segmentZKMetadata.getStatus(); + if (status == CommonConstants.Segment.Realtime.Status.IN_PROGRESS) { + updateMap(partitionGroupIdToLatestConsumingSegmentMap, llcSegmentName); + } else if (status == CommonConstants.Segment.Realtime.Status.DONE) { + updateMap(partitionGroupIdToLatestCompletedSegmentMap, llcSegmentName); + } } +// if (instanceToStatusMap.containsValue(SegmentStateModel.CONSUMING)) { +// updateMap(partitionGroupIdToLatestConsumingSegmentMap, llcSegmentName); +// } else if (instanceToStatusMap.containsValue(SegmentStateModel.ONLINE)) { +// updateMap(partitionGroupIdToLatestCompletedSegmentMap, llcSegmentName); +// } } }); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 30f10f0021..d98930535a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -30,7 +30,6 @@ import java.util.List; import java.util.Map; import java.util.Random; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -533,17 +532,17 @@ public class PinotLLCRealtimeSegmentManager { TableConfig tableConfig = getTableConfig(realtimeTableName); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); IdealState idealState = getIdealState(realtimeTableName); - ExternalView externalView = getExternalView(realtimeTableName); +// ExternalView externalView = getExternalView(realtimeTableName); // Check whether there is at least 1 replica in ONLINE state for full-auto mode. - if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) { - Preconditions.checkState( - idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.ONLINE), - "Failed to find instance in ONLINE state in IdealState for segment: %s", committingSegmentName); - } else { - Preconditions.checkState( - idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING), - "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName); - } +// if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) { +// Preconditions.checkState( +// idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.ONLINE), +// "Failed to find instance in ONLINE state in IdealState for segment: %s", committingSegmentName); +// } else { +// Preconditions.checkState( +// idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.ONLINE), +// "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName); +// } int numReplicas = getNumReplicas(tableConfig, instancePartitions); /* @@ -842,8 +841,12 @@ public class PinotLLCRealtimeSegmentManager { // TODO: how to handle such state updates for FULL-AUTO mode? So far we don't enable FULL-AUTO for REALTIME Map<String, String> stateMap = idealState.getInstanceStateMap(segmentName); String state = stateMap.get(instanceName); - if (SegmentStateModel.CONSUMING.equals(state)) { - stateMap.put(instanceName, SegmentStateModel.OFFLINE); + if (SegmentStateModel.ONLINE.equals(state)) { + SegmentZKMetadata segmentZKMetadata = + ZKMetadataProvider.getSegmentZKMetadata(_propertyStore, realtimeTableName, segmentName); + if (segmentZKMetadata != null && segmentZKMetadata.getStatus() == Status.IN_PROGRESS) { + stateMap.put(instanceName, SegmentStateModel.OFFLINE); + } } else { LOGGER.info("Segment {} in state {} when trying to register consumption stop from {}", segmentName, state, instanceName); @@ -1195,7 +1198,7 @@ public class PinotLLCRealtimeSegmentManager { Map<String, String> instanceStateMap = instanceStatesMap.get(latestSegmentName); if (instanceStateMap != null) { // Latest segment of metadata is in idealstate. - if (instanceStateMap.containsValue(SegmentStateModel.CONSUMING)) { + if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) { if (latestSegmentZKMetadata.getStatus() == Status.DONE) { // step-1 of commmitSegmentMetadata is done (i.e. marking old segment as DONE) @@ -1294,13 +1297,20 @@ public class PinotLLCRealtimeSegmentManager { if (latestSegmentZKMetadata.getStatus() == Status.IN_PROGRESS) { // Find the previous CONSUMING segment String previousConsumingSegment = null; - for (Map.Entry<String, Map<String, String>> segmentEntry : instanceStatesMap.entrySet()) { - if (segmentEntry.getValue().containsValue(SegmentStateModel.CONSUMING) - && new LLCSegmentName(segmentEntry.getKey()).getPartitionGroupId() == partitionGroupId) { - previousConsumingSegment = segmentEntry.getKey(); + Set<String> consumingSegments = findConsumingSegments(idealState); + for (String consumingSegment : consumingSegments) { + if (new LLCSegmentName(consumingSegment).getPartitionGroupId() == partitionGroupId) { + previousConsumingSegment = consumingSegment; break; } } +// for (Map.Entry<String, Map<String, String>> segmentEntry : instanceStatesMap.entrySet()) { +// if (segmentEntry.getValue().containsValue(SegmentStateModel.CONSUMING) +// && new LLCSegmentName(segmentEntry.getKey()).getPartitionGroupId() == partitionGroupId) { +// previousConsumingSegment = segmentEntry.getKey(); +// break; +// } +// } if (previousConsumingSegment == null) { LOGGER.error( "Failed to find previous CONSUMING segment for partition: {} of table: {}, potential data loss", @@ -1755,16 +1765,7 @@ public class PinotLLCRealtimeSegmentManager { } private Set<String> findConsumingSegments(IdealState idealState) { - Set<String> consumingSegments = new TreeSet<>(); - idealState.getRecord().getMapFields().forEach((segmentName, instanceToStateMap) -> { - for (String state : instanceToStateMap.values()) { - if (state.equals(SegmentStateModel.CONSUMING)) { - consumingSegments.add(segmentName); - break; - } - } - }); - return consumingSegments; + return _helixResourceManager.getConsumingSegments(idealState); } /** diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java index 9d921d3af8..280214854d 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupSegmentAssignmentTest.java @@ -244,7 +244,7 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { // Check the CONSUMING segments newAssignment.get(_segments.get(segmentId)).forEach((instance, state) -> { assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); - assertEquals(state, SegmentStateModel.CONSUMING); + assertEquals(state, SegmentStateModel.ONLINE); }); } } @@ -345,7 +345,7 @@ public class RealtimeNonReplicaGroupSegmentAssignmentTest { // Add the new segment into the assignment as CONSUMING currentAssignment.put(_segments.get(segmentId), - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } private HelixManager createHelixManager() { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java index e03a8d4a46..e7b5895a47 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeNonReplicaGroupTieredSegmentAssignmentTest.java @@ -196,7 +196,7 @@ public class RealtimeNonReplicaGroupTieredSegmentAssignmentTest { Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); - assertEquals(entry.getValue(), SegmentStateModel.CONSUMING); + assertEquals(entry.getValue(), SegmentStateModel.ONLINE); } } } @@ -227,7 +227,7 @@ public class RealtimeNonReplicaGroupTieredSegmentAssignmentTest { Map<String, String> instanceStateMap = newAssignment.get(_segments.get(segmentId)); for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { assertTrue(entry.getKey().startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); - assertEquals(entry.getValue(), SegmentStateModel.CONSUMING); + assertEquals(entry.getValue(), SegmentStateModel.ONLINE); } } } @@ -345,7 +345,7 @@ public class RealtimeNonReplicaGroupTieredSegmentAssignmentTest { // Add the new segment into the assignment as CONSUMING currentAssignment.put(_segments.get(segmentId), - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } /** diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java index 2c590ef25e..cbac3133cf 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeReplicaGroupSegmentAssignmentTest.java @@ -231,7 +231,7 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { // check CONSUMING segments newAssignment.get(_segments.get(segmentId)).forEach((instance, state) -> { assertTrue(instance.startsWith(CONSUMING_INSTANCE_NAME_PREFIX)); - assertEquals(state, SegmentStateModel.CONSUMING); + assertEquals(state, SegmentStateModel.ONLINE); }); } } @@ -446,7 +446,7 @@ public class RealtimeReplicaGroupSegmentAssignmentTest { // Add the new segment into the assignment as CONSUMING currentAssignment.put(_segments.get(segmentId), - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } private HelixManager createHelixManager() { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java index 13520bb4f8..524a9b9606 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/segment/StrictRealtimeSegmentAssignmentTest.java @@ -274,7 +274,7 @@ public class StrictRealtimeSegmentAssignmentTest { // Add the new segment into the assignment as CONSUMING currentAssignment.put(_segments.get(segmentId), - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); + SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.ONLINE)); } private HelixManager createHelixManager() { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index a882a47ec7..26641d515c 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -179,7 +179,7 @@ public class PinotLLCRealtimeSegmentManagerTest { assertNotNull(instanceStateMap); assertEquals(instanceStateMap.size(), numReplicas); for (String state : instanceStateMap.values()) { - assertEquals(state, SegmentStateModel.CONSUMING); + assertEquals(state, SegmentStateModel.ONLINE); } SegmentZKMetadata segmentZKMetadata = segmentManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, segmentName, null); @@ -223,7 +223,7 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment); assertNotNull(consumingSegmentInstanceStateMap); assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()), - Collections.singleton(SegmentStateModel.CONSUMING)); + Collections.singleton(SegmentStateModel.ONLINE)); // Verify segment ZK metadata for committed segment and new consuming segment SegmentZKMetadata committedSegmentZKMetadata = segmentManager._segmentZKMetadataMap.get(committingSegment); @@ -260,7 +260,7 @@ public class PinotLLCRealtimeSegmentManagerTest { consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment); assertNotNull(consumingSegmentInstanceStateMap); assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()), - Collections.singleton(SegmentStateModel.CONSUMING)); + Collections.singleton(SegmentStateModel.ONLINE)); // Illegal segment commit - commit the segment again try { @@ -421,7 +421,7 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<String, String> instanceStateMap = instanceStatesMap.get(segmentName); assertEquals(instanceStateMap.size(), segmentManager._numReplicas); for (String state : instanceStateMap.values()) { - assertEquals(state, SegmentStateModel.CONSUMING); + assertEquals(state, SegmentStateModel.ONLINE); } // NOTE: Old segment ZK metadata might exist when previous round failed due to not enough instances assertTrue(segmentZKMetadataMap.containsKey(segmentName)); @@ -622,7 +622,7 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.remove(consumingSegment); assertNotNull(consumingSegmentInstanceStateMap); assertEquals(new HashSet<>(consumingSegmentInstanceStateMap.values()), - Collections.singleton(SegmentStateModel.CONSUMING)); + Collections.singleton(SegmentStateModel.ONLINE)); if (latestCommittedSegment != null) { Map<String, String> latestCommittedSegmentInstanceStateMap = instanceStatesMap.get(latestCommittedSegment); @@ -630,7 +630,7 @@ public class PinotLLCRealtimeSegmentManagerTest { for (Map.Entry<String, String> entry : latestCommittedSegmentInstanceStateMap.entrySet()) { // Latest committed segment should have all instances in ONLINE state assertEquals(entry.getValue(), SegmentStateModel.ONLINE); - entry.setValue(SegmentStateModel.CONSUMING); + entry.setValue(SegmentStateModel.ONLINE); } } } @@ -644,7 +644,7 @@ public class PinotLLCRealtimeSegmentManagerTest { assertNotNull(consumingSegmentInstanceStateMap); for (Map.Entry<String, String> entry : consumingSegmentInstanceStateMap.entrySet()) { // Consuming segment should have all instances in CONSUMING state - assertEquals(entry.getValue(), SegmentStateModel.CONSUMING); + assertEquals(entry.getValue(), SegmentStateModel.ONLINE); entry.setValue(SegmentStateModel.OFFLINE); } } @@ -657,7 +657,7 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<String, String> consumingSegmentInstanceStateMap = instanceStatesMap.get(consumingSegment); assertNotNull(consumingSegmentInstanceStateMap); for (Map.Entry<String, String> entry : consumingSegmentInstanceStateMap.entrySet()) { - entry.setValue(SegmentStateModel.CONSUMING); + entry.setValue(SegmentStateModel.ONLINE); } } @@ -702,8 +702,7 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<String, String> instanceStateMap = entry.getValue(); // Skip segments with all instances OFFLINE - if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || instanceStateMap - .containsValue(SegmentStateModel.CONSUMING)) { + if (instanceStateMap.containsValue(SegmentStateModel.ONLINE)) { LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName); int partitionsId = llcSegmentName.getPartitionGroupId(); Map<Integer, String> sequenceNumberToSegmentMap = partitionGroupIdToSegmentsMap.get(partitionsId); @@ -723,8 +722,8 @@ public class PinotLLCRealtimeSegmentManagerTest { Map<String, String> instanceStateMap = instanceStatesMap.get(latestSegment); if (!shardsEnded.contains(partitionGroupId)) { // Latest segment should have CONSUMING instance but no ONLINE instance in ideal state - assertTrue(instanceStateMap.containsValue(SegmentStateModel.CONSUMING)); - assertFalse(instanceStateMap.containsValue(SegmentStateModel.ONLINE)); + assertTrue(instanceStateMap.containsValue(SegmentStateModel.ONLINE)); +// assertFalse(instanceStateMap.containsValue(SegmentStateModel.ONLINE)); // Latest segment ZK metadata should be IN_PROGRESS assertEquals(segmentManager._segmentZKMetadataMap.get(latestSegment).getStatus(), Status.IN_PROGRESS); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java index ecf1e0feda..cf0476b3b0 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerTest.java @@ -29,7 +29,6 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; import org.testng.annotations.Test; -import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING; import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ERROR; import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE; import static org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE; @@ -1293,7 +1292,7 @@ public class TableRebalancerTest { true, null)); // Should fail when a segment has CONSUMING instance in IdealState but does not exist in ExternalView - instanceStateMap.put("instance2", CONSUMING); + instanceStateMap.put("instance2", ONLINE); assertFalse( TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, false, null)); @@ -1321,7 +1320,7 @@ public class TableRebalancerTest { true, null)); // Should pass when instance state matches - instanceStateMap.put("instance2", CONSUMING); + instanceStateMap.put("instance2", ONLINE); assertTrue( TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, false, null)); @@ -1330,7 +1329,7 @@ public class TableRebalancerTest { true, null)); // Should pass when there are extra instances in ExternalView - instanceStateMap.put("instance3", CONSUMING); + instanceStateMap.put("instance3", ONLINE); assertTrue( TableRebalancer.isExternalViewConverged(offlineTableName, externalViewSegmentStates, idealStateSegmentStates, false, null)); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 2ddc0f7eb5..9ed0a8a395 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -311,7 +311,7 @@ public class RealtimeTableDataManager extends BaseTableDataManager { public Set<Integer> getHostedPartitionsGroupIds() { Set<Integer> partitionsHostedByThisServer = new HashSet<>(); List<String> segments = TableStateUtils.getSegmentsInGivenStateForThisInstance(_helixManager, _tableNameWithType, - CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING); + Status.IN_PROGRESS); for (String segmentNameStr : segments) { LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); partitionsHostedByThisServer.add(segmentName.getPartitionGroupId()); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java index 39e4855f63..69fb078c94 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java @@ -302,7 +302,7 @@ public class ControllerPeriodicTasksIntegrationTest extends BaseClusterIntegrati for (Map<String, String> instanceStateMap : idealState.getRecord().getMapFields().values()) { for (Map.Entry<String, String> entry : instanceStateMap.entrySet()) { String state = entry.getValue(); - if (state.equals(SegmentStateModel.CONSUMING)) { + if (state.equals("CONSUMING")) { consumingServers.add(entry.getKey()); } else if (state.equals(SegmentStateModel.ONLINE)) { completedServers.add(entry.getKey()); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index 50693aaf73..c7c30ebca1 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -415,7 +415,7 @@ public class LLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegr Set<String> matchingSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size())); for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { Map<String, String> instanceStateMap = entry.getValue(); - if (instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING)) { + if (instanceStateMap.containsValue(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)) { matchingSegments.add(entry.getKey()); } } @@ -492,7 +492,7 @@ public class LLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegr is.getPartitionSet().forEach(segmentNameStr -> { if (LLCSegmentName.isLLCSegment(segmentNameStr)) { if (is.getInstanceStateMap(segmentNameStr).values().contains( - CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING)) { + CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE)) { LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); if (segmentName.getPartitionGroupId() == partition) { seqNum.set(segmentName.getSequenceNumber()); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java index 8afcbe9177..8f8602d87f 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PartialUpsertTableRebalanceIntegrationTest.java @@ -247,7 +247,7 @@ public class PartialUpsertTableRebalanceIntegrationTest extends BaseClusterInteg if (llcSegmentName.getSequenceNumber() < maxSequenceNumber) { assertEquals(state, CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE); } else { - assertEquals(state, CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING); + assertEquals(state, CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE); } } else { assertEquals(state, CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java index 9123c9660c..c6a1a7f992 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentPreloadIntegrationTest.java @@ -260,7 +260,7 @@ public class UpsertTableSegmentPreloadIntegrationTest extends BaseClusterIntegra if (llcSegmentName.getSequenceNumber() < maxSequenceNumber) { assertEquals(state, SegmentStateModel.ONLINE); } else { - assertEquals(state, SegmentStateModel.CONSUMING); + assertEquals(state, SegmentStateModel.ONLINE); } } else { assertEquals(state, SegmentStateModel.ONLINE); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java index 2e9f472d98..4f2cd5d977 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java @@ -192,7 +192,7 @@ public class UpsertTableSegmentUploadIntegrationTest extends BaseClusterIntegrat Map.Entry<String, String> instanceIdAndState = instanceStateMap.entrySet().iterator().next(); String state = instanceIdAndState.getValue(); if (LLCSegmentName.isLLCSegment(segmentName)) { - assertEquals(state, SegmentStateModel.CONSUMING); + assertEquals(state, SegmentStateModel.ONLINE); } else { assertEquals(state, SegmentStateModel.ONLINE); } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java index 6de4536de1..4f35c7f9c6 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java @@ -27,6 +27,11 @@ import org.apache.helix.PropertyKey; import org.apache.helix.model.CurrentState; import org.apache.helix.model.IdealState; import org.apache.helix.model.LiveInstance; +import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,12 +49,13 @@ public class TableStateUtils { * * @param helixManager instance of Helix manager * @param tableNameWithType table for which we are obtaining segments in a given state - * @param state state of the segments to be returned + * @param status valid status of the segments to be returned * * @return List of segment names in a given state. */ public static List<String> getSegmentsInGivenStateForThisInstance(HelixManager helixManager, String tableNameWithType, - String state) { + CommonConstants.Segment.Realtime.Status status) { + ZkHelixPropertyStore<ZNRecord> propertyStore = helixManager.getHelixPropertyStore(); HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder(); IdealState idealState = dataAccessor.getProperty(keyBuilder.idealStates(tableNameWithType)); @@ -66,11 +72,15 @@ public class TableStateUtils { String segmentName = entry.getKey(); Map<String, String> instanceStateMap = entry.getValue(); String expectedState = instanceStateMap.get(instanceName); - // Only track state segments assigned to the current instance - if (!state.equals(expectedState)) { - continue; + if (!CommonConstants.Helix.StateModel.SegmentStateModel.ERROR.equals(expectedState)) { + SegmentZKMetadata segmentZKMetadata = + ZKMetadataProvider.getSegmentZKMetadata(propertyStore, tableNameWithType, segmentName); + // Only track state segments assigned to the current instance + if (segmentZKMetadata != null && segmentZKMetadata.getStatus() != status) { + continue; + } + segmentsInGivenState.add(segmentName); } - segmentsInGivenState.add(segmentName); } return segmentsInGivenState; } @@ -85,7 +95,8 @@ public class TableStateUtils { */ public static boolean isAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType) { List<String> onlineSegments = - getSegmentsInGivenStateForThisInstance(helixManager, tableNameWithType, SegmentStateModel.ONLINE); + getSegmentsInGivenStateForThisInstance(helixManager, tableNameWithType, + CommonConstants.Segment.Realtime.Status.DONE); if (onlineSegments.isEmpty()) { LOGGER.info("No ONLINE segment found for table: {}", tableNameWithType); return true; diff --git a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java index b08833b966..fd067f353f 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java @@ -95,7 +95,6 @@ import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.FieldSpec.DataType; import org.apache.pinot.spi.stream.ConsumerPartitionState; -import org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.roaringbitmap.buffer.MutableRoaringBitmap; @@ -737,30 +736,47 @@ public class TablesResource { // Segment hosted by this server. Validate segment state String segmentName = entry.getKey(); SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName); + if (segmentDataManager == null) { + return new TableSegmentValidationInfo(false, -1); + } try { - switch (segmentState) { - case SegmentStateModel.CONSUMING: - // Only validate presence of segment - if (segmentDataManager == null) { - return new TableSegmentValidationInfo(false, -1); - } - break; - case SegmentStateModel.ONLINE: - // Validate segment CRC - SegmentZKMetadata zkMetadata = - ZKMetadataProvider.getSegmentZKMetadata(_serverInstance.getHelixManager().getHelixPropertyStore(), - tableNameWithType, segmentName); - Preconditions.checkState(zkMetadata != null, - "Segment zk metadata not found for segment : " + segmentName); - if (segmentDataManager == null || !segmentDataManager.getSegment().getSegmentMetadata().getCrc() - .equals(String.valueOf(zkMetadata.getCrc()))) { - return new TableSegmentValidationInfo(false, -1); - } - maxEndTimeMs = Math.max(maxEndTimeMs, zkMetadata.getEndTimeMs()); - break; - default: - break; + IndexSegment indexSegment = segmentDataManager.getSegment(); + if (indexSegment instanceof ImmutableSegment) { + // Validate segment CRC + SegmentZKMetadata zkMetadata = + ZKMetadataProvider.getSegmentZKMetadata(_serverInstance.getHelixManager().getHelixPropertyStore(), + tableNameWithType, segmentName); + Preconditions.checkState(zkMetadata != null, + "Segment zk metadata not found for segment : " + segmentName); + if (!segmentDataManager.getSegment().getSegmentMetadata().getCrc() + .equals(String.valueOf(zkMetadata.getCrc()))) { + return new TableSegmentValidationInfo(false, -1); + } + maxEndTimeMs = Math.max(maxEndTimeMs, zkMetadata.getEndTimeMs()); } +// switch (segmentState) { +// case SegmentStateModel.CONSUMING: +// // Only validate presence of segment +// if (segmentDataManager == null) { +// return new TableSegmentValidationInfo(false, -1); +// } +// break; +// case SegmentStateModel.ONLINE: +// // Validate segment CRC +// SegmentZKMetadata zkMetadata = +// ZKMetadataProvider.getSegmentZKMetadata(_serverInstance.getHelixManager().getHelixPropertyStore(), +// tableNameWithType, segmentName); +// Preconditions.checkState(zkMetadata != null, +// "Segment zk metadata not found for segment : " + segmentName); +// if (segmentDataManager == null || !segmentDataManager.getSegment().getSegmentMetadata().getCrc() +// .equals(String.valueOf(zkMetadata.getCrc()))) { +// return new TableSegmentValidationInfo(false, -1); +// } +// maxEndTimeMs = Math.max(maxEndTimeMs, zkMetadata.getEndTimeMs()); +// break; +// default: +// break; +// } } finally { if (segmentDataManager != null) { tableDataManager.releaseSegment(segmentDataManager); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 9862ffb7dd..e0faea64c2 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -51,6 +51,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.Utils; import org.apache.pinot.common.config.TlsConfig; import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.restlet.resources.SystemResourceInfo; @@ -298,10 +299,20 @@ public abstract class BaseServerStarter implements ServiceStartable { } if (checkRealtime && TableNameBuilder.isRealtimeTableResource(resourceName)) { for (String partitionName : idealState.getPartitionSet()) { - if (StateModel.SegmentStateModel.CONSUMING.equals( + if (StateModel.SegmentStateModel.ONLINE.equals( idealState.getInstanceStateMap(partitionName).get(_instanceId))) { - consumingSegments.add(partitionName); + SegmentZKMetadata segmentZKMetadata = + ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), resourceName, + partitionName); + if (segmentZKMetadata != null + && segmentZKMetadata.getStatus() == CommonConstants.Segment.Realtime.Status.IN_PROGRESS) { + consumingSegments.add(partitionName); + } } +// if (StateModel.SegmentStateModel.CONSUMING.equals( +// idealState.getInstanceStateMap(partitionName).get(_instanceId))) { +// consumingSegments.add(partitionName); +// } } } } @@ -871,7 +882,7 @@ public abstract class BaseServerStarter implements ServiceStartable { for (String partition : externalView.getPartitionSet()) { Map<String, String> instanceStateMap = externalView.getStateMap(partition); String state = instanceStateMap.get(_instanceId); - if (StateModel.SegmentStateModel.ONLINE.equals(state) || StateModel.SegmentStateModel.CONSUMING.equals(state)) { + if (StateModel.SegmentStateModel.ONLINE.equals(state)) { return false; } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java index 0a0608b44d..ddb5dca96e 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.server.starter.helix; -import com.google.common.base.Preconditions; import org.apache.helix.NotificationContext; import org.apache.helix.model.Message; import org.apache.helix.participant.statemachine.StateModel; @@ -76,10 +75,14 @@ public class OfflineSegmentOnlineOfflineStateModelFactory extends StateModelFact String tableNameWithType = message.getResourceName(); String segmentName = message.getPartitionName(); TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); - Preconditions.checkArgument((tableType != null) && (tableType != TableType.REALTIME), - "TableType is null or is a REALTIME table, offline state model should not be called fo RT"); +// Preconditions.checkArgument((tableType != null) && (tableType != TableType.REALTIME), +// "TableType is null or is a REALTIME table, offline state model should not be called fo RT"); try { - _instanceDataManager.addOrReplaceSegment(tableNameWithType, segmentName); + if (tableType == TableType.OFFLINE) { + _instanceDataManager.addOrReplaceSegment(tableNameWithType, segmentName); + } else if (tableType == TableType.REALTIME) { + _instanceDataManager.addRealtimeSegment(tableNameWithType, segmentName); + } } catch (Exception e) { String errorMessage = String.format("Caught exception in state transition OFFLINE -> ONLINE for table: %s, segment: %s", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org