jugomezv commented on code in PR #9994:
URL: https://github.com/apache/pinot/pull/9994#discussion_r1060227882


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -212,6 +217,85 @@ protected void doShutdown() {
     if (_leaseExtender != null) {
       _leaseExtender.shutDown();
     }
+    // Make sure we do metric cleanup when we shut down the table.
+    if (_consumptionDelayTracker != null) {
+      _consumptionDelayTracker.shutdown();
+    }
+  }
+
+  /*
+   * Method to handle CONSUMING->DROPPED transition.
+   *
+   * @param partitionGroupId Partition id that we must stop tracking on this 
server.
+   */
+  private void stopTrackingPartitionDelay(int partitionGroupId) {
+    if (_consumptionDelayTracker != null) {
+      
_consumptionDelayTracker.stopTrackingPartitionConsumptionDelay(partitionGroupId);
+    }
+  }
+
+  /*
+   * Method to handle CONSUMING->ONLINE transition.
+   * If no new consumption is noticed for this segment in some timeout, we 
will read
+   * ideal state to verify the partition is still hosted in this server.
+   *
+   * @param partitionGroupId partition id of partition to be verified as 
hosted by this server.
+   */
+  private void markPartitionForVerification(int partitionGroupId) {
+    if (_consumptionDelayTracker != null) {
+      _consumptionDelayTracker.markPartitionForConfirmation(partitionGroupId);
+    }
+  }
+
+  /*
+   * Method used by LLRealtimeSegmentManagers to update their partition delays
+   *
+   * @param ingestionDelayMillis Ingestion delay being reported.
+   * @param currentTimeMillis Timestamp of the measure being provided, i.e. 
when this delay was computed.
+   * @param partitionGroupId Partition ID for which delay is being updated.
+   */
+  public void updateIngestionDelay(long ingestionDelayMillis, long 
currenTimeMillis, int partitionGroupId) {
+    if (_consumptionDelayTracker != null) {
+      _consumptionDelayTracker.storeConsumptionDelay(ingestionDelayMillis, 
currenTimeMillis, partitionGroupId);
+    }
+  }
+
+  /*
+   * Method ta handle CONSUMING to ONLINE transitions of segments in this 
table.
+   * We mark partitions for verification with ideal state when we do not see a 
consuming segment for some time
+   * for that partition. The idea is to remove the related metrics when the 
partition moves from the current server.
+   *
+   * @param segmentNameStr name of segment which is transitioning state.
+   */
+  @Override
+  public void onConsumingToOnline(String segmentNameStr) {
+    LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+    markPartitionForVerification(segmentName.getPartitionGroupId());
+  }
+
+  /*
+   * Method ta handle CONSUMING to DROPPED transitions of segments in this 
table.
+   *
+   * @param segmentNameStr name of segment which is transitioning state.
+   */
+  @Override
+  public void onConsumingToDropped(String segmentNameStr) {
+    LLCSegmentName segmentName = new LLCSegmentName(segmentNameStr);
+    // We stop tracking ingestion delay partitions for which their segments go 
into DROPPED state.
+    stopTrackingPartitionDelay(segmentName.getPartitionGroupId());
+  }
+
+  /**
+   * Returns all partitionGroupIds for the partitions hosted by this server 
for current table.

Review Comment:
   Added a note to warn users about this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to