jugomezv commented on code in PR #9994: URL: https://github.com/apache/pinot/pull/9994#discussion_r1060227882
########## pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java: ########## @@ -212,6 +217,85 @@ protected void doShutdown() { if (_leaseExtender != null) { _leaseExtender.shutDown(); } + // Make sure we do metric cleanup when we shut down the table. + if (_consumptionDelayTracker != null) { + _consumptionDelayTracker.shutdown(); + } + } + + /* + * Method to handle CONSUMING->DROPPED transition. + * + * @param partitionGroupId Partition id that we must stop tracking on this server. + */ + private void stopTrackingPartitionDelay(int partitionGroupId) { + if (_consumptionDelayTracker != null) { + _consumptionDelayTracker.stopTrackingPartitionConsumptionDelay(partitionGroupId); + } + } + + /* + * Method to handle CONSUMING->ONLINE transition. + * If no new consumption is noticed for this segment in some timeout, we will read + * ideal state to verify the partition is still hosted in this server. + * + * @param partitionGroupId partition id of partition to be verified as hosted by this server. + */ + private void markPartitionForVerification(int partitionGroupId) { + if (_consumptionDelayTracker != null) { + _consumptionDelayTracker.markPartitionForConfirmation(partitionGroupId); + } + } + + /* + * Method used by LLRealtimeSegmentManagers to update their partition delays + * + * @param ingestionDelayMillis Ingestion delay being reported. + * @param currentTimeMillis Timestamp of the measure being provided, i.e. when this delay was computed. + * @param partitionGroupId Partition ID for which delay is being updated. + */ + public void updateIngestionDelay(long ingestionDelayMillis, long currenTimeMillis, int partitionGroupId) { + if (_consumptionDelayTracker != null) { + _consumptionDelayTracker.storeConsumptionDelay(ingestionDelayMillis, currenTimeMillis, partitionGroupId); + } + } + + /* + * Method ta handle CONSUMING to ONLINE transitions of segments in this table. + * We mark partitions for verification with ideal state when we do not see a consuming segment for some time + * for that partition. The idea is to remove the related metrics when the partition moves from the current server. + * + * @param segmentNameStr name of segment which is transitioning state. + */ + @Override + public void onConsumingToOnline(String segmentNameStr) { + LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); + markPartitionForVerification(segmentName.getPartitionGroupId()); + } + + /* + * Method ta handle CONSUMING to DROPPED transitions of segments in this table. + * + * @param segmentNameStr name of segment which is transitioning state. + */ + @Override + public void onConsumingToDropped(String segmentNameStr) { + LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr); + // We stop tracking ingestion delay partitions for which their segments go into DROPPED state. + stopTrackingPartitionDelay(segmentName.getPartitionGroupId()); + } + + /** + * Returns all partitionGroupIds for the partitions hosted by this server for current table. Review Comment: Added a note to warn users about this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org