This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 09fcdd59cc Add metric for out-of-order partial-upsert events (#8925) 09fcdd59cc is described below commit 09fcdd59cc07e2faa154858075473cdec3518b60 Author: Xiaotian (Jackie) Jiang <17555551+jackie-ji...@users.noreply.github.com> AuthorDate: Sun Jun 19 12:10:39 2022 -0700 Add metric for out-of-order partial-upsert events (#8925) --- .../apache/pinot/common/metrics/ServerMeter.java | 1 + .../upsert/PartitionUpsertMetadataManager.java | 22 ++++++++++++++++++---- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index 19ae43f382..0cbd12c537 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -41,6 +41,7 @@ public enum ServerMeter implements AbstractMetrics.Meter { REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false), REALTIME_PARTITION_MISMATCH("mismatch", false), REALTIME_DEDUP_DROPPED("rows", false), + PARTIAL_UPSERT_OUT_OF_ORDER("rows", false), ROWS_WITH_ERRORS("rows", false), LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true), LLC_CONTROLLER_RESPONSE_COMMIT("messages", true), diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java index 041a86443a..8214162fe9 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/PartitionUpsertMetadataManager.java @@ -22,9 +22,11 @@ import com.google.common.annotations.VisibleForTesting; import java.util.Iterator; import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; import org.apache.pinot.common.metrics.ServerGauge; +import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.segment.local.utils.HashUtils; @@ -66,6 +68,8 @@ import org.slf4j.LoggerFactory; public class PartitionUpsertMetadataManager { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionUpsertMetadataManager.class); + private static final long OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS = TimeUnit.MINUTES.toNanos(1); + private final String _tableNameWithType; private final int _partitionId; private final ServerMetrics _serverMetrics; @@ -79,6 +83,9 @@ public class PartitionUpsertMetadataManager { // Reused for reading previous record during partial upsert private final GenericRow _reuse = new GenericRow(); + private long _lastOutOfOrderEventReportTimeNs = Long.MIN_VALUE; + private int _numOutOfOrderEvents = 0; + public PartitionUpsertMetadataManager(String tableNameWithType, int partitionId, ServerMetrics serverMetrics, @Nullable PartialUpsertHandler partialUpsertHandler, HashFunction hashFunction) { _tableNameWithType = tableNameWithType; @@ -214,10 +221,17 @@ public class PartitionUpsertMetadataManager { currentRecordLocation.getSegment().getRecord(currentRecordLocation.getDocId(), _reuse); return _partialUpsertHandler.merge(previousRecord, record); } else { - LOGGER.warn( - "Got late event for partial-upsert: {} (current comparison value: {}, record comparison value: {}), " - + "skipping updating the record", record, currentRecordLocation.getComparisonValue(), - recordInfo.getComparisonValue()); + _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L); + _numOutOfOrderEvents++; + long currentTimeNs = System.nanoTime(); + if (currentTimeNs - _lastOutOfOrderEventReportTimeNs > OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) { + LOGGER.warn("Skipped {} out-of-order events for partial-upsert table: {} " + + "(the last event has current comparison value: {}, record comparison value: {})", + _numOutOfOrderEvents, + _tableNameWithType, currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue()); + _lastOutOfOrderEventReportTimeNs = currentTimeNs; + _numOutOfOrderEvents = 0; + } return record; } } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org