This is an automated email from the ASF dual-hosted git repository. atri pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new b699cdbf81 Push out-of-order events metrics for full upsert (#10944) b699cdbf81 is described below commit b699cdbf814d1a1a480d376faacdafd698c46ecc Author: Pratik Tibrewal <tibrewalpra...@uber.com> AuthorDate: Wed Jun 21 20:54:52 2023 +0530 Push out-of-order events metrics for full upsert (#10944) * Push out-of-order events metrics for full upsert * add new meter for full upsert --- .../main/java/org/apache/pinot/common/metrics/ServerMeter.java | 1 + .../local/upsert/BasePartitionUpsertMetadataManager.java | 10 ++++++---- .../upsert/ConcurrentMapPartitionUpsertMetadataManager.java | 1 + 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index d83a8aa0a3..4681b41659 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -46,6 +46,7 @@ public enum ServerMeter implements AbstractMetrics.Meter { UPSERT_KEYS_IN_WRONG_SEGMENT("rows", false), PARTIAL_UPSERT_OUT_OF_ORDER("rows", false), PARTIAL_UPSERT_KEYS_NOT_REPLACED("rows", false), + UPSERT_OUT_OF_ORDER("rows", false), ROWS_WITH_ERRORS("rows", false), LLC_CONTROLLER_RESPONSE_NOT_SENT("messages", true), LLC_CONTROLLER_RESPONSE_COMMIT("messages", true), diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java index e979823c2e..6556ffce8a 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BasePartitionUpsertMetadataManager.java @@ -371,13 +371,15 @@ public abstract class BasePartitionUpsertMetadataManager implements PartitionUps protected abstract GenericRow doUpdateRecord(GenericRow record, RecordInfo recordInfo); protected void handleOutOfOrderEvent(Object currentComparisonValue, Object recordComparisonValue) { - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER, 1L); + boolean isPartialUpsertTable = (_partialUpsertHandler != null); + _serverMetrics.addMeteredTableValue(_tableNameWithType, + isPartialUpsertTable ? ServerMeter.PARTIAL_UPSERT_OUT_OF_ORDER : ServerMeter.UPSERT_OUT_OF_ORDER, 1L); _numOutOfOrderEvents++; long currentTimeNs = System.nanoTime(); if (currentTimeNs - _lastOutOfOrderEventReportTimeNs > OUT_OF_ORDER_EVENT_MIN_REPORT_INTERVAL_NS) { - _logger.warn("Skipped {} out-of-order events for partial-upsert table (the last event has current comparison " - + "value: {}, record comparison value: {})", _numOutOfOrderEvents, currentComparisonValue, - recordComparisonValue); + _logger.warn("Skipped {} out-of-order events for {} upsert table {} (the last event has current comparison " + + "value: {}, record comparison value: {})", _numOutOfOrderEvents, + (isPartialUpsertTable ? "partial" : ""), _tableNameWithType, currentComparisonValue, recordComparisonValue); _lastOutOfOrderEventReportTimeNs = currentTimeNs; _numOutOfOrderEvents = 0; } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java index f7529a0011..29b293cbaa 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/ConcurrentMapPartitionUpsertMetadataManager.java @@ -202,6 +202,7 @@ public class ConcurrentMapPartitionUpsertMetadataManager extends BasePartitionUp } return new RecordLocation(segment, recordInfo.getDocId(), recordInfo.getComparisonValue()); } else { + handleOutOfOrderEvent(currentRecordLocation.getComparisonValue(), recordInfo.getComparisonValue()); return currentRecordLocation; } } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org