Jackie-Jiang commented on code in PR #9994:
URL: https://github.com/apache/pinot/pull/9994#discussion_r1062020489


##########
pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java:
##########
@@ -45,8 +45,10 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   // Dedup metrics
   DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false),
   CONSUMPTION_QUOTA_UTILIZATION("ratio", false),
-  JVM_HEAP_USED_BYTES("bytes", true);
-
+  JVM_HEAP_USED_BYTES("bytes", true),
+  // Lag metrics
+  TABLE_MAX_INGESTION_DELAY_MS("milliseconds", false),
+  TABLE_PER_PARTITION_INGESTION_DELAY_MS("milliseconds", false);

Review Comment:
   I think we can use the same gauge for both table level and partition level. 
For partition level, we will suffix it with `.partitionId`.
   ```suggestion
     MAX_INGESTION_DELAY_MS("milliseconds", false);
   ```



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1554,6 +1557,25 @@ private void createPartitionMetadataProvider(String 
reason) {
     _partitionMetadataProvider = 
_streamConsumerFactory.createPartitionMetadataProvider(_clientId, 
_partitionGroupId);
   }
 
+  /*
+   * Updates the ingestion delay if messages were processed using the time 
stamp for the last consumed event.
+   *
+   * @param indexedMessagesCount
+   */
+  private void updateIngestionDelay(int indexedMessageCount) {
+    if (_catchingUpPhase) {

Review Comment:
   If this is `false` in the beginning, we also need to update the ingestion 
delay when it becomes `true`, or the lag will already be propagated.



##########
pinot-server/src/main/java/org/apache/pinot/server/starter/helix/IngestionBasedConsumptionStatusChecker.java:
##########
@@ -77,6 +77,9 @@ public int 
getNumConsumingSegmentsNotReachedIngestionCriteria() {
         LLRealtimeSegmentDataManager rtSegmentDataManager = 
(LLRealtimeSegmentDataManager) segmentDataManager;
         if (isSegmentCaughtUp(segName, rtSegmentDataManager)) {
           _caughtUpSegments.add(segName);
+          rtSegmentDataManager.notifyConsumptionCaughtUp(false);

Review Comment:
   We want to skip reporting delay during server startup because:
   1. Server will need to catch up, which will very likely trigger the alert
   2. Server is not really serving the query, so it is actually false alarm
   
   We do want to report delay when a new consuming segment is created because 
server is serving queries from it.



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java:
##########
@@ -69,4 +69,13 @@ protected static PinotDataBufferMemoryManager 
getMemoryManager(String consumerDi
 
   public abstract Map<String, PartitionLagState> getPartitionToLagState(
       Map<String, ConsumerPartitionState> consumerPartitionStateMap);
+
+  /**
+   * The RT segment data manager can handle status change from external 
components like the ConsumptionStatusChecker
+   * etc. Currently, it acts as a way to signal the RT Segment data manager 
that the current partition has caught up.
+   *
+   * @param caughtUpWithUpstream Boolean indicating if the partition has 
caught up with upstream source or not based on
+   *                            the strategy used in the {@literal 
IngestionBasedConsumptionStatusChecker}
+   */
+  public abstract void notifyConsumptionCaughtUp(boolean caughtUpWithUpstream);

Review Comment:
   This is quite confusing. Per the implementation, seems like passing `false` 
means the segment is caught up?



##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/LLRealtimeSegmentDataManager.java:
##########
@@ -1594,4 +1616,9 @@ public String getSegmentName() {
   public void forceCommit() {
     _forceCommitMessageReceived = true;
   }
+
+  @Override
+  public void notifyConsumptionCaughtUp(boolean catchingUpPhase) {

Review Comment:
   Another way to handle it is to assume it is caught up in the beginning, and 
let the status checker to set it as not caught up yet. The status checker only 
monitor the initial consuming segments during restart. New consuming segments 
won't be tracked, thus will be in caught up phase automatically.
   I think this PR is already taking this approach.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to