This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4d2c3f54a6 Remove ingestion metrics when consuming segment relocates (#13668) 4d2c3f54a6 is described below commit 4d2c3f54a64d4607c1d663994327c81896bd80f1 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Tue Aug 13 16:33:53 2024 -0700 Remove ingestion metrics when consuming segment relocates (#13668) --- .../pinot/common/messages/ForceCommitMessage.java | 2 +- .../messages/IngestionMetricsRemoveMessage.java | 47 ++++ .../realtime/PinotLLCRealtimeSegmentManager.java | 59 ++++- .../PinotLLCRealtimeSegmentManagerTest.java | 3 +- .../manager/realtime/IngestionDelayTracker.java | 290 ++++++++++----------- .../realtime/RealtimeSegmentDataManager.java | 25 +- .../manager/realtime/RealtimeTableDataManager.java | 71 ++--- .../realtime/IngestionDelayTrackerTest.java | 201 +++++++------- .../helix/SegmentMessageHandlerFactory.java | 26 ++ 9 files changed, 428 insertions(+), 296 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java b/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java index 8d9d1b02bd..4535789f42 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/messages/ForceCommitMessage.java @@ -49,7 +49,7 @@ public class ForceCommitMessage extends Message { super(message.getRecord()); String msgSubType = message.getMsgSubType(); Preconditions.checkArgument(msgSubType.equals(FORCE_COMMIT_MSG_SUB_TYPE), - "Invalid message sub type: " + msgSubType + " for SegmentReloadMessage"); + "Invalid message sub type: " + msgSubType + " for ForceCommitMessage"); } public String getTableName() { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/messages/IngestionMetricsRemoveMessage.java b/pinot-common/src/main/java/org/apache/pinot/common/messages/IngestionMetricsRemoveMessage.java new file mode 100644 index 0000000000..3c2100a46a --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/messages/IngestionMetricsRemoveMessage.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.messages; + +import com.google.common.base.Preconditions; +import java.util.UUID; +import org.apache.helix.model.Message; + + +/** + * Ingestion metrics remove message is created on controller and get sent to servers to instruct them to remove + * ingestion metrics for the stream partition of the given segment when the new consuming segment is no longer served by + * the server. + */ +public class IngestionMetricsRemoveMessage extends Message { + public static final String INGESTION_METRICS_REMOVE_MSG_SUB_TYPE = "INGESTION_METRICS_REMOVE"; + + public IngestionMetricsRemoveMessage() { + super(MessageType.USER_DEFINE_MSG, UUID.randomUUID().toString()); + setMsgSubType(INGESTION_METRICS_REMOVE_MSG_SUB_TYPE); + // Give it infinite time to process the message, as long as session is alive + setExecutionTimeout(-1); + } + + public IngestionMetricsRemoveMessage(Message message) { + super(message.getRecord()); + String msgSubType = message.getMsgSubType(); + Preconditions.checkArgument(msgSubType.equals(INGESTION_METRICS_REMOVE_MSG_SUB_TYPE), + "Invalid message sub type: " + msgSubType + " for IngestionMetricsRemoveMessage"); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index c471277590..bb331f2bc4 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Random; @@ -40,6 +41,7 @@ import java.util.stream.Collectors; import javax.annotation.Nullable; import org.apache.commons.lang3.StringUtils; import org.apache.helix.AccessOption; +import org.apache.helix.ClusterMessagingService; import org.apache.helix.Criteria; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; @@ -50,6 +52,7 @@ import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.assignment.InstancePartitions; import org.apache.pinot.common.assignment.InstancePartitionsUtils; import org.apache.pinot.common.messages.ForceCommitMessage; +import org.apache.pinot.common.messages.IngestionMetricsRemoveMessage; import org.apache.pinot.common.metadata.ZKMetadataProvider; import org.apache.pinot.common.metadata.segment.SegmentPartitionMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; @@ -576,8 +579,9 @@ public class PinotLLCRealtimeSegmentManager { // to reduce this contention. We may still contend with RetentionManager, or other updates // to idealstate from other controllers, but then we have the retry mechanism to get around that. synchronized (_helixResourceManager.getIdealStateUpdaterLock(realtimeTableName)) { - updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newConsumingSegmentName, - segmentAssignment, instancePartitionsMap); + idealState = + updateIdealStateOnSegmentCompletion(realtimeTableName, committingSegmentName, newConsumingSegmentName, + segmentAssignment, instancePartitionsMap); } long endTimeNs = System.nanoTime(); @@ -598,6 +602,12 @@ public class PinotLLCRealtimeSegmentManager { // Trigger the metadata event notifier _metadataEventNotifierFactory.create().notifyOnSegmentFlush(tableConfig); + + // Handle segment movement if necessary + if (newConsumingSegmentName != null) { + handleSegmentMovement(realtimeTableName, idealState.getRecord().getMapFields(), committingSegmentName, + newConsumingSegmentName); + } } /** @@ -937,10 +947,10 @@ public class PinotLLCRealtimeSegmentManager { * Updates ideal state after completion of a realtime segment */ @VisibleForTesting - void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, + IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { - HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { + return HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { assert idealState != null; // When segment completion begins, the zk metadata is updated, followed by ideal state. // We allow only {@link PinotLLCRealtimeSegmentManager::MAX_SEGMENT_COMPLETION_TIME_MILLIS} ms for a segment to @@ -1014,6 +1024,47 @@ public class PinotLLCRealtimeSegmentManager { } } + /** + * Handles segment movement between instances. + * If the new consuming segment is served by a different set of servers than the committed segment, notify the + * servers no longer serving the stream partition to remove the ingestion metrics. This can prevent servers from + * emitting high ingestion delay alerts on stream partitions no longer served. + */ + private void handleSegmentMovement(String realtimeTableName, Map<String, Map<String, String>> instanceStatesMap, + String committedSegment, String newConsumingSegment) { + Set<String> oldInstances = instanceStatesMap.get(committedSegment).keySet(); + Set<String> newInstances = instanceStatesMap.get(newConsumingSegment).keySet(); + if (newInstances.containsAll(oldInstances)) { + return; + } + Set<String> instancesNoLongerServe = new HashSet<>(oldInstances); + instancesNoLongerServe.removeAll(newInstances); + LOGGER.info("Segment movement detected for committed segment: {} (served by: {}), " + + "consuming segment: {} (served by: {}) in table: {}, " + + "sending message to instances: {} to remove ingestion metrics", committedSegment, oldInstances, + newConsumingSegment, newInstances, realtimeTableName, instancesNoLongerServe); + + ClusterMessagingService messagingService = _helixManager.getMessagingService(); + List<String> instancesSent = new ArrayList<>(instancesNoLongerServe.size()); + for (String instance : instancesNoLongerServe) { + Criteria recipientCriteria = new Criteria(); + recipientCriteria.setInstanceName(instance); + recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT); + recipientCriteria.setResource(realtimeTableName); + recipientCriteria.setPartition(committedSegment); + recipientCriteria.setSessionSpecific(true); + IngestionMetricsRemoveMessage message = new IngestionMetricsRemoveMessage(); + if (messagingService.send(recipientCriteria, message, null, -1) > 0) { + instancesSent.add(instance); + } else { + LOGGER.warn("Failed to send ingestion metrics remove message for table: {} segment: {} to instance: {}", + realtimeTableName, committedSegment, instance); + } + } + LOGGER.info("Sent ingestion metrics remove message for table: {} segment: {} to instances: {}", realtimeTableName, + committedSegment, instancesSent); + } + /* * A segment commit takes 3 modifications to zookeeper: * - Change old segment metadata (mark it DONE, and then other things) diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 5af7fb92de..16d1983a21 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -1212,13 +1212,14 @@ public class PinotLLCRealtimeSegmentManagerTest { } @Override - void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, + IdealState updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), committingSegmentName, null, segmentAssignment, instancePartitionsMap); updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), null, newSegmentName, segmentAssignment, instancePartitionsMap); + return _idealState; } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java index 6953ddaf33..fd31d8f72b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java @@ -16,10 +16,11 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.pinot.core.data.manager.realtime; import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import java.time.Clock; import java.util.ArrayList; import java.util.List; @@ -32,9 +33,13 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; +import javax.annotation.Nullable; import org.apache.pinot.common.metrics.ServerGauge; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.spi.stream.LongMsgOffset; +import org.apache.pinot.spi.stream.RowMetadata; +import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; @@ -81,43 +86,46 @@ import org.slf4j.LoggerFactory; public class IngestionDelayTracker { - // Class to wrap supported timestamps collected for an ingested event - private static class IngestionTimestamps { - private final long _firstStreamIngestionTimeMs; - private final long _ingestionTimeMs; - IngestionTimestamps(long ingestionTimesMs, long firstStreamIngestionTimeMs) { - _ingestionTimeMs = ingestionTimesMs; - _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs; - } - } + private static class IngestionInfo { + final long _ingestionTimeMs; + final long _firstStreamIngestionTimeMs; + final StreamPartitionMsgOffset _currentOffset; + final StreamPartitionMsgOffset _latestOffset; - private static class IngestionOffsets { - private final StreamPartitionMsgOffset _latestOffset; - private final StreamPartitionMsgOffset _offset; - IngestionOffsets(StreamPartitionMsgOffset offset, StreamPartitionMsgOffset latestOffset) { - _offset = offset; + IngestionInfo(long ingestionTimeMs, long firstStreamIngestionTimeMs, + @Nullable StreamPartitionMsgOffset currentOffset, @Nullable StreamPartitionMsgOffset latestOffset) { + _ingestionTimeMs = ingestionTimeMs; + _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs; + _currentOffset = currentOffset; _latestOffset = latestOffset; } } + private static final Logger LOGGER = LoggerFactory.getLogger(IngestionDelayTracker.class); + // Sleep interval for scheduled executor service thread that triggers read of ideal state private static final int SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS = 300000; // 5 minutes +/- precision in timeouts // Once a partition is marked for verification, we wait 10 minutes to pull its ideal state. private static final int PARTITION_TIMEOUT_MS = 600000; // 10 minutes timeouts // Delay scheduled executor service for this amount of time after starting service private static final int INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS = 100; - private static final Logger _logger = LoggerFactory.getLogger(IngestionDelayTracker.class.getSimpleName()); - // HashMap used to store ingestion time measures for all partitions active for the current table. - private final Map<Integer, IngestionTimestamps> _partitionToIngestionTimestampsMap = new ConcurrentHashMap<>(); + // Cache expire time for ignored segment if there is no update from the segment. + private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10; + + // Per partition info for all partitions active for the current table. + private final Map<Integer, IngestionInfo> _ingestionInfoMap = new ConcurrentHashMap<>(); - private final Map<Integer, IngestionOffsets> _partitionToOffsetMap = new ConcurrentHashMap<>(); // We mark partitions that go from CONSUMING to ONLINE in _partitionsMarkedForVerification: if they do not // go back to CONSUMING in some period of time, we verify whether they are still hosted in this server by reading // ideal state. This is done with the goal of minimizing reading ideal state for efficiency reasons. + // TODO: Consider removing this mechanism after releasing 1.2.0, and use {@link #stopTrackingPartitionIngestionDelay} + // instead. private final Map<Integer, Long> _partitionsMarkedForVerification = new ConcurrentHashMap<>(); - final int _scheduledExecutorThreadTickIntervalMs; + private final Cache<String, Boolean> _segmentsToIgnore = + CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES, TimeUnit.MINUTES).build(); + // TODO: Make thread pool a server/cluster level config // ScheduledExecutorService to check partitions that are inactive against ideal state. private final ScheduledExecutorService _scheduledExecutor = Executors.newScheduledThreadPool(2); @@ -131,6 +139,7 @@ public class IngestionDelayTracker { private Clock _clock; + @VisibleForTesting public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType, RealtimeTableDataManager realtimeTableDataManager, int scheduledExecutorThreadTickIntervalMs, Supplier<Boolean> isServerReadyToServeQueries) @@ -144,9 +153,8 @@ public class IngestionDelayTracker { // Handle negative timer values if (scheduledExecutorThreadTickIntervalMs <= 0) { throw new RuntimeException(String.format("Illegal timer timeout argument, expected > 0, got=%d for table=%s", - scheduledExecutorThreadTickIntervalMs, _tableNameWithType)); + scheduledExecutorThreadTickIntervalMs, _tableNameWithType)); } - _scheduledExecutorThreadTickIntervalMs = scheduledExecutorThreadTickIntervalMs; // ThreadFactory to set the thread's name ThreadFactory threadFactory = new ThreadFactory() { @@ -162,7 +170,7 @@ public class IngestionDelayTracker { ((ScheduledThreadPoolExecutor) _scheduledExecutor).setThreadFactory(threadFactory); _scheduledExecutor.scheduleWithFixedDelay(this::timeoutInactivePartitions, - INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, _scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS); + INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS); } public IngestionDelayTracker(ServerMetrics serverMetrics, String tableNameWithType, @@ -188,41 +196,26 @@ public class IngestionDelayTracker { return agedIngestionDelayMs; } - private long getPartitionOffsetLag(IngestionOffsets offset) { - if (offset == null) { - return 0; - } - StreamPartitionMsgOffset currentOffset = offset._offset; - StreamPartitionMsgOffset latestOffset = offset._latestOffset; - - if (currentOffset == null || latestOffset == null) { - return 0; - } - - // Compute aged delay for current partition - // TODO: Support other types of offsets - if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof LongMsgOffset)) { - return 0; - } - - return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) currentOffset).getOffset(); - } - /* * Helper function to be called when we should stop tracking a given partition. Removes the partition from * all our maps. * - * @param partitionGroupId partition ID which we should stop tracking. + * @param partitionId partition ID which we should stop tracking. */ - private void removePartitionId(int partitionGroupId) { - _partitionToIngestionTimestampsMap.remove(partitionGroupId); - _partitionToOffsetMap.remove(partitionGroupId); + private void removePartitionId(int partitionId) { + _ingestionInfoMap.compute(partitionId, (k, v) -> { + if (v != null) { + // Remove all metrics associated with this partition + _serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_DELAY_MS); + _serverMetrics.removePartitionGauge(_metricName, partitionId, + ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS); + _serverMetrics.removePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG); + } + return null; + }); + // If we are removing a partition we should stop reading its ideal state. - _partitionsMarkedForVerification.remove(partitionGroupId); - _serverMetrics.removePartitionGauge(_metricName, partitionGroupId, ServerGauge.REALTIME_INGESTION_DELAY_MS); - _serverMetrics.removePartitionGauge(_metricName, partitionGroupId, - ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS); - _serverMetrics.removePartitionGauge(_metricName, partitionGroupId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG); + _partitionsMarkedForVerification.remove(partitionId); } /* @@ -241,7 +234,6 @@ public class IngestionDelayTracker { return partitionsToVerify; } - /** * Function that enable use to set predictable clocks for testing purposes. * @@ -255,80 +247,74 @@ public class IngestionDelayTracker { /** * Called by RealTimeSegmentDataManagers to update the ingestion delay metrics for a given partition. * - * @param ingestionTimeMs ingestion time being recorded. - * @param firstStreamIngestionTimeMs time the event was ingested in the first stage of the ingestion pipeline. - * @param msgOffset message offset of the event being ingested. - * @param latestOffset latest message offset in the stream. - * @param partitionGroupId partition ID for which the ingestion metrics are being recorded. + * @param segmentName name of the consuming segment + * @param partitionId partition id of the consuming segment (directly passed in to avoid parsing the segment name) + * @param ingestionTimeMs ingestion time of the last consumed message (from {@link RowMetadata}) + * @param firstStreamIngestionTimeMs ingestion time of the last consumed message in the first stream (from + * {@link RowMetadata}) + * @param currentOffset offset of the last consumed message (from {@link RowMetadata}) + * @param latestOffset offset of the latest message in the partition (from {@link StreamMetadataProvider}) */ - public void updateIngestionMetrics(long ingestionTimeMs, long firstStreamIngestionTimeMs, - StreamPartitionMsgOffset msgOffset, StreamPartitionMsgOffset latestOffset, - int partitionGroupId) { + public void updateIngestionMetrics(String segmentName, int partitionId, long ingestionTimeMs, + long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset, + @Nullable StreamPartitionMsgOffset latestOffset) { if (!_isServerReadyToServeQueries.get() || _realTimeTableDataManager.isShutDown()) { // Do not update the ingestion delay metrics during server startup period // or once the table data manager has been shutdown. return; } - updateIngestionDelay(ingestionTimeMs, firstStreamIngestionTimeMs, partitionGroupId); - updateIngestionOffsets(msgOffset, latestOffset, partitionGroupId); - - // If we are consuming we do not need to track this partition for removal. - _partitionsMarkedForVerification.remove(partitionGroupId); - } - - public void updateIngestionDelay(long ingestionTimeMs, long firstStreamIngestionTimeMs, int partitionGroupId) { - if ((ingestionTimeMs < 0) && (firstStreamIngestionTimeMs < 0)) { - // If stream does not return a valid ingestion timestamps don't publish a metric + if (ingestionTimeMs < 0 && firstStreamIngestionTimeMs < 0 && (currentOffset == null || latestOffset == null)) { + // Do not publish metrics if stream does not return valid ingestion time or offset. return; } - IngestionTimestamps previousMeasure = _partitionToIngestionTimestampsMap.put(partitionGroupId, - new IngestionTimestamps(ingestionTimeMs, firstStreamIngestionTimeMs)); - if (previousMeasure == null) { - // First time we start tracking a partition we should start tracking it via metric - // Only publish the metric if supported by the underlying stream. If not supported the stream - // returns Long.MIN_VALUE - if (ingestionTimeMs >= 0) { - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId, ServerGauge.REALTIME_INGESTION_DELAY_MS, - () -> getPartitionIngestionDelayMs(partitionGroupId)); + + _ingestionInfoMap.compute(partitionId, (k, v) -> { + if (_segmentsToIgnore.getIfPresent(segmentName) != null) { + // Do not update the metrics for the segment that is marked to be ignored. + return v; } - if (firstStreamIngestionTimeMs >= 0) { - // Only publish this metric when creation time is supported by the underlying stream - // When this timestamp is not supported it always returns the value Long.MIN_VALUE - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId, - ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, - () -> getPartitionEndToEndIngestionDelayMs(partitionGroupId)); + if (v == null) { + // Add metric when we start tracking a partition. Only publish the metric if supported by the stream. + if (ingestionTimeMs > 0) { + _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_DELAY_MS, + () -> getPartitionIngestionDelayMs(partitionId)); + } + if (firstStreamIngestionTimeMs > 0) { + _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, + ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS, + () -> getPartitionEndToEndIngestionDelayMs(partitionId)); + } + if (currentOffset != null && latestOffset != null) { + _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId, ServerGauge.REALTIME_INGESTION_OFFSET_LAG, + () -> getPartitionIngestionOffsetLag(partitionId)); + } } - } - } + return new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs, currentOffset, latestOffset); + }); - public void updateIngestionOffsets(StreamPartitionMsgOffset currentOffset, StreamPartitionMsgOffset latestOffset, - int partitionGroupId) { - if ((currentOffset == null)) { - // If stream does not return a valid ingestion offset don't publish a metric - return; - } - IngestionOffsets previousMeasure = - _partitionToOffsetMap.put(partitionGroupId, new IngestionOffsets(currentOffset, latestOffset)); - if (previousMeasure == null) { - // First time we start tracking a partition we should start tracking it via metric - // Only publish the metric if supported by the underlying stream. If not supported the stream - // returns Long.MIN_VALUE - if (currentOffset != null) { - _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionGroupId, - ServerGauge.REALTIME_INGESTION_OFFSET_LAG, () -> getPartitionIngestionOffsetLag(partitionGroupId)); - } - } + // If we are consuming we do not need to track this partition for removal. + _partitionsMarkedForVerification.remove(partitionId); } /* * Handle partition removal event. This must be invoked when we stop serving a given partition for * this table in the current server. * - * @param partitionGroupId partition id that we should stop tracking. + * @param partitionId partition id that we should stop tracking. */ - public void stopTrackingPartitionIngestionDelay(int partitionGroupId) { - removePartitionId(partitionGroupId); + public void stopTrackingPartitionIngestionDelay(int partitionId) { + removePartitionId(partitionId); + } + + /** + * Stops tracking the partition ingestion delay, and also ignores the updates from the given segment. This is useful + * when we want to stop tracking the ingestion delay for a partition when the segment might still be consuming, e.g. + * when the new consuming segment is created on a different server. + */ + public void stopTrackingPartitionIngestionDelay(String segmentName) { + _segmentsToIgnore.put(segmentName, true); + removePartitionId(new LLCSegmentName(segmentName).getPartitionGroupId()); } /* @@ -345,7 +331,7 @@ public class IngestionDelayTracker { // Check if we have any partition to verify, else don't make the call to check ideal state as that // involves network traffic and may be inefficient. List<Integer> partitionsToVerify = getPartitionsToBeVerified(); - if (partitionsToVerify.size() == 0) { + if (partitionsToVerify.isEmpty()) { // Don't make the call to getHostedPartitionsGroupIds() as it involves checking ideal state. return; } @@ -353,87 +339,81 @@ public class IngestionDelayTracker { try { partitionsHostedByThisServer = _realTimeTableDataManager.getHostedPartitionsGroupIds(); } catch (Exception e) { - _logger.error("Failed to get partitions hosted by this server, table={}, exception={}:{}", _tableNameWithType, + LOGGER.error("Failed to get partitions hosted by this server, table={}, exception={}:{}", _tableNameWithType, e.getClass(), e.getMessage()); return; } - for (int partitionGroupId : partitionsToVerify) { - if (!partitionsHostedByThisServer.contains(partitionGroupId)) { + for (int partitionId : partitionsToVerify) { + if (!partitionsHostedByThisServer.contains(partitionId)) { // Partition is not hosted in this server anymore, stop tracking it - removePartitionId(partitionGroupId); + removePartitionId(partitionId); } } } - /* - * This function is invoked when a partition goes from CONSUMING to ONLINE, so we can assert whether the - * partition is still hosted by this server after some interval of time. - * - * @param partitionGroupId Partition id that we need confirmed via ideal state as still hosted by this server. + /** + * This function is invoked when a segment goes from CONSUMING to ONLINE, so we can assert whether the partition of + * the segment is still hosted by this server after some interval of time. */ - public void markPartitionForVerification(int partitionGroupId) { - if (!_isServerReadyToServeQueries.get()) { - // Do not update the tracker state during server startup period + public void markPartitionForVerification(String segmentName) { + if (!_isServerReadyToServeQueries.get() || _segmentsToIgnore.getIfPresent(segmentName) != null) { + // Do not update the tracker state during server startup period or if the segment is marked to be ignored return; } - _partitionsMarkedForVerification.put(partitionGroupId, _clock.millis()); + _partitionsMarkedForVerification.put(new LLCSegmentName(segmentName).getPartitionGroupId(), _clock.millis()); } /* * Method to get timestamp used for the ingestion delay for a given partition. * - * @param partitionGroupId partition for which we are retrieving the delay + * @param partitionId partition for which we are retrieving the delay * * @return ingestion delay timestamp in milliseconds for the given partition ID. */ - public long getPartitionIngestionTimeMs(int partitionGroupId) { - // Not protected as this will only be invoked when metric is installed which happens after server ready - IngestionTimestamps currentMeasure = _partitionToIngestionTimestampsMap.get(partitionGroupId); - if (currentMeasure == null) { // Guard just in case we read the metric without initializing it - return Long.MIN_VALUE; - } - return currentMeasure._ingestionTimeMs; + public long getPartitionIngestionTimeMs(int partitionId) { + IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId); + return ingestionInfo != null ? ingestionInfo._ingestionTimeMs : Long.MIN_VALUE; } /* * Method to get ingestion delay for a given partition. * - * @param partitionGroupId partition for which we are retrieving the delay + * @param partitionId partition for which we are retrieving the delay * * @return ingestion delay in milliseconds for the given partition ID. */ - public long getPartitionIngestionDelayMs(int partitionGroupId) { - // Not protected as this will only be invoked when metric is installed which happens after server ready - IngestionTimestamps currentMeasure = _partitionToIngestionTimestampsMap.get(partitionGroupId); - if (currentMeasure == null) { // Guard just in case we read the metric without initializing it - return 0; - } - return getIngestionDelayMs(currentMeasure._ingestionTimeMs); - } - - public long getPartitionIngestionOffsetLag(int partitionGroupId) { - // Not protected as this will only be invoked when metric is installed which happens after server ready - IngestionOffsets currentMeasure = _partitionToOffsetMap.get(partitionGroupId); - if (currentMeasure == null) { // Guard just in case we read the metric without initializing it - return 0; - } - return getPartitionOffsetLag(currentMeasure); + public long getPartitionIngestionDelayMs(int partitionId) { + IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId); + return ingestionInfo != null ? getIngestionDelayMs(ingestionInfo._ingestionTimeMs) : 0; } /* * Method to get end to end ingestion delay for a given partition. * - * @param partitionGroupId partition for which we are retrieving the delay + * @param partitionId partition for which we are retrieving the delay * * @return End to end ingestion delay in milliseconds for the given partition ID. */ - public long getPartitionEndToEndIngestionDelayMs(int partitionGroupId) { - // Not protected as this will only be invoked when metric is installed which happens after server ready - IngestionTimestamps currentMeasure = _partitionToIngestionTimestampsMap.get(partitionGroupId); - if (currentMeasure == null) { // Guard just in case we read the metric without initializing it + public long getPartitionEndToEndIngestionDelayMs(int partitionId) { + IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId); + return ingestionInfo != null ? getIngestionDelayMs(ingestionInfo._firstStreamIngestionTimeMs) : 0; + } + + public long getPartitionIngestionOffsetLag(int partitionId) { + IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId); + if (ingestionInfo == null) { return 0; } - return getIngestionDelayMs(currentMeasure._firstStreamIngestionTimeMs); + StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset; + StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset; + if (currentOffset == null || latestOffset == null) { + return 0; + } + // TODO: Support other types of offsets + if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof LongMsgOffset)) { + return 0; + } + return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset) currentOffset).getOffset(); } /* @@ -448,8 +428,8 @@ public class IngestionDelayTracker { return; } // Remove partitions so their related metrics get uninstalled. - for (Map.Entry<Integer, IngestionTimestamps> entry : _partitionToIngestionTimestampsMap.entrySet()) { - removePartitionId(entry.getKey()); + for (Integer partitionId : _ingestionInfoMap.keySet()) { + removePartitionId(partitionId); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 5acd5d57ee..648bed6080 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -964,6 +964,7 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { return _currentOffset; } + @Nullable public StreamPartitionMsgOffset getLatestStreamOffsetAtStartupTime() { return _latestStreamOffsetAtStartupTime; } @@ -1683,22 +1684,27 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { return _idleTimer.getTimeSinceEventLastConsumedMs(); } + @Nullable public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs, boolean useDebugLog) { return fetchStreamOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA, maxWaitTimeMs, useDebugLog); } + @Nullable public StreamPartitionMsgOffset fetchLatestStreamOffset(long maxWaitTimeMs) { return fetchLatestStreamOffset(maxWaitTimeMs, false); } + @Nullable public StreamPartitionMsgOffset fetchEarliestStreamOffset(long maxWaitTimeMs, boolean useDebugLog) { return fetchStreamOffset(OffsetCriteria.SMALLEST_OFFSET_CRITERIA, maxWaitTimeMs, useDebugLog); } + @Nullable public StreamPartitionMsgOffset fetchEarliestStreamOffset(long maxWaitTimeMs) { return fetchEarliestStreamOffset(maxWaitTimeMs, false); } + @Nullable private StreamPartitionMsgOffset fetchStreamOffset(OffsetCriteria offsetCriteria, long maxWaitTimeMs, boolean useDebugLog) { if (_partitionMetadataProvider == null) { @@ -1797,9 +1803,9 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { * Assumes there is a valid instance of {@link PartitionGroupConsumer} */ private void recreateStreamConsumer(String reason) { - _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason); - _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset); - closePartitionGroupConsumer(); + _segmentLogger.info("Recreating stream consumer for topic partition {}, reason: {}", _clientId, reason); + _currentOffset = _partitionGroupConsumer.checkpoint(_currentOffset); + closePartitionGroupConsumer(); try { _partitionGroupConsumer = _streamConsumerFactory.createPartitionGroupConsumer(_clientId, _partitionGroupConsumptionStatus); @@ -1825,20 +1831,23 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { if (metadata != null) { try { StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000, true); - _realtimeTableDataManager.updateIngestionMetrics(metadata.getRecordIngestionTimeMs(), - metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset(), latestOffset, _partitionGroupId); + _realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, _partitionGroupId, + metadata.getRecordIngestionTimeMs(), metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset(), + latestOffset); } catch (Exception e) { _segmentLogger.warn("Failed to fetch latest offset for updating ingestion delay", e); } } } - /* + /** * Sets ingestion delay to zero in situations where we are caught up processing events. + * TODO: Revisit if we should preserve the offset info. */ private void setIngestionDelayToZero() { long currentTimeMs = System.currentTimeMillis(); - _realtimeTableDataManager.updateIngestionMetrics(currentTimeMs, currentTimeMs, null, null, _partitionGroupId); + _realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr, _partitionGroupId, currentTimeMs, currentTimeMs, + null, null); } // This should be done during commit? We may not always commit when we build a segment.... @@ -1878,14 +1887,12 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { /** * Creates a {@link StreamMessageDecoder} using properties in {@link StreamConfig}. * - * @param streamConfig The stream config from the table config * @param fieldsToRead The fields to read from the source stream * @return The initialized StreamMessageDecoder */ private StreamMessageDecoder createMessageDecoder(Set<String> fieldsToRead) { String decoderClass = _streamConfig.getDecoderClass(); try { - Map<String, String> decoderProperties = _streamConfig.getDecoderProperties(); StreamMessageDecoder decoder = PluginManager.get().createInstance(decoderClass); decoder.init(fieldsToRead, _streamConfig, _tableConfig, _schema); return decoder; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index 75e8a4c235..8393da3884 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.function.BooleanSupplier; import java.util.function.Supplier; +import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.io.FileUtils; @@ -74,6 +75,8 @@ import org.apache.pinot.spi.data.DateTimeFieldSpec; import org.apache.pinot.spi.data.DateTimeFormatSpec; import org.apache.pinot.spi.data.FieldSpec; import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.stream.RowMetadata; +import org.apache.pinot.spi.stream.StreamMetadataProvider; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status; @@ -262,57 +265,63 @@ public class RealtimeTableDataManager extends BaseTableDataManager { } } - /* - * Method used by RealtimeSegmentManagers to update their partition delays + /** + * Updates the ingestion metrics for the given partition. * - * @param ingestionTimeMs Ingestion delay being reported. - * @param firstStreamIngestionTimeMs Ingestion time of the first message in the stream. - * @param partitionGroupId Partition ID for which delay is being updated. - * @param offset last offset received for the partition. - * @param latestOffset latest upstream offset for the partition. + * @param segmentName name of the consuming segment + * @param partitionId partition id of the consuming segment (directly passed in to avoid parsing the segment name) + * @param ingestionTimeMs ingestion time of the last consumed message (from {@link RowMetadata}) + * @param firstStreamIngestionTimeMs ingestion time of the last consumed message in the first stream (from + * {@link RowMetadata}) + * @param currentOffset offset of the last consumed message (from {@link RowMetadata}) + * @param latestOffset offset of the latest message in the partition (from {@link StreamMetadataProvider}) */ - public void updateIngestionMetrics(long ingestionTimeMs, long firstStreamIngestionTimeMs, - StreamPartitionMsgOffset offset, StreamPartitionMsgOffset latestOffset, int partitionGroupId) { - _ingestionDelayTracker.updateIngestionMetrics(ingestionTimeMs, firstStreamIngestionTimeMs, offset, latestOffset, - partitionGroupId); + public void updateIngestionMetrics(String segmentName, int partitionId, long ingestionTimeMs, + long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset currentOffset, + @Nullable StreamPartitionMsgOffset latestOffset) { + _ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, ingestionTimeMs, firstStreamIngestionTimeMs, + currentOffset, latestOffset); } - /* - * Method used during query execution (ServerQueryExecutorV1Impl) to get the current timestamp for the ingestion - * delay for a partition - * - * @param segmentNameStr name of segment for which we want the ingestion delay timestamp. - * @return timestamp of the ingestion delay for the partition. + /** + * Returns the ingestion time of the last consumed message for the partition of the given segment. Returns + * {@code Long.MIN_VALUE} when it is not available. */ - public long getPartitionIngestionTimeMs(String segmentNameStr) { - LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); - int partitionGroupId = segmentName.getPartitionGroupId(); - return _ingestionDelayTracker.getPartitionIngestionTimeMs(partitionGroupId); + public long getPartitionIngestionTimeMs(String segmentName) { + return _ingestionDelayTracker.getPartitionIngestionTimeMs(new LLCSegmentName(segmentName).getPartitionGroupId()); } - /* + /** + * Removes the ingestion metrics for the partition of the given segment, and also ignores the updates from the given + * segment. This is useful when we want to stop tracking the ingestion delay for a partition when the segment might + * still be consuming, e.g. when the new consuming segment is created on a different server. + */ + public void removeIngestionMetrics(String segmentName) { + _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName); + } + + /** * Method to handle CONSUMING -> DROPPED segment state transitions: * We stop tracking partitions whose segments are dropped. * - * @param segmentNameStr name of segment which is transitioning state. + * @param segmentName name of segment which is transitioning state. */ @Override - public void onConsumingToDropped(String segmentNameStr) { - LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); - _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName.getPartitionGroupId()); + public void onConsumingToDropped(String segmentName) { + // NOTE: No need to mark segment ignored here because it should have already been dropped. + _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(new LLCSegmentName(segmentName).getPartitionGroupId()); } - /* + /** * Method to handle CONSUMING -> ONLINE segment state transitions: * We mark partitions for verification against ideal state when we do not see a consuming segment for some time * for that partition. The idea is to remove the related metrics when the partition moves from the current server. * - * @param segmentNameStr name of segment which is transitioning state. + * @param segmentName name of segment which is transitioning state. */ @Override - public void onConsumingToOnline(String segmentNameStr) { - LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); - _ingestionDelayTracker.markPartitionForVerification(segmentName.getPartitionGroupId()); + public void onConsumingToOnline(String segmentName) { + _ingestionDelayTracker.markPartitionForVerification(segmentName); } @Override diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java index 8072731583..9cb527b121 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.pinot.core.data.manager.realtime; import java.time.Clock; @@ -24,8 +23,10 @@ import java.time.Duration; import java.time.Instant; import java.time.ZoneId; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.spi.stream.LongMsgOffset; import org.apache.pinot.spi.stream.StreamPartitionMsgOffset; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.testng.Assert; import org.testng.annotations.Test; @@ -33,7 +34,8 @@ import static org.mockito.Mockito.mock; public class IngestionDelayTrackerTest { - private static final String REALTIME_TABLE_NAME = "testTable_REALTIME"; + private static final String RAW_TABLE_NAME = "testTable"; + private static final String REALTIME_TABLE_NAME = TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME); private static final int TIMER_THREAD_TICK_INTERVAL_MS = 100; private final ServerMetrics _serverMetrics = mock(ServerMetrics.class); @@ -66,8 +68,7 @@ public class IngestionDelayTrackerTest { Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), Long.MIN_VALUE); // Test bad timer args to the constructor try { - new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, _realtimeTableDataManager, - 0, () -> true); + new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME, _realtimeTableDataManager, 0, () -> true); Assert.fail("Must have asserted due to invalid arguments"); // Constructor must assert } catch (Exception e) { if ((e instanceof NullPointerException) || !(e instanceof RuntimeException)) { @@ -80,7 +81,9 @@ public class IngestionDelayTrackerTest { public void testRecordIngestionDelayWithNoAging() { final long maxTestDelay = 100; final int partition0 = 0; + final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0, 123).getSegmentName(); final int partition1 = 1; + final String segment1 = new LLCSegmentName(RAW_TABLE_NAME, partition1, 0, 234).getSegmentName(); IngestionDelayTracker ingestionDelayTracker = createTracker(); // Use fixed clock so samples dont age @@ -90,43 +93,54 @@ public class IngestionDelayTrackerTest { ingestionDelayTracker.setClock(clock); // Test we follow a single partition up and down - for (long i = 0; i <= maxTestDelay; i++) { - long firstStreamIngestionTimeMs = i + 1; - ingestionDelayTracker.updateIngestionDelay(i, firstStreamIngestionTimeMs, partition0); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), clock.millis() - i); + for (long ingestionTimeMs = 0; ingestionTimeMs <= maxTestDelay; ingestionTimeMs++) { + long firstStreamIngestionTimeMs = ingestionTimeMs + 1; + ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, ingestionTimeMs, firstStreamIngestionTimeMs, + null, null); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), + clock.millis() - ingestionTimeMs); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0), clock.millis() - firstStreamIngestionTimeMs); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0), i); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0), ingestionTimeMs); } // Test tracking down a measure for a given partition - for (long i = maxTestDelay; i >= 0; i--) { - ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition0); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), clock.millis() - i); + for (long ingestionTimeMs = maxTestDelay; ingestionTimeMs >= 0; ingestionTimeMs--) { + long firstStreamIngestionTimeMs = ingestionTimeMs + 1; + ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, ingestionTimeMs, firstStreamIngestionTimeMs, + null, null); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), + clock.millis() - ingestionTimeMs); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0), - clock.millis() - (i + 1)); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0), i); + clock.millis() - (ingestionTimeMs + 1)); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0), ingestionTimeMs); } // Make the current partition maximum - ingestionDelayTracker.updateIngestionDelay(maxTestDelay, maxTestDelay, partition0); + ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, maxTestDelay, maxTestDelay, null, null); // Bring up partition1 delay up and verify values - for (long i = 0; i <= 2 * maxTestDelay; i++) { - ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition1); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), clock.millis() - i); + for (long ingestionTimeMs = 0; ingestionTimeMs <= 2 * maxTestDelay; ingestionTimeMs++) { + long firstStreamIngestionTimeMs = ingestionTimeMs + 1; + ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, ingestionTimeMs, firstStreamIngestionTimeMs, + null, null); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), + clock.millis() - ingestionTimeMs); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1), - clock.millis() - (i + 1)); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1), i); + clock.millis() - firstStreamIngestionTimeMs); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1), ingestionTimeMs); } // Bring down values of partition1 and verify values - for (long i = 2 * maxTestDelay; i >= 0; i--) { - ingestionDelayTracker.updateIngestionDelay(i, (i + 1), partition1); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), clock.millis() - i); + for (long ingestionTimeMs = 2 * maxTestDelay; ingestionTimeMs >= 0; ingestionTimeMs--) { + long firstStreamIngestionTimeMs = ingestionTimeMs + 1; + ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, ingestionTimeMs, firstStreamIngestionTimeMs, + null, null); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), + clock.millis() - ingestionTimeMs); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1), - clock.millis() - (i + 1)); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1), i); + clock.millis() - firstStreamIngestionTimeMs); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1), ingestionTimeMs); } ingestionDelayTracker.shutdown(); @@ -136,11 +150,13 @@ public class IngestionDelayTrackerTest { @Test public void testRecordIngestionDelayWithAging() { final int partition0 = 0; + final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0, 123).getSegmentName(); final long partition0Delay0 = 1000; final long partition0Delay1 = 10; // record lower delay to make sure max gets reduced final long partition0Offset0Ms = 300; final long partition0Offset1Ms = 1000; final int partition1 = 1; + final String segment1 = new LLCSegmentName(RAW_TABLE_NAME, partition1, 0, 234).getSegmentName(); final long partition1Delay0 = 11; final long partition1Offset0Ms = 150; @@ -151,14 +167,12 @@ public class IngestionDelayTrackerTest { ZoneId zoneId = ZoneId.systemDefault(); Clock clock = Clock.fixed(now, zoneId); ingestionDelayTracker.setClock(clock); - long ingestionTimeMs = (clock.millis() - partition0Delay0); - ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs, - (clock.millis() - partition0Delay0), partition0); - ingestionDelayTracker.updateIngestionDelay((clock.millis() - partition0Delay0), (clock.millis() - partition0Delay0), - partition0); + long ingestionTimeMs = clock.millis() - partition0Delay0; + ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, ingestionTimeMs, ingestionTimeMs, null, null); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), partition0Delay0); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0), partition0Delay0); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0), ingestionTimeMs); + // Advance clock and test aging Clock offsetClock = Clock.offset(clock, Duration.ofMillis(partition0Offset0Ms)); ingestionDelayTracker.setClock(offsetClock); @@ -168,9 +182,8 @@ public class IngestionDelayTrackerTest { (partition0Delay0 + partition0Offset0Ms)); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0), ingestionTimeMs); - ingestionTimeMs = (offsetClock.millis() - partition0Delay1); - ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs, - (offsetClock.millis() - partition0Delay1), partition0); + ingestionTimeMs = offsetClock.millis() - partition0Delay1; + ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, ingestionTimeMs, ingestionTimeMs, null, null); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), partition0Delay1); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0), partition0Delay1); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0), ingestionTimeMs); @@ -181,9 +194,8 @@ public class IngestionDelayTrackerTest { Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0), (partition0Delay1 + partition0Offset1Ms)); - ingestionTimeMs = (offsetClock.millis() - partition1Delay0); - ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs, - (offsetClock.millis() - partition1Delay0), partition1); + ingestionTimeMs = offsetClock.millis() - partition1Delay0; + ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, ingestionTimeMs, ingestionTimeMs, null, null); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1), partition1Delay0); Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1), partition1Delay0); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1), ingestionTimeMs); @@ -210,26 +222,54 @@ public class IngestionDelayTrackerTest { ingestionDelayTracker.setClock(clock); // Record a number of partitions with delay equal to partition id - for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; partitionGroupId++) { - long ingestionTimeMs = (clock.millis() - partitionGroupId); - ingestionDelayTracker.updateIngestionDelay(ingestionTimeMs, ingestionTimeMs, partitionGroupId); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId), partitionGroupId); - Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId), - partitionGroupId); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partitionGroupId), ingestionTimeMs); + for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) { + String segmentName = new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0, 123).getSegmentName(); + long ingestionTimeMs = clock.millis() - partitionId; + ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, ingestionTimeMs, ingestionTimeMs, null, + null); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId), partitionId); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId), partitionId); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partitionId), ingestionTimeMs); } - for (int partitionGroupId = maxPartition; partitionGroupId >= 0; partitionGroupId--) { - ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionGroupId); + for (int partitionId = maxPartition; partitionId >= 0; partitionId--) { + ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionId); } - for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; partitionGroupId++) { + for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) { // Untracked partitions must return 0 - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId), 0); - Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId), 0); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs( - partitionGroupId), Long.MIN_VALUE); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId), 0); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId), 0); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partitionId), Long.MIN_VALUE); } } + @Test + public void testStopTrackingIngestionDelayWithSegment() { + IngestionDelayTracker ingestionDelayTracker = createTracker(); + // Use fixed clock so samples don't age + Instant now = Instant.now(); + ZoneId zoneId = ZoneId.systemDefault(); + Clock clock = Clock.fixed(now, zoneId); + ingestionDelayTracker.setClock(clock); + + String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 123).getSegmentName(); + long ingestionTimeMs = clock.millis() - 10; + ingestionDelayTracker.updateIngestionMetrics(segmentName, 0, ingestionTimeMs, ingestionTimeMs, null, null); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 10); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0), 10); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), ingestionTimeMs); + + ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 0); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0), 0); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), Long.MIN_VALUE); + + // Should not update metrics for removed segment + ingestionDelayTracker.updateIngestionMetrics(segmentName, 0, ingestionTimeMs, ingestionTimeMs, null, null); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0), 0); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0), 0); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0), Long.MIN_VALUE); + } + @Test public void testShutdown() { final long maxTestDelay = 100; @@ -242,12 +282,13 @@ public class IngestionDelayTrackerTest { ingestionDelayTracker.setClock(clock); // Test Shutdown with partitions active - for (int partitionGroupId = 0; partitionGroupId <= maxTestDelay; partitionGroupId++) { - ingestionDelayTracker.updateIngestionDelay((clock.millis() - partitionGroupId), - (clock.millis() - partitionGroupId), partitionGroupId); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionGroupId), partitionGroupId); - Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionGroupId), - partitionGroupId); + for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) { + String segmentName = new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0, 123).getSegmentName(); + long ingestionTimeMs = clock.millis() - partitionId; + ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId, ingestionTimeMs, ingestionTimeMs, null, + null); + Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId), partitionId); + Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId), partitionId); } ingestionDelayTracker.shutdown(); @@ -257,65 +298,35 @@ public class IngestionDelayTrackerTest { } @Test - public void testRecordIngestionDelayOffsetWithNoAging() { + public void testRecordIngestionDelayOffset() { final int partition0 = 0; + final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0, 123).getSegmentName(); final int partition1 = 1; + final String segment1 = new LLCSegmentName(RAW_TABLE_NAME, partition1, 0, 234).getSegmentName(); IngestionDelayTracker ingestionDelayTracker = createTracker(); - // Use fixed clock so samples don't age - Instant now = Instant.now(); - ZoneId zoneId = ZoneId.systemDefault(); - Clock clock = Clock.fixed(now, zoneId); - ingestionDelayTracker.setClock(clock); // Test tracking offset lag for a single partition StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(100); StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(200); - ingestionDelayTracker.updateIngestionOffsets(msgOffset0, latestOffset0, partition0); + ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0, + latestOffset0); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0), 100); // Test tracking offset lag for another partition StreamPartitionMsgOffset msgOffset1 = new LongMsgOffset(50); StreamPartitionMsgOffset latestOffset1 = new LongMsgOffset(150); - ingestionDelayTracker.updateIngestionOffsets(msgOffset1, latestOffset1, partition1); + ingestionDelayTracker.updateIngestionMetrics(segment1, partition1, Long.MIN_VALUE, Long.MIN_VALUE, msgOffset1, + latestOffset1); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition1), 100); // Update offset lag for partition0 msgOffset0 = new LongMsgOffset(150); latestOffset0 = new LongMsgOffset(200); - ingestionDelayTracker.updateIngestionOffsets(msgOffset0, latestOffset0, partition0); + ingestionDelayTracker.updateIngestionMetrics(segment0, partition0, Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0, + latestOffset0); Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0), 50); ingestionDelayTracker.shutdown(); } - - @Test - public void testRecordIngestionDelayOffsetWithAging() { - final int partition0 = 0; - final long partition0OffsetLag0 = 100; - final long partition0OffsetLag1 = 50; - - IngestionDelayTracker ingestionDelayTracker = createTracker(); - - // With samples for a single partition, test that sample is aged as expected - Instant now = Instant.now(); - ZoneId zoneId = ZoneId.systemDefault(); - Clock clock = Clock.fixed(now, zoneId); - ingestionDelayTracker.setClock(clock); - - StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(100); - StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(200); - ingestionDelayTracker.updateIngestionOffsets(msgOffset0, latestOffset0, partition0); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0), partition0OffsetLag0); - - // Update offset lag and test aging - msgOffset0 = new LongMsgOffset(150); - latestOffset0 = new LongMsgOffset(200); - ingestionDelayTracker.updateIngestionOffsets(msgOffset0, latestOffset0, partition0); - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0), partition0OffsetLag1); - - - Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0), partition0OffsetLag1); - ingestionDelayTracker.shutdown(); - } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java index 11729316c4..0b7111a75e 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentMessageHandlerFactory.java @@ -30,6 +30,7 @@ import org.apache.helix.messaging.handling.MessageHandlerFactory; import org.apache.helix.model.Message; import org.apache.pinot.common.Utils; import org.apache.pinot.common.messages.ForceCommitMessage; +import org.apache.pinot.common.messages.IngestionMetricsRemoveMessage; import org.apache.pinot.common.messages.SegmentRefreshMessage; import org.apache.pinot.common.messages.SegmentReloadMessage; import org.apache.pinot.common.messages.TableDeletionMessage; @@ -39,7 +40,9 @@ import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.metrics.ServerQueryPhase; import org.apache.pinot.common.metrics.ServerTimer; import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager; import org.apache.pinot.core.util.SegmentRefreshSemaphore; +import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,6 +75,8 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { return new TableDeletionMessageHandler(new TableDeletionMessage(message), _metrics, context); case ForceCommitMessage.FORCE_COMMIT_MSG_SUB_TYPE: return new ForceCommitMessageHandler(new ForceCommitMessage(message), _metrics, context); + case IngestionMetricsRemoveMessage.INGESTION_METRICS_REMOVE_MSG_SUB_TYPE: + return new IngestionMetricsRemoveMessageHandler(new IngestionMetricsRemoveMessage(message), _metrics, context); default: LOGGER.warn("Unsupported user defined message sub type: {} for segment: {}", msgSubType, message.getPartitionName()); @@ -229,6 +234,27 @@ public class SegmentMessageHandlerFactory implements MessageHandlerFactory { } } + private class IngestionMetricsRemoveMessageHandler extends DefaultMessageHandler { + + IngestionMetricsRemoveMessageHandler(IngestionMetricsRemoveMessage message, ServerMetrics metrics, + NotificationContext context) { + super(message, metrics, context); + } + + @Override + public HelixTaskResult handleMessage() { + _logger.info("Handling ingestion metrics remove message for table: {}, segment: {}", _tableNameWithType, + _segmentName); + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(_tableNameWithType); + if (tableDataManager instanceof RealtimeTableDataManager) { + ((RealtimeTableDataManager) tableDataManager).removeIngestionMetrics(_segmentName); + } + HelixTaskResult helixTaskResult = new HelixTaskResult(); + helixTaskResult.setSuccess(true); + return helixTaskResult; + } + } + private static class DefaultMessageHandler extends MessageHandler { final String _segmentName; final String _tableNameWithType; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org