This is an automated email from the ASF dual-hosted git repository.
manishswaminathan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f73e756d06d Fix Realtime Ingestion Metrics (#16783)
f73e756d06d is described below
commit f73e756d06d7ba5aa6592e6d77a53401848fae94
Author: NOOB <[email protected]>
AuthorDate: Tue Oct 7 15:54:03 2025 +0530
Fix Realtime Ingestion Metrics (#16783)
* Enhance and fix realtime ingestion delay calculation
* wip
* wip
* updates tracker
* Fixes tracker test
* Fixes lint and tests
* Removes endToEndIngestionDelay
* updates code comments and improve perf
* Adds asserts
* nit
* refactors code
* Adds condn to check _isServerReadyToServeQueries
* Updates code doc
* Adds test to verify metrics
* updates test
* supports multi-stream ingestion tracking
* Adds test for fetching latest stream offset
* Updates tests and adds metric description
* Fixes lint
* Removes e2e ingestion delay ms gauge
* nit
* Misc fixes and refactoring
* misc fixes
* minor refactoring
* refactoring and perf fix
* Fixes tests
* Addresses comments
* continue if failed to create stream metadata provider
* Adds ability to update stream metadata provider
* fixes lint
* Adds metrics and code comments
* minor refactoring
* Adds test for IngestionconfigUtil
* Fixes currentOffset bug
* restore StreamMessageMetadata class
* nit
---
.../apache/pinot/common/metrics/ServerGauge.java | 12 +-
.../apache/pinot/common/metrics/ServerMeter.java | 4 +-
.../apache/pinot/common/metrics/ServerTimer.java | 5 +-
.../prometheus/ServerPrometheusMetricsTest.java | 5 +-
.../manager/realtime/IngestionDelayTracker.java | 470 ++++++++++++++-------
.../realtime/RealtimeSegmentDataManager.java | 9 +-
.../manager/realtime/RealtimeTableDataManager.java | 27 +-
.../realtime/IngestionDelayTrackerTest.java | 329 ++++++++++++---
.../fakestream/FakeStreamMetadataProvider.java | 18 +
.../kafka20/KafkaStreamMetadataProvider.java | 28 ++
.../kafka20/KafkaPartitionLevelConsumerTest.java | 26 +-
.../kafka30/KafkaStreamMetadataProvider.java | 28 ++
.../kafka30/KafkaPartitionLevelConsumerTest.java | 26 +-
.../kinesis/KinesisStreamMetadataProvider.java | 5 +
.../pulsar/PulsarStreamMetadataProvider.java | 5 +
.../pinot/spi/stream/StreamMetadataProvider.java | 17 +
.../pinot/spi/utils/IngestionConfigUtils.java | 24 ++
.../pinot/spi/utils/IngestionConfigUtilsTest.java | 25 ++
18 files changed, 791 insertions(+), 272 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index dcc8d6776c5..a0401497b3c 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -85,9 +85,6 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
*/
NETTY_POOLED_THREADLOCALCACHE("bytes", true),
NETTY_POOLED_CHUNK_SIZE("bytes", true),
- // Ingestion delay metrics
- REALTIME_INGESTION_DELAY_MS("milliseconds", false),
- END_TO_END_REALTIME_INGESTION_DELAY_MS("milliseconds", false),
LUCENE_INDEXING_DELAY_MS("milliseconds", false),
LUCENE_INDEXING_DELAY_DOCS("documents", false),
// Needed to track if valid doc id snapshots are present for faster restarts
@@ -95,9 +92,12 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
UPSERT_QUERYABLE_DOC_ID_SNAPSHOT_COUNT("upsertQueryableDocIdSnapshotCount",
false),
UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT("upsertPrimaryKeysInSnapshotCount",
false),
UPSERT_QUERYABLE_DOCS_IN_SNAPSHOT_COUNT("upsertQueryableDocIdsInSnapshot",
false),
- REALTIME_INGESTION_OFFSET_LAG("offsetLag", false),
- REALTIME_INGESTION_UPSTREAM_OFFSET("upstreamOffset", false),
- REALTIME_INGESTION_CONSUMING_OFFSET("consumingOffset", false),
+ REALTIME_INGESTION_OFFSET_LAG("offsetLag", false,
+ "The difference between latest message offset and the last consumed
message offset."),
+ REALTIME_INGESTION_UPSTREAM_OFFSET("upstreamOffset", false, "The offset of
the latest message in the upstream."),
+ REALTIME_INGESTION_CONSUMING_OFFSET("consumingOffset", false, "The offset of
the last consumed message."),
+ REALTIME_INGESTION_DELAY_MS("milliseconds", false,
+ "The difference of the current timestamp and the timestamp present in
the last consumed message record."),
REALTIME_CONSUMER_DIR_USAGE("bytes", true),
SEGMENT_DOWNLOAD_SPEED("bytes", true),
PREDOWNLOAD_SPEED("bytes", true),
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index fbf006ec3d0..a10f25661c8 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -224,7 +224,9 @@ public enum ServerMeter implements AbstractMetrics.Meter {
*/
MUTABLE_JSON_INDEX_MEMORY_USAGE("bytes", false),
// Workload Budget exceeded counter
- WORKLOAD_BUDGET_EXCEEDED("workloadBudgetExceeded", false, "Number of times
workload budget exceeded");
+ WORKLOAD_BUDGET_EXCEEDED("workloadBudgetExceeded", false, "Number of times
workload budget exceeded"),
+ INGESTION_DELAY_TRACKING_ERRORS("errors", false,
+ "Indicates the count of errors encountered while tracking ingestion
delay.");
private final String _meterName;
private final String _unit;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
index 0f72acd8264..5c6604ad193 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerTimer.java
@@ -106,7 +106,10 @@ public enum ServerTimer implements AbstractMetrics.Timer {
SEGMENT_DOWNLOAD_FROM_DEEP_STORE_TIME_MS("millis", false,
"Time taken to download a segment from deep store (including untar and
move operations)"),
SEGMENT_DOWNLOAD_FROM_PEERS_TIME_MS("millis", false,
- "Time taken to download a segment from peers (including untar and move
operations)");
+ "Time taken to download a segment from peers (including untar and move
operations)"),
+
+ // Ingestion metrics
+ INGESTION_DELAY_TRACKING_MS("milliseconds", false, "Time taken to run a
trackIngestionDelay cycle");
private final String _timerName;
private final boolean _global;
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
index 4a4a3c14407..79fb88f56ed 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/metrics/prometheus/ServerPrometheusMetricsTest.java
@@ -55,9 +55,8 @@ public abstract class ServerPrometheusMetricsTest extends
PinotPrometheusMetrics
private static final List<ServerGauge> GAUGES_ACCEPTING_PARTITION =
List.of(ServerGauge.UPSERT_VALID_DOC_ID_SNAPSHOT_COUNT,
ServerGauge.UPSERT_PRIMARY_KEYS_IN_SNAPSHOT_COUNT,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
ServerGauge.REALTIME_INGESTION_DELAY_MS,
- ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
- ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET,
- ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
+ ServerGauge.UPSERT_PRIMARY_KEYS_COUNT,
ServerGauge.DEDUP_PRIMARY_KEYS_COUNT,
+ ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET,
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
private static final List<ServerGauge> GAUGES_ACCEPTING_RAW_TABLE_NAME =
List.of(ServerGauge.REALTIME_OFFHEAP_MEMORY_USED,
ServerGauge.REALTIME_SEGMENT_NUM_PARTITIONS,
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
index 90e19ead18f..cbdda8b8903 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTracker.java
@@ -21,6 +21,7 @@ package org.apache.pinot.core.data.manager.realtime;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
+import java.io.IOException;
import java.time.Clock;
import java.util.ArrayList;
import java.util.HashSet;
@@ -30,30 +31,34 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.common.metrics.ServerTimer;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* A Class to track realtime ingestion delay for table partitions on a given
server.
* Highlights:
* 1-An object of this class is hosted by each RealtimeTableDataManager.
* 2-The object tracks ingestion delays for all partitions hosted by the
current server for the given Realtime table.
- * 3-Partition delays are updated by all RealtimeSegmentDataManager objects
hosted in the corresponding
- * RealtimeTableDataManager.
+ * 3-The current consumption status of partitions are updated by the
RealtimeSegmentDataManager objects hosted in the
+ * corresponding RealtimeTableDataManager.
* 4-Individual metrics are associated with each partition being tracked.
* 5-Delays for partitions that do not have events to consume are reported as
zero.
* 6-Partitions whose Segments go from CONSUMING to DROPPED state stop being
tracked so their delays do not cloud
@@ -63,6 +68,8 @@ import org.slf4j.LoggerFactory;
* partition. If not, we stop tracking the respective partition.
* 8-A scheduled executor thread is started by this object to track timeouts
of partitions and drive the reading
* of their ideal state.
+ * 9-A scheduled executor thread is started by this object to fetch the latest
stream offset from upstream and create
+ * metrics for new partitions for which there is no consumer present on the
server.
*
* The following diagram illustrates the object interactions with main
external APIs
*
@@ -82,139 +89,253 @@ import org.slf4j.LoggerFactory;
* | TimerTrackingTask | (CONSUMING -> DROPPED state change)
* |___________________|
*
- * TODO: handle bug situations like the one where a partition is not allocated
to a given server due to a bug.
*/
public class IngestionDelayTracker {
private static class IngestionInfo {
- final long _ingestionTimeMs;
- final long _firstStreamIngestionTimeMs;
- final StreamPartitionMsgOffset _currentOffset;
- final StreamPartitionMsgOffset _latestOffset;
+ volatile long _ingestionTimeMs;
+ @Nullable
+ volatile StreamPartitionMsgOffset _currentOffset;
+
+ IngestionInfo(long ingestionTimeMs, @Nullable StreamPartitionMsgOffset
currentOffset) {
+ _ingestionTimeMs = ingestionTimeMs;
+ _currentOffset = currentOffset;
+ }
- IngestionInfo(long ingestionTimeMs, long firstStreamIngestionTimeMs,
- @Nullable StreamPartitionMsgOffset currentOffset, @Nullable
StreamPartitionMsgOffset latestOffset) {
+ void update(long ingestionTimeMs, @Nullable StreamPartitionMsgOffset
currentOffset) {
_ingestionTimeMs = ingestionTimeMs;
- _firstStreamIngestionTimeMs = firstStreamIngestionTimeMs;
_currentOffset = currentOffset;
- _latestOffset = latestOffset;
}
}
private static final Logger LOGGER =
LoggerFactory.getLogger(IngestionDelayTracker.class);
-
// Sleep interval for scheduled executor service thread that triggers read
of ideal state
- private static final int SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS =
300000; // 5 minutes +/- precision in timeouts
+ // 5 minutes +/- precision in timeouts
+ private static final long METRICS_REMOVAL_INTERVAL_MS =
TimeUnit.MINUTES.toMillis(5);
+ // 30 seconds +/- precision in timeouts
+ private static final long METRICS_TRACKING_INTERVAL_MS =
TimeUnit.SECONDS.toMillis(30);
// Once a partition is marked for verification, we wait 10 minutes to pull
its ideal state.
- private static final int PARTITION_TIMEOUT_MS = 600000; // 10
minutes timeouts
+ private static final long PARTITION_TIMEOUT_MS =
TimeUnit.MINUTES.toMillis(10);
// Delay scheduled executor service for this amount of time after starting
service
private static final int INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS = 100;
-
// Cache expire time for ignored segment if there is no update from the
segment.
private static final int IGNORED_SEGMENT_CACHE_TIME_MINUTES = 10;
-
- // Per partition info for all partitions active for the current table.
- private final Map<Integer, IngestionInfo> _ingestionInfoMap = new
ConcurrentHashMap<>();
-
- // We mark partitions that go from CONSUMING to ONLINE in
_partitionsMarkedForVerification: if they do not
- // go back to CONSUMING in some period of time, we verify whether they are
still hosted in this server by reading
- // ideal state. This is done with the goal of minimizing reading ideal state
for efficiency reasons.
- // TODO: Consider removing this mechanism after releasing 1.2.0, and use
{@link #stopTrackingPartitionIngestionDelay}
- // instead.
- private final Map<Integer, Long> _partitionsMarkedForVerification = new
ConcurrentHashMap<>();
-
- private final Cache<String, Boolean> _segmentsToIgnore =
-
CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES,
TimeUnit.MINUTES).build();
-
- // TODO: Make thread pool a server/cluster level config
- // ScheduledExecutorService to check partitions that are inactive against
ideal state.
- private final ScheduledExecutorService _scheduledExecutor =
Executors.newScheduledThreadPool(2);
+ // Timeout after 5 seconds while fetching the latest stream offset.
+ private static final long LATEST_STREAM_OFFSET_FETCH_TIMEOUT_MS =
TimeUnit.SECONDS.toMillis(5);
private final ServerMetrics _serverMetrics;
private final String _tableNameWithType;
private final String _metricName;
-
private final RealtimeTableDataManager _realTimeTableDataManager;
private final Supplier<Boolean> _isServerReadyToServeQueries;
+ private final Cache<String, Boolean> _segmentsToIgnore =
+
CacheBuilder.newBuilder().expireAfterAccess(IGNORED_SEGMENT_CACHE_TIME_MINUTES,
TimeUnit.MINUTES).build();
+ // Map to describe the partitions for which the metrics are being reported.
+ // This map is accessed by:
+ // 1. _ingestionDelayTrackingScheduler thread.
+ // 2. All threads which can removes metrics - Consumer thread, Helix Thread,
Server API Thread, etc.
+ // 3. Consumer thread when it updates the ingestion info of the partition
for the first time.
+ private final Map<Integer, Boolean> _partitionsTracked = new
ConcurrentHashMap<>();
+ // Map to hold the ingestion info reported by the consumer.
+ // This map is accessed by:
+ // 1. _ingestionDelayTrackingScheduler thread.
+ // 2. All threads which can removes metrics - Consumer thread, Helix Thread,
Server API Thread, etc.
+ // 3. Consumer thread when it updates the ingestion info of the partition.
+ private final Map<Integer, IngestionInfo> _ingestionInfoMap = new
ConcurrentHashMap<>();
+ private final Map<Integer, Long> _partitionsMarkedForVerification = new
ConcurrentHashMap<>();
+ private final ScheduledExecutorService _metricsRemovalScheduler;
+ private final ScheduledExecutorService _ingestionDelayTrackingScheduler;
- private Clock _clock;
+ private Clock _clock = Clock.systemUTC();
+
+ protected volatile Map<Integer, StreamPartitionMsgOffset>
_partitionIdToLatestOffset;
+ protected volatile Set<Integer> _partitionsHostedByThisServer = new
HashSet<>();
+ // Map of StreamMetadataProvider to fetch upstream latest stream offset
(Table can have multiple upstream topics)
+ // This map is accessed by:
+ // 1. _ingestionDelayTrackingScheduler thread.
+ // 2. All threads which can removes metrics - Consumer thread, Helix Thread,
Server API Thread, etc.
+ protected Map<Integer, StreamMetadataProvider>
_streamConfigIndexToStreamMetadataProvider = new ConcurrentHashMap<>();
- @VisibleForTesting
public IngestionDelayTracker(ServerMetrics serverMetrics, String
tableNameWithType,
- RealtimeTableDataManager realtimeTableDataManager, int
scheduledExecutorThreadTickIntervalMs,
- Supplier<Boolean> isServerReadyToServeQueries)
+ RealtimeTableDataManager realtimeTableDataManager)
throws RuntimeException {
+ this(serverMetrics, tableNameWithType, realtimeTableDataManager,
METRICS_REMOVAL_INTERVAL_MS,
+ METRICS_TRACKING_INTERVAL_MS,
realtimeTableDataManager.getIsServerReadyToServeQueries());
+ }
+
+ @VisibleForTesting
+ public IngestionDelayTracker(ServerMetrics serverMetrics, String
tableNameWithType,
+ RealtimeTableDataManager realtimeTableDataManager, long
metricsRemovalIntervalMs, long metricsTrackingIntervalMs,
+ Supplier<Boolean> isServerReadyToServeQueries) {
_serverMetrics = serverMetrics;
_tableNameWithType = tableNameWithType;
_metricName = tableNameWithType;
_realTimeTableDataManager = realtimeTableDataManager;
- _clock = Clock.systemUTC();
_isServerReadyToServeQueries = isServerReadyToServeQueries;
- // Handle negative timer values
- if (scheduledExecutorThreadTickIntervalMs <= 0) {
- throw new RuntimeException("Illegal timer timeout argument, expected >
0, got="
- + scheduledExecutorThreadTickIntervalMs + " for table=" +
_tableNameWithType);
+
+ _metricsRemovalScheduler =
Executors.newSingleThreadScheduledExecutor(getThreadFactory(
+ "IngestionDelayMetricsRemovalThread-" +
TableNameBuilder.extractRawTableName(tableNameWithType)));
+ // schedule periodically to remove in-active partitions.
+
_metricsRemovalScheduler.scheduleWithFixedDelay(this::timeoutInactivePartitions,
+ INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, metricsRemovalIntervalMs,
TimeUnit.MILLISECONDS);
+
+ createOrUpdateStreamMetadataProvider();
+
+ _ingestionDelayTrackingScheduler =
Executors.newSingleThreadScheduledExecutor(
+ getThreadFactory("IngestionDelayTrackingThread-" +
TableNameBuilder.extractRawTableName(tableNameWithType)));
+ // schedule periodically to update latest upstream offset or create
metrics for newly added partitions.
+
_ingestionDelayTrackingScheduler.scheduleWithFixedDelay(this::trackIngestionDelay,
+ INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS, metricsTrackingIntervalMs,
TimeUnit.MILLISECONDS);
+ }
+
+ private void createOrUpdateStreamMetadataProvider() {
+ List<StreamConfig> streamConfigs =
+
IngestionConfigUtils.getStreamConfigs(_realTimeTableDataManager.getCachedTableConfigAndSchema().getLeft());
+
+ for (int streamConfigIndex = 0; streamConfigIndex < streamConfigs.size();
streamConfigIndex++) {
+ if
(_streamConfigIndexToStreamMetadataProvider.containsKey(streamConfigIndex)) {
+ continue;
+ }
+ StreamConfig streamConfig = null;
+ StreamMetadataProvider streamMetadataProvider;
+ try {
+ streamConfig = streamConfigs.get(streamConfigIndex);
+ StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ String clientId =
+
IngestionConfigUtils.getTableTopicUniqueClientId(IngestionDelayTracker.class.getSimpleName(),
streamConfig);
+ streamMetadataProvider =
streamConsumerFactory.createStreamMetadataProvider(clientId);
+ } catch (Exception e) {
+ LOGGER.error("Failed to create stream metadata provider for
streamConfig: {}", streamConfig, e);
+ continue;
+ }
+
+ assert streamMetadataProvider != null;
+ _streamConfigIndexToStreamMetadataProvider.put(streamConfigIndex,
streamMetadataProvider);
+
+ if ((streamMetadataProvider.supportsOffsetLag()) &&
(_partitionIdToLatestOffset == null)) {
+ _partitionIdToLatestOffset = new ConcurrentHashMap<>();
+ }
}
+ }
+
+ private void trackIngestionDelay() {
+ long startMs = System.currentTimeMillis();
+ try {
+ if (!_isServerReadyToServeQueries.get() ||
_realTimeTableDataManager.isShutDown()) {
+ // Do not update the ingestion delay metrics during server startup
period
+ // or once the table data manager has been shutdown.
+ return;
+ }
+ Set<Integer> partitionsHosted = _partitionsHostedByThisServer;
+
+ if (_ingestionInfoMap.size() > partitionsHosted.size()) {
+ // In-case new partition got assigned to the server before
_partitionsHostedByThisServer was updated.
+ partitionsHosted.addAll(_ingestionInfoMap.keySet());
+ }
+
+ if (partitionsHosted.isEmpty()) {
+ return;
+ }
+
+ updateLatestStreamOffset(partitionsHosted);
+
+ for (Integer partitionId : partitionsHosted) {
+ _partitionsTracked.computeIfAbsent(partitionId, k -> {
+ createMetrics(partitionId);
+ return true;
+ });
+ }
+ } catch (Throwable t) {
+ LOGGER.error("Failed to track ingestion delay metrics.", t);
+
_serverMetrics.addMeteredTableValue(_realTimeTableDataManager.getTableName(),
+ ServerMeter.INGESTION_DELAY_TRACKING_ERRORS, 1);
+ } finally {
+ _serverMetrics.addTimedTableValue(_metricName,
ServerTimer.INGESTION_DELAY_TRACKING_MS,
+ System.currentTimeMillis() - startMs, TimeUnit.MILLISECONDS);
+ }
+ }
- // ThreadFactory to set the thread's name
- ThreadFactory threadFactory = new ThreadFactory() {
+ @VisibleForTesting
+ void updateLatestStreamOffset(Set<Integer> partitionsHosted) {
+ Map<Integer, Set<Integer>> streamIndexToStreamPartitionIds =
+
IngestionConfigUtils.getStreamConfigIndexToStreamPartitions(partitionsHosted);
+
+ if (streamIndexToStreamPartitionIds.size() >
_streamConfigIndexToStreamMetadataProvider.size()) {
+ // There might be a new stream config added or need to retry creation of
streamMetadataProvider for a stream
+ // which might have failed before.
+ createOrUpdateStreamMetadataProvider();
+ }
+
+ for (Map.Entry<Integer, StreamMetadataProvider> entry :
_streamConfigIndexToStreamMetadataProvider.entrySet()) {
+ int streamIndex = entry.getKey();
+ StreamMetadataProvider streamMetadataProvider = entry.getValue();
+
+ if (!streamIndexToStreamPartitionIds.containsKey(streamIndex)) {
+ // Server is not hosting any partitions of this stream.
+ continue;
+ }
+
+ if (streamMetadataProvider.supportsOffsetLag()) {
+ Set<Integer> streamPartitionIds =
streamIndexToStreamPartitionIds.get(streamIndex);
+ try {
+ Map<Integer, StreamPartitionMsgOffset>
streamPartitionIdToLatestOffset =
+
streamMetadataProvider.fetchLatestStreamOffset(streamPartitionIds,
+ LATEST_STREAM_OFFSET_FETCH_TIMEOUT_MS);
+ if (streamIndex > 0) {
+ // Need to convert stream partition Ids back to pinot partition
Ids.
+ for (Map.Entry<Integer, StreamPartitionMsgOffset> latestOffsetEntry
+ : streamPartitionIdToLatestOffset.entrySet()) {
+ _partitionIdToLatestOffset.put(
+
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(latestOffsetEntry.getKey(),
+ streamIndex), latestOffsetEntry.getValue());
+ }
+ } else {
+ _partitionIdToLatestOffset.putAll(streamPartitionIdToLatestOffset);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to update latest stream offsets for partitions:
{}", streamPartitionIds, e);
+ }
+ }
+ }
+ }
+
+ private ThreadFactory getThreadFactory(String threadName) {
+ return new ThreadFactory() {
private final ThreadFactory _defaultFactory =
Executors.defaultThreadFactory();
@Override
public Thread newThread(Runnable r) {
Thread thread = _defaultFactory.newThread(r);
- thread.setName("IngestionDelayTimerThread-" +
TableNameBuilder.extractRawTableName(tableNameWithType));
+ thread.setName(threadName);
return thread;
}
};
- ((ScheduledThreadPoolExecutor)
_scheduledExecutor).setThreadFactory(threadFactory);
-
- _scheduledExecutor.scheduleWithFixedDelay(this::timeoutInactivePartitions,
- INITIAL_SCHEDULED_EXECUTOR_THREAD_DELAY_MS,
scheduledExecutorThreadTickIntervalMs, TimeUnit.MILLISECONDS);
}
- public IngestionDelayTracker(ServerMetrics serverMetrics, String
tableNameWithType,
- RealtimeTableDataManager tableDataManager, Supplier<Boolean>
isServerReadyToServeQueries) {
- this(serverMetrics, tableNameWithType, tableDataManager,
SCHEDULED_EXECUTOR_THREAD_TICK_INTERVAL_MS,
- isServerReadyToServeQueries);
- }
-
- /*
- * Helper function to get the ingestion delay for a given ingestion time.
- * Ingestion delay == Current Time - Ingestion Time
- *
- * @param ingestionTimeMs original ingestion time in milliseconds.
- */
- private long getIngestionDelayMs(long ingestionTimeMs) {
- if (ingestionTimeMs < 0) {
- return 0;
- }
- // Compute aged delay for current partition
- long agedIngestionDelayMs = _clock.millis() - ingestionTimeMs;
- // Correct to zero for any time shifts due to NTP or time reset.
- agedIngestionDelayMs = Math.max(agedIngestionDelayMs, 0);
- return agedIngestionDelayMs;
- }
-
- /*
+ /**
* Helper function to be called when we should stop tracking a given
partition. Removes the partition from
* all our maps.
*
* @param partitionId partition ID which we should stop tracking.
*/
private void removePartitionId(int partitionId) {
- _ingestionInfoMap.compute(partitionId, (k, v) -> {
+ _partitionsTracked.compute(partitionId, (k, v) -> {
if (v != null) {
+ int streamConfigIndex =
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
+ StreamMetadataProvider streamMetadataProvider =
+ _streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex);
// Remove all metrics associated with this partition
+ if (streamMetadataProvider != null &&
streamMetadataProvider.supportsOffsetLag()) {
+ _serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
+ _serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET);
+ _serverMetrics.removePartitionGauge(_metricName, partitionId,
+ ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
+ }
_serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_DELAY_MS);
- _serverMetrics.removePartitionGauge(_metricName, partitionId,
- ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS);
- _serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG);
- _serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET);
- _serverMetrics.removePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET);
LOGGER.info("Successfully removed ingestion metrics for partition id:
{}", partitionId);
}
+ _ingestionInfoMap.remove(partitionId);
return null;
});
@@ -222,7 +343,7 @@ public class IngestionDelayTracker {
_partitionsMarkedForVerification.remove(partitionId);
}
- /*
+ /**
* Helper functions that creates a list of all the partitions that are
marked for verification and whose
* timeouts are expired. This helps us optimize checks of the ideal state.
*/
@@ -248,27 +369,41 @@ public class IngestionDelayTracker {
_clock = clock;
}
+ public void createMetrics(int partitionId) {
+ int streamConfigIndex =
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
+ StreamMetadataProvider streamMetadataProvider =
_streamConfigIndexToStreamMetadataProvider.get(streamConfigIndex);
+
+ if (streamMetadataProvider != null &&
streamMetadataProvider.supportsOffsetLag()) {
+ _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
+ () -> getPartitionIngestionOffsetLag(partitionId));
+
+ _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
+ ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET, () ->
getPartitionIngestionConsumingOffset(partitionId));
+
+ _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
+ ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, () ->
getLatestPartitionOffset(partitionId));
+ }
+ _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_DELAY_MS,
+ () -> getPartitionIngestionDelayMs(partitionId));
+ }
+
/**
* Called by RealTimeSegmentDataManagers to update the ingestion delay
metrics for a given partition.
*
* @param segmentName name of the consuming segment
* @param partitionId partition id of the consuming segment (directly passed
in to avoid parsing the segment name)
* @param ingestionTimeMs ingestion time of the last consumed message (from
{@link StreamMessageMetadata})
- * @param firstStreamIngestionTimeMs ingestion time of the last consumed
message in the first stream (from
- * {@link StreamMessageMetadata})
* @param currentOffset offset of the last consumed message (from {@link
StreamMessageMetadata})
- * @param latestOffset offset of the latest message in the partition (from
{@link StreamMetadataProvider})
*/
- public void updateIngestionMetrics(String segmentName, int partitionId, long
ingestionTimeMs,
- long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset
currentOffset,
- @Nullable StreamPartitionMsgOffset latestOffset) {
+ public void updateMetrics(String segmentName, int partitionId, long
ingestionTimeMs,
+ @Nullable StreamPartitionMsgOffset currentOffset) {
if (!_isServerReadyToServeQueries.get() ||
_realTimeTableDataManager.isShutDown()) {
// Do not update the ingestion delay metrics during server startup period
// or once the table data manager has been shutdown.
return;
}
- if (ingestionTimeMs < 0 && firstStreamIngestionTimeMs < 0 &&
(currentOffset == null || latestOffset == null)) {
+ if ((ingestionTimeMs < 0) && (currentOffset == null)) {
// Do not publish metrics if stream does not return valid ingestion time
or offset.
return;
}
@@ -279,45 +414,27 @@ public class IngestionDelayTracker {
return v;
}
if (v == null) {
- // Add metric when we start tracking a partition. Only publish the
metric if supported by the stream.
- if (ingestionTimeMs > 0) {
- _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_DELAY_MS,
- () -> getPartitionIngestionDelayMs(partitionId));
- }
- if (firstStreamIngestionTimeMs > 0) {
- _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
- ServerGauge.END_TO_END_REALTIME_INGESTION_DELAY_MS,
- () -> getPartitionEndToEndIngestionDelayMs(partitionId));
- }
- if (currentOffset != null && latestOffset != null) {
- _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG,
- () -> getPartitionIngestionOffsetLag(partitionId));
- }
-
- if (currentOffset != null) {
- _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
- ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET, () ->
getPartitionIngestionConsumingOffset(partitionId));
- }
-
- if (latestOffset != null) {
- _serverMetrics.setOrUpdatePartitionGauge(_metricName, partitionId,
- ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET, () ->
getPartitionIngestionUpstreamOffset(partitionId));
- }
+ _partitionsTracked.computeIfAbsent(partitionId, pid -> {
+ createMetrics(partitionId);
+ return true;
+ });
+ return new IngestionInfo(ingestionTimeMs, currentOffset);
}
- return new IngestionInfo(ingestionTimeMs, firstStreamIngestionTimeMs,
currentOffset, latestOffset);
+ v.update(ingestionTimeMs, currentOffset);
+ return v;
});
// If we are consuming we do not need to track this partition for removal.
_partitionsMarkedForVerification.remove(partitionId);
}
- /*
+ /**
* Handle partition removal event. This must be invoked when we stop serving
a given partition for
* this table in the current server.
*
* @param partitionId partition id that we should stop tracking.
*/
- public void stopTrackingPartitionIngestionDelay(int partitionId) {
+ public void stopTrackingPartition(int partitionId) {
removePartitionId(partitionId);
}
@@ -327,7 +444,7 @@ public class IngestionDelayTracker {
*
* @return Set of partitionIds for which ingestion metrics were removed.
*/
- public Set<Integer> stopTrackingIngestionDelayForAllPartitions() {
+ public Set<Integer> stopTrackingAllPartitions() {
Set<Integer> removedPartitionIds = new
HashSet<>(_ingestionInfoMap.keySet());
for (Integer partitionId : _ingestionInfoMap.keySet()) {
removePartitionId(partitionId);
@@ -340,12 +457,12 @@ public class IngestionDelayTracker {
* when we want to stop tracking the ingestion delay for a partition when
the segment might still be consuming, e.g.
* when the new consuming segment is created on a different server.
*/
- public void stopTrackingPartitionIngestionDelay(String segmentName) {
+ public void stopTrackingPartition(String segmentName) {
_segmentsToIgnore.put(segmentName, true);
removePartitionId(new LLCSegmentName(segmentName).getPartitionGroupId());
}
- /*
+ /**
* This method is used for timing out inactive partitions, so we don't
display their metrics on current server.
* When the inactive time exceeds some threshold, we read from ideal state
to confirm we still host the partition,
* if not we remove the partition from being tracked locally.
@@ -377,6 +494,7 @@ public class IngestionDelayTracker {
removePartitionId(partitionId);
}
}
+ _partitionsHostedByThisServer = partitionsHostedByThisServer;
}
/**
@@ -391,7 +509,7 @@ public class IngestionDelayTracker {
_partitionsMarkedForVerification.put(new
LLCSegmentName(segmentName).getPartitionGroupId(), _clock.millis());
}
- /*
+ /**
* Method to get timestamp used for the ingestion delay for a given
partition.
*
* @param partitionId partition for which we are retrieving the delay
@@ -403,7 +521,7 @@ public class IngestionDelayTracker {
return ingestionInfo != null ? ingestionInfo._ingestionTimeMs :
Long.MIN_VALUE;
}
- /*
+ /**
* Method to get ingestion delay for a given partition.
*
* @param partitionId partition for which we are retrieving the delay
@@ -412,79 +530,103 @@ public class IngestionDelayTracker {
*/
public long getPartitionIngestionDelayMs(int partitionId) {
IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
- return ingestionInfo != null ?
getIngestionDelayMs(ingestionInfo._ingestionTimeMs) : 0;
+ long ingestionTimeMs = 0;
+ if ((ingestionInfo != null) && (ingestionInfo._ingestionTimeMs > 0)) {
+ ingestionTimeMs = ingestionInfo._ingestionTimeMs;
+ }
+ // Compute aged delay for current partition
+ long agedIngestionDelayMs = _clock.millis() - ingestionTimeMs;
+ // Correct to zero for any time shifts due to NTP or time reset.
+ return Math.max(agedIngestionDelayMs, 0);
}
- /*
- * Method to get end to end ingestion delay for a given partition.
- *
- * @param partitionId partition for which we are retrieving the delay
+ /**
+ * Computes the ingestion lag for the given partition based on offset
difference.
+ * <p>
+ * The lag is calculated as the difference between the latest upstream offset
+ * and the current consuming offset. Only {@link LongMsgOffset} types are
supported.
*
- * @return End to end ingestion delay in milliseconds for the given
partition ID.
+ * @param partitionId partition for which the ingestion lag is computed
+ * @return offset lag for the given partition
*/
- public long getPartitionEndToEndIngestionDelayMs(int partitionId) {
- IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
- return ingestionInfo != null ?
getIngestionDelayMs(ingestionInfo._firstStreamIngestionTimeMs) : 0;
- }
-
public long getPartitionIngestionOffsetLag(int partitionId) {
- IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
- if (ingestionInfo == null) {
- return 0;
- }
- StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
- StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
- if (currentOffset == null || latestOffset == null) {
+ StreamPartitionMsgOffset latestOffset =
_partitionIdToLatestOffset.get(partitionId);
+ if (latestOffset == null) {
return 0;
}
- // TODO: Support other types of offsets
- if (!(currentOffset instanceof LongMsgOffset && latestOffset instanceof
LongMsgOffset)) {
- return 0;
+ IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
+ long currentOffset = 0;
+ if (ingestionInfo != null) {
+ StreamPartitionMsgOffset currentMsgOffset = ingestionInfo._currentOffset;
+ if (currentMsgOffset == null) {
+ // If currentOffset is set to null, it means:
+ // 1. The stream does not support offset lag (example: Kinesis). But
if this was true,
+ // IngestionOffsetLag gauge will not be created and this method will
never be called.
+ // 2. Server has caught up. Think of scenario where server restarts
and server is already caught up.
+ return 0;
+ }
+ assert currentMsgOffset instanceof LongMsgOffset;
+ currentOffset = ((LongMsgOffset) currentMsgOffset).getOffset();
}
- return ((LongMsgOffset) latestOffset).getOffset() - ((LongMsgOffset)
currentOffset).getOffset();
+ assert latestOffset instanceof LongMsgOffset;
+ return Math.max(0, ((LongMsgOffset) latestOffset).getOffset() -
currentOffset);
}
- // Get the consuming offset for a given partition
+ /**
+ * Retrieves the latest offset consumed for the given partition.
+ *
+ * @param partitionId partition for which the consuming offset was retrieved
+ * @return consuming offset value for the given partition, or {@code 0} if
no ingestion info is available
+ */
public long getPartitionIngestionConsumingOffset(int partitionId) {
IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
if (ingestionInfo == null) {
return 0;
}
- StreamPartitionMsgOffset currentOffset = ingestionInfo._currentOffset;
- if (currentOffset == null) {
- return 0;
- }
- // TODO: Support other types of offsets
- if (!(currentOffset instanceof LongMsgOffset)) {
+ StreamPartitionMsgOffset currentMsgOffset = ingestionInfo._currentOffset;
+ if (currentMsgOffset == null) {
+ // If currentOffset is set to null, it means:
+ // 1. The stream does not support offset lag (example: Kinesis). But if
this was true,
+ // IngestionOffsetLag gauge will not be created and this method will
never be called.
+ // 2. Server has caught up. Think of scenario where server restarts and
server is already caught up.
return 0;
}
- return ((LongMsgOffset) currentOffset).getOffset();
+ assert currentMsgOffset instanceof LongMsgOffset;
+ return ((LongMsgOffset) currentMsgOffset).getOffset();
}
- // Get the latest offset in upstream data source for a given partition
- public long getPartitionIngestionUpstreamOffset(int partitionId) {
- IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
- if (ingestionInfo == null) {
- return 0;
- }
- StreamPartitionMsgOffset latestOffset = ingestionInfo._latestOffset;
+ /**
+ * Retrieves the latest offset in the upstream data source for the given
partition.
+ *
+ * @param partitionId partition for which the latest upstream offset is
retrieved
+ * @return latest offset value for the given partition, or {@code 0} if not
available
+ */
+ public long getLatestPartitionOffset(int partitionId) {
+ StreamPartitionMsgOffset latestOffset =
_partitionIdToLatestOffset.get(partitionId);
if (latestOffset == null) {
return 0;
}
- // TODO: Support other types of offsets
- if (!(latestOffset instanceof LongMsgOffset)) {
- return 0;
- }
+ assert latestOffset instanceof LongMsgOffset;
return ((LongMsgOffset) latestOffset).getOffset();
}
- /*
+ /**
* We use this method to clean up when a table is being removed. No updates
are expected at this time as all
* RealtimeSegmentManagers should be down now.
*/
public void shutdown() {
// Now that segments can't report metric, destroy metric for this table
- _scheduledExecutor.shutdown(); // ScheduledExecutor is installed in
constructor so must always be cancelled
+ _metricsRemovalScheduler.shutdown(); // ScheduledExecutor is installed in
constructor so must always be cancelled
+ if (_ingestionDelayTrackingScheduler != null) {
+ _ingestionDelayTrackingScheduler.shutdown();
+ }
+ for (StreamMetadataProvider streamMetadataProvider :
_streamConfigIndexToStreamMetadataProvider.values()) {
+ try {
+ streamMetadataProvider.close();
+ } catch (IOException e) {
+ LOGGER.error("Failed to close streamMetadataProvider", e);
+ }
+ }
if (!_isServerReadyToServeQueries.get()) {
// Do not update the tracker state during server startup period
return;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 597aa40ae2c..17336e89ff2 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -2013,12 +2013,10 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private void updateIngestionMetrics(StreamMessageMetadata metadata) {
if (metadata != null) {
try {
- StreamPartitionMsgOffset latestOffset = fetchLatestStreamOffset(5000,
true);
_realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr,
_partitionGroupId,
- metadata.getRecordIngestionTimeMs(),
metadata.getFirstStreamRecordIngestionTimeMs(), metadata.getOffset(),
- latestOffset);
+ metadata.getRecordIngestionTimeMs(), metadata.getOffset());
} catch (Exception e) {
- _segmentLogger.warn("Failed to fetch latest offset for updating
ingestion delay", e);
+ _segmentLogger.warn("Failed to update the ingestion metrics", e);
}
}
}
@@ -2029,8 +2027,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
*/
private void setIngestionDelayToZero() {
long currentTimeMs = System.currentTimeMillis();
- _realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr,
_partitionGroupId, currentTimeMs, currentTimeMs,
- null, null);
+ _realtimeTableDataManager.updateIngestionMetrics(_segmentNameStr,
_partitionGroupId, currentTimeMs, null);
}
// This should be done during commit? We may not always commit when we build
a segment....
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 406712e9d68..69184e2bfc0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -84,7 +84,6 @@ import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
-import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.Status;
@@ -160,8 +159,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
protected void doInit() {
_leaseExtender = SegmentBuildTimeLeaseExtender.getOrCreate(_instanceId,
_serverMetrics, _tableNameWithType);
// Tracks ingestion delay of all partitions being served for this table
- _ingestionDelayTracker =
- new IngestionDelayTracker(_serverMetrics, _tableNameWithType, this,
_isServerReadyToServeQueries);
+ _ingestionDelayTracker = new IngestionDelayTracker(_serverMetrics,
_tableNameWithType, this);
File statsFile = new File(_tableDataDir, STATS_FILE_NAME);
try {
_statsHistory = RealtimeSegmentStatsHistory.deserialzeFrom(statsFile);
@@ -294,16 +292,11 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
* @param segmentName name of the consuming segment
* @param partitionId partition id of the consuming segment (directly passed
in to avoid parsing the segment name)
* @param ingestionTimeMs ingestion time of the last consumed message (from
{@link StreamMessageMetadata})
- * @param firstStreamIngestionTimeMs ingestion time of the last consumed
message in the first stream (from
- * {@link StreamMessageMetadata})
* @param currentOffset offset of the last consumed message (from {@link
StreamMessageMetadata})
- * @param latestOffset offset of the latest message in the partition (from
{@link StreamMetadataProvider})
*/
public void updateIngestionMetrics(String segmentName, int partitionId, long
ingestionTimeMs,
- long firstStreamIngestionTimeMs, @Nullable StreamPartitionMsgOffset
currentOffset,
- @Nullable StreamPartitionMsgOffset latestOffset) {
- _ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId,
ingestionTimeMs, firstStreamIngestionTimeMs,
- currentOffset, latestOffset);
+ @Nullable StreamPartitionMsgOffset currentOffset) {
+ _ingestionDelayTracker.updateMetrics(segmentName, partitionId,
ingestionTimeMs, currentOffset);
}
/**
@@ -320,7 +313,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
* still be consuming, e.g. when the new consuming segment is created on a
different server.
*/
public void removeIngestionMetrics(String segmentName) {
- _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName);
+ _ingestionDelayTracker.stopTrackingPartition(segmentName);
}
/**
@@ -332,7 +325,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
@Override
public void onConsumingToDropped(String segmentName) {
// NOTE: No need to mark segment ignored here because it should have
already been dropped.
- _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(new
LLCSegmentName(segmentName).getPartitionGroupId());
+ _ingestionDelayTracker.stopTrackingPartition(new
LLCSegmentName(segmentName).getPartitionGroupId());
}
/**
@@ -345,7 +338,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
*/
@Override
public void onConsumingToOffline(String segmentName) {
- _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName);
+ _ingestionDelayTracker.stopTrackingPartition(segmentName);
}
@Override
@@ -511,10 +504,10 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
public Set<Integer> stopTrackingPartitionIngestionDelay(@Nullable
Set<Integer> partitionIds) {
if (CollectionUtils.isEmpty(partitionIds)) {
- return
_ingestionDelayTracker.stopTrackingIngestionDelayForAllPartitions();
+ return _ingestionDelayTracker.stopTrackingAllPartitions();
}
for (Integer partitionId: partitionIds) {
- _ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionId);
+ _ingestionDelayTracker.stopTrackingPartition(partitionId);
}
return partitionIds;
}
@@ -926,6 +919,10 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
_enforceConsumptionInOrder = enforceConsumptionInOrder;
}
+ public Supplier<Boolean> getIsServerReadyToServeQueries() {
+ return _isServerReadyToServeQueries;
+ }
+
/**
* Validate a schema against the table config for real-time record
consumption.
* Ideally, we should validate these things when schema is added or table is
created, but either of these
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
index 1fdd12e00e7..cf25c1d4a1e 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/IngestionDelayTrackerTest.java
@@ -22,53 +22,154 @@ import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.LLCSegmentName;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class IngestionDelayTrackerTest {
private static final String RAW_TABLE_NAME = "testTable";
private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
- private static final int TIMER_THREAD_TICK_INTERVAL_MS = 100;
+ private static final int METRICS_CLEANUP_TICK_INTERVAL_MS = 100;
+ private static final int METRICS_TRACKING_TICK_INTERVAL_MS = 100;
private final ServerMetrics _serverMetrics = mock(ServerMetrics.class);
- private final RealtimeTableDataManager _realtimeTableDataManager =
mock(RealtimeTableDataManager.class);
+ private static final RealtimeTableDataManager REALTIME_TABLE_DATA_MANAGER =
mock(RealtimeTableDataManager.class);
+
+ static {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setTimeColumnName("ts")
+ .setNullHandlingEnabled(true)
+ .setStreamConfigs(getStreamConfigs())
+ .build();
+
when(REALTIME_TABLE_DATA_MANAGER.getCachedTableConfigAndSchema()).thenReturn(Pair.of(tableConfig,
null));
+ }
+
+ private static Map<String, String> getStreamConfigs() {
+ Map<String, String> streamConfigs = new HashMap<>();
+ streamConfigs.put("streamType", "kafka");
+ streamConfigs.put("stream.kafka.topic.name", "test");
+ streamConfigs.put("stream.kafka.decoder.class.name",
+ "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder");
+ streamConfigs.put("stream.kafka.consumer.factory.class.name",
+
"org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConsumerFactory");
+ return streamConfigs;
+ }
+
+ private static class MockIngestionDelayTracker extends IngestionDelayTracker
{
+
+ private Map<Integer, Map<String, List<Long>>> _partitionToMetricToValues;
+ private ScheduledExecutorService _scheduledExecutorService;
+
+ public MockIngestionDelayTracker(ServerMetrics serverMetrics, String
tableNameWithType,
+ RealtimeTableDataManager realtimeTableDataManager)
+ throws RuntimeException {
+ this(serverMetrics, tableNameWithType, realtimeTableDataManager, 5, 5,
() -> true);
+ }
+
+ public MockIngestionDelayTracker(ServerMetrics serverMetrics, String
tableNameWithType,
+ RealtimeTableDataManager realtimeTableDataManager, int
timerThreadTickIntervalMs, int metricTrackingIntervalMs,
+ Supplier<Boolean> isServerReadyToServeQueries) {
+ super(serverMetrics, tableNameWithType, realtimeTableDataManager,
timerThreadTickIntervalMs,
+ metricTrackingIntervalMs, isServerReadyToServeQueries);
+ }
+
+ @Override
+ public void createMetrics(int partitionId) {
+ if (_partitionToMetricToValues == null) {
+ _partitionToMetricToValues = new ConcurrentHashMap<>();
+ _scheduledExecutorService = Executors.newScheduledThreadPool(2);
+ }
+ Map<String, List<Long>> metricToValues = new HashMap<>();
+ _partitionToMetricToValues.put(partitionId, metricToValues);
+ if
(_streamConfigIndexToStreamMetadataProvider.get(0).supportsOffsetLag()) {
+
metricToValues.put(ServerGauge.REALTIME_INGESTION_OFFSET_LAG.getGaugeName(),
new ArrayList<>());
+
metricToValues.put(ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET.getGaugeName(),
new ArrayList<>());
+
metricToValues.put(ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET.getGaugeName(),
new ArrayList<>());
+ }
+
metricToValues.put(ServerGauge.REALTIME_INGESTION_DELAY_MS.getGaugeName(), new
ArrayList<>());
+
+ _scheduledExecutorService.scheduleWithFixedDelay(() ->
_partitionToMetricToValues.compute(partitionId, (k, v) -> {
+ Map<String, List<Long>> metricToValuesForPartition =
_partitionToMetricToValues.get(partitionId);
+ if
(_streamConfigIndexToStreamMetadataProvider.get(0).supportsOffsetLag()) {
+
metricToValuesForPartition.get(ServerGauge.REALTIME_INGESTION_OFFSET_LAG.getGaugeName())
+ .add(getPartitionIngestionOffsetLag(partitionId));
+
metricToValuesForPartition.get(ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET.getGaugeName())
+ .add(getPartitionIngestionConsumingOffset(partitionId));
+
metricToValuesForPartition.get(ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET.getGaugeName())
+ .add(getLatestPartitionOffset(partitionId));
+ }
+
metricToValuesForPartition.get(ServerGauge.REALTIME_INGESTION_DELAY_MS.getGaugeName())
+ .add(getPartitionIngestionDelayMs(partitionId));
+ return metricToValuesForPartition;
+ }), 0, 10, TimeUnit.MILLISECONDS);
+ }
+
+ void updatePartitionIdToLatestOffset(Map<Integer,
StreamPartitionMsgOffset> partitionIdToLatestOffset) {
+ _partitionIdToLatestOffset = partitionIdToLatestOffset;
+ }
+
+ @Override
+ public void shutdown() {
+ if (_scheduledExecutorService != null) {
+ _scheduledExecutorService.shutdown();
+ }
+ super.shutdown();
+ }
+ }
private IngestionDelayTracker createTracker() {
- IngestionDelayTracker ingestionDelayTracker =
- new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME,
_realtimeTableDataManager, () -> true);
- // With no samples, the time reported must be zero
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0),
0);
- return ingestionDelayTracker;
+ return new MockIngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME,
REALTIME_TABLE_DATA_MANAGER);
}
@Test
public void testTrackerConstructors() {
// Test regular constructor
IngestionDelayTracker ingestionDelayTracker =
- new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME,
_realtimeTableDataManager, () -> true);
+ new MockIngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME,
REALTIME_TABLE_DATA_MANAGER);
Clock clock = Clock.systemUTC();
ingestionDelayTracker.setClock(clock);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0),
0);
+ Assert.assertTrue(ingestionDelayTracker.getPartitionIngestionDelayMs(0) <=
clock.millis());
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0),
Long.MIN_VALUE);
ingestionDelayTracker.shutdown();
// Test constructor with timer arguments
- ingestionDelayTracker = new IngestionDelayTracker(_serverMetrics,
REALTIME_TABLE_NAME, _realtimeTableDataManager,
- TIMER_THREAD_TICK_INTERVAL_MS, () -> true);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0),
0);
+ ingestionDelayTracker =
+ new MockIngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME,
REALTIME_TABLE_DATA_MANAGER,
+ METRICS_CLEANUP_TICK_INTERVAL_MS,
METRICS_TRACKING_TICK_INTERVAL_MS, () -> true);
+ Assert.assertTrue(ingestionDelayTracker.getPartitionIngestionDelayMs(0) <=
clock.millis());
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0),
Long.MIN_VALUE);
// Test bad timer args to the constructor
try {
- new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME,
_realtimeTableDataManager, 0, () -> true);
+ new IngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME,
REALTIME_TABLE_DATA_MANAGER, 0, 0, () -> true);
Assert.fail("Must have asserted due to invalid arguments"); //
Constructor must assert
} catch (Exception e) {
if ((e instanceof NullPointerException) || !(e instanceof
RuntimeException)) {
@@ -86,7 +187,7 @@ public class IngestionDelayTrackerTest {
final String segment1 = new LLCSegmentName(RAW_TABLE_NAME, partition1, 0,
234).getSegmentName();
IngestionDelayTracker ingestionDelayTracker = createTracker();
- // Use fixed clock so samples dont age
+ // Use fixed clock so samples don't age
Instant now = Instant.now();
ZoneId zoneId = ZoneId.systemDefault();
Clock clock = Clock.fixed(now, zoneId);
@@ -94,52 +195,37 @@ public class IngestionDelayTrackerTest {
// Test we follow a single partition up and down
for (long ingestionTimeMs = 0; ingestionTimeMs <= maxTestDelay;
ingestionTimeMs++) {
- long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
- ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
ingestionTimeMs, firstStreamIngestionTimeMs,
- null, null);
+ ingestionDelayTracker.updateMetrics(segment0, partition0,
ingestionTimeMs, null);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
clock.millis() - ingestionTimeMs);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
- clock.millis() - firstStreamIngestionTimeMs);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
ingestionTimeMs);
}
// Test tracking down a measure for a given partition
for (long ingestionTimeMs = maxTestDelay; ingestionTimeMs >= 0;
ingestionTimeMs--) {
- long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
- ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
ingestionTimeMs, firstStreamIngestionTimeMs,
- null, null);
+ ingestionDelayTracker.updateMetrics(segment0, partition0,
ingestionTimeMs, null);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
clock.millis() - ingestionTimeMs);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
- clock.millis() - (ingestionTimeMs + 1));
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
ingestionTimeMs);
}
// Make the current partition maximum
- ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
maxTestDelay, maxTestDelay, null, null);
+ ingestionDelayTracker.updateMetrics(segment0, partition0, maxTestDelay,
null);
// Bring up partition1 delay up and verify values
for (long ingestionTimeMs = 0; ingestionTimeMs <= 2 * maxTestDelay;
ingestionTimeMs++) {
long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
- ingestionDelayTracker.updateIngestionMetrics(segment1, partition1,
ingestionTimeMs, firstStreamIngestionTimeMs,
- null, null);
+ ingestionDelayTracker.updateMetrics(segment1, partition1,
ingestionTimeMs, null);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
clock.millis() - ingestionTimeMs);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
- clock.millis() - firstStreamIngestionTimeMs);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
ingestionTimeMs);
}
// Bring down values of partition1 and verify values
for (long ingestionTimeMs = 2 * maxTestDelay; ingestionTimeMs >= 0;
ingestionTimeMs--) {
- long firstStreamIngestionTimeMs = ingestionTimeMs + 1;
- ingestionDelayTracker.updateIngestionMetrics(segment1, partition1,
ingestionTimeMs, firstStreamIngestionTimeMs,
- null, null);
+ ingestionDelayTracker.updateMetrics(segment1, partition1,
ingestionTimeMs, null);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
clock.millis() - ingestionTimeMs);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
- clock.millis() - firstStreamIngestionTimeMs);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
ingestionTimeMs);
}
@@ -168,9 +254,8 @@ public class IngestionDelayTrackerTest {
Clock clock = Clock.fixed(now, zoneId);
ingestionDelayTracker.setClock(clock);
long ingestionTimeMs = clock.millis() - partition0Delay0;
- ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
ingestionTimeMs, ingestionTimeMs, null, null);
+ ingestionDelayTracker.updateMetrics(segment0, partition0, ingestionTimeMs,
null);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
partition0Delay0);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
partition0Delay0);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
ingestionTimeMs);
// Advance clock and test aging
@@ -178,14 +263,11 @@ public class IngestionDelayTrackerTest {
ingestionDelayTracker.setClock(offsetClock);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
(partition0Delay0 + partition0Offset0Ms));
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
- (partition0Delay0 + partition0Offset0Ms));
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
ingestionTimeMs);
ingestionTimeMs = offsetClock.millis() - partition0Delay1;
- ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
ingestionTimeMs, ingestionTimeMs, null, null);
+ ingestionDelayTracker.updateMetrics(segment0, partition0, ingestionTimeMs,
null);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition0),
partition0Delay1);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition0),
partition0Delay1);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition0),
ingestionTimeMs);
// Add some offset to the last sample and make sure we age that measure
properly
@@ -195,9 +277,8 @@ public class IngestionDelayTrackerTest {
(partition0Delay1 + partition0Offset1Ms));
ingestionTimeMs = offsetClock.millis() - partition1Delay0;
- ingestionDelayTracker.updateIngestionMetrics(segment1, partition1,
ingestionTimeMs, ingestionTimeMs, null, null);
+ ingestionDelayTracker.updateMetrics(segment1, partition1, ingestionTimeMs,
null);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partition1),
partition1Delay0);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partition1),
partition1Delay0);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partition1),
ingestionTimeMs);
// Add some offset to the last sample and make sure we age that measure
properly
@@ -225,19 +306,16 @@ public class IngestionDelayTrackerTest {
for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) {
String segmentName = new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0,
123).getSegmentName();
long ingestionTimeMs = clock.millis() - partitionId;
- ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId,
ingestionTimeMs, ingestionTimeMs, null,
- null);
+ ingestionDelayTracker.updateMetrics(segmentName, partitionId,
ingestionTimeMs, null);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId),
partitionId);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId),
partitionId);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partitionId),
ingestionTimeMs);
}
for (int partitionId = maxPartition; partitionId >= 0; partitionId--) {
- ingestionDelayTracker.stopTrackingPartitionIngestionDelay(partitionId);
+ ingestionDelayTracker.stopTrackingPartition(partitionId);
}
for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) {
// Untracked partitions must return 0
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId),
0);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId),
0);
+
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId),
clock.millis());
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(partitionId),
Long.MIN_VALUE);
}
}
@@ -253,20 +331,17 @@ public class IngestionDelayTrackerTest {
String segmentName = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
123).getSegmentName();
long ingestionTimeMs = clock.millis() - 10;
- ingestionDelayTracker.updateIngestionMetrics(segmentName, 0,
ingestionTimeMs, ingestionTimeMs, null, null);
+ ingestionDelayTracker.updateMetrics(segmentName, 0, ingestionTimeMs, null);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0),
10);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0),
10);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0),
ingestionTimeMs);
- ingestionDelayTracker.stopTrackingPartitionIngestionDelay(segmentName);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0),
0);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0),
0);
+ ingestionDelayTracker.stopTrackingPartition(segmentName);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0),
clock.millis());
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0),
Long.MIN_VALUE);
// Should not update metrics for removed segment
- ingestionDelayTracker.updateIngestionMetrics(segmentName, 0,
ingestionTimeMs, ingestionTimeMs, null, null);
- Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0),
0);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(0),
0);
+ ingestionDelayTracker.updateMetrics(segmentName, 0, ingestionTimeMs, null);
+ Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(0),
clock.millis());
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionTimeMs(0),
Long.MIN_VALUE);
}
@@ -285,10 +360,8 @@ public class IngestionDelayTrackerTest {
for (int partitionId = 0; partitionId <= maxTestDelay; partitionId++) {
String segmentName = new LLCSegmentName(RAW_TABLE_NAME, partitionId, 0,
123).getSegmentName();
long ingestionTimeMs = clock.millis() - partitionId;
- ingestionDelayTracker.updateIngestionMetrics(segmentName, partitionId,
ingestionTimeMs, ingestionTimeMs, null,
- null);
+ ingestionDelayTracker.updateMetrics(segmentName, partitionId,
ingestionTimeMs, null);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionDelayMs(partitionId),
partitionId);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionEndToEndIngestionDelayMs(partitionId),
partitionId);
}
ingestionDelayTracker.shutdown();
@@ -305,34 +378,154 @@ public class IngestionDelayTrackerTest {
final String segment1 = new LLCSegmentName(RAW_TABLE_NAME, partition1, 0,
234).getSegmentName();
IngestionDelayTracker ingestionDelayTracker = createTracker();
+ Map<Integer, StreamPartitionMsgOffset> partitionMsgOffsetMap = new
HashMap<>();
+ ((MockIngestionDelayTracker)
ingestionDelayTracker).updatePartitionIdToLatestOffset(partitionMsgOffsetMap);
// Test tracking offset lag for a single partition
StreamPartitionMsgOffset msgOffset0 = new LongMsgOffset(50);
StreamPartitionMsgOffset latestOffset0 = new LongMsgOffset(150);
- ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
- latestOffset0);
+ partitionMsgOffsetMap.put(partition0, latestOffset0);
+ ingestionDelayTracker.updateMetrics(segment0, partition0, Long.MIN_VALUE,
msgOffset0);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
100);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionUpstreamOffset(partition0),
150);
+
Assert.assertEquals(ingestionDelayTracker.getLatestPartitionOffset(partition0),
150);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionConsumingOffset(partition0),
50);
// Test tracking offset lag for another partition
StreamPartitionMsgOffset msgOffset1 = new LongMsgOffset(50);
StreamPartitionMsgOffset latestOffset1 = new LongMsgOffset(150);
- ingestionDelayTracker.updateIngestionMetrics(segment1, partition1,
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset1,
- latestOffset1);
+ partitionMsgOffsetMap.put(partition1, latestOffset1);
+ ingestionDelayTracker.updateMetrics(segment1, partition1, Long.MIN_VALUE,
msgOffset1);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition1),
100);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionUpstreamOffset(partition1),
150);
+
Assert.assertEquals(ingestionDelayTracker.getLatestPartitionOffset(partition1),
150);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionConsumingOffset(partition1),
50);
// Update offset lag for partition0
msgOffset0 = new LongMsgOffset(150);
latestOffset0 = new LongMsgOffset(200);
- ingestionDelayTracker.updateIngestionMetrics(segment0, partition0,
Long.MIN_VALUE, Long.MIN_VALUE, msgOffset0,
- latestOffset0);
+ partitionMsgOffsetMap.put(partition0, latestOffset0);
+ ingestionDelayTracker.updateMetrics(segment0, partition0, Long.MIN_VALUE,
msgOffset0);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionOffsetLag(partition0),
50);
-
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionUpstreamOffset(partition0),
200);
+
Assert.assertEquals(ingestionDelayTracker.getLatestPartitionOffset(partition0),
200);
Assert.assertEquals(ingestionDelayTracker.getPartitionIngestionConsumingOffset(partition0),
150);
ingestionDelayTracker.shutdown();
}
+
+ @Test
+ public void testUpdateLatestStreamOffset() {
+ IngestionDelayTracker ingestionDelayTracker = createTracker();
+ Set<Integer> partitionsHosted = new HashSet<>();
+ partitionsHosted.add(0);
+ partitionsHosted.add(1);
+
+ ingestionDelayTracker.updateLatestStreamOffset(partitionsHosted);
+
Assert.assertEquals(ingestionDelayTracker._partitionIdToLatestOffset.size(), 2);
+ Assert.assertEquals(((LongMsgOffset)
(ingestionDelayTracker._partitionIdToLatestOffset.get(0))).getOffset(),
+ Integer.MAX_VALUE);
+ Assert.assertEquals(((LongMsgOffset)
(ingestionDelayTracker._partitionIdToLatestOffset.get(1))).getOffset(),
+ Integer.MAX_VALUE);
+
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ List<Map<String, String>> streamConfigMaps = new ArrayList<>();
+ streamConfigMaps.add(getStreamConfigs());
+ streamConfigMaps.add(getStreamConfigs());
+ StreamIngestionConfig streamIngestionConfig = new
StreamIngestionConfig(streamConfigMaps);
+ ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setTimeColumnName("ts")
+ .setNullHandlingEnabled(true)
+ .setIngestionConfig(ingestionConfig)
+ .setStreamConfigs(getStreamConfigs())
+ .build();
+
when(REALTIME_TABLE_DATA_MANAGER.getCachedTableConfigAndSchema()).thenReturn(Pair.of(tableConfig,
null));
+ ingestionDelayTracker = createTracker();
+
partitionsHosted.add(IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(0,
1));
+ ingestionDelayTracker.updateLatestStreamOffset(partitionsHosted);
+
Assert.assertEquals(ingestionDelayTracker._partitionIdToLatestOffset.size(), 3);
+ Assert.assertEquals(((LongMsgOffset)
(ingestionDelayTracker._partitionIdToLatestOffset.get(0))).getOffset(),
+ Integer.MAX_VALUE);
+ Assert.assertEquals(((LongMsgOffset)
(ingestionDelayTracker._partitionIdToLatestOffset.get(1))).getOffset(),
+ Integer.MAX_VALUE);
+ Assert.assertEquals(((LongMsgOffset)
(ingestionDelayTracker._partitionIdToLatestOffset.get(
+ IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(0,
1)))).getOffset(), Integer.MAX_VALUE);
+ }
+
+ @Test
+ public void testIngestionDelay() {
+ MockIngestionDelayTracker ingestionDelayTracker =
+ new MockIngestionDelayTracker(_serverMetrics, REALTIME_TABLE_NAME,
REALTIME_TABLE_DATA_MANAGER,
+ METRICS_CLEANUP_TICK_INTERVAL_MS,
METRICS_TRACKING_TICK_INTERVAL_MS, () -> true);
+
+ final int partition0 = 0;
+ final String segment0 = new LLCSegmentName(RAW_TABLE_NAME, partition0, 0,
123).getSegmentName();
+ final int partition1 = 1;
+
+ ingestionDelayTracker._partitionsHostedByThisServer.add(partition0);
+ ingestionDelayTracker._partitionsHostedByThisServer.add(partition1);
+ ingestionDelayTracker.updateMetrics(segment0, partition0,
System.currentTimeMillis(), new LongMsgOffset(50));
+
+ ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
+ Map<Integer, StreamPartitionMsgOffset> partitionIdVsLatestOffset = new
HashMap<>();
+ partitionIdVsLatestOffset.put(partition0, new LongMsgOffset(200));
+ partitionIdVsLatestOffset.put(partition1, new LongMsgOffset(200));
+
ingestionDelayTracker.updatePartitionIdToLatestOffset(partitionIdVsLatestOffset);
+
+ scheduledExecutorService.scheduleWithFixedDelay(() -> {
+ partitionIdVsLatestOffset.put(partition0,
+ new LongMsgOffset(((LongMsgOffset)
(partitionIdVsLatestOffset.get(partition0))).getOffset() + 50));
+ partitionIdVsLatestOffset.put(partition1,
+ new LongMsgOffset(((LongMsgOffset)
(partitionIdVsLatestOffset.get(partition1))).getOffset() + 50));
+ }, 0, 10, TimeUnit.MILLISECONDS);
+
+ Map<Integer, Map<String, List<Long>>> partitionToMetricToValues =
ingestionDelayTracker._partitionToMetricToValues;
+ TestUtils.waitForCondition((aVoid) -> {
+ try {
+ verifyMetrics(partitionToMetricToValues);
+ } catch (Error e) {
+ return false;
+ }
+ return true;
+ }, 10, 2000, "Failed to verify the ingestion delay metrics.");
+ scheduledExecutorService.shutdown();
+ ingestionDelayTracker.shutdown();
+ }
+
+ private void verifyMetrics(Map<Integer, Map<String, List<Long>>>
partitionToMetricToValues) {
+ Assert.assertEquals(partitionToMetricToValues.size(), 2);
+ verifyPartition0(partitionToMetricToValues.get(0));
+ verifyPartition1(partitionToMetricToValues.get(1));
+ }
+
+ private void verifyPartition0(Map<String, List<Long>> metrics) {
+ assertMinMax(metrics,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG.getGaugeName(), 150L, 300L);
+ assertMinMax(metrics,
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET.getGaugeName(), 200L, 350L);
+ assertEqualsFirstAndLast(metrics,
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET.getGaugeName(), 50L);
+ assertIncreasing(metrics,
ServerGauge.REALTIME_INGESTION_DELAY_MS.getGaugeName());
+
Assert.assertTrue(metrics.get(ServerGauge.REALTIME_INGESTION_DELAY_MS.getGaugeName()).get(0)
> 0);
+ }
+
+ private void verifyPartition1(Map<String, List<Long>> metrics) {
+ assertMinMax(metrics,
ServerGauge.REALTIME_INGESTION_OFFSET_LAG.getGaugeName(), 200L, 350L);
+ assertMinMax(metrics,
ServerGauge.REALTIME_INGESTION_UPSTREAM_OFFSET.getGaugeName(), 200L, 350L);
+ assertEqualsFirstAndLast(metrics,
ServerGauge.REALTIME_INGESTION_CONSUMING_OFFSET.getGaugeName(), 0L);
+ assertIncreasing(metrics,
ServerGauge.REALTIME_INGESTION_DELAY_MS.getGaugeName());
+
Assert.assertTrue(metrics.get(ServerGauge.REALTIME_INGESTION_DELAY_MS.getGaugeName()).get(0)
> 0);
+ }
+
+ private void assertMinMax(Map<String, List<Long>> metrics, String key, long
minFirst, long minLast) {
+ List<Long> values = metrics.get(key);
+ Assert.assertTrue(values.get(0) >= minFirst, key + " first value too
small");
+ Assert.assertTrue(values.get(values.size() - 1) >= minLast, key + " last
value too small");
+ }
+
+ private void assertEqualsFirstAndLast(Map<String, List<Long>> metrics,
String key, long expected) {
+ List<Long> values = metrics.get(key);
+ Assert.assertEquals(values.get(0), expected, key + " first value
mismatch");
+ Assert.assertEquals(values.get(values.size() - 1), expected, key + " last
value mismatch");
+ }
+
+ private void assertIncreasing(Map<String, List<Long>> metrics, String key) {
+ List<Long> values = metrics.get(key);
+ Assert.assertTrue(values.get(values.size() - 1) > values.get(0), key + "
not increasing");
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
index c59c15a028a..99e3d90749d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamMetadataProvider.java
@@ -19,9 +19,12 @@
package org.apache.pinot.core.realtime.impl.fakestream;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
@@ -57,8 +60,23 @@ public class FakeStreamMetadataProvider implements
StreamMetadataProvider {
}
}
+ @Override
+ public boolean supportsOffsetLag() {
+ return true;
+ }
+
@Override
public void close()
throws IOException {
}
+
+ @Override
+ public Map<Integer, StreamPartitionMsgOffset>
fetchLatestStreamOffset(Set<Integer> partitionIds,
+ long timeoutMillis) {
+ Map<Integer, StreamPartitionMsgOffset> partitionIdToLatestOffset = new
HashMap<>();
+ for (Integer partitionId: partitionIds) {
+ partitionIdToLatestOffset.put(partitionId, new
LongMsgOffset(Integer.MAX_VALUE));
+ }
+ return partitionIdToLatestOffset;
+ }
}
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
index c3fc185de09..694ca8972ac 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/main/java/org/apache/pinot/plugin/stream/kafka20/KafkaStreamMetadataProvider.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Sets;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -95,6 +96,28 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
}
}
+ @Override
+ public Map<Integer, StreamPartitionMsgOffset>
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
+ List<TopicPartition> topicPartitions = new
ArrayList<>(partitionIds.size());
+ for (Integer streamPartition: partitionIds) {
+ topicPartitions.add(new TopicPartition(_topic, streamPartition));
+ }
+ try {
+ Map<TopicPartition, Long> topicPartitionToLatestOffsetMap =
+ _consumer.endOffsets(topicPartitions,
Duration.ofMillis(timeoutMillis));
+
+ Map<Integer, StreamPartitionMsgOffset> partitionIdToLatestOffset =
+ new HashMap<>(topicPartitionToLatestOffsetMap.size());
+ for (Map.Entry<TopicPartition, Long> entry :
topicPartitionToLatestOffsetMap.entrySet()) {
+ partitionIdToLatestOffset.put(entry.getKey().partition(), new
LongMsgOffset(entry.getValue()));
+ }
+
+ return partitionIdToLatestOffset;
+ } catch (TimeoutException e) {
+ throw new TransientConsumerException(e);
+ }
+ }
+
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
Preconditions.checkNotNull(offsetCriteria);
@@ -188,6 +211,11 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
}
}
+ @Override
+ public boolean supportsOffsetLag() {
+ return true;
+ }
+
@Override
public StreamPartitionMsgOffset getOffsetAtTimestamp(int partitionId, long
timestampMillis, long timeoutMillis) {
return new LongMsgOffset(_consumer.offsetsForTimes(Map.of(_topicPartition,
timestampMillis),
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
index 6e4de8a1a03..664885f4082 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-2.0/src/test/java/org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumerTest.java
@@ -18,13 +18,16 @@
*/
package org.apache.pinot.plugin.stream.kafka20;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
@@ -424,6 +427,22 @@ public class KafkaPartitionLevelConsumerTest {
assertTrue(topicNames.containsAll(List.of(TEST_TOPIC_1, TEST_TOPIC_2,
TEST_TOPIC_3)));
}
+ @Test
+ public void testFetchLatestStreamOffset()
+ throws IOException {
+ StreamConfig streamConfig = getStreamConfig(TEST_TOPIC_2);
+ try (KafkaStreamMetadataProvider streamMetadataProvider = new
KafkaStreamMetadataProvider("clientId",
+ streamConfig)) {
+ Set<Integer> partitions = new HashSet<>();
+ partitions.add(0);
+ partitions.add(1);
+ Map<Integer, StreamPartitionMsgOffset> partitionMsgOffsetMap =
+ streamMetadataProvider.fetchLatestStreamOffset(partitions, 1000);
+ Assert.assertEquals(((LongMsgOffset)
(partitionMsgOffsetMap.get(0))).getOffset(), NUM_MSG_PRODUCED_PER_PARTITION);
+ Assert.assertEquals(((LongMsgOffset)
(partitionMsgOffsetMap.get(1))).getOffset(), NUM_MSG_PRODUCED_PER_PARTITION);
+ }
+ }
+
@Test
void testBatchSizeInBytesIsCalculatedCorrectly() {
TopicPartition topicPartition = new TopicPartition("test-topic", 0);
@@ -457,20 +476,19 @@ public class KafkaPartitionLevelConsumerTest {
}
FakeKafkaPartitionLevelConsumer kafkaSimpleStreamConsumer =
- new FakeKafkaPartitionLevelConsumer("clientId-test",
getStreamConfig(), 0);
+ new FakeKafkaPartitionLevelConsumer("clientId-test",
getStreamConfig("test-topic"), 0);
KafkaMessageBatch kafkaMessageBatch =
kafkaSimpleStreamConsumer.fetchMessages(new LongMsgOffset(12345L), 10000);
Assert.assertEquals(kafkaMessageBatch.getSizeInBytes(), 14);
}
- private StreamConfig getStreamConfig() {
+ private StreamConfig getStreamConfig(String topicName) {
String streamType = "kafka";
- String streamKafkaTopicName = "test-topic";
String streamKafkaBrokerList = _kafkaBrokerAddress;
String tableNameWithType = "tableName_REALTIME";
Map<String, String> streamConfigMap = new HashMap<>();
streamConfigMap.put("streamType", streamType);
- streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+ streamConfigMap.put("stream.kafka.topic.name", topicName);
streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
index a0d58a244ac..36523c54e7a 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
@@ -23,6 +23,7 @@ import com.google.common.collect.Sets;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@@ -95,6 +96,28 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
}
}
+ @Override
+ public Map<Integer, StreamPartitionMsgOffset>
fetchLatestStreamOffset(Set<Integer> partitionIds, long timeoutMillis) {
+ List<TopicPartition> topicPartitions = new
ArrayList<>(partitionIds.size());
+ for (Integer streamPartition: partitionIds) {
+ topicPartitions.add(new TopicPartition(_topic, streamPartition));
+ }
+ try {
+ Map<TopicPartition, Long> topicPartitionToLatestOffsetMap =
+ _consumer.endOffsets(topicPartitions,
Duration.ofMillis(timeoutMillis));
+
+ Map<Integer, StreamPartitionMsgOffset> partitionIdToLatestOffset =
+ new HashMap<>(topicPartitionToLatestOffsetMap.size());
+ for (Map.Entry<TopicPartition, Long> entry :
topicPartitionToLatestOffsetMap.entrySet()) {
+ partitionIdToLatestOffset.put(entry.getKey().partition(), new
LongMsgOffset(entry.getValue()));
+ }
+
+ return partitionIdToLatestOffset;
+ } catch (TimeoutException e) {
+ throw new TransientConsumerException(e);
+ }
+ }
+
@Override
public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
Preconditions.checkNotNull(offsetCriteria);
@@ -188,6 +211,11 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
}
}
+ @Override
+ public boolean supportsOffsetLag() {
+ return true;
+ }
+
public static class KafkaTopicMetadata implements TopicMetadata {
private String _name;
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
index 4a744114398..e9ed3b7df8a 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/test/java/org/apache/pinot/plugin/stream/kafka30/KafkaPartitionLevelConsumerTest.java
@@ -18,13 +18,16 @@
*/
package org.apache.pinot.plugin.stream.kafka30;
+import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.Consumer;
@@ -457,20 +460,35 @@ public class KafkaPartitionLevelConsumerTest {
}
FakeKafkaPartitionLevelConsumer kafkaSimpleStreamConsumer =
- new FakeKafkaPartitionLevelConsumer("clientId-test",
getStreamConfig(), 0);
+ new FakeKafkaPartitionLevelConsumer("clientId-test",
getStreamConfig("test-topic"), 0);
KafkaMessageBatch kafkaMessageBatch =
kafkaSimpleStreamConsumer.fetchMessages(new LongMsgOffset(12345L), 10000);
Assert.assertEquals(kafkaMessageBatch.getSizeInBytes(), 14);
}
- private StreamConfig getStreamConfig() {
+ @Test
+ public void testFetchLatestStreamOffset()
+ throws IOException {
+ StreamConfig streamConfig = getStreamConfig(TEST_TOPIC_2);
+ try (KafkaStreamMetadataProvider streamMetadataProvider = new
KafkaStreamMetadataProvider("clientId",
+ streamConfig)) {
+ Set<Integer> partitions = new HashSet<>();
+ partitions.add(0);
+ partitions.add(1);
+ Map<Integer, StreamPartitionMsgOffset> partitionMsgOffsetMap =
+ streamMetadataProvider.fetchLatestStreamOffset(partitions, 1000);
+ Assert.assertEquals(((LongMsgOffset)
(partitionMsgOffsetMap.get(0))).getOffset(), NUM_MSG_PRODUCED_PER_PARTITION);
+ Assert.assertEquals(((LongMsgOffset)
(partitionMsgOffsetMap.get(1))).getOffset(), NUM_MSG_PRODUCED_PER_PARTITION);
+ }
+ }
+
+ private StreamConfig getStreamConfig(String topicName) {
String streamType = "kafka";
- String streamKafkaTopicName = "test-topic";
String streamKafkaBrokerList = _kafkaBrokerAddress;
String tableNameWithType = "tableName_REALTIME";
Map<String, String> streamConfigMap = new HashMap<>();
streamConfigMap.put("streamType", streamType);
- streamConfigMap.put("stream.kafka.topic.name", streamKafkaTopicName);
+ streamConfigMap.put("stream.kafka.topic.name", topicName);
streamConfigMap.put("stream.kafka.broker.list", streamKafkaBrokerList);
streamConfigMap.put("stream.kafka.consumer.factory.class.name",
getKafkaConsumerFactoryName());
streamConfigMap.put("stream.kafka.decoder.class.name", "decoderClass");
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
index fb24821910a..baa6eeaa610 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kinesis/src/main/java/org/apache/pinot/plugin/stream/kinesis/KinesisStreamMetadataProvider.java
@@ -287,6 +287,11 @@ public class KinesisStreamMetadataProvider implements
StreamMetadataProvider {
.collect(Collectors.toList());
}
+ @Override
+ public boolean supportsOffsetLag() {
+ return false;
+ }
+
public static class KinesisTopicMetadata implements TopicMetadata {
private String _name;
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
index 5384ec71b1a..657921613bd 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarStreamMetadataProvider.java
@@ -202,6 +202,11 @@ public class PulsarStreamMetadataProvider extends
PulsarPartitionLevelConnection
}
}
+ @Override
+ public boolean supportsOffsetLag() {
+ return false;
+ }
+
public static class PulsarTopicMetadata implements TopicMetadata {
private String _name;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 4f95198c033..0ef42b8dde1 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -53,6 +53,17 @@ public interface StreamMetadataProvider extends Closeable {
throw new UnsupportedOperationException();
}
+ /**
+ * Fetches the latest offset for a set of given partition Ids.
+ * @param partitionIds partition Ids of the stream
+ * @param timeoutMillis fetch timeout
+ * @return latest {@link StreamPartitionMsgOffset} for each partition Id.
+ */
+ default Map<Integer, StreamPartitionMsgOffset>
fetchLatestStreamOffset(Set<Integer> partitionIds,
+ long timeoutMillis) {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Fetches the offset for a given partition and offset criteria
* @param offsetCriteria offset criteria to fetch{@link
StreamPartitionMsgOffset}.
@@ -152,6 +163,12 @@ public interface StreamMetadataProvider extends Closeable {
throw new UnsupportedOperationException();
}
+ /**
+ * @return true if the stream supports computing ingestion lag by
subtracting the last consumed offset from the
+ * latest offset.
+ */
+ boolean supportsOffsetLag();
+
/**
* Represents the metadata of a topic. This can be used to represent the
topic name and other metadata in the future.
*/
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
index a3841dde9a5..a5dfab2f5d2 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
@@ -20,8 +20,10 @@ package org.apache.pinot.spi.utils;
import com.google.common.base.Preconditions;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -34,6 +36,7 @@ import
org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
/**
@@ -313,4 +316,25 @@ public final class IngestionConfigUtils {
}
return DEFAULT_PUSH_RETRY_INTERVAL_MILLIS;
}
+
+ /**
+ * Returns a unique client id which can be used for Stream providers
+ */
+ public static String getTableTopicUniqueClientId(String className,
StreamConfig streamConfig) {
+ return StreamConsumerFactory.getUniqueClientId(
+ className + "-" + streamConfig.getTableNameWithType() + "-" +
streamConfig.getTopicName());
+ }
+
+ /**
+ * Returns a Map of stream config index to Set of stream partition Ids.
+ * @param pinotPartitionIds Set of pinot partition ids.
+ */
+ public static Map<Integer, Set<Integer>>
getStreamConfigIndexToStreamPartitions(Set<Integer> pinotPartitionIds) {
+ Map<Integer, Set<Integer>> streamIndexToPartitions = new HashMap<>();
+ for (Integer partition : pinotPartitionIds) {
+
streamIndexToPartitions.computeIfAbsent(getStreamConfigIndexFromPinotPartitionId(partition),
+ k -> new
HashSet<>()).add(getStreamPartitionIdFromPinotPartitionId(partition));
+ }
+ return streamIndexToPartitions;
+ }
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
index 59aec8b3b91..e653ee4b9b1 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/utils/IngestionConfigUtilsTest.java
@@ -22,8 +22,10 @@ import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.pinot.spi.config.table.IndexingConfig;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -158,4 +160,27 @@ public class IngestionConfigUtilsTest {
Assert.assertEquals(2,
IngestionConfigUtils.getConfigMapWithPrefix(testMap, "k1").size());
Assert.assertEquals(2,
IngestionConfigUtils.getConfigMapWithPrefix(testMap, "k1.").size());
}
+
+ @Test
+ public void testGetStreamConfigIndexToStreamPartitions() {
+ Set<Integer> pinotPartitionIds = new HashSet<>(2);
+ pinotPartitionIds.add(0);
+ pinotPartitionIds.add(1);
+ Map<Integer, Set<Integer>> streamConfigIndexToStreamPartitions =
+
IngestionConfigUtils.getStreamConfigIndexToStreamPartitions(pinotPartitionIds);
+ Assert.assertEquals(streamConfigIndexToStreamPartitions.size(), 1);
+ Assert.assertEquals(streamConfigIndexToStreamPartitions.get(0), new
HashSet<>(Arrays.asList(0, 1)));
+
+ pinotPartitionIds = new HashSet<>(4);
+ pinotPartitionIds.add(2);
+
pinotPartitionIds.add(IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(100,
1));
+
pinotPartitionIds.add(IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(1,
1));
+
pinotPartitionIds.add(IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(400,
3));
+ streamConfigIndexToStreamPartitions =
+
IngestionConfigUtils.getStreamConfigIndexToStreamPartitions(pinotPartitionIds);
+ Assert.assertEquals(streamConfigIndexToStreamPartitions.size(), 3);
+ Assert.assertEquals(streamConfigIndexToStreamPartitions.get(0), new
HashSet<>(Arrays.asList(2)));
+ Assert.assertEquals(streamConfigIndexToStreamPartitions.get(1), new
HashSet<>(Arrays.asList(100, 1)));
+ Assert.assertEquals(streamConfigIndexToStreamPartitions.get(3), new
HashSet<>(Arrays.asList(400)));
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]