This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch full-auto-poc in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 35e96d1eb5a5662a62ab43dac4ad7ea9716f8b42 Author: jlli_LinkedIn <j...@linkedin.com> AuthorDate: Mon Feb 12 23:36:47 2024 -0800 Initial POC code for hybrid table --- .../controller/helix/SegmentStatusChecker.java | 6 +- ...imeSegmentOnlineOfflineStateModelGenerator.java | 72 +++++ .../helix/core/PinotHelixResourceManager.java | 30 +- .../helix/core/PinotTableIdealStateBuilder.java | 17 +- .../realtime/MissingConsumingSegmentFinder.java | 7 +- .../realtime/PinotLLCRealtimeSegmentManager.java | 30 +- .../helix/core/util/HelixSetupUtils.java | 21 +- .../realtime/RealtimeSegmentDataManager.java | 7 + .../server/starter/helix/BaseServerStarter.java | 13 +- ...ltimeSegmentOnlineOfflineStateModelFactory.java | 325 +++++++++++++++++++++ .../airlineStats_realtime_table_config.json | 4 +- 11 files changed, 488 insertions(+), 44 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java index d0af31044f..2b9431b0f2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/SegmentStatusChecker.java @@ -256,8 +256,12 @@ public class SegmentStatusChecker extends ControllerPeriodicTask<SegmentStatusCh int nReplicas = 0; int nIdeal = 0; nSegments++; + Map<String, String> partitionMap = idealState.getInstanceStateMap(partitionName); + if (partitionMap == null) { + continue; + } // Skip segments not online in ideal state - for (Map.Entry<String, String> serverAndState : idealState.getInstanceStateMap(partitionName).entrySet()) { + for (Map.Entry<String, String> serverAndState : partitionMap.entrySet()) { if (serverAndState == null) { break; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java new file mode 100644 index 0000000000..ec640131cb --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core; + +import org.apache.helix.model.StateModelDefinition; + + +/** + * Offline Segment state model generator describes the transitions for offline segment states. + * + * Online to Offline, Online to Dropped + * Offline to Online, Offline to Dropped + * + * This does not include the state transitions for realtime segments (which includes the CONSUMING state) + */ +public class PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator { + private PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator() { + } + + public static final String PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL = + "RealtimeSegmentOnlineOfflineStateModel"; + + public static final String ONLINE_STATE = "ONLINE"; + public static final String CONSUMING_STATE = "CONSUMING"; + public static final String OFFLINE_STATE = "OFFLINE"; + public static final String DROPPED_STATE = "DROPPED"; + + public static StateModelDefinition generatePinotStateModelDefinition() { + StateModelDefinition.Builder builder = + new StateModelDefinition.Builder(PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL); + builder.initialState(OFFLINE_STATE); + + builder.addState(ONLINE_STATE); + builder.addState(CONSUMING_STATE); + builder.addState(OFFLINE_STATE); + builder.addState(DROPPED_STATE); + // Set the initial state when the node starts + + // Add transitions between the states. + builder.addTransition(CONSUMING_STATE, ONLINE_STATE); + builder.addTransition(OFFLINE_STATE, CONSUMING_STATE); +// builder.addTransition(OFFLINE_STATE, ONLINE_STATE); + builder.addTransition(CONSUMING_STATE, OFFLINE_STATE); + builder.addTransition(ONLINE_STATE, OFFLINE_STATE); + builder.addTransition(OFFLINE_STATE, DROPPED_STATE); + + // set constraints on states. + // static constraint + builder.dynamicUpperBound(ONLINE_STATE, "R"); + // dynamic constraint, R means it should be derived based on the replication + // factor. + + StateModelDefinition statemodelDefinition = builder.build(); + return statemodelDefinition; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index ae208715fd..0a308fb514 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1583,17 +1583,19 @@ public class PinotHelixResourceManager { Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME, "Invalid table type: %s", tableType); - IdealState idealState; - if (tableType == TableType.REALTIME) { - idealState = - PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(), - _enableBatchMessageMode); - } else { - // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only - idealState = - PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(), - _enableBatchMessageMode); - } + IdealState idealState = + PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(), + _enableBatchMessageMode); +// if (tableType == TableType.REALTIME) { +// idealState = +// PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(), +// _enableBatchMessageMode); +// } else { +// // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only +// idealState = +// PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(), +// _enableBatchMessageMode); +// } // IdealState idealState = // PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(), // _enableBatchMessageMode); @@ -2270,8 +2272,10 @@ public class PinotHelixResourceManager { TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); if (tableType == TableType.REALTIME) { // TODO: Once REALTIME uses FULL-AUTO only the listFields should be updated - currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); + currentAssignmentList.put(segmentName, Collections.emptyList() + /* SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */); +// currentAssignment.put(segmentName, +// SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); } else { // TODO: Assess whether to pass in an empty instance list or to set the preferred list currentAssignmentList.put(segmentName, Collections.emptyList() diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 63222df7e3..f02ea86dc7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -23,10 +23,12 @@ import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.model.IdealState; import org.apache.helix.model.builder.CustomModeISBuilder; import org.apache.helix.model.builder.FullAutoModeISBuilder; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.RetryPolicies; import org.apache.pinot.spi.utils.retry.RetryPolicy; import org.slf4j.Logger; @@ -57,12 +59,23 @@ public class PinotTableIdealStateBuilder { public static IdealState buildEmptyFullAutoIdealStateFor(String tableNameWithType, int numReplicas, boolean enableBatchMessageMode) { LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + String stateModel; + if (tableType == null) { + throw new RuntimeException("Failed to get table type from table name: " + tableNameWithType); + } else if (TableType.OFFLINE.equals(tableType)) { + stateModel = + PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; + } else { + stateModel = + PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; + } + // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default // TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType); idealStateBuilder - .setStateModel( - PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL) + .setStateModel(stateModel) .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1) // TODO: Revisit the rebalance strategy to use (maybe we add a custom one) .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName()); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java index f4192a5a1a..03d72600c7 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java @@ -25,6 +25,7 @@ import java.time.Instant; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import org.apache.commons.lang.StringUtils; import org.apache.helix.AccessOption; import org.apache.helix.model.IdealState; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -141,8 +142,12 @@ public class MissingConsumingSegmentFinder { // Note that there is no problem in case the partition group has reached its end of life. SegmentZKMetadata segmentZKMetadata = _segmentMetadataFetcher .fetchSegmentZkMetadata(_realtimeTableName, latestCompletedSegment.getSegmentName()); + String endOffset = segmentZKMetadata.getEndOffset(); + if (StringUtils.isEmpty(endOffset)) { + return; + } StreamPartitionMsgOffset completedSegmentEndOffset = - _streamPartitionMsgOffsetFactory.create(segmentZKMetadata.getEndOffset()); + _streamPartitionMsgOffsetFactory.create(endOffset); if (completedSegmentEndOffset.compareTo(largestStreamOffset) < 0) { // there are unconsumed messages available on the stream missingSegmentInfo._totalCount++; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 003f985e3f..80aed78c04 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -71,7 +71,6 @@ import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; -import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager; import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater; @@ -521,9 +520,16 @@ public class PinotLLCRealtimeSegmentManager { TableConfig tableConfig = getTableConfig(realtimeTableName); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); IdealState idealState = getIdealState(realtimeTableName); - Preconditions.checkState( - idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING), - "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName); + // Check whether there is at least 1 replica in ONLINE state for full-auto mode. + if (idealState.getRebalanceMode() == IdealState.RebalanceMode.FULL_AUTO) { + Preconditions.checkState( + idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.ONLINE), + "Failed to find instance in ONLINE state in IdealState for segment: %s", committingSegmentName); + } else { + Preconditions.checkState( + idealState.getInstanceStateMap(committingSegmentName).containsValue(SegmentStateModel.CONSUMING), + "Failed to find instance in CONSUMING state in IdealState for segment: %s", committingSegmentName); + } int numReplicas = getNumReplicas(tableConfig, instancePartitions); /* @@ -993,9 +999,11 @@ public class PinotLLCRealtimeSegmentManager { // TODO: Need to figure out the best way to handle committed segments' state change if (committingSegmentName != null) { // Change committing segment state to ONLINE - Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet(); - instanceStatesMap.put(committingSegmentName, - SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE)); +// Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet(); +// instanceStatesMap.put(committingSegmentName, +// SegmentAssignmentUtils.getInstanceStateMap(instances, SegmentStateModel.ONLINE)); + instanceStatesList.put(newSegmentName, Collections.emptyList() + /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/); LOGGER.info("Updating segment: {} to ONLINE state", committingSegmentName); } @@ -1029,14 +1037,14 @@ public class PinotLLCRealtimeSegmentManager { List<String> instancesAssigned = segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap); // No need to check for tableType as offline tables can never go to CONSUMING state. All callers are for REALTIME - instanceStatesMap.put(newSegmentName, - SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); +// instanceStatesMap.put(newSegmentName, +// SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the map. Uncomment below lines to update list. // Assess whether we should set am empty InstanceStateList for the segment or not. i.e. do we support // this preferred list concept, and does Helix-Auto even allow preferred list concept (from code reading it // looks like it does) - // instanceStatesList.put(newSegmentName, Collections.emptyList() - // /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/); + instanceStatesList.put(newSegmentName, Collections.emptyList() + /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/); LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java index 61f6112365..a14010f17f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java @@ -47,7 +47,7 @@ import org.apache.pinot.common.utils.helix.LeadControllerUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator; import org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator; -import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator; +import org.apache.pinot.controller.helix.core.PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator; import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -139,23 +139,24 @@ public class HelixSetupUtils { private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin helixAdmin, HelixDataAccessor helixDataAccessor, boolean isUpdateStateModel) { - String segmentStateModelName = - PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; - StateModelDefinition stateModelDefinition = helixAdmin.getStateModelDef(helixClusterName, segmentStateModelName); + String realtimeSegmentStateModelName = + PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.PINOT_REALTIME_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; + StateModelDefinition stateModelDefinition = + helixAdmin.getStateModelDef(helixClusterName, realtimeSegmentStateModelName); if (stateModelDefinition == null || isUpdateStateModel) { if (stateModelDefinition == null) { - LOGGER.info("Adding state model: {} with CONSUMING state", segmentStateModelName); + LOGGER.info("Adding state model: {} with CONSUMING state", realtimeSegmentStateModelName); } else { - LOGGER.info("Updating state model: {} to contain CONSUMING state", segmentStateModelName); + LOGGER.info("Updating state model: {} to contain CONSUMING state", realtimeSegmentStateModelName); } - helixDataAccessor - .createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); + helixDataAccessor.createStateModelDef( + PinotHelixRealtimeSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); } String offlineSegmentStateModelName = PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; - StateModelDefinition offlineStateModelDefinition = helixAdmin.getStateModelDef(helixClusterName, - offlineSegmentStateModelName); + StateModelDefinition offlineStateModelDefinition = + helixAdmin.getStateModelDef(helixClusterName, offlineSegmentStateModelName); if (offlineStateModelDefinition == null || isUpdateStateModel) { if (stateModelDefinition == null) { LOGGER.info("Adding offline segment state model: {} with CONSUMING state", offlineSegmentStateModelName); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 282f3c72a1..5cdb3dda3a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -862,6 +862,13 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { return _state == State.ERROR ? ConsumerState.NOT_CONSUMING : ConsumerState.CONSUMING; } + /** + * Returns the state of the realtime segment. + */ + public State getState() { + return _state; + } + /** * Returns the timestamp of the last consumed message. */ diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 6e0d157075..9862ffb7dd 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -579,15 +579,20 @@ public abstract class BaseServerStarter implements ServiceStartable { Tracing.ThreadAccountantOps .initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId); initSegmentFetcher(_serverConf); - StateModelFactory<?> stateModelFactoryWithRealtime = - new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); +// StateModelFactory<?> stateModelFactoryWithRealtime = +// new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); StateModelFactory<?> stateModelFactory = new OfflineSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); + StateModelFactory<?> realtimeSegmentStateModelFactory = + new RealtimeSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); _helixManager.getStateMachineEngine() .registerStateModelFactory(OfflineSegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory); _helixManager.getStateMachineEngine() - .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), - stateModelFactoryWithRealtime); + .registerStateModelFactory(RealtimeSegmentOnlineOfflineStateModelFactory.getStateModelName(), + realtimeSegmentStateModelFactory); +// _helixManager.getStateMachineEngine() +// .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), +// stateModelFactoryWithRealtime); // Start the data manager as a pre-connect callback so that it starts after connecting to the ZK in order to access // the property store, but before receiving state transitions _helixManager.addPreConnectCallback(_serverInstance::startDataManager); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java new file mode 100644 index 0000000000..58c785583a --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/RealtimeSegmentOnlineOfflineStateModelFactory.java @@ -0,0 +1,325 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.starter.helix; + +import com.google.common.base.Preconditions; +import org.apache.helix.NotificationContext; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.Transition; +import org.apache.pinot.common.Utils; +import org.apache.pinot.common.metadata.ZKMetadataProvider; +import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; +import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager; +import org.apache.pinot.segment.local.data.manager.SegmentDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Data Server layer state model to take over how to operate on: + * 1. Add a new segment + * 2. Refresh an existed now serving segment. + * 3. Delete an existed segment. + * + * This only works for OFFLINE table segments and does not handle the CONSUMING state at all + */ +public class RealtimeSegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { + private final String _instanceId; + private final InstanceDataManager _instanceDataManager; + + public RealtimeSegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager) { + _instanceId = instanceId; + _instanceDataManager = instanceDataManager; + } + + public static String getStateModelName() { + return "RealtimeSegmentOnlineOfflineStateModel"; + } + + @Override + public StateModel createNewStateModel(String resourceName, String partitionName) { + return new RealtimeSegmentOnlineOfflineStateModel(); + } + + + // Helix seems to need StateModelInfo annotation for 'initialState'. It does not use the 'states' field. + // The transitions in the helix messages indicate the from/to states, and helix uses the + // Transition annotations (but only if StateModelInfo is defined). + @SuppressWarnings("unused") + @StateModelInfo(states = "{'OFFLINE', 'CONSUMING', 'ONLINE', 'DROPPED'}", initialState = "OFFLINE") + public class RealtimeSegmentOnlineOfflineStateModel extends StateModel { + private final Logger _logger = LoggerFactory.getLogger(_instanceId + " - RealtimeSegmentOnlineOfflineStateModel"); + + @Transition(from = "OFFLINE", to = "CONSUMING") + public void onBecomeConsumingFromOffline(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : " + message); + String realtimeTableName = message.getResourceName(); + String segmentName = message.getPartitionName(); + + // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now +// TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName); +// Preconditions.checkNotNull(tableType); +// if (tableType == TableType.OFFLINE) { +// _logger.info("OFFLINE->CONSUMING state transition called for OFFLINE table, treat this as a no-op"); +// return; +// } + + try { + _instanceDataManager.addRealtimeSegment(realtimeTableName, segmentName); + } catch (Exception e) { + String errorMessage = + String.format("Caught exception in state transition OFFLINE -> CONSUMING for table: %s, segment: %s", + realtimeTableName, segmentName); + _logger.error(errorMessage, e); + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); + if (tableDataManager != null) { + tableDataManager.addSegmentError(segmentName, + new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); + } + Utils.rethrowException(e); + } + } + + @Transition(from = "CONSUMING", to = "ONLINE") + public void onBecomeOnlineFromConsuming(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() : " + message); + String realtimeTableName = message.getResourceName(); + String segmentName = message.getPartitionName(); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName); + Preconditions.checkNotNull(tableType); + + // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now +// if (tableType == TableType.OFFLINE) { +// try { +// _instanceDataManager.addOrReplaceSegment(realtimeTableName, segmentName); +// } catch (Exception e) { +// String errorMessage = String.format( +// "Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s", +// realtimeTableName, segmentName); +// _logger.error(errorMessage, e); +// TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); +// if (tableDataManager != null) { +// tableDataManager.addSegmentError(segmentName, new SegmentErrorInfo(System.currentTimeMillis(), +// errorMessage, e)); +// } +// Utils.rethrowException(e); +// } +// } else { + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); + Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName); + tableDataManager.onConsumingToOnline(segmentName); + boolean isConsumingDone = false; + while (!isConsumingDone) { + SegmentDataManager acquiredSegment = tableDataManager.acquireSegment(segmentName); + // For this transition to be correct in helix, we should already have a segment that is consuming + Preconditions.checkState(acquiredSegment != null, "Failed to find segment: %s in table: %s", segmentName, + realtimeTableName); + + // TODO: https://github.com/apache/pinot/issues/10049 + try { + // This indicates that the realtime segment is already a completed immutable segment. + // So nothing to do for this transition. + if (!(acquiredSegment instanceof RealtimeSegmentDataManager)) { + // We found an LLC segment that is not consuming right now, must be that we already swapped it with a + // segment that has been built. Nothing to do for this state transition. + _logger.info( + "Segment {} not an instance of RealtimeSegmentDataManager. Reporting success for the transition", + acquiredSegment.getSegmentName()); + return; + } + RealtimeSegmentDataManager segmentDataManager = (RealtimeSegmentDataManager) acquiredSegment; + RealtimeSegmentDataManager.State state = segmentDataManager.getState(); + if (state.shouldConsume()) { + Thread.sleep(10_000L); + } else { + SegmentZKMetadata segmentZKMetadata = + ZKMetadataProvider.getSegmentZKMetadata(_instanceDataManager.getPropertyStore(), realtimeTableName, + segmentName); + segmentDataManager.goOnlineFromConsuming(segmentZKMetadata); + isConsumingDone = true; + } + +// // Use a single thread to wait for the consuming segment to be fully completed +// ExecutorService executorService = Executors.newSingleThreadExecutor(); +// Future<Boolean> future = executorService.submit(() -> { +// try { +// while (true) { +// RealtimeSegmentDataManager.State state = segmentDataManager.getState(); +// if (state.shouldConsume()) { +// Thread.sleep(5_000L); +// } else { +// segmentDataManager.goOnlineFromConsuming(segmentZKMetadata); +// return true; +// } +//// RealtimeSegmentDataManager.State state = segmentDataManager.getState(); +//// if (state.isFinal()) { +//// return true; +//// } +// } +// } catch (InterruptedException e) { +// throw new RuntimeException(e); +// } +// }); +// future.get(); +// executorService.shutdown(); + } catch (Exception e) { + String errorMessage = + String.format("Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s", + realtimeTableName, segmentName); + _logger.error(errorMessage, e); + tableDataManager.addSegmentError(segmentName, + new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); + Utils.rethrowException(e); + } finally { + tableDataManager.releaseSegment(acquiredSegment); + } + } + } + + @Transition(from = "CONSUMING", to = "OFFLINE") + public void onBecomeOfflineFromConsuming(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromConsuming() : " + message); + String realtimeTableName = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.offloadSegment(realtimeTableName, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition CONSUMING -> OFFLINE for table: {}, segment: {}", + realtimeTableName, segmentName, e); + Utils.rethrowException(e); + } + } + + @Transition(from = "CONSUMING", to = "DROPPED") + public void onBecomeDroppedFromConsuming(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : " + message); + String realtimeTableName = message.getResourceName(); + String segmentName = message.getPartitionName(); + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); + Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName); + tableDataManager.onConsumingToDropped(segmentName); + try { + _instanceDataManager.offloadSegment(realtimeTableName, segmentName); + _instanceDataManager.deleteSegment(realtimeTableName, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition CONSUMING -> DROPPED for table: {}, segment: {}", + realtimeTableName, segmentName, e); + Utils.rethrowException(e); + } + } + + +// @Transition(from = "OFFLINE", to = "ONLINE") +// public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { +// _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : " + message); +// String tableNameWithType = message.getResourceName(); +// String segmentName = message.getPartitionName(); +// TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); +// Preconditions.checkArgument((tableType != null) && (tableType != TableType.REALTIME), +// "TableType is null or is a REALTIME table, offline state model should not be called fo RT"); +// try { +// _instanceDataManager.addOrReplaceSegment(tableNameWithType, segmentName); +// } catch (Exception e) { +// String errorMessage = +// String.format("Caught exception in state transition OFFLINE -> ONLINE for table: %s, segment: %s", +// tableNameWithType, segmentName); +// _logger.error(errorMessage, e); +// TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType); +// if (tableDataManager != null) { +// tableDataManager.addSegmentError(segmentName, +// new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); +// } +// Utils.rethrowException(e); +// } +// } + + // Remove segment from InstanceDataManager. + // Still keep the data files in local. + @Transition(from = "ONLINE", to = "OFFLINE") + public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() : " + message); + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.offloadSegment(tableNameWithType, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition ONLINE -> OFFLINE for table: {}, segment: {}", + tableNameWithType, segmentName, e); + Utils.rethrowException(e); + } + } + + // Delete segment from local directory. + @Transition(from = "OFFLINE", to = "DROPPED") + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() : " + message); + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.deleteSegment(tableNameWithType, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition OFFLINE -> DROPPED for table: {}, segment: {}", + tableNameWithType, segmentName, e); + Utils.rethrowException(e); + } + } + + @Transition(from = "ONLINE", to = "DROPPED") + public void onBecomeDroppedFromOnline(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() : " + message); + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.offloadSegment(tableNameWithType, segmentName); + _instanceDataManager.deleteSegment(tableNameWithType, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition ONLINE -> DROPPED for table: {}, segment: {}", + tableNameWithType, segmentName, e); + Utils.rethrowException(e); + } + } + + @Transition(from = "ERROR", to = "OFFLINE") + public void onBecomeOfflineFromError(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeOfflineFromError() : " + message); + } + + @Transition(from = "ERROR", to = "DROPPED") + public void onBecomeDroppedFromError(Message message, NotificationContext context) { + _logger.info("RealtimeSegmentOnlineOfflineStateModel.onBecomeDroppedFromError() : " + message); + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.deleteSegment(tableNameWithType, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition ERROR -> DROPPED for table: {}, segment: {}", + tableNameWithType, segmentName, e); + Utils.rethrowException(e); + } + } + } +} diff --git a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json index 9d1dcd7e14..018ee3b6e8 100644 --- a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json +++ b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_realtime_table_config.json @@ -6,7 +6,7 @@ "timeColumnName": "DaysSinceEpoch", "retentionTimeUnit": "DAYS", "retentionTimeValue": "5", - "replication": "1" + "replication": "3" }, "tableIndexConfig": {}, "routing": { @@ -26,7 +26,7 @@ "stream.kafka.zk.broker.url": "localhost:2191/kafka", "stream.kafka.broker.list": "localhost:19092", "realtime.segment.flush.threshold.time": "3600000", - "realtime.segment.flush.threshold.size": "50000" + "realtime.segment.flush.threshold.size": "500" } ] }, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org