This is an automated email from the ASF dual-hosted git repository. sajjad pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 8feec414cb Fix issue with realtime partition mismatch metric (#11871) 8feec414cb is described below commit 8feec414cbe10d97573740b73cd61715931f05da Author: Sajjad Moradi <moradi.saj...@gmail.com> AuthorDate: Thu Oct 26 18:21:43 2023 -0700 Fix issue with realtime partition mismatch metric (#11871) --- .../org/apache/pinot/common/metrics/ServerMeter.java | 1 + .../manager/realtime/RealtimeSegmentDataManager.java | 3 +-- .../local/indexsegment/mutable/MutableSegmentImpl.java | 16 +++++++++++----- 3 files changed, 13 insertions(+), 7 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java index f77194d37b..5b995d6e6a 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMeter.java @@ -41,6 +41,7 @@ public enum ServerMeter implements AbstractMetrics.Meter { REALTIME_CONSUMPTION_EXCEPTIONS("exceptions", true), REALTIME_OFFSET_COMMITS("commits", true), REALTIME_OFFSET_COMMIT_EXCEPTIONS("exceptions", false), + // number of times partition of a record did not match the partition of the stream REALTIME_PARTITION_MISMATCH("mismatch", false), REALTIME_DEDUP_DROPPED("rows", false), UPSERT_KEYS_IN_WRONG_SEGMENT("rows", false), diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index f954eff0e1..087f8c2cd2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -1574,10 +1574,9 @@ public class RealtimeSegmentDataManager extends SegmentDataManager { Collections.emptyList(), /*maxWaitTimeMs=*/5000).size(); if (numPartitionGroups != numPartitions) { - _segmentLogger.warn( + _segmentLogger.info( "Number of stream partitions: {} does not match number of partitions in the partition config: {}, " + "using number of stream " + "partitions", numPartitionGroups, numPartitions); - _serverMetrics.addMeteredTableValue(_tableNameWithType, ServerMeter.REALTIME_PARTITION_MISMATCH, 1); numPartitions = numPartitionGroups; } } catch (Exception e) { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index a99b3c75f4..7adeae3d7b 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -138,6 +138,7 @@ public class MutableSegmentImpl implements MutableSegment { private final RealtimeSegmentStatsHistory _statsHistory; private final String _partitionColumn; private final PartitionFunction _partitionFunction; + private final int _mainPartitionId; // partition id designated for this consuming segment private final boolean _nullHandlingEnabled; private final Map<String, IndexContainer> _indexContainerMap = new HashMap<>(); @@ -211,6 +212,7 @@ public class MutableSegmentImpl implements MutableSegment { _statsHistory = config.getStatsHistory(); _partitionColumn = config.getPartitionColumn(); _partitionFunction = config.getPartitionFunction(); + _mainPartitionId = config.getPartitionId(); _nullHandlingEnabled = config.isNullHandlingEnabled(); Collection<FieldSpec> allFieldSpecs = _schema.getAllFieldSpecs(); @@ -290,10 +292,10 @@ public class MutableSegmentImpl implements MutableSegment { // NOTE: Use a concurrent set because the partitions can be updated when the partition of the ingested record // does not match the stream partition. This could happen when stream partition changes, or the records - // are not properly partitioned from the stream. Log an warning and emit a metric if it happens, then add + // are not properly partitioned from the stream. Log a warning and emit a metric if it happens, then add // the new partition into this set. partitions = ConcurrentHashMap.newKeySet(); - partitions.add(config.getPartitionId()); + partitions.add(_mainPartitionId); } // TODO (mutable-index-spi): The comment above was here, but no check was done. @@ -666,9 +668,13 @@ public class MutableSegmentImpl implements MutableSegment { if (column.equals(_partitionColumn)) { Object valueToPartition = (dataType == BYTES) ? new ByteArray((byte[]) value) : value; int partition = _partitionFunction.getPartition(valueToPartition); - if (indexContainer._partitions.add(partition)) { - _logger.warn("Found new partition: {} from partition column: {}, value: {}", partition, column, - valueToPartition); + if (partition != _mainPartitionId) { + if (indexContainer._partitions.add(partition)) { + // for every partition other than mainPartitionId, log a warning once + _logger.warn("Found new partition: {} from partition column: {}, value: {}", partition, column, + valueToPartition); + } + // always emit a metric when a partition other than mainPartitionId is detected if (_serverMetrics != null) { _serverMetrics.addMeteredTableValue(_realtimeTableName, ServerMeter.REALTIME_PARTITION_MISMATCH, 1); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org