This is an automated email from the ASF dual-hosted git repository.

atri pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new b699cdbf81 Push out-of-order events metrics for full upsert (#10944)
b699cdbf81 is described below

commit b699cdbf814d1a1a480d376faacdafd698c46ecc
Author: Pratik Tibrewal <tibrewalpra...@uber.com>
AuthorDate: Wed Jun 21 20:54:52 2023 +0530

    Push out-of-order events metrics for full upsert (#10944)
    
    * Push out-of-order events metrics for full upsert
    
    * add new meter for full upsert
---
 .../main/java/org/apache/pinot/common/metrics/ServerMeter.java |  1 +
 .../local/upsert/BasePartitionUpsertMetadataManager.java       | 10 ++++++----
 .../upsert/ConcurrentMapPartitionUpsertMetadataManager.java    |  1 +
 3 files changed, 8 insertions(+), 4 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index d83a8aa0a3..4681b41659 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -46,6 +46,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   UPSERT_KEYS_IN_WRONG_SEGMENT("rows", false),
   PARTIAL_UPSERT_OUT_OF_ORDER("rows", false),
   PARTIAL_UPSERT_KEYS_NOT_REPLACED("rows", false),
+  UPSERT_OUT_OF_ORDER("rows", false),
   ROWS_WITH_ERRORS("rows", false),
   LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true),
   LLC_CONTROLLER_RESPONSE_COMMIT("messages", true),
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
index e979823c2e..6556ffce8a 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java
@@ -371,13 +371,15 @@ public abstract class BasePartitionUpsertMetadataManager 
implements PartitionUps
   protected abstract GenericRow doUpdateRecord(GenericRow record, RecordInfo 
recordInfo);
 
   protected void handleOutOfOrderEvent(Object currentComparisonValue, Object 
recordComparisonValue) {
-    _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L);
+    boolean isPartialUpsertTable = (_partialUpsertHandler != null);
+    _serverMetrics.addMeteredTableValue(_tableNameWithType,
+        isPartialUpsertTable ? ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER : 
ServerMeter.UPSERT_OUT_OF_ORDER, 1L);
     _numOutOfOrderEvents++;
     long currentTimeNs = System.nanoTime();
     if (currentTimeNs - _lastOutOfOrderEventReportTimeNs > 
OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) {
-      _logger.warn("Skipped {} out-of-order events for partial-upsert table 
(the last event has current comparison "
-              + "value: {}, record comparison value: {})", 
_numOutOfOrderEvents, currentComparisonValue,
-          recordComparisonValue);
+      _logger.warn("Skipped {} out-of-order events for {} upsert table {} (the 
last event has current comparison "
+              + "value: {}, record comparison value: {})", 
_numOutOfOrderEvents,
+          (isPartialUpsertTable ? "partial" : ""), _tableNameWithType, 
currentComparisonValue, recordComparisonValue);
       _lastOutOfOrderEventReportTimeNs = currentTimeNs;
       _numOutOfOrderEvents = 0;
     }
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
index f7529a0011..29b293cbaa 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java
@@ -202,6 +202,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager 
extends BasePartitionUp
               }
               return new RecordLocation(segment, recordInfo.getDocId(), 
recordInfo.getComparisonValue());
             } else {
+              
handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), 
recordInfo.getComparisonValue());
               return currentRecordLocation;
             }
           } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to