This is an automated email from the ASF dual-hosted git repository. mcvsubbu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 6a216c5a4f [feature] Add a Tracker class to support aggregate-worst case consumption delay… (#9994) 6a216c5a4f is described below commit 6a216c5a4fa8cc91dcdbb4c40949d2d2b83e6453 Author: Juan Gomez <jugo...@linkedin.com> AuthorDate: Mon Jan 9 17:43:28 2023 -0800 [feature] Add a Tracker class to support aggregate-worst case consumption delay… (#9994) * Add a Tracker class to support aggregate-worst case consumption delay in Pinot Early draft for review, no significant unitest or end to end testing done yet. Missing: code to read ideal state, unitets. -Changed code to not remove but verify a partition when CONSUMING to OFFLINE transition comes Tests: make sure server, core tests pass. -Added support for reading ideal state -Added some unitests for tracker object -Added more unitests that deal with partition removal. -Removed caching the maximum to simplefy the code. -Use Concurrent hashes instead of synchronized methods. -Renamed some variables for consistency -Refactored timeout code -Fix failing unitests and a number of renames -Use int for partitionGroupIds instead of longs -Some renames and minor edits. -Move some of uses of partition id to be integers for consistency with previous code. Remove use of type cast by adding interfaces to super class. * Remove initial delay of timer thread from configurable parameters, clean throw message and update comments to reflect new method names. * Added support for per partition metrics and some renames. Added support for prefix to metric name so we can add multiple trackers and metrics per table. * Add code to support disabling emitting per partition or aggregate metric. * Do per-batch sampling of delay metric and not per event for efficiency reasons. Some rename to avoid the Pinot word in name of ingestion delay. * Add Navina's code to not sample partition delay during catching up phase. * Rename Millis to Ms and rearrange final private to private final * Rename Consumption Delay to ingestion delay, remove unnecessary casts, factor out zeroing out the ingestion delay. * Use anonymous class for timer tasks, some final variables and add TODO for some issues we noted in the code while working on this. * Remove checks for _ingestionDelayTracker being null as a number of other things break before if that happens. Rollback method to set delay to zero after oflline discussion. * Consolidate segement state transition actions in a single method, some minor edits to comments to reflect we now track per partition ingestion delay * Roll back change where we use one interface for all segement state transitions, code is simpler with interface per transition, also helps with consistency with other parts of the code. * Optimization to only check ideal state when we really need to. * Cleanup some comments with stale references, some minor refactors. * Consolidate all ingestion delay measures under the same Gauge. * Eliminte the use of optional aging and fix tests to use fixed clocks and get predictable results. * Remove the per-partition metric mute and use server ready to serve queries as signal to start collecting metrics. * Address most of the last round of comments. * Remove isServerReady code from Segment data provider to keep code simpler, encapsulate in IngestionDelay class instead. * Use CONSUMING segements to determine hosted partitions instead of ONLINE, getSegmentsInGivenStateForThisInstance returns set instead of list. * Added few unitests for per partition measures, few comments edits * Removing aggregate metric and some other things requested to simplify the change. --- .../pinot/common/metrics/AbstractMetrics.java | 31 +++ .../apache/pinot/common/metrics/ServerGauge.java | 4 +- .../core/data/manager/InstanceDataManager.java | 8 + .../manager/offline/TableDataManagerProvider.java | 10 +- .../manager/realtime/IngestionDelayTracker.java | 308 +++++++++++++++++++++ .../realtime/LLRealtimeSegmentDataManager.java | 19 +- .../manager/realtime/RealtimeTableDataManager.java | 68 ++++- .../realtime/IngestionDelayTrackerTest.java | 204 ++++++++++++++ .../local/data/manager/TableDataManager.java | 16 ++ .../local/utils/tablestate/TableStateUtils.java | 45 ++- .../server/starter/helix/BaseServerStarter.java | 4 +- .../starter/helix/HelixInstanceDataManager.java | 9 +- .../SegmentOnlineOfflineStateModelFactory.java | 9 +- 13 files changed, 715 insertions(+), 20 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java index 9add8f98e8..e4eb329c7d 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java @@ -537,6 +537,23 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e addCallbackGaugeIfNeeded(fullGaugeName, valueCallback); } + /** + * Install a per-partition table gauge if needed. + * + * @param tableName The table name + * @param partitionId The partition name + * @param gauge The gauge to use + * @param valueCallback the callback function to be called while reading the metric. + */ + public void addCallbackPartitionGaugeIfNeeded(final String tableName, final int partitionId, final G gauge, + final Callable<Long> valueCallback) { + final String fullGaugeName; + String gaugeName = gauge.getGaugeName(); + fullGaugeName = gaugeName + "." + getTableName(tableName) + "." + partitionId; + + addCallbackGaugeIfNeeded(fullGaugeName, valueCallback); + } + /** * Similar to addCallbackGauge method. * This method may be called multiple times, while it will be registered to callback function only once. @@ -602,6 +619,20 @@ public abstract class AbstractMetrics<QP extends AbstractMetrics.QueryPhase, M e removeGauge(fullGaugeName); } + + /** + * Removes a table gauge given the table name and the gauge. + * The add/remove is expected to work correctly in case of being invoked across multiple threads. + * @param tableName table name + * @param gauge the gauge to be removed + */ + public void removePartitionGauge(final String tableName, final int partitionId, final G gauge) { + final String fullGaugeName; + String gaugeName = gauge.getGaugeName(); + fullGaugeName = gaugeName + "." + getTableName(tableName) + "." + partitionId; + removeGauge(fullGaugeName); + } + /** * Remove gauge from Pinot metrics. * @param gaugeName gauge name diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java index 015e75aae9..7dd52e90ea 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java @@ -45,7 +45,9 @@ public enum ServerGauge implements AbstractMetrics.Gauge { // Dedup metrics DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false), CONSUMPTION_QUOTA_UTILIZATION("ratio", false), - JVM_HEAP_USED_BYTES("bytes", true); + JVM_HEAP_USED_BYTES("bytes", true), + // Ingestion delay metric + REALTIME_INGESTION_DELAY_MS("milliseconds", false); private final String _gaugeName; private final String _unit; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java index 77cab8135a..0e041021d2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java @@ -21,6 +21,7 @@ package org.apache.pinot.core.data.manager; import java.io.File; import java.util.List; import java.util.Set; +import java.util.function.Supplier; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.configuration.ConfigurationException; @@ -183,4 +184,11 @@ public interface InstanceDataManager { * Immediately stop consumption and start committing the consuming segments. */ void forceCommit(String tableNameWithType, Set<String> segmentNames); + + /** + * Enables the installation of a method to determine if a server is ready to server queries. + * + * @param isServerReadyToServeQueries supplier to retrieve state of server. + */ + void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean> isServerReadyToServeQueries); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java index 15f284ae09..2a2671047a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java @@ -20,6 +20,7 @@ package org.apache.pinot.core.data.manager.offline; import com.google.common.cache.LoadingCache; import java.util.concurrent.Semaphore; +import java.util.function.Supplier; import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.HelixManager; import org.apache.helix.store.zk.ZkHelixPropertyStore; @@ -54,6 +55,13 @@ public class TableDataManagerProvider { public static TableDataManager getTableDataManager(TableDataManagerConfig tableDataManagerConfig, String instanceId, ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager, LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache) { + return getTableDataManager(tableDataManagerConfig, instanceId, propertyStore, serverMetrics, helixManager, + errorCache, () -> true); + } + + public static TableDataManager getTableDataManager(TableDataManagerConfig tableDataManagerConfig, String instanceId, + ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics serverMetrics, HelixManager helixManager, + LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache, Supplier<Boolean> isServerReadyToServeQueries) { TableDataManager tableDataManager; switch (tableDataManagerConfig.getTableType()) { case OFFLINE: @@ -64,7 +72,7 @@ public class TableDataManagerProvider { } break; case REALTIME: - tableDataManager = new RealtimeTableDataManager(_segmentBuildSemaphore); + tableDataManager = new RealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToServeQueries); break; default: throw new IllegalStateException(); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java new file mode 100644 index 0000000000..3031aee91b --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java @@ -0,0 +1,308 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.core.data.manager.realtime; + +import com.google.common.annotations.VisibleForTesting; +import java.time.Clock; +import java.util.ArrayList; +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; +import org.apache.pinot.common.metrics.ServerGauge; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * A Class to track realtime ingestion delay for a given table on a given server. + * Highlights: + * 1-An object of this class is hosted by each RealtimeTableDataManager. + * 2-The object tracks ingestion delays for all partitions hosted by the current server for the given Realtime table. + * 3-Partition delays are updated by all LLRealtimeSegmentDataManager objects hosted in the corresponding + * RealtimeTableDataManager. + * 4-A Metric is derived from reading the maximum tracked by this class. In addition, individual metrics are associated + * with each partition being tracked. + * 5-Delays reported for partitions that do not have events to consume are reported as zero. + * 6-We track the time at which each delay sample was collected so that delay can be increased when partition stops + * consuming for any reason other than no events being available for consumption. + * 7-Partitions whose Segments go from CONSUMING to DROPPED state stop being tracked so their delays do not cloud + * delays of active partitions. + * 8-When a segment goes from CONSUMING to ONLINE, we start a timeout for the corresponding partition. + * If no consumption is noticed after the timeout, we then read ideal state to confirm the server still hosts the + * partition. If not, we stop tracking the respective partition. + * 9-A timer thread is started by this object to track timeouts of partitions and drive the reading of their ideal + * state. + * + * The following diagram illustrates the object interactions with main external APIs + * + * (CONSUMING -> ONLINE state change) + * | + * markPartitionForConfirmation(partitionId) + * | |<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 0}} + * | | + * ___________V_________________________V_ + * | (Table X) |<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 1}} + * | IngestionDelayTracker | ... + * |____________________________________|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager (Partition n}} + * ^ ^ + * | \ + * timeoutInactivePartitions() stopTrackingPartitionIngestionDelay(partitionId) + * _________|__________ \ + * | TimerTrackingTask | (CONSUMING -> DROPPED state change) + * |___________________| + * + * TODO: handle bug situations like the one where a partition is not allocated to a given server due to a bug. + */ + +public class IngestionDelayTracker { + + // Sleep interval for timer thread that triggers read of ideal state + private static final int TIMER_THREAD_TICK_INTERVAL_MS = 300000; // 5 minutes +/- precision in timeouts + // Once a partition is marked for verification, we wait 10 minutes to pull its ideal state. + private static final int PARTITION_TIMEOUT_MS = 600000; // 10 minutes timeouts + // Delay Timer thread for this amount of time after starting timer + private static final int INITIAL_TIMER_THREAD_DELAY_MS = 100; + private static final Logger _logger = LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName()); + + /* + * Class to keep an ingestion delay measure and the time when the sample was taken (i.e. sample time) + * We will use the sample time to increase ingestion delay when a partition stops consuming: the time + * difference between the sample time and current time will be added to the metric when read. + */ + static private class DelayMeasure { + public DelayMeasure(long t, long d) { + _delayMs = d; + _sampleTime = t; + } + public final long _delayMs; + public final long _sampleTime; + } + + // HashMap used to store delay measures for all partitions active for the current table. + private final ConcurrentHashMap<Integer, DelayMeasure> _partitionToDelaySampleMap = new ConcurrentHashMap<>(); + // We mark partitions that go from CONSUMING to ONLINE in _partitionsMarkedForVerification: if they do not + // go back to CONSUMING in some period of time, we confirm whether they are still hosted in this server by reading + // ideal state. This is done with the goal of minimizing reading ideal state for efficiency reasons. + private final ConcurrentHashMap<Integer, Long> _partitionsMarkedForVerification = new ConcurrentHashMap<>(); + + final int _timerThreadTickIntervalMs; + // Timer task to check partitions that are inactive against ideal state. + private final Timer _timer; + + private final ServerMetrics _serverMetrics; + private final String _tableNameWithType; + private final String _metricName; + + private final RealtimeTableDataManager _realTimeTableDataManager; + private final Supplier<Boolean> _isServerReadyToServeQueries; + + private Clock _clock; + + public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType, + RealtimeTableDataManager realtimeTableDataManager, int timerThreadTickIntervalMs, + Supplier<Boolean> isServerReadyToServeQueries) + throws RuntimeException { + _serverMetrics = serverMetrics; + _tableNameWithType = tableNameWithType; + _metricName = tableNameWithType; + _realTimeTableDataManager = realtimeTableDataManager; + _clock = Clock.systemUTC(); + _isServerReadyToServeQueries = isServerReadyToServeQueries; + // Handle negative timer values + if (timerThreadTickIntervalMs <= 0) { + throw new RuntimeException(String.format("Illegal timer timeout argument, expected > 0, got=%d for table=%s", + timerThreadTickIntervalMs, _tableNameWithType)); + } + _timerThreadTickIntervalMs = timerThreadTickIntervalMs; + _timer = new Timer("IngestionDelayTimerThread-" + TableNameBuilder.extractRawTableName(tableNameWithType)); + _timer.schedule(new TimerTask() { + @Override + public void run() { + timeoutInactivePartitions(); + } + }, INITIAL_TIMER_THREAD_DELAY_MS, _timerThreadTickIntervalMs); + } + + public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType, + RealtimeTableDataManager tableDataManager, Supplier<Boolean> isServerReadyToServeQueries) { + this(serverMetrics, tableNameWithType, tableDataManager, TIMER_THREAD_TICK_INTERVAL_MS, + isServerReadyToServeQueries); + } + + /* + * Helper function to age a delay measure. Aging means adding the time elapsed since the measure was + * taken till the measure is being reported. + * + * @param currentDelay original sample delay to which we will add the age of the measure. + */ + private long getAgedDelay(DelayMeasure currentDelay) { + if (currentDelay == null) { + return 0; // return 0 when not initialized + } + // Add age of measure to the reported value + long measureAgeInMs = _clock.millis() - currentDelay._sampleTime; + // Correct to zero for any time shifts due to NTP or time reset. + measureAgeInMs = Math.max(measureAgeInMs, 0); + return currentDelay._delayMs + measureAgeInMs; + } + + /* + * Helper function to be called when we should stop tracking a given partition. Removes the partition from + * all our maps. + * + * @param partitionGroupId partition ID which we should stop tracking. + */ + private void removePartitionId(int partitionGroupId) { + _partitionToDelaySampleMap.remove(partitionGroupId); + // If we are removing a partition we should stop reading its ideal state. + _partitionsMarkedForVerification.remove(partitionGroupId); + _serverMetrics.removePartitionGauge(_metricName, partitionGroupId, ServerGauge.REALTIME_INGESTION_DELAY_MS); + } + + /* + * Helper functions that creates a list of all the partitions that are marked for verification and whose + * timeouts are expired. This helps us optimize checks of the ideal state. + */ + private ArrayList<Integer> getPartitionsToBeVerified() { + ArrayList<Integer> partitionsToVerify = new ArrayList<>(); + for (ConcurrentHashMap.Entry<Integer, Long> entry : _partitionsMarkedForVerification.entrySet()) { + long timeMarked = _clock.millis() - entry.getValue(); + if (timeMarked > PARTITION_TIMEOUT_MS) { + // Partition must be verified + partitionsToVerify.add(entry.getKey()); + } + } + return partitionsToVerify; + } + + + /** + * Function that enable use to set predictable clocks for testing purposes. + * + * @param clock clock to be used by the class + */ + @VisibleForTesting + void setClock(Clock clock) { + _clock = clock; + } + + /* + * Called by LLRealTimeSegmentDataManagers to post delay updates to this tracker class. + * + * @param delayMs ingestion delay being recorded. + * @param sampleTime sample time. + * @param partitionGroupId partition ID for which this delay is being recorded. + */ + public void updateIngestionDelay(long delayMs, long sampleTime, int partitionGroupId) { + // Store new measure and wipe old one for this partition + // TODO: see if we can install gauges after the server is ready. + if (!_isServerReadyToServeQueries.get()) { + // Do not update the ingestion delay metrics during server startup period + return; + } + DelayMeasure previousMeasure = _partitionToDelaySampleMap.put(partitionGroupId, + new DelayMeasure(sampleTime, delayMs)); + if (previousMeasure == null) { + // First time we start tracking a partition we should start tracking it via metric + _serverMetrics.addCallbackPartitionGaugeIfNeeded(_metricName, partitionGroupId, + ServerGauge.REALTIME_INGESTION_DELAY_MS, () -> getPartitionIngestionDelay(partitionGroupId)); + } + // If we are consuming we do not need to track this partition for removal. + _partitionsMarkedForVerification.remove(partitionGroupId); + } + + /* + * Handle partition removal event. This must be invoked when we stop serving a given partition for + * this table in the current server. + * + * @param partitionGroupId partition id that we should stop tracking. + */ + public void stopTrackingPartitionIngestionDelay(int partitionGroupId) { + removePartitionId(partitionGroupId); + } + + /* + * This method is used for timing out inactive partitions, so we don't display their metrics on current server. + * When the inactive time exceeds some threshold, we read from ideal state to confirm we still host the partition, + * if not we remove the partition from being tracked locally. + * This call is to be invoked by a timer thread that will periodically wake up and invoke this function. + */ + public void timeoutInactivePartitions() { + Set<Integer> partitionsHostedByThisServer = null; + // Check if we have any partition to verify, else don't make the call to check ideal state as that + // involves network traffic and may be inefficient. + ArrayList<Integer> partitionsToVerify = getPartitionsToBeVerified(); + if (partitionsToVerify.size() == 0) { + // Don't make the call to getHostedPartitionsGroupIds() as it involves checking ideal state. + return; + } + try { + partitionsHostedByThisServer = _realTimeTableDataManager.getHostedPartitionsGroupIds(); + } catch (Exception e) { + _logger.error("Failed to get partitions hosted by this server, table={}", _tableNameWithType); + return; + } + for (int partitionGroupId : partitionsToVerify) { + if (!partitionsHostedByThisServer.contains(partitionGroupId)) { + // Partition is not hosted in this server anymore, stop tracking it + removePartitionId(partitionGroupId); + } + } + } + + /* + * This function is invoked when a partition goes from CONSUMING to ONLINE, so we can assert whether the + * partition is still hosted by this server after some interval of time. + * + * @param partitionGroupId Partition id that we need confirmed via ideal state as still hosted by this server. + */ + public void markPartitionForVerification(int partitionGroupId) { + _partitionsMarkedForVerification.put(partitionGroupId, _clock.millis()); + } + + /* + * Method to get ingestion delay for a given partition. + * + * @param partitionGroupId partition for which we are retrieving the delay + * + * @return ingestion delay in milliseconds for the given partition ID. + */ + public long getPartitionIngestionDelay(int partitionGroupId) { + DelayMeasure currentMeasure = _partitionToDelaySampleMap.get(partitionGroupId); + return getAgedDelay(currentMeasure); + } + + /* + * We use this method to clean up when a table is being removed. No updates are expected at this time + * as all LLRealtimeSegmentManagers should be down now. + */ + public void shutdown() { + // Now that segments can't report metric, destroy metric for this table + _timer.cancel(); + // Remove partitions so their related metrics get uninstalled. + for (ConcurrentHashMap.Entry<Integer, DelayMeasure> entry : _partitionToDelaySampleMap.entrySet()) { + removePartitionId(entry.getKey()); + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java index 2f38640568..fe7c0f30b1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java @@ -600,12 +600,12 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { } } } - _currentOffset = messagesAndOffsets.getNextStreamPartitionMsgOffsetAtIndex(index); _numRowsIndexed = _realtimeSegment.getNumDocsIndexed(); _numRowsConsumed++; streamMessageCount++; } + updateIngestionDelay(indexedMessageCount); updateCurrentDocumentCountMetrics(); if (messagesAndOffsets.getUnfilteredMessageCount() > 0) { _hasMessagesFetched = true; @@ -617,6 +617,8 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { if (_segmentLogger.isDebugEnabled()) { _segmentLogger.debug("empty batch received - sleeping for {}ms", idlePipeSleepTimeMillis); } + // Record Pinot ingestion delay as zero since we are up-to-date and no new events + _realtimeTableDataManager.updateIngestionDelay(0, System.currentTimeMillis(), _partitionGroupId); // If there were no messages to be fetched from stream, wait for a little bit as to avoid hammering the stream Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis, TimeUnit.MILLISECONDS); } @@ -1560,6 +1562,21 @@ public class LLRealtimeSegmentDataManager extends RealtimeSegmentDataManager { _partitionMetadataProvider = _streamConsumerFactory.createPartitionMetadataProvider(_clientId, _partitionGroupId); } + /* + * Updates the ingestion delay if messages were processed using the time stamp for the last consumed event. + * + * @param indexedMessagesCount + */ + private void updateIngestionDelay(int indexedMessageCount) { + if ((indexedMessageCount > 0) && (_lastRowMetadata != null)) { + long ingestionDelayMs = _lastConsumedTimestampMs - _lastRowMetadata.getRecordIngestionTimeMs(); + ingestionDelayMs = Math.max(ingestionDelayMs, 0); + // Record Ingestion delay for this partition + _realtimeTableDataManager.updateIngestionDelay(ingestionDelayMs, _lastConsumedTimestampMs, + _partitionGroupId); + } + } + // This should be done during commit? We may not always commit when we build a segment.... // TODO Call this method when we are loading the segment, which we do from table datamanager afaik private void updateCurrentDocumentCountMetrics() { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 89b45f1cce..fb53de64ea 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -22,8 +22,10 @@ import com.google.common.base.Preconditions; import java.io.File; import java.io.IOException; import java.net.URI; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; @@ -31,6 +33,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.io.FileUtils; @@ -115,15 +118,25 @@ public class RealtimeTableDataManager extends BaseTableDataManager { private TableDedupMetadataManager _tableDedupMetadataManager; private TableUpsertMetadataManager _tableUpsertMetadataManager; + // Object to track ingestion delay for all partitions + private IngestionDelayTracker _ingestionDelayTracker; + private final Supplier<Boolean> _isServerReadyToServeQueries; public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) { + this(segmentBuildSemaphore, () -> true); + } + + public RealtimeTableDataManager(Semaphore segmentBuildSemaphore, Supplier<Boolean> isServerReadyToServeQueries) { _segmentBuildSemaphore = segmentBuildSemaphore; + _isServerReadyToServeQueries = isServerReadyToServeQueries; } @Override protected void doInit() { _leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(_instanceId, _serverMetrics, _tableNameWithType); - + // Tracks ingestion delay of all partitions being served for this table + _ingestionDelayTracker = new IngestionDelayTracker(_serverMetrics, _tableNameWithType, this, + _isServerReadyToServeQueries); File statsFile = new File(_tableDataDir, STATS_FILE_NAME); try { _statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFile); @@ -212,6 +225,59 @@ public class RealtimeTableDataManager extends BaseTableDataManager { if (_leaseExtender != null) { _leaseExtender.shutDown(); } + // Make sure we do metric cleanup when we shut down the table. + _ingestionDelayTracker.shutdown(); + } + + /* + * Method used by LLRealtimeSegmentManagers to update their partition delays + * + * @param ingestionDelayMs Ingestion delay being reported. + * @param currentTimeMs Timestamp of the measure being provided, i.e. when this delay was computed. + * @param partitionGroupId Partition ID for which delay is being updated. + */ + public void updateIngestionDelay(long ingestionDelayMs, long currenTimeMs, int partitionGroupId) { + _ingestionDelayTracker.updateIngestionDelay(ingestionDelayMs, currenTimeMs, partitionGroupId); + } + + /* + * Method to handle CONSUMING -> DROPPED segment state transitions: + * We stop tracking partitions whose segments are dropped. + * + * @param segmentNameStr name of segment which is transitioning state. + */ + @Override + public void onConsumingToDropped(String segmentNameStr) { + LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); + _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName.getPartitionGroupId()); + } + + /* + * Method to handle CONSUMING -> ONLINE segment state transitions: + * We mark partitions for verification against ideal state when we do not see a consuming segment for some time + * for that partition. The idea is to remove the related metrics when the partition moves from the current server. + * + * @param segmentNameStr name of segment which is transitioning state. + */ + @Override + public void onConsumingToOnline(String segmentNameStr) { + LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); + _ingestionDelayTracker.markPartitionForVerification(segmentName.getPartitionGroupId()); + } + + /** + * Returns all partitionGroupIds for the partitions hosted by this server for current table. + * @Note: this involves Zookeeper read and should not be used frequently due to efficiency concerns. + */ + public Set<Integer> getHostedPartitionsGroupIds() { + Set<Integer> partitionsHostedByThisServer = new HashSet<>(); + List<String> segments = TableStateUtils.getSegmentsInGivenStateForThisInstance(_helixManager, _tableNameWithType, + CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING); + for (String segmentNameStr : segments) { + LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); + partitionsHostedByThisServer.add(segmentName.getPartitionGroupId()); + } + return partitionsHostedByThisServer; } public RealtimeSegmentStatsHistory getStatsHistory() { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java new file mode 100644 index 0000000000..b91d714e11 --- /dev/null +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.core.data.manager.realtime; + +import java.time.Clock; +import java.time.Duration; +import java.time.Instant; +import java.time.ZoneId; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.spi.metrics.PinotMetricUtils; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class IngestionDelayTrackerTest { + + private static final int TIMER_THREAD_TICK_INTERVAL_MS = 100; + + private IngestionDelayTracker createTracker() { + ServerMetrics serverMetrics = new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); + RealtimeTableDataManager realtimeTableDataManager = new RealtimeTableDataManager(null); + IngestionDelayTracker ingestionDelayTracker = + new IngestionDelayTracker(serverMetrics, "dummyTable_RT", + realtimeTableDataManager, () -> true); + // With no samples, the time reported must be zero + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0), 0); + return ingestionDelayTracker; + } + + @Test + public void testTrackerConstructors() { + ServerMetrics serverMetrics = new ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry()); + RealtimeTableDataManager realtimeTableDataManager = new RealtimeTableDataManager(null); + // Test regular constructor + IngestionDelayTracker ingestionDelayTracker = + new IngestionDelayTracker(serverMetrics, "dummyTable_RT", + realtimeTableDataManager, () -> true); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0), 0); + ingestionDelayTracker.shutdown(); + // Test constructor with timer arguments + ingestionDelayTracker = + new IngestionDelayTracker(serverMetrics, "dummyTable_RT", + realtimeTableDataManager, TIMER_THREAD_TICK_INTERVAL_MS, () -> true); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0), 0); + // Test bad timer args to the constructor + try { + ingestionDelayTracker = + new IngestionDelayTracker(serverMetrics, "dummyTable_RT", + realtimeTableDataManager, 0, () -> true); + Assert.assertTrue(false); // Constructor must assert + } catch (Exception e) { + Assert.assertTrue(e instanceof RuntimeException); + } + } + + @Test + public void testRecordIngestionDelayWithNoAging() { + final long maxTestDelay = 100; + final int partition0 = 0; + final int partition1 = 1; + + IngestionDelayTracker ingestionDelayTracker = createTracker(); + // Use fixed clock so samples dont age + Instant now = Instant.now(); + ZoneId zoneId = ZoneId.systemDefault(); + Clock clock = Clock.fixed(now, zoneId); + ingestionDelayTracker.setClock(clock); + + // Test we follow a single partition up and down + for (long i = 0; i <= maxTestDelay; i++) { + ingestionDelayTracker.updateIngestionDelay(i, 0, partition0); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0), i + clock.millis()); + } + + // Test tracking down a measure for a given partition + for (long i = maxTestDelay; i >= 0; i--) { + ingestionDelayTracker.updateIngestionDelay(i, 0, partition0); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0), i + clock.millis()); + } + + // Make the current partition maximum + ingestionDelayTracker.updateIngestionDelay(maxTestDelay, 0, partition0); + + // Bring up partition1 delay up and verify values + for (long i = 0; i <= 2 * maxTestDelay; i++) { + ingestionDelayTracker.updateIngestionDelay(i, 0, partition1); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1), i + clock.millis()); + } + + // Bring down values of partition1 and verify values + for (long i = 2 * maxTestDelay; i >= 0; i--) { + ingestionDelayTracker.updateIngestionDelay(i, 0, partition1); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1), i + clock.millis()); + } + + ingestionDelayTracker.shutdown(); + Assert.assertTrue(true); + } + + @Test + public void testRecordIngestionDelayWithAging() { + final int partition0 = 0; + final long partition0Delay0 = 1000; + final long partition0Delay1 = 10; // record lower delay to make sure max gets reduced + final long partition0Offset0Ms = 300; + final long partition0Offset1Ms = 1000; + final int partition1 = 1; + final long partition1Delay0 = 11; // Record something slightly higher than previous max + final long partition1Delay1 = 8; // Record something lower so that partition0 is the current max again + final long partition1Offset0Ms = 150; + final long partition1Offset1Ms = 450; + + final long sleepMs = 500; + + IngestionDelayTracker ingestionDelayTracker = createTracker(); + + // With samples for a single partition, test that sample is aged as expected + Instant now = Instant.now(); + ZoneId zoneId = ZoneId.systemDefault(); + Clock clock = Clock.fixed(now, zoneId); + ingestionDelayTracker.setClock(clock); + ingestionDelayTracker.updateIngestionDelay(partition0Delay0, clock.millis(), partition0); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0), partition0Delay0); + // Advance clock and test aging + Clock offsetClock = Clock.offset(clock, Duration.ofMillis(partition0Offset0Ms)); + ingestionDelayTracker.setClock(offsetClock); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0), + (partition0Delay0 + partition0Offset0Ms)); + + // Add a new value below max and verify we are tracking the new max correctly + ingestionDelayTracker.updateIngestionDelay(partition0Delay1, offsetClock.millis(), partition0); + Clock partition0LastUpdate = Clock.offset(offsetClock, Duration.ZERO); // Save this as we need to verify aging later + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0), partition0Delay1); + // Add some offset to the last sample and make sure we age that measure properly + offsetClock = Clock.offset(offsetClock, Duration.ofMillis(partition0Offset1Ms)); + ingestionDelayTracker.setClock(offsetClock); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0), + (partition0Delay1 + partition0Offset1Ms)); + + // Now try setting a new maximum in another partition + ingestionDelayTracker.updateIngestionDelay(partition1Delay0, offsetClock.millis(), partition1); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1), partition1Delay0); + // Add some offset to the last sample and make sure we age that measure properly + offsetClock = Clock.offset(offsetClock, Duration.ofMillis(partition1Offset0Ms)); + ingestionDelayTracker.setClock(offsetClock); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1), + (partition1Delay0 + partition1Offset0Ms)); + + ingestionDelayTracker.shutdown(); + } + + @Test + public void testStopTrackingIngestionDelay() { + final long maxTestDelay = 100; + final int maxPartition = 100; + + IngestionDelayTracker ingestionDelayTracker = createTracker(); + // Use fixed clock so samples don't age + Instant now = Instant.now(); + ZoneId zoneId = ZoneId.systemDefault(); + Clock clock = Clock.fixed(now, zoneId); + ingestionDelayTracker.setClock(clock); + + // Record a number of partitions with delay equal to partition id + for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; partitionGroupId++) { + ingestionDelayTracker.updateIngestionDelay(partitionGroupId, clock.millis(), partitionGroupId); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partitionGroupId), partitionGroupId); + } + // Verify that as we remove partitions the next available maximum takes over + for (int partitionGroupId = maxPartition; partitionGroupId >= 0; partitionGroupId--) { + ingestionDelayTracker.stopTrackingPartitionIngestionDelay((int) partitionGroupId); + } + for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; partitionGroupId++) { + // Untracked partitions must return 0 + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partitionGroupId), 0); + } + } + + @Test + public void testTickInactivePartitions() { + Assert.assertTrue(true); + } + + @Test + public void testMarkPartitionForConfirmation() { + Assert.assertTrue(true); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java index a316669277..121e47f6f6 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java @@ -200,4 +200,20 @@ public interface TableDataManager { * @return List of {@link SegmentErrorInfo} */ Map<String, SegmentErrorInfo> getSegmentErrors(); + + /** + * Interface to handle segment state transitions from CONSUMING to DROPPED + * + * @param segmentNameStr name of segment for which the state change is being handled + */ + default void onConsumingToDropped(String segmentNameStr) { + }; + + /** + * Interface to handle segment state transitions from CONSUMING to ONLINE + * + * @param segmentNameStr name of segment for which the state change is being handled + */ + default void onConsumingToOnline(String segmentNameStr) { + }; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java index 4e74682220..67be761616 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java @@ -39,37 +39,56 @@ public class TableStateUtils { } /** - * Checks if all segments for the given @param tableNameWithType are succesfully loaded - * This function will get all segments in IDEALSTATE and CURRENTSTATE for the given table, - * and then check if all ONLINE segments in IDEALSTATE match with CURRENTSTATE. - * @param helixManager helix manager for the server instance - * @param tableNameWithType table name for which segment state is to be checked - * @return true if all segments for the given table are succesfully loaded. False otherwise + * Returns all segments in a given state for a given table. + * + * @param helixManager instance of Helix manager + * @param tableNameWithType table for which we are obtaining ONLINE segments + * @param state state of the segments to be returned + * + * @return List of segment names in a given state. */ - public static boolean isAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType) { + public static List<String> getSegmentsInGivenStateForThisInstance(HelixManager helixManager, String tableNameWithType, + String state) { HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder(); IdealState idealState = dataAccessor.getProperty(keyBuilder.idealStates(tableNameWithType)); + List<String> segmentsInGivenState = new ArrayList<>(); if (idealState == null) { LOGGER.warn("Failed to find ideal state for table: {}", tableNameWithType); - return false; + return segmentsInGivenState; } - // Get all ONLINE segments from idealState + // Get all segments with state from idealState String instanceName = helixManager.getInstanceName(); - List<String> onlineSegments = new ArrayList<>(); Map<String, Map<String, String>> idealStatesMap = idealState.getRecord().getMapFields(); for (Map.Entry<String, Map<String, String>> entry : idealStatesMap.entrySet()) { String segmentName = entry.getKey(); Map<String, String> instanceStateMap = entry.getValue(); String expectedState = instanceStateMap.get(instanceName); - // Only track ONLINE segments assigned to the current instance - if (!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(expectedState)) { + // Only track state segments assigned to the current instance + if (!state.equals(expectedState)) { continue; } - onlineSegments.add(segmentName); + segmentsInGivenState.add(segmentName); } + return segmentsInGivenState; + } + + /** + * Checks if all segments for the given @param tableNameWithType were succesfully loaded + * This function will get all segments in IDEALSTATE and CURRENTSTATE for the given table, + * and then check if all ONLINE segments in IDEALSTATE match with CURRENTSTATE. + * @param helixManager helix manager for the server instance + * @param tableNameWithType table name for which segment state is to be checked + * @return true if all segments for the given table are succesfully loaded. False otherwise + */ + public static boolean isAllSegmentsLoaded(HelixManager helixManager, String tableNameWithType) { + HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder(); + String instanceName = helixManager.getInstanceName(); + List<String> onlineSegments = getSegmentsInGivenStateForThisInstance(helixManager, tableNameWithType, + CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE); if (onlineSegments.size() > 0) { LiveInstance liveInstance = dataAccessor.getProperty(keyBuilder.liveInstance(instanceName)); if (liveInstance == null) { diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 68773ab89c..fda6caf5c1 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -136,6 +136,7 @@ public abstract class BaseServerStarter implements ServiceStartable { protected ServerQueriesDisabledTracker _serverQueriesDisabledTracker; protected RealtimeLuceneIndexRefreshState _realtimeLuceneIndexRefreshState; protected PinotEnvironmentProvider _pinotEnvironmentProvider; + protected volatile boolean _isServerReadyToServeQueries = false; @Override public void init(PinotConfiguration serverConf) @@ -533,6 +534,7 @@ public abstract class BaseServerStarter implements ServiceStartable { _serverInstance = new ServerInstance(serverConf, _helixManager, accessControlFactory); ServerMetrics serverMetrics = _serverInstance.getServerMetrics(); InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager(); + instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> _isServerReadyToServeQueries); initSegmentFetcher(_serverConf); StateModelFactory<?> stateModelFactory = new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); @@ -582,7 +584,7 @@ public abstract class BaseServerStarter implements ServiceStartable { _serverInstance.startQueryServer(); _helixAdmin.setConfig(_instanceConfigScope, Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS, Boolean.toString(false))); - + _isServerReadyToServeQueries = true; // Throttling for realtime consumption is disabled up to this point to allow maximum consumption during startup time RealtimeConsumptionRateManager.getInstance().enableThrottling(); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index a254009742..25abdfc9d7 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -35,6 +35,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.configuration.ConfigurationException; @@ -94,11 +95,17 @@ public class HelixInstanceDataManager implements InstanceDataManager { private ServerMetrics _serverMetrics; private ZkHelixPropertyStore<ZNRecord> _propertyStore; private SegmentUploader _segmentUploader; + private Supplier<Boolean> _isReadyToServeQueries = () -> true; // Fixed size LRU cache for storing last N errors on the instance. // Key is TableNameWithType-SegmentName pair. private LoadingCache<Pair<String, String>, SegmentErrorInfo> _errorCache; + @Override + public void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean> isServingQueries) { + _isReadyToServeQueries = isServingQueries; + } + @Override public synchronized void init(PinotConfiguration config, HelixManager helixManager, ServerMetrics serverMetrics) throws ConfigurationException { @@ -225,7 +232,7 @@ public class HelixInstanceDataManager implements InstanceDataManager { TableDataManagerConfig tableDataManagerConfig = new TableDataManagerConfig(_instanceDataManagerConfig, tableConfig); TableDataManager tableDataManager = TableDataManagerProvider.getTableDataManager(tableDataManagerConfig, _instanceId, _propertyStore, - _serverMetrics, _helixManager, _errorCache); + _serverMetrics, _helixManager, _errorCache, _isReadyToServeQueries); tableDataManager.start(); LOGGER.info("Created table data manager for table: {}", tableNameWithType); return tableDataManager; diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java index 8c5fa50b51..b85d13b953 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java @@ -91,15 +91,17 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); Preconditions.checkNotNull(tableDataManager); + tableDataManager.onConsumingToOnline(segmentNameStr); SegmentDataManager acquiredSegment = tableDataManager.acquireSegment(segmentNameStr); // For this transition to be correct in helix, we should already have a segment that is consuming if (acquiredSegment == null) { throw new RuntimeException("Segment " + segmentNameStr + " + not present "); } + // TODO: https://github.com/apache/pinot/issues/10049 try { if (!(acquiredSegment instanceof LLRealtimeSegmentDataManager)) { - // We found a LLC segment that is not consuming right now, must be that we already swapped it with a + // We found an LLC segment that is not consuming right now, must be that we already swapped it with a // segment that has been built. Nothing to do for this state transition. _logger .info("Segment {} not an instance of LLRealtimeSegmentDataManager. Reporting success for the transition", @@ -138,6 +140,11 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta @Transition(from = "CONSUMING", to = "DROPPED") public void onBecomeDroppedFromConsuming(Message message, NotificationContext context) { _logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : " + message); + String realtimeTableName = message.getResourceName(); + String segmentNameStr = message.getPartitionName(); + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); + Preconditions.checkNotNull(tableDataManager); + tableDataManager.onConsumingToDropped(segmentNameStr); try { onBecomeOfflineFromConsuming(message, context); onBecomeDroppedFromOffline(message, context); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org