This is an automated email from the ASF dual-hosted git repository.
mcvsubbu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 6a216c5a4f [feature] Add a Tracker class to support aggregate-worst
case consumption delay… (#9994)
6a216c5a4f is described below
commit 6a216c5a4fa8cc91dcdbb4c40949d2d2b83e6453
Author: Juan Gomez <[email protected]>
AuthorDate: Mon Jan 9 17:43:28 2023 -0800
[feature] Add a Tracker class to support aggregate-worst case consumption
delay… (#9994)
* Add a Tracker class to support aggregate-worst case consumption delay in
Pinot
Early draft for review, no significant unitest or end to end testing done
yet.
Missing: code to read ideal state, unitets.
-Changed code to not remove but verify a partition when CONSUMING to
OFFLINE transition
comes
Tests: make sure server, core tests pass.
-Added support for reading ideal state
-Added some unitests for tracker object
-Added more unitests that deal with partition removal.
-Removed caching the maximum to simplefy the code.
-Use Concurrent hashes instead of synchronized methods.
-Renamed some variables for consistency
-Refactored timeout code
-Fix failing unitests and a number of renames
-Use int for partitionGroupIds instead of longs
-Some renames and minor edits.
-Move some of uses of partition id to be integers for consistency with
previous code.
Remove use of type cast by adding interfaces to super class.
* Remove initial delay of timer thread from configurable parameters, clean
throw message and
update comments to reflect new method names.
* Added support for per partition metrics and some renames.
Added support for prefix to metric name so we can add multiple trackers and
metrics per table.
* Add code to support disabling emitting per partition or aggregate metric.
* Do per-batch sampling of delay metric and not per event for efficiency
reasons.
Some rename to avoid the Pinot word in name of ingestion delay.
* Add Navina's code to not sample partition delay during catching up phase.
* Rename Millis to Ms and rearrange final private to private final
* Rename Consumption Delay to ingestion delay, remove unnecessary casts,
factor out zeroing out the ingestion delay.
* Use anonymous class for timer tasks, some final variables and add TODO
for some issues we noted in the code while working on this.
* Remove checks for _ingestionDelayTracker being null as a number of other
things break before if that happens. Rollback method to
set delay to zero after oflline discussion.
* Consolidate segement state transition actions in a single method, some
minor edits to comments to reflect we now track per partition ingestion delay
* Roll back change where we use one interface for all segement state
transitions, code is simpler with interface per transition, also helps with
consistency with other parts of the code.
* Optimization to only check ideal state when we really need to.
* Cleanup some comments with stale references, some minor refactors.
* Consolidate all ingestion delay measures under the same Gauge.
* Eliminte the use of optional aging and fix tests to use fixed clocks and
get predictable results.
* Remove the per-partition metric mute and use server ready to serve
queries as signal to start collecting metrics.
* Address most of the last round of comments.
* Remove isServerReady code from Segment data provider to keep code
simpler, encapsulate in IngestionDelay class instead.
* Use CONSUMING segements to determine hosted partitions instead of ONLINE,
getSegmentsInGivenStateForThisInstance returns set instead of list.
* Added few unitests for per partition measures, few comments edits
* Removing aggregate metric and some other things requested to simplify the
change.
---
.../pinot/common/metrics/AbstractMetrics.java | 31 +++
.../apache/pinot/common/metrics/ServerGauge.java | 4 +-
.../core/data/manager/InstanceDataManager.java | 8 +
.../manager/offline/TableDataManagerProvider.java | 10 +-
.../manager/realtime/IngestionDelayTracker.java | 308 +++++++++++++++++++++
.../realtime/LLRealtimeSegmentDataManager.java | 19 +-
.../manager/realtime/RealtimeTableDataManager.java | 68 ++++-
.../realtime/IngestionDelayTrackerTest.java | 204 ++++++++++++++
.../local/data/manager/TableDataManager.java | 16 ++
.../local/utils/tablestate/TableStateUtils.java | 45 ++-
.../server/starter/helix/BaseServerStarter.java | 4 +-
.../starter/helix/HelixInstanceDataManager.java | 9 +-
.../SegmentOnlineOfflineStateModelFactory.java | 9 +-
13 files changed, 715 insertions(+), 20 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index 9add8f98e8..e4eb329c7d 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -537,6 +537,23 @@ public abstract class AbstractMetrics<QP extends
AbstractMetrics.QueryPhase, M e
addCallbackGaugeIfNeeded(fullGaugeName, valueCallback);
}
+ /**
+ * Install a per-partition table gauge if needed.
+ *
+ * @param tableName The table name
+ * @param partitionId The partition name
+ * @param gauge The gauge to use
+ * @param valueCallback the callback function to be called while reading the
metric.
+ */
+ public void addCallbackPartitionGaugeIfNeeded(final String tableName, final
int partitionId, final G gauge,
+ final Callable<Long> valueCallback) {
+ final String fullGaugeName;
+ String gaugeName = gauge.getGaugeName();
+ fullGaugeName = gaugeName + "." + getTableName(tableName) + "." +
partitionId;
+
+ addCallbackGaugeIfNeeded(fullGaugeName, valueCallback);
+ }
+
/**
* Similar to addCallbackGauge method.
* This method may be called multiple times, while it will be registered to
callback function only once.
@@ -602,6 +619,20 @@ public abstract class AbstractMetrics<QP extends
AbstractMetrics.QueryPhase, M e
removeGauge(fullGaugeName);
}
+
+ /**
+ * Removes a table gauge given the table name and the gauge.
+ * The add/remove is expected to work correctly in case of being invoked
across multiple threads.
+ * @param tableName table name
+ * @param gauge the gauge to be removed
+ */
+ public void removePartitionGauge(final String tableName, final int
partitionId, final G gauge) {
+ final String fullGaugeName;
+ String gaugeName = gauge.getGaugeName();
+ fullGaugeName = gaugeName + "." + getTableName(tableName) + "." +
partitionId;
+ removeGauge(fullGaugeName);
+ }
+
/**
* Remove gauge from Pinot metrics.
* @param gaugeName gauge name
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index 015e75aae9..7dd52e90ea 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -45,7 +45,9 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
// Dedup metrics
DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false),
CONSUMPTION_QUOTA_UTILIZATION("ratio", false),
- JVM_HEAP_USED_BYTES("bytes", true);
+ JVM_HEAP_USED_BYTES("bytes", true),
+ // Ingestion delay metric
+ REALTIME_INGESTION_DELAY_MS("milliseconds", false);
private final String _gaugeName;
private final String _unit;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index 77cab8135a..0e041021d2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.data.manager;
import java.io.File;
import java.util.List;
import java.util.Set;
+import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.ConfigurationException;
@@ -183,4 +184,11 @@ public interface InstanceDataManager {
* Immediately stop consumption and start committing the consuming segments.
*/
void forceCommit(String tableNameWithType, Set<String> segmentNames);
+
+ /**
+ * Enables the installation of a method to determine if a server is ready to
server queries.
+ *
+ * @param isServerReadyToServeQueries supplier to retrieve state of server.
+ */
+ void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean>
isServerReadyToServeQueries);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
index 15f284ae09..2a2671047a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/offline/TableDataManagerProvider.java
@@ -20,6 +20,7 @@ package org.apache.pinot.core.data.manager.offline;
import com.google.common.cache.LoadingCache;
import java.util.concurrent.Semaphore;
+import java.util.function.Supplier;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -54,6 +55,13 @@ public class TableDataManagerProvider {
public static TableDataManager getTableDataManager(TableDataManagerConfig
tableDataManagerConfig, String instanceId,
ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics
serverMetrics, HelixManager helixManager,
LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache) {
+ return getTableDataManager(tableDataManagerConfig, instanceId,
propertyStore, serverMetrics, helixManager,
+ errorCache, () -> true);
+ }
+
+ public static TableDataManager getTableDataManager(TableDataManagerConfig
tableDataManagerConfig, String instanceId,
+ ZkHelixPropertyStore<ZNRecord> propertyStore, ServerMetrics
serverMetrics, HelixManager helixManager,
+ LoadingCache<Pair<String, String>, SegmentErrorInfo> errorCache,
Supplier<Boolean> isServerReadyToServeQueries) {
TableDataManager tableDataManager;
switch (tableDataManagerConfig.getTableType()) {
case OFFLINE:
@@ -64,7 +72,7 @@ public class TableDataManagerProvider {
}
break;
case REALTIME:
- tableDataManager = new
RealtimeTableDataManager(_segmentBuildSemaphore);
+ tableDataManager = new
RealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToServeQueries);
break;
default:
throw new IllegalStateException();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
new file mode 100644
index 0000000000..3031aee91b
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A Class to track realtime ingestion delay for a given table on a given
server.
+ * Highlights:
+ * 1-An object of this class is hosted by each RealtimeTableDataManager.
+ * 2-The object tracks ingestion delays for all partitions hosted by the
current server for the given Realtime table.
+ * 3-Partition delays are updated by all LLRealtimeSegmentDataManager objects
hosted in the corresponding
+ * RealtimeTableDataManager.
+ * 4-A Metric is derived from reading the maximum tracked by this class. In
addition, individual metrics are associated
+ * with each partition being tracked.
+ * 5-Delays reported for partitions that do not have events to consume are
reported as zero.
+ * 6-We track the time at which each delay sample was collected so that delay
can be increased when partition stops
+ * consuming for any reason other than no events being available for
consumption.
+ * 7-Partitions whose Segments go from CONSUMING to DROPPED state stop being
tracked so their delays do not cloud
+ * delays of active partitions.
+ * 8-When a segment goes from CONSUMING to ONLINE, we start a timeout for the
corresponding partition.
+ * If no consumption is noticed after the timeout, we then read ideal state
to confirm the server still hosts the
+ * partition. If not, we stop tracking the respective partition.
+ * 9-A timer thread is started by this object to track timeouts of partitions
and drive the reading of their ideal
+ * state.
+ *
+ * The following diagram illustrates the object interactions with main
external APIs
+ *
+ * (CONSUMING -> ONLINE state change)
+ * |
+ * markPartitionForConfirmation(partitionId)
+ * |
|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 0}}
+ * | |
+ * ___________V_________________________V_
+ * | (Table X)
|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager(Partition 1}}
+ * | IngestionDelayTracker | ...
+ *
|____________________________________|<-updateIngestionDelay()-{LLRealtimeSegmentDataManager
(Partition n}}
+ * ^ ^
+ * | \
+ * timeoutInactivePartitions()
stopTrackingPartitionIngestionDelay(partitionId)
+ * _________|__________ \
+ * | TimerTrackingTask | (CONSUMING -> DROPPED state change)
+ * |___________________|
+ *
+ * TODO: handle bug situations like the one where a partition is not allocated
to a given server due to a bug.
+ */
+
+public class IngestionDelayTracker {
+
+ // Sleep interval for timer thread that triggers read of ideal state
+ private static final int TIMER_THREAD_TICK_INTERVAL_MS = 300000; // 5
minutes +/- precision in timeouts
+ // Once a partition is marked for verification, we wait 10 minutes to pull
its ideal state.
+ private static final int PARTITION_TIMEOUT_MS = 600000; // 10
minutes timeouts
+ // Delay Timer thread for this amount of time after starting timer
+ private static final int INITIAL_TIMER_THREAD_DELAY_MS = 100;
+ private static final Logger _logger =
LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName());
+
+ /*
+ * Class to keep an ingestion delay measure and the time when the sample was
taken (i.e. sample time)
+ * We will use the sample time to increase ingestion delay when a partition
stops consuming: the time
+ * difference between the sample time and current time will be added to the
metric when read.
+ */
+ static private class DelayMeasure {
+ public DelayMeasure(long t, long d) {
+ _delayMs = d;
+ _sampleTime = t;
+ }
+ public final long _delayMs;
+ public final long _sampleTime;
+ }
+
+ // HashMap used to store delay measures for all partitions active for the
current table.
+ private final ConcurrentHashMap<Integer, DelayMeasure>
_partitionToDelaySampleMap = new ConcurrentHashMap<>();
+ // We mark partitions that go from CONSUMING to ONLINE in
_partitionsMarkedForVerification: if they do not
+ // go back to CONSUMING in some period of time, we confirm whether they are
still hosted in this server by reading
+ // ideal state. This is done with the goal of minimizing reading ideal state
for efficiency reasons.
+ private final ConcurrentHashMap<Integer, Long>
_partitionsMarkedForVerification = new ConcurrentHashMap<>();
+
+ final int _timerThreadTickIntervalMs;
+ // Timer task to check partitions that are inactive against ideal state.
+ private final Timer _timer;
+
+ private final ServerMetrics _serverMetrics;
+ private final String _tableNameWithType;
+ private final String _metricName;
+
+ private final RealtimeTableDataManager _realTimeTableDataManager;
+ private final Supplier<Boolean> _isServerReadyToServeQueries;
+
+ private Clock _clock;
+
+ public IngestionDelayTracker(ServerMetrics serverMetrics, String
tableNameWithType,
+ RealtimeTableDataManager realtimeTableDataManager, int
timerThreadTickIntervalMs,
+ Supplier<Boolean> isServerReadyToServeQueries)
+ throws RuntimeException {
+ _serverMetrics = serverMetrics;
+ _tableNameWithType = tableNameWithType;
+ _metricName = tableNameWithType;
+ _realTimeTableDataManager = realtimeTableDataManager;
+ _clock = Clock.systemUTC();
+ _isServerReadyToServeQueries = isServerReadyToServeQueries;
+ // Handle negative timer values
+ if (timerThreadTickIntervalMs <= 0) {
+ throw new RuntimeException(String.format("Illegal timer timeout
argument, expected > 0, got=%d for table=%s",
+ timerThreadTickIntervalMs, _tableNameWithType));
+ }
+ _timerThreadTickIntervalMs = timerThreadTickIntervalMs;
+ _timer = new Timer("IngestionDelayTimerThread-" +
TableNameBuilder.extractRawTableName(tableNameWithType));
+ _timer.schedule(new TimerTask() {
+ @Override
+ public void run() {
+ timeoutInactivePartitions();
+ }
+ }, INITIAL_TIMER_THREAD_DELAY_MS, _timerThreadTickIntervalMs);
+ }
+
+ public IngestionDelayTracker(ServerMetrics serverMetrics, String
tableNameWithType,
+ RealtimeTableDataManager tableDataManager, Supplier<Boolean>
isServerReadyToServeQueries) {
+ this(serverMetrics, tableNameWithType, tableDataManager,
TIMER_THREAD_TICK_INTERVAL_MS,
+ isServerReadyToServeQueries);
+ }
+
+ /*
+ * Helper function to age a delay measure. Aging means adding the time
elapsed since the measure was
+ * taken till the measure is being reported.
+ *
+ * @param currentDelay original sample delay to which we will add the age of
the measure.
+ */
+ private long getAgedDelay(DelayMeasure currentDelay) {
+ if (currentDelay == null) {
+ return 0; // return 0 when not initialized
+ }
+ // Add age of measure to the reported value
+ long measureAgeInMs = _clock.millis() - currentDelay._sampleTime;
+ // Correct to zero for any time shifts due to NTP or time reset.
+ measureAgeInMs = Math.max(measureAgeInMs, 0);
+ return currentDelay._delayMs + measureAgeInMs;
+ }
+
+ /*
+ * Helper function to be called when we should stop tracking a given
partition. Removes the partition from
+ * all our maps.
+ *
+ * @param partitionGroupId partition ID which we should stop tracking.
+ */
+ private void removePartitionId(int partitionGroupId) {
+ _partitionToDelaySampleMap.remove(partitionGroupId);
+ // If we are removing a partition we should stop reading its ideal state.
+ _partitionsMarkedForVerification.remove(partitionGroupId);
+ _serverMetrics.removePartitionGauge(_metricName, partitionGroupId,
ServerGauge.REALTIME_INGESTION_DELAY_MS);
+ }
+
+ /*
+ * Helper functions that creates a list of all the partitions that are
marked for verification and whose
+ * timeouts are expired. This helps us optimize checks of the ideal state.
+ */
+ private ArrayList<Integer> getPartitionsToBeVerified() {
+ ArrayList<Integer> partitionsToVerify = new ArrayList<>();
+ for (ConcurrentHashMap.Entry<Integer, Long> entry :
_partitionsMarkedForVerification.entrySet()) {
+ long timeMarked = _clock.millis() - entry.getValue();
+ if (timeMarked > PARTITION_TIMEOUT_MS) {
+ // Partition must be verified
+ partitionsToVerify.add(entry.getKey());
+ }
+ }
+ return partitionsToVerify;
+ }
+
+
+ /**
+ * Function that enable use to set predictable clocks for testing purposes.
+ *
+ * @param clock clock to be used by the class
+ */
+ @VisibleForTesting
+ void setClock(Clock clock) {
+ _clock = clock;
+ }
+
+ /*
+ * Called by LLRealTimeSegmentDataManagers to post delay updates to this
tracker class.
+ *
+ * @param delayMs ingestion delay being recorded.
+ * @param sampleTime sample time.
+ * @param partitionGroupId partition ID for which this delay is being
recorded.
+ */
+ public void updateIngestionDelay(long delayMs, long sampleTime, int
partitionGroupId) {
+ // Store new measure and wipe old one for this partition
+ // TODO: see if we can install gauges after the server is ready.
+ if (!_isServerReadyToServeQueries.get()) {
+ // Do not update the ingestion delay metrics during server startup period
+ return;
+ }
+ DelayMeasure previousMeasure =
_partitionToDelaySampleMap.put(partitionGroupId,
+ new DelayMeasure(sampleTime, delayMs));
+ if (previousMeasure == null) {
+ // First time we start tracking a partition we should start tracking it
via metric
+ _serverMetrics.addCallbackPartitionGaugeIfNeeded(_metricName,
partitionGroupId,
+ ServerGauge.REALTIME_INGESTION_DELAY_MS, () ->
getPartitionIngestionDelay(partitionGroupId));
+ }
+ // If we are consuming we do not need to track this partition for removal.
+ _partitionsMarkedForVerification.remove(partitionGroupId);
+ }
+
+ /*
+ * Handle partition removal event. This must be invoked when we stop serving
a given partition for
+ * this table in the current server.
+ *
+ * @param partitionGroupId partition id that we should stop tracking.
+ */
+ public void stopTrackingPartitionIngestionDelay(int partitionGroupId) {
+ removePartitionId(partitionGroupId);
+ }
+
+ /*
+ * This method is used for timing out inactive partitions, so we don't
display their metrics on current server.
+ * When the inactive time exceeds some threshold, we read from ideal state
to confirm we still host the partition,
+ * if not we remove the partition from being tracked locally.
+ * This call is to be invoked by a timer thread that will periodically wake
up and invoke this function.
+ */
+ public void timeoutInactivePartitions() {
+ Set<Integer> partitionsHostedByThisServer = null;
+ // Check if we have any partition to verify, else don't make the call to
check ideal state as that
+ // involves network traffic and may be inefficient.
+ ArrayList<Integer> partitionsToVerify = getPartitionsToBeVerified();
+ if (partitionsToVerify.size() == 0) {
+ // Don't make the call to getHostedPartitionsGroupIds() as it involves
checking ideal state.
+ return;
+ }
+ try {
+ partitionsHostedByThisServer =
_realTimeTableDataManager.getHostedPartitionsGroupIds();
+ } catch (Exception e) {
+ _logger.error("Failed to get partitions hosted by this server,
table={}", _tableNameWithType);
+ return;
+ }
+ for (int partitionGroupId : partitionsToVerify) {
+ if (!partitionsHostedByThisServer.contains(partitionGroupId)) {
+ // Partition is not hosted in this server anymore, stop tracking it
+ removePartitionId(partitionGroupId);
+ }
+ }
+ }
+
+ /*
+ * This function is invoked when a partition goes from CONSUMING to ONLINE,
so we can assert whether the
+ * partition is still hosted by this server after some interval of time.
+ *
+ * @param partitionGroupId Partition id that we need confirmed via ideal
state as still hosted by this server.
+ */
+ public void markPartitionForVerification(int partitionGroupId) {
+ _partitionsMarkedForVerification.put(partitionGroupId, _clock.millis());
+ }
+
+ /*
+ * Method to get ingestion delay for a given partition.
+ *
+ * @param partitionGroupId partition for which we are retrieving the delay
+ *
+ * @return ingestion delay in milliseconds for the given partition ID.
+ */
+ public long getPartitionIngestionDelay(int partitionGroupId) {
+ DelayMeasure currentMeasure =
_partitionToDelaySampleMap.get(partitionGroupId);
+ return getAgedDelay(currentMeasure);
+ }
+
+ /*
+ * We use this method to clean up when a table is being removed. No updates
are expected at this time
+ * as all LLRealtimeSegmentManagers should be down now.
+ */
+ public void shutdown() {
+ // Now that segments can't report metric, destroy metric for this table
+ _timer.cancel();
+ // Remove partitions so their related metrics get uninstalled.
+ for (ConcurrentHashMap.Entry<Integer, DelayMeasure> entry :
_partitionToDelaySampleMap.entrySet()) {
+ removePartitionId(entry.getKey());
+ }
+ }
+}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
index 2f38640568..fe7c0f30b1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java
@@ -600,12 +600,12 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
}
}
}
-
_currentOffset =
messagesAndOffsets.getNextStreamPartitionMsgOffsetAtIndex(index);
_numRowsIndexed = _realtimeSegment.getNumDocsIndexed();
_numRowsConsumed++;
streamMessageCount++;
}
+ updateIngestionDelay(indexedMessageCount);
updateCurrentDocumentCountMetrics();
if (messagesAndOffsets.getUnfilteredMessageCount() > 0) {
_hasMessagesFetched = true;
@@ -617,6 +617,8 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
if (_segmentLogger.isDebugEnabled()) {
_segmentLogger.debug("empty batch received - sleeping for {}ms",
idlePipeSleepTimeMillis);
}
+ // Record Pinot ingestion delay as zero since we are up-to-date and no
new events
+ _realtimeTableDataManager.updateIngestionDelay(0,
System.currentTimeMillis(), _partitionGroupId);
// If there were no messages to be fetched from stream, wait for a
little bit as to avoid hammering the stream
Uninterruptibles.sleepUninterruptibly(idlePipeSleepTimeMillis,
TimeUnit.MILLISECONDS);
}
@@ -1560,6 +1562,21 @@ public class LLRealtimeSegmentDataManager extends
RealtimeSegmentDataManager {
_partitionMetadataProvider =
_streamConsumerFactory.createPartitionMetadataProvider(_clientId,
_partitionGroupId);
}
+ /*
+ * Updates the ingestion delay if messages were processed using the time
stamp for the last consumed event.
+ *
+ * @param indexedMessagesCount
+ */
+ private void updateIngestionDelay(int indexedMessageCount) {
+ if ((indexedMessageCount > 0) && (_lastRowMetadata != null)) {
+ long ingestionDelayMs = _lastConsumedTimestampMs -
_lastRowMetadata.getRecordIngestionTimeMs();
+ ingestionDelayMs = Math.max(ingestionDelayMs, 0);
+ // Record Ingestion delay for this partition
+ _realtimeTableDataManager.updateIngestionDelay(ingestionDelayMs,
_lastConsumedTimestampMs,
+ _partitionGroupId);
+ }
+ }
+
// This should be done during commit? We may not always commit when we build
a segment....
// TODO Call this method when we are loading the segment, which we do from
table datamanager afaik
private void updateCurrentDocumentCountMetrics() {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 89b45f1cce..fb53de64ea 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -22,8 +22,10 @@ import com.google.common.base.Preconditions;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
@@ -31,6 +33,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
@@ -115,15 +118,25 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
private TableDedupMetadataManager _tableDedupMetadataManager;
private TableUpsertMetadataManager _tableUpsertMetadataManager;
+ // Object to track ingestion delay for all partitions
+ private IngestionDelayTracker _ingestionDelayTracker;
+ private final Supplier<Boolean> _isServerReadyToServeQueries;
public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
+ this(segmentBuildSemaphore, () -> true);
+ }
+
+ public RealtimeTableDataManager(Semaphore segmentBuildSemaphore,
Supplier<Boolean> isServerReadyToServeQueries) {
_segmentBuildSemaphore = segmentBuildSemaphore;
+ _isServerReadyToServeQueries = isServerReadyToServeQueries;
}
@Override
protected void doInit() {
_leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(_instanceId,
_serverMetrics, _tableNameWithType);
-
+ // Tracks ingestion delay of all partitions being served for this table
+ _ingestionDelayTracker = new IngestionDelayTracker(_serverMetrics,
_tableNameWithType, this,
+ _isServerReadyToServeQueries);
File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
try {
_statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFile);
@@ -212,6 +225,59 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
if (_leaseExtender != null) {
_leaseExtender.shutDown();
}
+ // Make sure we do metric cleanup when we shut down the table.
+ _ingestionDelayTracker.shutdown();
+ }
+
+ /*
+ * Method used by LLRealtimeSegmentManagers to update their partition delays
+ *
+ * @param ingestionDelayMs Ingestion delay being reported.
+ * @param currentTimeMs Timestamp of the measure being provided, i.e. when
this delay was computed.
+ * @param partitionGroupId Partition ID for which delay is being updated.
+ */
+ public void updateIngestionDelay(long ingestionDelayMs, long currenTimeMs,
int partitionGroupId) {
+ _ingestionDelayTracker.updateIngestionDelay(ingestionDelayMs,
currenTimeMs, partitionGroupId);
+ }
+
+ /*
+ * Method to handle CONSUMING -> DROPPED segment state transitions:
+ * We stop tracking partitions whose segments are dropped.
+ *
+ * @param segmentNameStr name of segment which is transitioning state.
+ */
+ @Override
+ public void onConsumingToDropped(String segmentNameStr) {
+ LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+
_ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName.getPartitionGroupId());
+ }
+
+ /*
+ * Method to handle CONSUMING -> ONLINE segment state transitions:
+ * We mark partitions for verification against ideal state when we do not
see a consuming segment for some time
+ * for that partition. The idea is to remove the related metrics when the
partition moves from the current server.
+ *
+ * @param segmentNameStr name of segment which is transitioning state.
+ */
+ @Override
+ public void onConsumingToOnline(String segmentNameStr) {
+ LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+
_ingestionDelayTracker.markPartitionForVerification(segmentName.getPartitionGroupId());
+ }
+
+ /**
+ * Returns all partitionGroupIds for the partitions hosted by this server
for current table.
+ * @Note: this involves Zookeeper read and should not be used frequently due
to efficiency concerns.
+ */
+ public Set<Integer> getHostedPartitionsGroupIds() {
+ Set<Integer> partitionsHostedByThisServer = new HashSet<>();
+ List<String> segments =
TableStateUtils.getSegmentsInGivenStateForThisInstance(_helixManager,
_tableNameWithType,
+ CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING);
+ for (String segmentNameStr : segments) {
+ LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+ partitionsHostedByThisServer.add(segmentName.getPartitionGroupId());
+ }
+ return partitionsHostedByThisServer;
}
public RealtimeSegmentStatsHistory getStatsHistory() {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
new file mode 100644
index 0000000000..b91d714e11
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.metrics.PinotMetricUtils;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+public class IngestionDelayTrackerTest {
+
+ private static final int TIMER_THREAD_TICK_INTERVAL_MS = 100;
+
+ private IngestionDelayTracker createTracker() {
+ ServerMetrics serverMetrics = new
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+ RealtimeTableDataManager realtimeTableDataManager = new
RealtimeTableDataManager(null);
+ IngestionDelayTracker ingestionDelayTracker =
+ new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
+ realtimeTableDataManager, () -> true);
+ // With no samples, the time reported must be zero
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0),
0);
+ return ingestionDelayTracker;
+ }
+
+ @Test
+ public void testTrackerConstructors() {
+ ServerMetrics serverMetrics = new
ServerMetrics(PinotMetricUtils.getPinotMetricsRegistry());
+ RealtimeTableDataManager realtimeTableDataManager = new
RealtimeTableDataManager(null);
+ // Test regular constructor
+ IngestionDelayTracker ingestionDelayTracker =
+ new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
+ realtimeTableDataManager, () -> true);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0),
0);
+ ingestionDelayTracker.shutdown();
+ // Test constructor with timer arguments
+ ingestionDelayTracker =
+ new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
+ realtimeTableDataManager, TIMER_THREAD_TICK_INTERVAL_MS, () ->
true);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(0),
0);
+ // Test bad timer args to the constructor
+ try {
+ ingestionDelayTracker =
+ new IngestionDelayTracker(serverMetrics, "dummyTable_RT",
+ realtimeTableDataManager, 0, () -> true);
+ Assert.assertTrue(false); // Constructor must assert
+ } catch (Exception e) {
+ Assert.assertTrue(e instanceof RuntimeException);
+ }
+ }
+
+ @Test
+ public void testRecordIngestionDelayWithNoAging() {
+ final long maxTestDelay = 100;
+ final int partition0 = 0;
+ final int partition1 = 1;
+
+ IngestionDelayTracker ingestionDelayTracker = createTracker();
+ // Use fixed clock so samples dont age
+ Instant now = Instant.now();
+ ZoneId zoneId = ZoneId.systemDefault();
+ Clock clock = Clock.fixed(now, zoneId);
+ ingestionDelayTracker.setClock(clock);
+
+ // Test we follow a single partition up and down
+ for (long i = 0; i <= maxTestDelay; i++) {
+ ingestionDelayTracker.updateIngestionDelay(i, 0, partition0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
i + clock.millis());
+ }
+
+ // Test tracking down a measure for a given partition
+ for (long i = maxTestDelay; i >= 0; i--) {
+ ingestionDelayTracker.updateIngestionDelay(i, 0, partition0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
i + clock.millis());
+ }
+
+ // Make the current partition maximum
+ ingestionDelayTracker.updateIngestionDelay(maxTestDelay, 0, partition0);
+
+ // Bring up partition1 delay up and verify values
+ for (long i = 0; i <= 2 * maxTestDelay; i++) {
+ ingestionDelayTracker.updateIngestionDelay(i, 0, partition1);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1),
i + clock.millis());
+ }
+
+ // Bring down values of partition1 and verify values
+ for (long i = 2 * maxTestDelay; i >= 0; i--) {
+ ingestionDelayTracker.updateIngestionDelay(i, 0, partition1);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1),
i + clock.millis());
+ }
+
+ ingestionDelayTracker.shutdown();
+ Assert.assertTrue(true);
+ }
+
+ @Test
+ public void testRecordIngestionDelayWithAging() {
+ final int partition0 = 0;
+ final long partition0Delay0 = 1000;
+ final long partition0Delay1 = 10; // record lower delay to make sure max
gets reduced
+ final long partition0Offset0Ms = 300;
+ final long partition0Offset1Ms = 1000;
+ final int partition1 = 1;
+ final long partition1Delay0 = 11; // Record something slightly higher than
previous max
+ final long partition1Delay1 = 8; // Record something lower so that
partition0 is the current max again
+ final long partition1Offset0Ms = 150;
+ final long partition1Offset1Ms = 450;
+
+ final long sleepMs = 500;
+
+ IngestionDelayTracker ingestionDelayTracker = createTracker();
+
+ // With samples for a single partition, test that sample is aged as
expected
+ Instant now = Instant.now();
+ ZoneId zoneId = ZoneId.systemDefault();
+ Clock clock = Clock.fixed(now, zoneId);
+ ingestionDelayTracker.setClock(clock);
+ ingestionDelayTracker.updateIngestionDelay(partition0Delay0,
clock.millis(), partition0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
partition0Delay0);
+ // Advance clock and test aging
+ Clock offsetClock = Clock.offset(clock,
Duration.ofMillis(partition0Offset0Ms));
+ ingestionDelayTracker.setClock(offsetClock);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
+ (partition0Delay0 + partition0Offset0Ms));
+
+ // Add a new value below max and verify we are tracking the new max
correctly
+ ingestionDelayTracker.updateIngestionDelay(partition0Delay1,
offsetClock.millis(), partition0);
+ Clock partition0LastUpdate = Clock.offset(offsetClock, Duration.ZERO); //
Save this as we need to verify aging later
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
partition0Delay1);
+ // Add some offset to the last sample and make sure we age that measure
properly
+ offsetClock = Clock.offset(offsetClock,
Duration.ofMillis(partition0Offset1Ms));
+ ingestionDelayTracker.setClock(offsetClock);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition0),
+ (partition0Delay1 + partition0Offset1Ms));
+
+ // Now try setting a new maximum in another partition
+ ingestionDelayTracker.updateIngestionDelay(partition1Delay0,
offsetClock.millis(), partition1);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1),
partition1Delay0);
+ // Add some offset to the last sample and make sure we age that measure
properly
+ offsetClock = Clock.offset(offsetClock,
Duration.ofMillis(partition1Offset0Ms));
+ ingestionDelayTracker.setClock(offsetClock);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partition1),
+ (partition1Delay0 + partition1Offset0Ms));
+
+ ingestionDelayTracker.shutdown();
+ }
+
+ @Test
+ public void testStopTrackingIngestionDelay() {
+ final long maxTestDelay = 100;
+ final int maxPartition = 100;
+
+ IngestionDelayTracker ingestionDelayTracker = createTracker();
+ // Use fixed clock so samples don't age
+ Instant now = Instant.now();
+ ZoneId zoneId = ZoneId.systemDefault();
+ Clock clock = Clock.fixed(now, zoneId);
+ ingestionDelayTracker.setClock(clock);
+
+ // Record a number of partitions with delay equal to partition id
+ for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay;
partitionGroupId++) {
+ ingestionDelayTracker.updateIngestionDelay(partitionGroupId,
clock.millis(), partitionGroupId);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partitionGroupId),
partitionGroupId);
+ }
+ // Verify that as we remove partitions the next available maximum takes
over
+ for (int partitionGroupId = maxPartition; partitionGroupId >= 0;
partitionGroupId--) {
+ ingestionDelayTracker.stopTrackingPartitionIngestionDelay((int)
partitionGroupId);
+ }
+ for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay;
partitionGroupId++) {
+ // Untracked partitions must return 0
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelay(partitionGroupId),
0);
+ }
+ }
+
+ @Test
+ public void testTickInactivePartitions() {
+ Assert.assertTrue(true);
+ }
+
+ @Test
+ public void testMarkPartitionForConfirmation() {
+ Assert.assertTrue(true);
+ }
+}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index a316669277..121e47f6f6 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -200,4 +200,20 @@ public interface TableDataManager {
* @return List of {@link SegmentErrorInfo}
*/
Map<String, SegmentErrorInfo> getSegmentErrors();
+
+ /**
+ * Interface to handle segment state transitions from CONSUMING to DROPPED
+ *
+ * @param segmentNameStr name of segment for which the state change is being
handled
+ */
+ default void onConsumingToDropped(String segmentNameStr) {
+ };
+
+ /**
+ * Interface to handle segment state transitions from CONSUMING to ONLINE
+ *
+ * @param segmentNameStr name of segment for which the state change is being
handled
+ */
+ default void onConsumingToOnline(String segmentNameStr) {
+ };
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
index 4e74682220..67be761616 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/tablestate/TableStateUtils.java
@@ -39,37 +39,56 @@ public class TableStateUtils {
}
/**
- * Checks if all segments for the given @param tableNameWithType are
succesfully loaded
- * This function will get all segments in IDEALSTATE and CURRENTSTATE for
the given table,
- * and then check if all ONLINE segments in IDEALSTATE match with
CURRENTSTATE.
- * @param helixManager helix manager for the server instance
- * @param tableNameWithType table name for which segment state is to be
checked
- * @return true if all segments for the given table are succesfully loaded.
False otherwise
+ * Returns all segments in a given state for a given table.
+ *
+ * @param helixManager instance of Helix manager
+ * @param tableNameWithType table for which we are obtaining ONLINE segments
+ * @param state state of the segments to be returned
+ *
+ * @return List of segment names in a given state.
*/
- public static boolean isAllSegmentsLoaded(HelixManager helixManager, String
tableNameWithType) {
+ public static List<String>
getSegmentsInGivenStateForThisInstance(HelixManager helixManager, String
tableNameWithType,
+ String state) {
HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
IdealState idealState =
dataAccessor.getProperty(keyBuilder.idealStates(tableNameWithType));
+ List<String> segmentsInGivenState = new ArrayList<>();
if (idealState == null) {
LOGGER.warn("Failed to find ideal state for table: {}",
tableNameWithType);
- return false;
+ return segmentsInGivenState;
}
- // Get all ONLINE segments from idealState
+ // Get all segments with state from idealState
String instanceName = helixManager.getInstanceName();
- List<String> onlineSegments = new ArrayList<>();
Map<String, Map<String, String>> idealStatesMap =
idealState.getRecord().getMapFields();
for (Map.Entry<String, Map<String, String>> entry :
idealStatesMap.entrySet()) {
String segmentName = entry.getKey();
Map<String, String> instanceStateMap = entry.getValue();
String expectedState = instanceStateMap.get(instanceName);
- // Only track ONLINE segments assigned to the current instance
- if
(!CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(expectedState))
{
+ // Only track state segments assigned to the current instance
+ if (!state.equals(expectedState)) {
continue;
}
- onlineSegments.add(segmentName);
+ segmentsInGivenState.add(segmentName);
}
+ return segmentsInGivenState;
+ }
+
+ /**
+ * Checks if all segments for the given @param tableNameWithType were
succesfully loaded
+ * This function will get all segments in IDEALSTATE and CURRENTSTATE for
the given table,
+ * and then check if all ONLINE segments in IDEALSTATE match with
CURRENTSTATE.
+ * @param helixManager helix manager for the server instance
+ * @param tableNameWithType table name for which segment state is to be
checked
+ * @return true if all segments for the given table are succesfully loaded.
False otherwise
+ */
+ public static boolean isAllSegmentsLoaded(HelixManager helixManager, String
tableNameWithType) {
+ HelixDataAccessor dataAccessor = helixManager.getHelixDataAccessor();
+ PropertyKey.Builder keyBuilder = dataAccessor.keyBuilder();
+ String instanceName = helixManager.getInstanceName();
+ List<String> onlineSegments =
getSegmentsInGivenStateForThisInstance(helixManager, tableNameWithType,
+ CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE);
if (onlineSegments.size() > 0) {
LiveInstance liveInstance =
dataAccessor.getProperty(keyBuilder.liveInstance(instanceName));
if (liveInstance == null) {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 68773ab89c..fda6caf5c1 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -136,6 +136,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
protected ServerQueriesDisabledTracker _serverQueriesDisabledTracker;
protected RealtimeLuceneIndexRefreshState _realtimeLuceneIndexRefreshState;
protected PinotEnvironmentProvider _pinotEnvironmentProvider;
+ protected volatile boolean _isServerReadyToServeQueries = false;
@Override
public void init(PinotConfiguration serverConf)
@@ -533,6 +534,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
_serverInstance = new ServerInstance(serverConf, _helixManager,
accessControlFactory);
ServerMetrics serverMetrics = _serverInstance.getServerMetrics();
InstanceDataManager instanceDataManager =
_serverInstance.getInstanceDataManager();
+ instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() ->
_isServerReadyToServeQueries);
initSegmentFetcher(_serverConf);
StateModelFactory<?> stateModelFactory =
new SegmentOnlineOfflineStateModelFactory(_instanceId,
instanceDataManager);
@@ -582,7 +584,7 @@ public abstract class BaseServerStarter implements
ServiceStartable {
_serverInstance.startQueryServer();
_helixAdmin.setConfig(_instanceConfigScope,
Collections.singletonMap(Helix.IS_SHUTDOWN_IN_PROGRESS,
Boolean.toString(false)));
-
+ _isServerReadyToServeQueries = true;
// Throttling for realtime consumption is disabled up to this point to
allow maximum consumption during startup time
RealtimeConsumptionRateManager.getInstance().enableThrottling();
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index a254009742..25abdfc9d7 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -35,6 +35,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
+import java.util.function.Supplier;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.configuration.ConfigurationException;
@@ -94,11 +95,17 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
private ServerMetrics _serverMetrics;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
private SegmentUploader _segmentUploader;
+ private Supplier<Boolean> _isReadyToServeQueries = () -> true;
// Fixed size LRU cache for storing last N errors on the instance.
// Key is TableNameWithType-SegmentName pair.
private LoadingCache<Pair<String, String>, SegmentErrorInfo> _errorCache;
+ @Override
+ public void setSupplierOfIsServerReadyToServeQueries(Supplier<Boolean>
isServingQueries) {
+ _isReadyToServeQueries = isServingQueries;
+ }
+
@Override
public synchronized void init(PinotConfiguration config, HelixManager
helixManager, ServerMetrics serverMetrics)
throws ConfigurationException {
@@ -225,7 +232,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
TableDataManagerConfig tableDataManagerConfig = new
TableDataManagerConfig(_instanceDataManagerConfig, tableConfig);
TableDataManager tableDataManager =
TableDataManagerProvider.getTableDataManager(tableDataManagerConfig,
_instanceId, _propertyStore,
- _serverMetrics, _helixManager, _errorCache);
+ _serverMetrics, _helixManager, _errorCache,
_isReadyToServeQueries);
tableDataManager.start();
LOGGER.info("Created table data manager for table: {}", tableNameWithType);
return tableDataManager;
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
index 8c5fa50b51..b85d13b953 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java
@@ -91,15 +91,17 @@ public class SegmentOnlineOfflineStateModelFactory extends
StateModelFactory<Sta
TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(realtimeTableName);
Preconditions.checkNotNull(tableDataManager);
+ tableDataManager.onConsumingToOnline(segmentNameStr);
SegmentDataManager acquiredSegment =
tableDataManager.acquireSegment(segmentNameStr);
// For this transition to be correct in helix, we should already have a
segment that is consuming
if (acquiredSegment == null) {
throw new RuntimeException("Segment " + segmentNameStr + " + not
present ");
}
+ // TODO: https://github.com/apache/pinot/issues/10049
try {
if (!(acquiredSegment instanceof LLRealtimeSegmentDataManager)) {
- // We found a LLC segment that is not consuming right now, must be
that we already swapped it with a
+ // We found an LLC segment that is not consuming right now, must be
that we already swapped it with a
// segment that has been built. Nothing to do for this state
transition.
_logger
.info("Segment {} not an instance of
LLRealtimeSegmentDataManager. Reporting success for the transition",
@@ -138,6 +140,11 @@ public class SegmentOnlineOfflineStateModelFactory extends
StateModelFactory<Sta
@Transition(from = "CONSUMING", to = "DROPPED")
public void onBecomeDroppedFromConsuming(Message message,
NotificationContext context) {
_logger.info("SegmentOnlineOfflineStateModel.onBecomeDroppedFromConsuming() : "
+ message);
+ String realtimeTableName = message.getResourceName();
+ String segmentNameStr = message.getPartitionName();
+ TableDataManager tableDataManager =
_instanceDataManager.getTableDataManager(realtimeTableName);
+ Preconditions.checkNotNull(tableDataManager);
+ tableDataManager.onConsumingToDropped(segmentNameStr);
try {
onBecomeOfflineFromConsuming(message, context);
onBecomeDroppedFromOffline(message, context);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]