This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 09fcdd59cc Add metric for out-of-order partial-upsert events (#8925)
09fcdd59cc is described below

commit 09fcdd59cc07e2faa154858075473cdec3518b60
Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com>
AuthorDate: Sun Jun 19 12:10:39 2022 -0700

    Add metric for out-of-order partial-upsert events (#8925)
---
 .../apache/pinot/common/metrics/ServerMeter.java   |  1 +
 .../upsert/PartitionUpsertMetadataManager.java     | 22 ++++++++++++++++++----
 2 files changed, 19 insertions(+), 4 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index 19ae43f382..0cbd12c537 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -41,6 +41,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
   REALTIME_PARTITION_MISMATCH("mismatch", false),
   REALTIME_DEDUP_DROPPED("rows", false),
+  PARTIAL_UPSERT_OUT_OF_ORDER("rows", false),
   ROWS_WITH_ERRORS("rows", false),
   LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true),
   LLC_CONTROLLER_RESPONSE_COMMIT("messages", true),
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
index 041a86443a..8214162fe9 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java
@@ -22,9 +22,11 @@ import com.google.common.annotations.VisibleForTesting;
 import java.util.Iterator;
 import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.segment.local.utils.HashUtils;
@@ -66,6 +68,8 @@ import org.slf4j.LoggerFactory;
 public class PartitionUpsertMetadataManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionUpsertMetadataManager.class);
 
+  private static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = 
TimeUnit.MINUTES.toNanos(1);
+
   private final String _tableNameWithType;
   private final int _partitionId;
   private final ServerMetrics _serverMetrics;
@@ -79,6 +83,9 @@ public class PartitionUpsertMetadataManager {
   // Reused for reading previous record during partial upsert
   private final GenericRow _reuse = new GenericRow();
 
+  private long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE;
+  private int _numOutOfOrderEvents = 0;
+
   public PartitionUpsertMetadataManager(String tableNameWithType, int 
partitionId, ServerMetrics serverMetrics,
       @Nullable PartialUpsertHandler partialUpsertHandler, HashFunction 
hashFunction) {
     _tableNameWithType = tableNameWithType;
@@ -214,10 +221,17 @@ public class PartitionUpsertMetadataManager {
             
currentRecordLocation.getSegment().getRecord(currentRecordLocation.getDocId(), 
_reuse);
         return _partialUpsertHandler.merge(previousRecord, record);
       } else {
-        LOGGER.warn(
-            "Got late event for partial-upsert: {} (current comparison value: 
{}, record comparison value: {}), "
-                + "skipping updating the record", record, 
currentRecordLocation.getComparisonValue(),
-            recordInfo.getComparisonValue());
+        _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L);
+        _numOutOfOrderEvents++;
+        long currentTimeNs = System.nanoTime();
+        if (currentTimeNs - _lastOutOfOrderEventReportTimeNs > 
OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) {
+          LOGGER.warn("Skipped {} out-of-order events for partial-upsert 
table: {} "
+                  + "(the last event has current comparison value: {}, record 
comparison value: {})",
+              _numOutOfOrderEvents,
+              _tableNameWithType, currentRecordLocation.getComparisonValue(), 
recordInfo.getComparisonValue());
+          _lastOutOfOrderEventReportTimeNs = currentTimeNs;
+          _numOutOfOrderEvents = 0;
+        }
         return record;
       }
     } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to