This is an automated email from the ASF dual-hosted git repository.

sajjad pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 8feec414cb Fix issue with realtime partition mismatch metric (#11871)
8feec414cb is described below

commit 8feec414cbe10d97573740b73cd61715931f05da
Author: Sajjad Moradi <moradi.saj...@gmail.com>
AuthorDate: Thu Oct 26 18:21:43 2023 -0700

    Fix issue with realtime partition mismatch metric (#11871)
---
 .../org/apache/pinot/common/metrics/ServerMeter.java     |  1 +
 .../manager/realtime/RealtimeSegmentDataManager.java     |  3 +--
 .../local/indexsegment/mutable/MutableSegmentImpl.java   | 16 +++++++++++-----
 3 files changed, 13 insertions(+), 7 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
index f77194d37b..5b995d6e6a 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java
@@ -41,6 +41,7 @@ public enum ServerMeter implements AbstractMetrics.Meter {
   REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true),
   REALTIME_OFFSET_COMMITS("commits", true),
   REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false),
+  // number of times partition of a record did not match the partition of the 
stream
   REALTIME_PARTITION_MISMATCH("mismatch", false),
   REALTIME_DEDUP_DROPPED("rows", false),
   UPSERT_KEYS_IN_WRONG_SEGMENT("rows", false),
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index f954eff0e1..087f8c2cd2 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -1574,10 +1574,9 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
               Collections.emptyList(), /*maxWaitTimeMs=*/5000).size();
 
           if (numPartitionGroups != numPartitions) {
-            _segmentLogger.warn(
+            _segmentLogger.info(
                 "Number of stream partitions: {} does not match number of 
partitions in the partition config: {}, "
                     + "using number of stream " + "partitions", 
numPartitionGroups, numPartitions);
-            _serverMetrics.addMeteredTableValue(_tableNameWithType, 
ServerMeter.REALTIME_PARTITION_MISMATCH, 1);
             numPartitions = numPartitionGroups;
           }
         } catch (Exception e) {
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
index a99b3c75f4..7adeae3d7b 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java
@@ -138,6 +138,7 @@ public class MutableSegmentImpl implements MutableSegment {
   private final RealtimeSegmentStatsHistory _statsHistory;
   private final String _partitionColumn;
   private final PartitionFunction _partitionFunction;
+  private final int _mainPartitionId; // partition id designated for this 
consuming segment
   private final boolean _nullHandlingEnabled;
 
   private final Map<String, IndexContainer> _indexContainerMap = new 
HashMap<>();
@@ -211,6 +212,7 @@ public class MutableSegmentImpl implements MutableSegment {
     _statsHistory = config.getStatsHistory();
     _partitionColumn = config.getPartitionColumn();
     _partitionFunction = config.getPartitionFunction();
+    _mainPartitionId = config.getPartitionId();
     _nullHandlingEnabled = config.isNullHandlingEnabled();
 
     Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs();
@@ -290,10 +292,10 @@ public class MutableSegmentImpl implements MutableSegment 
{
 
         // NOTE: Use a concurrent set because the partitions can be updated 
when the partition of the ingested record
         //       does not match the stream partition. This could happen when 
stream partition changes, or the records
-        //       are not properly partitioned from the stream. Log an warning 
and emit a metric if it happens, then add
+        //       are not properly partitioned from the stream. Log a warning 
and emit a metric if it happens, then add
         //       the new partition into this set.
         partitions = ConcurrentHashMap.newKeySet();
-        partitions.add(config.getPartitionId());
+        partitions.add(_mainPartitionId);
       }
 
       // TODO (mutable-index-spi): The comment above was here, but no check 
was done.
@@ -666,9 +668,13 @@ public class MutableSegmentImpl implements MutableSegment {
         if (column.equals(_partitionColumn)) {
           Object valueToPartition = (dataType == BYTES) ? new 
ByteArray((byte[]) value) : value;
           int partition = _partitionFunction.getPartition(valueToPartition);
-          if (indexContainer._partitions.add(partition)) {
-            _logger.warn("Found new partition: {} from partition column: {}, 
value: {}", partition, column,
-                valueToPartition);
+          if (partition != _mainPartitionId) {
+            if (indexContainer._partitions.add(partition)) {
+              // for every partition other than mainPartitionId, log a warning 
once
+              _logger.warn("Found new partition: {} from partition column: {}, 
value: {}", partition, column,
+                  valueToPartition);
+            }
+            // always emit a metric when a partition other than 
mainPartitionId is detected
             if (_serverMetrics != null) {
               _serverMetrics.addMeteredTableValue(_realtimeTableName, 
ServerMeter.REALTIME_PARTITION_MISMATCH, 1);
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to