priyen-stripe commented on PR #16783:
URL: https://github.com/apache/pinot/pull/16783#issuecomment-3498367846
Hi @swaminathanmanish , I believe you have broken/removed a feature that
used to work as part of this PR.
`_firstStreamIngestionTimeMs` was a legitimate metric that can be used (and
we give option to ours users in Stripe to use) to track ingestion lag where
tables have multiple-hops across streams before it makes it to Pinot
I see there was come discussion on how it looked like the code was unused?
but please see my flow to explain how it was actually working:
**Based on early July 2025 codebase**
IngestionDelayTracker.java
**1. Origin: Stream Message Metadata**
`firstStreamIngestionTimeMs` originates from `RowMetadata` when consuming
messages from a stream (like Kafka). It represents the ingestion timestamp of
the message in the first stream of a multi-stream setup.
**2. Storage in IngestionInfo** - this object is created as part of a call
to `updateIngestionMetrics` for every message
private static class IngestionInfo {
final long _ingestionTimeMs;
final long _firstStreamIngestionTimeMs; // Stored here
final StreamPartitionMsgOffset _currentOffset;
final StreamPartitionMsgOffset _latestOffset;
IngestionInfo(long ingestionTimeMs, long firstStreamIngestionTimeMs,
...) {
_ingestionTimeMs = ingestionTimeMs;
_firstStreamIngestionTimeMs = firstStreamIngestionTimeMs; // Captured
// ...
}
}
**3. Update Flow: RealtimeSegmentDataManager → IngestionDelayTracker**
When a segment consumes messages, it calls:
```
public void updateIngestionMetrics(String segmentName, int partitionId,
long ingestionTimeMs,
long firstStreamIngestionTimeMs, // Passed in from RowMetadata
@Nullable StreamPartitionMsgOffset currentOffset,
@Nullable StreamPartitionMsgOffset latestOffset)
```
This method updates the tracker with the new `IngestionInfo` object, which
has `firstStreamIngestionTimeMs`.
**4. Delay Calculation**
The end-to-end ingestion delay is calculated as follows, and is called when
if `firstStreamIngestionTimeMs` > 0
```
public long getPartitionEndToEndIngestionDelayMs(int partitionId) {
IngestionInfo ingestionInfo = _ingestionInfoMap.get(partitionId);
return ingestionInfo != null ?
getIngestionDelayMs(ingestionInfo._firstStreamIngestionTimeMs) : 0;
}
private long getIngestionDelayMs(long ingestionTimeMs) {
if (ingestionTimeMs < 0) {
return 0;
}
// Current time - first stream ingestion time
long agedIngestionDelayMs = _clock.millis() - ingestionTimeMs;
return Math.max(agedIngestionDelayMs, 0);
}
```
The distinction between `ingestionTimeMs` and `firstStreamIngestionTimeMs`
is important for multi-stream ingestion pipelines
Example Scenario:
Data Producer → Stream 1 (Kafka) → Stream 2 (Another Kafka) → Pinot
↑
↑
firstStreamIngestionTimeMs ingestionTimeMs
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]