This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch full-auto-poc in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 37dc958fbc8cc33ab500167e43e78d764f239297 Author: jlli_LinkedIn <j...@linkedin.com> AuthorDate: Mon Feb 12 23:36:47 2024 -0800 Initial POC code for hybrid table --- .../pinot/broker/routing/BrokerRoutingManager.java | 11 +- .../instanceselector/BaseInstanceSelector.java | 59 ++-- .../StrictReplicaGroupInstanceSelector.java | 34 ++- .../controller/helix/SegmentStatusChecker.java | 6 +- ...imeSegmentOnlineOfflineStateModelGenerator.java | 72 +++++ .../helix/core/PinotHelixResourceManager.java | 30 +- .../helix/core/PinotTableIdealStateBuilder.java | 17 +- .../realtime/MissingConsumingSegmentFinder.java | 7 +- .../realtime/PinotLLCRealtimeSegmentManager.java | 51 +++- .../helix/core/util/HelixSetupUtils.java | 23 +- .../realtime/RealtimeSegmentDataManager.java | 7 + .../tests/LLCRealtimeClusterIntegrationTest.java | 8 +- .../server/starter/helix/BaseServerStarter.java | 13 +- ...ltimeSegmentOnlineOfflineStateModelFactory.java | 325 +++++++++++++++++++++ .../airlineStats_realtime_table_config.json | 4 +- 15 files changed, 574 insertions(+), 93 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java index cc3a5354ef..b6be06653b 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java @@ -431,7 +431,7 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle externalViewVersion = externalView.getRecord().getVersion(); } - Set<String> onlineSegments = getOnlineSegments(idealState); + Set<String> onlineSegments = getOnlineSegments(idealState, externalView); SegmentPreSelector segmentPreSelector = SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig, _propertyStore); @@ -480,7 +480,7 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle if (offlineTableExternalView == null) { offlineTableExternalView = new ExternalView(offlineTableName); } - Set<String> offlineTableOnlineSegments = getOnlineSegments(offlineTableIdealState); + Set<String> offlineTableOnlineSegments = getOnlineSegments(offlineTableIdealState, offlineTableExternalView); SegmentPreSelector offlineTableSegmentPreSelector = SegmentPreSelectorFactory.getSegmentPreSelector(offlineTableConfig, _propertyStore); Set<String> offlineTablePreSelectedOnlineSegments = @@ -538,8 +538,11 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle /** * Returns the online segments (with ONLINE/CONSUMING instances) in the given ideal state. */ - private static Set<String> getOnlineSegments(IdealState idealState) { + private static Set<String> getOnlineSegments(IdealState idealState, ExternalView externalView) { Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields(); + if (segmentAssignment.isEmpty()) { + segmentAssignment = externalView.getRecord().getMapFields(); + } Set<String> onlineSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size())); for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) { Map<String, String> instanceStateMap = entry.getValue(); @@ -777,7 +780,7 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle // inconsistency between components, which is fine because the inconsistency only exists for the newly changed // segments and only lasts for a very short time. void onAssignmentChange(IdealState idealState, ExternalView externalView) { - Set<String> onlineSegments = getOnlineSegments(idealState); + Set<String> onlineSegments = getOnlineSegments(idealState, externalView); Set<String> preSelectedOnlineSegments = _segmentPreSelector.preSelect(onlineSegments); _segmentZkMetadataFetcher.onAssignmentChange(idealState, externalView, preSelectedOnlineSegments); _segmentSelector.onAssignmentChange(idealState, externalView, preSelectedOnlineSegments); diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java index b2961eef94..aebafa7741 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java @@ -27,6 +27,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; +import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; import javax.annotation.Nullable; @@ -130,11 +131,11 @@ abstract class BaseInstanceSelector implements InstanceSelector { Map<String, Long> getNewSegmentCreationTimeMapFromZK(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) { List<String> potentialNewSegments = new ArrayList<>(); - Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields(); +// Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields(); Map<String, Map<String, String>> externalViewAssignment = externalView.getRecord().getMapFields(); for (String segment : onlineSegments) { - assert idealStateAssignment.containsKey(segment); - if (isPotentialNewSegment(idealStateAssignment.get(segment), externalViewAssignment.get(segment))) { + assert externalViewAssignment.containsKey(segment); + if (isPotentialNewSegment(externalViewAssignment.get(segment))) { potentialNewSegments.add(segment); } } @@ -169,14 +170,13 @@ abstract class BaseInstanceSelector implements InstanceSelector { * - Any instance for the segment is in ERROR state * - External view for the segment converges with ideal state */ - static boolean isPotentialNewSegment(Map<String, String> idealStateInstanceStateMap, - @Nullable Map<String, String> externalViewInstanceStateMap) { + static boolean isPotentialNewSegment(@Nullable Map<String, String> externalViewInstanceStateMap) { if (externalViewInstanceStateMap == null) { return true; } boolean hasConverged = true; // Only track ONLINE/CONSUMING instances within the ideal state - for (Map.Entry<String, String> entry : idealStateInstanceStateMap.entrySet()) { + for (Map.Entry<String, String> entry : externalViewInstanceStateMap.entrySet()) { if (isOnlineForRouting(entry.getValue())) { String externalViewState = externalViewInstanceStateMap.get(entry.getKey()); if (externalViewState == null || externalViewState.equals(SegmentStateModel.OFFLINE)) { @@ -192,14 +192,13 @@ abstract class BaseInstanceSelector implements InstanceSelector { /** * Returns the online instances for routing purpose. */ - static TreeSet<String> getOnlineInstances(Map<String, String> idealStateInstanceStateMap, - Map<String, String> externalViewInstanceStateMap) { + static TreeSet<String> getOnlineInstances(Map<String, String> externalViewInstanceStateMap) { TreeSet<String> onlineInstances = new TreeSet<>(); // Only track ONLINE/CONSUMING instances within the ideal state - for (Map.Entry<String, String> entry : idealStateInstanceStateMap.entrySet()) { + for (Map.Entry<String, String> entry : externalViewInstanceStateMap.entrySet()) { String instance = entry.getKey(); // NOTE: DO NOT check if EV matches IS because it is a valid state when EV is CONSUMING while IS is ONLINE - if (isOnlineForRouting(entry.getValue()) && isOnlineForRouting(externalViewInstanceStateMap.get(instance))) { + if (isOnlineForRouting(externalViewInstanceStateMap.get(instance))) { onlineInstances.add(instance); } } @@ -217,6 +216,14 @@ abstract class BaseInstanceSelector implements InstanceSelector { } } + static SortedSet<String> convertToSortedSet(Set<String> set) { + if (set instanceof SortedSet) { + return (SortedSet<String>) set; + } else { + return new TreeSet<>(set); + } + } + /** * Updates the segment maps based on the given ideal state, external view, online segments (segments with * ONLINE/CONSUMING instances in the ideal state and pre-selected by the {@link SegmentPreSelector}) and new segments. @@ -229,20 +236,22 @@ abstract class BaseInstanceSelector implements InstanceSelector { _oldSegmentCandidatesMap.clear(); _newSegmentStateMap = new HashMap<>(HashUtil.getHashMapCapacity(newSegmentCreationTimeMap.size())); - Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields(); +// Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields(); + Set<String> idealStateSegmentSet = idealState.getPartitionSet(); Map<String, Map<String, String>> externalViewAssignment = externalView.getRecord().getMapFields(); for (String segment : onlineSegments) { - Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment); +// Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment); + Long newSegmentCreationTimeMs = newSegmentCreationTimeMap.get(segment); Map<String, String> externalViewInstanceStateMap = externalViewAssignment.get(segment); if (externalViewInstanceStateMap == null) { if (newSegmentCreationTimeMs != null) { // New segment - List<SegmentInstanceCandidate> candidates = new ArrayList<>(idealStateInstanceStateMap.size()); - for (Map.Entry<String, String> entry : convertToSortedMap(idealStateInstanceStateMap).entrySet()) { - if (isOnlineForRouting(entry.getValue())) { - candidates.add(new SegmentInstanceCandidate(entry.getKey(), false)); - } + List<SegmentInstanceCandidate> candidates = new ArrayList<>(Integer.parseInt(idealState.getReplicas())); + for (String segmentName : convertToSortedSet(idealStateSegmentSet)) { +// if (isOnlineForRouting(entry.getValue())) { + candidates.add(new SegmentInstanceCandidate(segmentName, false)); +// } } _newSegmentStateMap.put(segment, new NewSegmentState(newSegmentCreationTimeMs, candidates)); } else { @@ -250,11 +259,11 @@ abstract class BaseInstanceSelector implements InstanceSelector { _oldSegmentCandidatesMap.put(segment, Collections.emptyList()); } } else { - TreeSet<String> onlineInstances = getOnlineInstances(idealStateInstanceStateMap, externalViewInstanceStateMap); + TreeSet<String> onlineInstances = getOnlineInstances(externalViewInstanceStateMap); if (newSegmentCreationTimeMs != null) { // New segment - List<SegmentInstanceCandidate> candidates = new ArrayList<>(idealStateInstanceStateMap.size()); - for (Map.Entry<String, String> entry : convertToSortedMap(idealStateInstanceStateMap).entrySet()) { + List<SegmentInstanceCandidate> candidates = new ArrayList<>(externalViewInstanceStateMap.size()); + for (Map.Entry<String, String> entry : convertToSortedMap(externalViewInstanceStateMap).entrySet()) { if (isOnlineForRouting(entry.getValue())) { String instance = entry.getKey(); candidates.add(new SegmentInstanceCandidate(instance, onlineInstances.contains(instance))); @@ -365,7 +374,7 @@ abstract class BaseInstanceSelector implements InstanceSelector { @Override public void onAssignmentChange(IdealState idealState, ExternalView externalView, Set<String> onlineSegments) { Map<String, Long> newSegmentCreationTimeMap = - getNewSegmentCreationTimeMapFromExistingStates(idealState, externalView, onlineSegments); + getNewSegmentCreationTimeMapFromExistingStates(externalView, onlineSegments); updateSegmentMaps(idealState, externalView, onlineSegments, newSegmentCreationTimeMap); refreshSegmentStates(); } @@ -373,11 +382,11 @@ abstract class BaseInstanceSelector implements InstanceSelector { /** * Returns a map from new segment to their creation time based on the existing in-memory states. */ - Map<String, Long> getNewSegmentCreationTimeMapFromExistingStates(IdealState idealState, ExternalView externalView, + Map<String, Long> getNewSegmentCreationTimeMapFromExistingStates(ExternalView externalView, Set<String> onlineSegments) { Map<String, Long> newSegmentCreationTimeMap = new HashMap<>(); long currentTimeMs = _clock.millis(); - Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields(); +// Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields(); Map<String, Map<String, String>> externalViewAssignment = externalView.getRecord().getMapFields(); for (String segment : onlineSegments) { NewSegmentState newSegmentState = _newSegmentStateMap.get(segment); @@ -393,8 +402,8 @@ abstract class BaseInstanceSelector implements InstanceSelector { } // For recently created segment, check if it is qualified as new segment if (creationTimeMs > 0) { - assert idealStateAssignment.containsKey(segment); - if (isPotentialNewSegment(idealStateAssignment.get(segment), externalViewAssignment.get(segment))) { + assert externalViewAssignment.containsKey(segment); + if (isPotentialNewSegment(externalViewAssignment.get(segment))) { newSegmentCreationTimeMap.put(segment, creationTimeMs); } } diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java index 8c352bdbe6..5b34371a8d 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java @@ -96,7 +96,7 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele int newSegmentMapCapacity = HashUtil.getHashMapCapacity(newSegmentCreationTimeMap.size()); _newSegmentStateMap = new HashMap<>(newSegmentMapCapacity); - Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields(); +// Map<String, Map<String, String>> idealStateAssignment = idealState.getRecord().getMapFields(); Map<String, Map<String, String>> externalViewAssignment = externalView.getRecord().getMapFields(); // Get the online instances for the segments @@ -104,14 +104,14 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele new HashMap<>(HashUtil.getHashMapCapacity(onlineSegments.size())); Map<String, Set<String>> newSegmentToOnlineInstancesMap = new HashMap<>(newSegmentMapCapacity); for (String segment : onlineSegments) { - Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment); - assert idealStateInstanceStateMap != null; +// Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment); +// assert idealStateInstanceStateMap != null; Map<String, String> externalViewInstanceStateMap = externalViewAssignment.get(segment); Set<String> onlineInstances; if (externalViewInstanceStateMap == null) { onlineInstances = Collections.emptySet(); } else { - onlineInstances = getOnlineInstances(idealStateInstanceStateMap, externalViewInstanceStateMap); + onlineInstances = getOnlineInstances(externalViewInstanceStateMap); } if (newSegmentCreationTimeMap.containsKey(segment)) { newSegmentToOnlineInstancesMap.put(segment, onlineInstances); @@ -126,16 +126,18 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele for (Map.Entry<String, Set<String>> entry : oldSegmentToOnlineInstancesMap.entrySet()) { String segment = entry.getKey(); Set<String> onlineInstances = entry.getValue(); - Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment); - Set<String> instancesInIdealState = idealStateInstanceStateMap.keySet(); +// Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment); +// Set<String> instancesInIdealState = idealStateInstanceStateMap.keySet(); + Map<String, String> externalViewInstanceStateMap = externalViewAssignment.get(segment); + Set<String> instanceInExternalView = externalViewInstanceStateMap.keySet(); Set<String> unavailableInstances = - unavailableInstancesMap.computeIfAbsent(instancesInIdealState, k -> new HashSet<>()); - for (String instance : instancesInIdealState) { + unavailableInstancesMap.computeIfAbsent(instanceInExternalView, k -> new HashSet<>()); + for (String instance : instanceInExternalView) { if (!onlineInstances.contains(instance)) { if (unavailableInstances.add(instance)) { LOGGER.warn( "Found unavailable instance: {} in instance group: {} for segment: {}, table: {} (IS: {}, EV: {})", - instance, instancesInIdealState, segment, _tableNameWithType, idealStateInstanceStateMap, + instance, instanceInExternalView, segment, _tableNameWithType, externalViewInstanceStateMap, externalViewAssignment.get(segment)); } } @@ -147,8 +149,9 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele String segment = entry.getKey(); // NOTE: onlineInstances is either a TreeSet or an EmptySet (sorted) Set<String> onlineInstances = entry.getValue(); - Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment); - Set<String> unavailableInstances = unavailableInstancesMap.get(idealStateInstanceStateMap.keySet()); + Map<String, String> externalViewInstanceStateMap = externalViewAssignment.get(segment); +// Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment); + Set<String> unavailableInstances = unavailableInstancesMap.get(externalViewInstanceStateMap.keySet()); List<SegmentInstanceCandidate> candidates = new ArrayList<>(onlineInstances.size()); for (String instance : onlineInstances) { if (!unavailableInstances.contains(instance)) { @@ -161,11 +164,12 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele for (Map.Entry<String, Set<String>> entry : newSegmentToOnlineInstancesMap.entrySet()) { String segment = entry.getKey(); Set<String> onlineInstances = entry.getValue(); - Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment); + Map<String, String> externalViewInstanceStateMap = externalViewAssignment.get(segment); +// Map<String, String> idealStateInstanceStateMap = idealStateAssignment.get(segment); Set<String> unavailableInstances = - unavailableInstancesMap.getOrDefault(idealStateInstanceStateMap.keySet(), Collections.emptySet()); - List<SegmentInstanceCandidate> candidates = new ArrayList<>(idealStateInstanceStateMap.size()); - for (String instance : convertToSortedMap(idealStateInstanceStateMap).keySet()) { + unavailableInstancesMap.getOrDefault(externalViewInstanceStateMap.keySet(), Collections.emptySet()); + List<SegmentInstanceCandidate> candidates = new ArrayList<>(externalViewInstanceStateMap.size()); + for (String instance : convertToSortedMap(externalViewInstanceStateMap).keySet()) { if (!unavailableInstances.contains(instance)) { candidates.add(new SegmentInstanceCandidate(instance, onlineInstances.contains(instance))); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index d0af31044f..2b9431b0f2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -256,8 +256,12 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh int nReplicas = 0; int nIdeal = 0; nSegments++; + Map<String, String> partitionMap = idealState.getInstanceStateMap(partitionName); + if (partitionMap == null) { + continue; + } // Skip segments not online in ideal state - for (Map.Entry<String, String> serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) { + for (Map.Entry<String, String> serverAndState : partitionMap.entrySet()) { if (serverAndState == null) { break; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java new file mode 100644 index 0000000000..ec640131cb --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core; + +import org.apache.helix.model.StateModelDefinition; + + +/** + * Offline Segment state model generator describes the transitions for offline segment states. + * + * Online to Offline, Online to Dropped + * Offline to Online, Offline to Dropped + * + * This does not include the state transitions for realtime segments (which includes the CONSUMING state) + */ +public class PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator { + private PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator() { + } + + public static final String PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL = + "RealtimeSegmentOnlineOfflineStateModel"; + + public static final String ONLINE_STATE = "ONLINE"; + public static final String CONSUMING_STATE = "CONSUMING"; + public static final String OFFLINE_STATE = "OFFLINE"; + public static final String DROPPED_STATE = "DROPPED"; + + public static StateModelDefinition generatePinotStateModelDefinition() { + StateModelDefinition.Builder builder = + new StateModelDefinition.Builder(PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL); + builder.initialState(OFFLINE_STATE); + + builder.addState(ONLINE_STATE); + builder.addState(CONSUMING_STATE); + builder.addState(OFFLINE_STATE); + builder.addState(DROPPED_STATE); + // Set the initial state when the node starts + + // Add transitions between the states. + builder.addTransition(CONSUMING_STATE, ONLINE_STATE); + builder.addTransition(OFFLINE_STATE, CONSUMING_STATE); +// builder.addTransition(OFFLINE_STATE, ONLINE_STATE); + builder.addTransition(CONSUMING_STATE, OFFLINE_STATE); + builder.addTransition(ONLINE_STATE, OFFLINE_STATE); + builder.addTransition(OFFLINE_STATE, DROPPED_STATE); + + // set constraints on states. + // static constraint + builder.dynamicUpperBound(ONLINE_STATE, "R"); + // dynamic constraint, R means it should be derived based on the replication + // factor. + + StateModelDefinition statemodelDefinition = builder.build(); + return statemodelDefinition; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index ae208715fd..0a308fb514 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1583,17 +1583,19 @@ public class PinotHelixResourceManager { Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME, "Invalid table type: %s", tableType); - IdealState idealState; - if (tableType == TableType.REALTIME) { - idealState = - PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(), - _enableBatchMessageMode); - } else { - // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only - idealState = - PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(), - _enableBatchMessageMode); - } + IdealState idealState = + PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(), + _enableBatchMessageMode); +// if (tableType == TableType.REALTIME) { +// idealState = +// PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(), +// _enableBatchMessageMode); +// } else { +// // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only +// idealState = +// PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(), +// _enableBatchMessageMode); +// } // IdealState idealState = // PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(), // _enableBatchMessageMode); @@ -2270,8 +2272,10 @@ public class PinotHelixResourceManager { TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); if (tableType == TableType.REALTIME) { // TODO: Once REALTIME uses FULL-AUTO only the listFields should be updated - currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); + currentAssignmentList.put(segmentName, Collections.emptyList() + /* SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */); +// currentAssignment.put(segmentName, +// SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); } else { // TODO: Assess whether to pass in an empty instance list or to set the preferred list currentAssignmentList.put(segmentName, Collections.emptyList() diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 63222df7e3..f02ea86dc7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -23,10 +23,12 @@ import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.model.IdealState; import org.apache.helix.model.builder.CustomModeISBuilder; import org.apache.helix.model.builder.FullAutoModeISBuilder; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.RetryPolicies; import org.apache.pinot.spi.utils.retry.RetryPolicy; import org.slf4j.Logger; @@ -57,12 +59,23 @@ public class PinotTableIdealStateBuilder { public static IdealState buildEmptyFullAutoIdealStateFor(String tableNameWithType, int numReplicas, boolean enableBatchMessageMode) { LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + String stateModel; + if (tableType == null) { + throw new RuntimeException("Failed to get table type from table name: " + tableNameWithType); + } else if (TableType.OFFLINE.equals(tableType)) { + stateModel = + PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; + } else { + stateModel = + PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; + } + // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default // TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType); idealStateBuilder - .setStateModel( - PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL) + .setStateModel(stateModel) .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1) // TODO: Revisit the rebalance strategy to use (maybe we add a custom one) .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName()); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java index f4192a5a1a..03d72600c7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java @@ -25,6 +25,7 @@ import java.time.Instant; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.helix.AccessOption; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -141,8 +142,12 @@ public class MissingConsumingSegmentFinder { // Note that there is no problem in case the partition group has reached its end of life. SegmentZKMetadata segmentZKMetadata = _segmentMetadataFetcher .fetchSegmentZkMetadata(_realtimeTableName, latestCompletedSegment.getSegmentName()); + String endOffset = segmentZKMetadata.getEndOffset(); + if (StringUtils.isEmpty(endOffset)) { + return; + } StreamPartitionMsgOffset completedSegmentEndOffset = - _streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset()); + _streamPartitionMsgOffsetFactory.create(endOffset); if (completedSegmentEndOffset.compareTo(largestStreamOffset) < 0) { // there are unconsumed messages available on the stream missingSegmentInfo._totalCount++; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 003f985e3f..88f3c1431c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -46,6 +46,7 @@ import org.apache.helix.Criteria; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.InstanceType; +import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -71,7 +72,6 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; -import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager; import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater; @@ -448,6 +448,18 @@ public class PinotLLCRealtimeSegmentManager { } } + @VisibleForTesting + ExternalView getExternalView(String realtimeTableName) { + try { + ExternalView externalView = HelixHelper.getExternalViewForResource(_helixAdmin, _clusterName, realtimeTableName); + Preconditions.checkState(externalView != null, "Failed to find ExternalView for table: " + realtimeTableName); + return externalView; + } catch (Exception e) { + _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_ZOOKEEPER_FETCH_FAILURES, 1L); + throw e; + } + } + /** * This method moves the segment file from another location to its permanent location. * When splitCommit is enabled, segment file is uploaded to the segmentLocation in the committingSegmentDescriptor, @@ -521,9 +533,17 @@ public class PinotLLCRealtimeSegmentManager { TableConfig tableConfig = getTableConfig(realtimeTableName); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); IdealState idealState = getIdealState(realtimeTableName); - Preconditions.checkState( - idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING), - "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName); + ExternalView externalView = getExternalView(realtimeTableName); + // Check whether there is at least 1 replica in ONLINE state for full-auto mode. + if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) { + Preconditions.checkState( + externalView.getStateMap(committingSegmentName).containsValue(SegmentStateModel.ONLINE), + "Failed to find instance in ONLINE state in IdealState for segment: %s", committingSegmentName); + } else { + Preconditions.checkState( + idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING), + "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName); + } int numReplicas = getNumReplicas(tableConfig, instancePartitions); /* @@ -993,9 +1013,11 @@ public class PinotLLCRealtimeSegmentManager { // TODO: Need to figure out the best way to handle committed segments' state change if (committingSegmentName != null) { // Change committing segment state to ONLINE - Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet(); - instanceStatesMap.put(committingSegmentName, - SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE)); +// Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet(); +// instanceStatesMap.put(committingSegmentName, +// SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE)); + instanceStatesList.put(newSegmentName, Collections.emptyList() + /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/); LOGGER.info("Updating segment: {} to ONLINE state", committingSegmentName); } @@ -1029,14 +1051,14 @@ public class PinotLLCRealtimeSegmentManager { List<String> instancesAssigned = segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap); // No need to check for tableType as offline tables can never go to CONSUMING state. All callers are for REALTIME - instanceStatesMap.put(newSegmentName, - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); +// instanceStatesMap.put(newSegmentName, +// SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the map. Uncomment below lines to update list. // Assess whether we should set am empty InstanceStateList for the segment or not. i.e. do we support // this preferred list concept, and does Helix-Auto even allow preferred list concept (from code reading it // looks like it does) - // instanceStatesList.put(newSegmentName, Collections.emptyList() - // /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/); + instanceStatesList.put(newSegmentName, Collections.emptyList() + /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/); LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned); } } @@ -1247,8 +1269,11 @@ public class PinotLLCRealtimeSegmentManager { newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, instanceStatesList, segmentAssignment, instancePartitionsMap, startOffset); } else { - LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, - latestSegmentName); + LOGGER.error( + "Got unexpected instance state map: {} for segment: {}. Segment status: {}. " + + "recreateDeletedConsumingSegment: {}", + instanceStateMap, latestSegmentName, latestSegmentZKMetadata.getStatus(), + recreateDeletedConsumingSegment); } } // else, the partition group has reached end of life. This is an acceptable state diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java index 61f6112365..033bf59405 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java @@ -47,7 +47,7 @@ import org.apache.pinot.common.utils.helix.LeadControllerUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator; import org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator; -import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator; +import org.apache.pinot.controller.helix.core.PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator; import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +82,7 @@ public class HelixSetupUtils { Map<String, String> configMap = new HashMap<>(); configMap.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, Boolean.toString(true)); configMap.put(ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(), - Boolean.toString(true)); + Boolean.toString(false)); configMap.put(ENABLE_CASE_INSENSITIVE_KEY, Boolean.toString(DEFAULT_ENABLE_CASE_INSENSITIVE)); configMap.put(DEFAULT_HYPERLOGLOG_LOG2M_KEY, Integer.toString(DEFAULT_HYPERLOGLOG_LOG2M)); configMap.put(CommonConstants.Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, Boolean.toString(false)); @@ -139,23 +139,24 @@ public class HelixSetupUtils { private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin helixAdmin, HelixDataAccessor helixDataAccessor, boolean isUpdateStateModel) { - String segmentStateModelName = - PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; - StateModelDefinition stateModelDefinition = helixAdmin.getStateModelDef(helixClusterName, segmentStateModelName); + String realtimeSegmentStateModelName = + PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; + StateModelDefinition stateModelDefinition = + helixAdmin.getStateModelDef(helixClusterName, realtimeSegmentStateModelName); if (stateModelDefinition == null || isUpdateStateModel) { if (stateModelDefinition == null) { - LOGGER.info("Adding state model: {} with CONSUMING state", segmentStateModelName); + LOGGER.info("Adding state model: {} with CONSUMING state", realtimeSegmentStateModelName); } else { - LOGGER.info("Updating state model: {} to contain CONSUMING state", segmentStateModelName); + LOGGER.info("Updating state model: {} to contain CONSUMING state", realtimeSegmentStateModelName); } - helixDataAccessor - .createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); + helixDataAccessor.createStateModelDef( + PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); } String offlineSegmentStateModelName = PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; - StateModelDefinition offlineStateModelDefinition = helixAdmin.getStateModelDef(helixClusterName, - offlineSegmentStateModelName); + StateModelDefinition offlineStateModelDefinition = + helixAdmin.getStateModelDef(helixClusterName, offlineSegmentStateModelName); if (offlineStateModelDefinition == null || isUpdateStateModel) { if (stateModelDefinition == null) { LOGGER.info("Adding offline segment state model: {} with CONSUMING state", offlineSegmentStateModelName); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 282f3c72a1..5cdb3dda3a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -862,6 +862,13 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { return _state == State.ERROR ? ConsumerState.NOT_CONSUMING : ConsumerState.CONSUMING; } + /** + * Returns the state of the realtime segment. + */ + public State getState() { + return _state; + } + /** * Returns the timestamp of the last consumed message. */ diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index 2389fe8ba6..50693aaf73 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -500,8 +500,12 @@ public class LLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegr } } }); - Assert.assertTrue("No consuming segment found in partition " + partition, seqNum.get() >= 0); - return seqNum.get(); + if (seqNum.get() == -1) { + return 0; + } else { + Assert.assertTrue("No consuming segment found in partition " + partition, seqNum.get() >= 0); + return seqNum.get(); + } } public class ExceptingKafkaConsumer extends KafkaPartitionLevelConsumer { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 6e0d157075..9862ffb7dd 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -579,15 +579,20 @@ public abstract class BaseServerStarter implements ServiceStartable { Tracing.ThreadAccountantOps .initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId); initSegmentFetcher(_serverConf); - StateModelFactory<?> stateModelFactoryWithRealtime = - new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); +// StateModelFactory<?> stateModelFactoryWithRealtime = +// new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); StateModelFactory<?> stateModelFactory = new OfflineSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); + StateModelFactory<?> realtimeSegmentStateModelFactory = + new RealtimeSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); _helixManager.getStateMachineEngine() .registerStateModelFactory(OfflineSegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory); _helixManager.getStateMachineEngine() - .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), - stateModelFactoryWithRealtime); + .registerStateModelFactory(RealtimeSegmentOnlineOfflineStateModelFactory.getStateModelName(), + realtimeSegmentStateModelFactory); +// _helixManager.getStateMachineEngine() +// .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), +// stateModelFactoryWithRealtime); // Start the data manager as a pre-connect callback so that it starts after connecting to the ZK in order to access // the property store, but before receiving state transitions _helixManager.addPreConnectCallback(_serverInstance::startDataManager); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java new file mode 100644 index 0000000000..d3a937eaef --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.starter.helix; + +import com.google.common.base.Preconditions; +import org.apache.helix.NotificationContext; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.Transition; +import org.apache.pinot.common.Utils; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Data Server layer state model to take over how to operate on: + * 1. Add a new segment + * 2. Refresh an existed now serving segment. + * 3. Delete an existed segment. + * + * This only works for OFFLINE table segments and does not handle the CONSUMING state at all + */ +public class RealtimeSegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { + private final String _instanceId; + private final InstanceDataManager _instanceDataManager; + + public RealtimeSegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager) { + _instanceId = instanceId; + _instanceDataManager = instanceDataManager; + } + + public static String getStateModelName() { + return "RealtimeSegmentOnlineOfflineStateModel"; + } + + @Override + public StateModel createNewStateModel(String resourceName, String partitionName) { + return new RealtimeSegmentOnlineOfflineStateModel(); + } + + + // Helix seems to need StateModelInfo annotation for 'initialState'. It does not use the 'states' field. + // The transitions in the helix messages indicate the from/to states, and helix uses the + // Transition annotations (but only if StateModelInfo is defined). + @SuppressWarnings("unused") + @StateModelInfo(states = "{'OFFLINE', 'CONSUMING', 'ONLINE', 'DROPPED'}", initialState = "OFFLINE") + public class RealtimeSegmentOnlineOfflineStateModel extends StateModel { + private final Logger _logger = LoggerFactory.getLogger(_instanceId + " - RealtimeSegmentOnlineOfflineStateModel"); + + @Transition(from = "OFFLINE", to = "CONSUMING") + public void onBecomeConsumingFromOffline(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : " + message); + String realtimeTableName = message.getResourceName(); + String segmentName = message.getPartitionName(); + + // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now +// TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName); +// Preconditions.checkNotNull(tableType); +// if (tableType == TableType.OFFLINE) { +// _logger.info("OFFLINE->CONSUMING state transition called for OFFLINE table, treat this as a no-op"); +// return; +// } + + try { + _instanceDataManager.addRealtimeSegment(realtimeTableName, segmentName); + } catch (Exception e) { + String errorMessage = + String.format("Caught exception in state transition OFFLINE -> CONSUMING for table: %s, segment: %s", + realtimeTableName, segmentName); + _logger.error(errorMessage, e); + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); + if (tableDataManager != null) { + tableDataManager.addSegmentError(segmentName, + new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); + } + Utils.rethrowException(e); + } + } + + @Transition(from = "CONSUMING", to = "ONLINE") + public void onBecomeOnlineFromConsuming(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() : " + message); + String realtimeTableName = message.getResourceName(); + String segmentName = message.getPartitionName(); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName); + Preconditions.checkNotNull(tableType); + + // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now +// if (tableType == TableType.OFFLINE) { +// try { +// _instanceDataManager.addOrReplaceSegment(realtimeTableName, segmentName); +// } catch (Exception e) { +// String errorMessage = String.format( +// "Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s", +// realtimeTableName, segmentName); +// _logger.error(errorMessage, e); +// TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); +// if (tableDataManager != null) { +// tableDataManager.addSegmentError(segmentName, new SegmentErrorInfo(System.currentTimeMillis(), +// errorMessage, e)); +// } +// Utils.rethrowException(e); +// } +// } else { + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); + Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName); + tableDataManager.onConsumingToOnline(segmentName); + boolean isConsumingDone = false; + while (!isConsumingDone) { + SegmentDataManager acquiredSegment = tableDataManager.acquireSegment(segmentName); + // For this transition to be correct in helix, we should already have a segment that is consuming + Preconditions.checkState(acquiredSegment != null, "Failed to find segment: %s in table: %s", segmentName, + realtimeTableName); + + // TODO: https://github.com/apache/pinot/issues/10049 + try { + // This indicates that the realtime segment is already a completed immutable segment. + // So nothing to do for this transition. + if (!(acquiredSegment instanceof RealtimeSegmentDataManager)) { + // We found an LLC segment that is not consuming right now, must be that we already swapped it with a + // segment that has been built. Nothing to do for this state transition. + _logger.info( + "Segment {} not an instance of RealtimeSegmentDataManager. Reporting success for the transition", + acquiredSegment.getSegmentName()); + return; + } + RealtimeSegmentDataManager segmentDataManager = (RealtimeSegmentDataManager) acquiredSegment; + RealtimeSegmentDataManager.State state = segmentDataManager.getState(); + if ((!state.isFinal()) || state.shouldConsume()) { + Thread.sleep(10_000L); + } else { + SegmentZKMetadata segmentZKMetadata = + ZKMetadataProvider.getSegmentZKMetadata(_instanceDataManager.getPropertyStore(), realtimeTableName, + segmentName); + segmentDataManager.goOnlineFromConsuming(segmentZKMetadata); + isConsumingDone = true; + } + +// // Use a single thread to wait for the consuming segment to be fully completed +// ExecutorService executorService = Executors.newSingleThreadExecutor(); +// Future<Boolean> future = executorService.submit(() -> { +// try { +// while (true) { +// RealtimeSegmentDataManager.State state = segmentDataManager.getState(); +// if (state.shouldConsume()) { +// Thread.sleep(5_000L); +// } else { +// segmentDataManager.goOnlineFromConsuming(segmentZKMetadata); +// return true; +// } +//// RealtimeSegmentDataManager.State state = segmentDataManager.getState(); +//// if (state.isFinal()) { +//// return true; +//// } +// } +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// }); +// future.get(); +// executorService.shutdown(); + } catch (Exception e) { + String errorMessage = + String.format("Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s", + realtimeTableName, segmentName); + _logger.error(errorMessage, e); + tableDataManager.addSegmentError(segmentName, + new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); + Utils.rethrowException(e); + } finally { + tableDataManager.releaseSegment(acquiredSegment); + } + } + } + + @Transition(from = "CONSUMING", to = "OFFLINE") + public void onBecomeOfflineFromConsuming(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromConsuming() : " + message); + String realtimeTableName = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.offloadSegment(realtimeTableName, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition CONSUMING -> OFFLINE for table: {}, segment: {}", + realtimeTableName, segmentName, e); + Utils.rethrowException(e); + } + } + + @Transition(from = "CONSUMING", to = "DROPPED") + public void onBecomeDroppedFromConsuming(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : " + message); + String realtimeTableName = message.getResourceName(); + String segmentName = message.getPartitionName(); + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); + Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName); + tableDataManager.onConsumingToDropped(segmentName); + try { + _instanceDataManager.offloadSegment(realtimeTableName, segmentName); + _instanceDataManager.deleteSegment(realtimeTableName, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition CONSUMING -> DROPPED for table: {}, segment: {}", + realtimeTableName, segmentName, e); + Utils.rethrowException(e); + } + } + + +// @Transition(from = "OFFLINE", to = "ONLINE") +// public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { +// _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : " + message); +// String tableNameWithType = message.getResourceName(); +// String segmentName = message.getPartitionName(); +// TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); +// Preconditions.checkArgument((tableType != null) && (tableType != TableType.REALTIME), +// "TableType is null or is a REALTIME table, offline state model should not be called fo RT"); +// try { +// _instanceDataManager.addOrReplaceSegment(tableNameWithType, segmentName); +// } catch (Exception e) { +// String errorMessage = +// String.format("Caught exception in state transition OFFLINE -> ONLINE for table: %s, segment: %s", +// tableNameWithType, segmentName); +// _logger.error(errorMessage, e); +// TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType); +// if (tableDataManager != null) { +// tableDataManager.addSegmentError(segmentName, +// new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); +// } +// Utils.rethrowException(e); +// } +// } + + // Remove segment from InstanceDataManager. + // Still keep the data files in local. + @Transition(from = "ONLINE", to = "OFFLINE") + public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() : " + message); + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.offloadSegment(tableNameWithType, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition ONLINE -> OFFLINE for table: {}, segment: {}", + tableNameWithType, segmentName, e); + Utils.rethrowException(e); + } + } + + // Delete segment from local directory. + @Transition(from = "OFFLINE", to = "DROPPED") + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() : " + message); + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.deleteSegment(tableNameWithType, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition OFFLINE -> DROPPED for table: {}, segment: {}", + tableNameWithType, segmentName, e); + Utils.rethrowException(e); + } + } + + @Transition(from = "ONLINE", to = "DROPPED") + public void onBecomeDroppedFromOnline(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() : " + message); + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.offloadSegment(tableNameWithType, segmentName); + _instanceDataManager.deleteSegment(tableNameWithType, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition ONLINE -> DROPPED for table: {}, segment: {}", + tableNameWithType, segmentName, e); + Utils.rethrowException(e); + } + } + + @Transition(from = "ERROR", to = "OFFLINE") + public void onBecomeOfflineFromError(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromError() : " + message); + } + + @Transition(from = "ERROR", to = "DROPPED") + public void onBecomeDroppedFromError(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromError() : " + message); + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.deleteSegment(tableNameWithType, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition ERROR -> DROPPED for table: {}, segment: {}", + tableNameWithType, segmentName, e); + Utils.rethrowException(e); + } + } + } +} diff --git a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json index 9d1dcd7e14..018ee3b6e8 100644 --- a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json +++ b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json @@ -6,7 +6,7 @@ "timeColumnName": "DaysSinceEpoch", "retentionTimeUnit": "DAYS", "retentionTimeValue": "5", - "replication": "1" + "replication": "3" }, "tableIndexConfig": {}, "routing": { @@ -26,7 +26,7 @@ "stream.kafka.zk.broker.url": "localhost:2191/kafka", "stream.kafka.broker.list": "localhost:19092", "realtime.segment.flush.threshold.time": "3600000", - "realtime.segment.flush.threshold.size": "50000" + "realtime.segment.flush.threshold.size": "500" } ] }, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org