9aman commented on code in PR #17089:
URL: https://github.com/apache/pinot/pull/17089#discussion_r2526461893
##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java:
##########
@@ -209,17 +216,34 @@ private long getSegmentSize(SegmentDataManager
segmentDataManager) {
.getSegment()).getSegmentSizeBytes() : 0;
}
- private SegmentConsumerInfo getSegmentConsumerInfo(SegmentDataManager
segmentDataManager, TableType tableType) {
+ private SegmentConsumerInfo getSegmentConsumerInfo(TableDataManager
tableDataManager,
+ SegmentDataManager segmentDataManager, TableType tableType) {
SegmentConsumerInfo segmentConsumerInfo = null;
if (tableType == TableType.REALTIME) {
RealtimeSegmentDataManager realtimeSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
- Map<String, ConsumerPartitionState> partitionStateMap =
realtimeSegmentDataManager.getConsumerPartitionState();
+ StreamMetadataProvider streamMetadataProvider =
Review Comment:
So we are good with the multi-stream scenario ?
##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java:
##########
@@ -87,6 +92,7 @@
@Path("/debug/")
public class DebugResource {
private static final Logger LOGGER =
LoggerFactory.getLogger(DebugResource.class);
+ private static final long STREAM_METADATA_FETCH_TIMEOUT_MS = 5000;
Review Comment:
Maybe move this to `RealtimeTableDataManager` and also make
IngestionDelayTracker use this.
The value is same across all the files and all of them rely on
`RealtimeTableDataManager` to fetch the `StreamMetadataProvider`
##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/DebugResource.java:
##########
@@ -209,17 +216,34 @@ private long getSegmentSize(SegmentDataManager
segmentDataManager) {
.getSegment()).getSegmentSizeBytes() : 0;
}
- private SegmentConsumerInfo getSegmentConsumerInfo(SegmentDataManager
segmentDataManager, TableType tableType) {
+ private SegmentConsumerInfo getSegmentConsumerInfo(TableDataManager
tableDataManager,
+ SegmentDataManager segmentDataManager, TableType tableType) {
SegmentConsumerInfo segmentConsumerInfo = null;
if (tableType == TableType.REALTIME) {
RealtimeSegmentDataManager realtimeSegmentDataManager =
(RealtimeSegmentDataManager) segmentDataManager;
- Map<String, ConsumerPartitionState> partitionStateMap =
realtimeSegmentDataManager.getConsumerPartitionState();
+ StreamMetadataProvider streamMetadataProvider =
+ ((RealtimeTableDataManager)
(tableDataManager)).getStreamMetadataProvider(realtimeSegmentDataManager);
+ StreamPartitionMsgOffset latestMsgOffset;
+ try {
+ int partitionId = realtimeSegmentDataManager.getStreamPartitionId();
+ Map<Integer, StreamPartitionMsgOffset> partitionMsgOffsetMap =
+
streamMetadataProvider.fetchLatestStreamOffset(Collections.singleton(partitionId),
+ STREAM_METADATA_FETCH_TIMEOUT_MS);
+ latestMsgOffset = partitionMsgOffsetMap.get(partitionId);
+ } catch (Exception e) {
+ LOGGER.error("Failed to fetch latest stream offset.", e);
+ throw new RuntimeException(e);
+ }
+ Map<String, ConsumerPartitionState> partitionIdToStateMap =
+
realtimeSegmentDataManager.getConsumerPartitionState(latestMsgOffset);
Review Comment:
Nit: This is common across all the API's or users of this synchronized
consumer.
Can we move this to `RealtimeTableDataManager` or a util as all of them seem
to fetch the metadata provider from `RealtimeTableDataManager` and do exactly
the same steps to get to the offset.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]