This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch mark-resize-as-NumGroupsLimitReached in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 2c749dfc5945c9b27b3acdc3e6b50026c61a06a0 Author: jlli_LinkedIn <j...@linkedin.com> AuthorDate: Fri May 24 17:35:53 2024 -0700 Mark resize as NumGroupsLimitReached in broker reduce phase --- .../main/java/org/apache/pinot/core/data/table/IndexedTable.java | 7 ++++++- .../main/java/org/apache/pinot/core/data/table/TableResizer.java | 5 +++-- .../apache/pinot/core/query/reduce/GroupByDataTableReducer.java | 3 +++ 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java index 012fdc1170..5dec9a78f3 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java @@ -48,6 +48,7 @@ public abstract class IndexedTable extends BaseTable { protected Collection<Record> _topRecords; private int _numResizes; + private boolean _isResized = false; private long _resizeTimeNs; /** @@ -138,7 +139,7 @@ public abstract class IndexedTable extends BaseTable { protected void resize() { assert _hasOrderBy; long startTimeNs = System.nanoTime(); - _tableResizer.resizeRecordsMap(_lookupMap, _trimSize); + _isResized |= _tableResizer.resizeRecordsMap(_lookupMap, _trimSize); long resizeTimeNs = System.nanoTime() - startTimeNs; _numResizes++; _resizeTimeNs += resizeTimeNs; @@ -186,6 +187,10 @@ public abstract class IndexedTable extends BaseTable { return _numResizes; } + public boolean isResized() { + return _isResized; + } + public long getResizeTimeMs() { return TimeUnit.NANOSECONDS.toMillis(_resizeTimeNs); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java index 4299e5665e..b9acabab36 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java @@ -174,10 +174,10 @@ public class TableResizer { /** * Resizes the recordsMap to the given size. */ - public void resizeRecordsMap(Map<Key, Record> recordsMap, int size) { + public boolean resizeRecordsMap(Map<Key, Record> recordsMap, int size) { int numRecordsToEvict = recordsMap.size() - size; if (numRecordsToEvict <= 0) { - return; + return false; } if (numRecordsToEvict <= size) { // Fewer records to evict than retain, make a heap of records to evict @@ -195,6 +195,7 @@ public class TableResizer { recordsMap.put(recordToRetain._key, recordToRetain._record); } } + return true; } /** diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java index c0a109f7e4..4e76b9c4f1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java @@ -148,6 +148,9 @@ public class GroupByDataTableReducer implements DataTableReducer { brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes()); brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs()); } + // Take resizing in broker as one of the factors for NumGroupsLimitReached + brokerResponseNative.setNumGroupsLimitReached( + brokerResponseNative.isNumGroupsLimitReached() || indexedTable.isResized()); int numRecords = indexedTable.size(); Iterator<Record> sortedIterator = indexedTable.iterator(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org