This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch hotfix-minmax in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/hotfix-minmax by this push: new 801af45 Lazy initialization of ForwardIndexReaderContext. (#5781) 801af45 is described below commit 801af45494831c507476819f2f0061d676b2d64c Author: Mayank Shrivastava <maya...@apache.org> AuthorDate: Fri Jul 31 09:08:57 2020 -0700 Lazy initialization of ForwardIndexReaderContext. (#5781) Reader context is currently getting eager-initialized during operator initialization. In certain cases this leads to OOM for direct memory as follows: - Consider a case where we have hundreds of segments, but the rows of interest typically are within a much smaller subset of segments. - For cases with large bytes blobs (theta-sketches), the ForwardIndexReaderContext can initialize chunks (from direct-memory) of large size (numRowsPerChunk * lengthOfLargestEntry). This can run into several tens of MBs for theta-sketches. - Under such case, even though the rows of interest are in a small set of segments, the ForwardIndexReader initializes huge chunks for hundreds of segments, which is wasteful. This PR fixes the problem with lazy initialization of the ForwardIndexReaderContext, so that it only gets initialized for segments that have the data. Existing tests are expected to provide coverage for this code change. --- .../org/apache/pinot/core/common/DataFetcher.java | 94 ++++++++++++---------- 1 file changed, 50 insertions(+), 44 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java index 0e447ed..0a9013e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java @@ -253,49 +253,55 @@ public class DataFetcher { */ private class ColumnValueReader implements Closeable { final ForwardIndexReader _reader; - final ForwardIndexReaderContext _readerContext; final Dictionary _dictionary; + ForwardIndexReaderContext _readerContext = null; ColumnValueReader(ForwardIndexReader reader, @Nullable Dictionary dictionary) { _reader = reader; - _readerContext = reader.createContext(); _dictionary = dictionary; } + private ForwardIndexReaderContext getReaderContext() { + if (_readerContext == null) { + _readerContext = _reader.createContext(); + } + return _readerContext; + } + void readDictIds(int[] docIds, int length, int[] dictIdBuffer) { - _reader.readDictIds(docIds, length, dictIdBuffer, _readerContext); + _reader.readDictIds(docIds, length, dictIdBuffer, getReaderContext()); } void readIntValues(int[] docIds, int length, int[] valueBuffer) { if (_dictionary != null) { int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get(); - _reader.readDictIds(docIds, length, dictIdBuffer, _readerContext); + _reader.readDictIds(docIds, length, dictIdBuffer, getReaderContext()); _dictionary.readIntValues(dictIdBuffer, length, valueBuffer); } else { switch (_reader.getValueType()) { case INT: for (int i = 0; i < length; i++) { - valueBuffer[i] = _reader.getInt(docIds[i], _readerContext); + valueBuffer[i] = _reader.getInt(docIds[i], getReaderContext()); } break; case LONG: for (int i = 0; i < length; i++) { - valueBuffer[i] = (int) _reader.getLong(docIds[i], _readerContext); + valueBuffer[i] = (int) _reader.getLong(docIds[i], getReaderContext()); } break; case FLOAT: for (int i = 0; i < length; i++) { - valueBuffer[i] = (int) _reader.getFloat(docIds[i], _readerContext); + valueBuffer[i] = (int) _reader.getFloat(docIds[i], getReaderContext()); } break; case DOUBLE: for (int i = 0; i < length; i++) { - valueBuffer[i] = (int) _reader.getDouble(docIds[i], _readerContext); + valueBuffer[i] = (int) _reader.getDouble(docIds[i], getReaderContext()); } break; case STRING: for (int i = 0; i < length; i++) { - valueBuffer[i] = Integer.parseInt(_reader.getString(docIds[i], _readerContext)); + valueBuffer[i] = Integer.parseInt(_reader.getString(docIds[i], getReaderContext())); } break; default: @@ -307,33 +313,33 @@ public class DataFetcher { void readLongValues(int[] docIds, int length, long[] valueBuffer) { if (_dictionary != null) { int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get(); - _reader.readDictIds(docIds, length, dictIdBuffer, _readerContext); + _reader.readDictIds(docIds, length, dictIdBuffer, getReaderContext()); _dictionary.readLongValues(dictIdBuffer, length, valueBuffer); } else { switch (_reader.getValueType()) { case INT: for (int i = 0; i < length; i++) { - valueBuffer[i] = _reader.getInt(docIds[i], _readerContext); + valueBuffer[i] = _reader.getInt(docIds[i], getReaderContext()); } break; case LONG: for (int i = 0; i < length; i++) { - valueBuffer[i] = _reader.getLong(docIds[i], _readerContext); + valueBuffer[i] = _reader.getLong(docIds[i], getReaderContext()); } break; case FLOAT: for (int i = 0; i < length; i++) { - valueBuffer[i] = (long) _reader.getFloat(docIds[i], _readerContext); + valueBuffer[i] = (long) _reader.getFloat(docIds[i], getReaderContext()); } break; case DOUBLE: for (int i = 0; i < length; i++) { - valueBuffer[i] = (long) _reader.getDouble(docIds[i], _readerContext); + valueBuffer[i] = (long) _reader.getDouble(docIds[i], getReaderContext()); } break; case STRING: for (int i = 0; i < length; i++) { - valueBuffer[i] = Long.parseLong(_reader.getString(docIds[i], _readerContext)); + valueBuffer[i] = Long.parseLong(_reader.getString(docIds[i], getReaderContext())); } break; default: @@ -345,33 +351,33 @@ public class DataFetcher { void readFloatValues(int[] docIds, int length, float[] valueBuffer) { if (_dictionary != null) { int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get(); - _reader.readDictIds(docIds, length, dictIdBuffer, _readerContext); + _reader.readDictIds(docIds, length, dictIdBuffer, getReaderContext()); _dictionary.readFloatValues(dictIdBuffer, length, valueBuffer); } else { switch (_reader.getValueType()) { case INT: for (int i = 0; i < length; i++) { - valueBuffer[i] = _reader.getInt(docIds[i], _readerContext); + valueBuffer[i] = _reader.getInt(docIds[i], getReaderContext()); } break; case LONG: for (int i = 0; i < length; i++) { - valueBuffer[i] = _reader.getLong(docIds[i], _readerContext); + valueBuffer[i] = _reader.getLong(docIds[i], getReaderContext()); } break; case FLOAT: for (int i = 0; i < length; i++) { - valueBuffer[i] = _reader.getFloat(docIds[i], _readerContext); + valueBuffer[i] = _reader.getFloat(docIds[i], getReaderContext()); } break; case DOUBLE: for (int i = 0; i < length; i++) { - valueBuffer[i] = (float) _reader.getDouble(docIds[i], _readerContext); + valueBuffer[i] = (float) _reader.getDouble(docIds[i], getReaderContext()); } break; case STRING: for (int i = 0; i < length; i++) { - valueBuffer[i] = Float.parseFloat(_reader.getString(docIds[i], _readerContext)); + valueBuffer[i] = Float.parseFloat(_reader.getString(docIds[i], getReaderContext())); } break; default: @@ -383,33 +389,33 @@ public class DataFetcher { void readDoubleValues(int[] docIds, int length, double[] valueBuffer) { if (_dictionary != null) { int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get(); - _reader.readDictIds(docIds, length, dictIdBuffer, _readerContext); + _reader.readDictIds(docIds, length, dictIdBuffer, getReaderContext()); _dictionary.readDoubleValues(dictIdBuffer, length, valueBuffer); } else { switch (_reader.getValueType()) { case INT: for (int i = 0; i < length; i++) { - valueBuffer[i] = _reader.getInt(docIds[i], _readerContext); + valueBuffer[i] = _reader.getInt(docIds[i], getReaderContext()); } break; case LONG: for (int i = 0; i < length; i++) { - valueBuffer[i] = _reader.getLong(docIds[i], _readerContext); + valueBuffer[i] = _reader.getLong(docIds[i], getReaderContext()); } break; case FLOAT: for (int i = 0; i < length; i++) { - valueBuffer[i] = _reader.getFloat(docIds[i], _readerContext); + valueBuffer[i] = _reader.getFloat(docIds[i], getReaderContext()); } break; case DOUBLE: for (int i = 0; i < length; i++) { - valueBuffer[i] = _reader.getDouble(docIds[i], _readerContext); + valueBuffer[i] = _reader.getDouble(docIds[i], getReaderContext()); } break; case STRING: for (int i = 0; i < length; i++) { - valueBuffer[i] = Double.parseDouble(_reader.getString(docIds[i], _readerContext)); + valueBuffer[i] = Double.parseDouble(_reader.getString(docIds[i], getReaderContext())); } break; default: @@ -421,38 +427,38 @@ public class DataFetcher { void readStringValues(int[] docIds, int length, String[] valueBuffer) { if (_dictionary != null) { int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get(); - _reader.readDictIds(docIds, length, dictIdBuffer, _readerContext); + _reader.readDictIds(docIds, length, dictIdBuffer, getReaderContext()); _dictionary.readStringValues(dictIdBuffer, length, valueBuffer); } else { switch (_reader.getValueType()) { case INT: for (int i = 0; i < length; i++) { - valueBuffer[i] = Integer.toString(_reader.getInt(docIds[i], _readerContext)); + valueBuffer[i] = Integer.toString(_reader.getInt(docIds[i], getReaderContext())); } break; case LONG: for (int i = 0; i < length; i++) { - valueBuffer[i] = Long.toString(_reader.getLong(docIds[i], _readerContext)); + valueBuffer[i] = Long.toString(_reader.getLong(docIds[i], getReaderContext())); } break; case FLOAT: for (int i = 0; i < length; i++) { - valueBuffer[i] = Float.toString(_reader.getFloat(docIds[i], _readerContext)); + valueBuffer[i] = Float.toString(_reader.getFloat(docIds[i], getReaderContext())); } break; case DOUBLE: for (int i = 0; i < length; i++) { - valueBuffer[i] = Double.toString(_reader.getDouble(docIds[i], _readerContext)); + valueBuffer[i] = Double.toString(_reader.getDouble(docIds[i], getReaderContext())); } break; case STRING: for (int i = 0; i < length; i++) { - valueBuffer[i] = _reader.getString(docIds[i], _readerContext); + valueBuffer[i] = _reader.getString(docIds[i], getReaderContext()); } break; case BYTES: for (int i = 0; i < length; i++) { - valueBuffer[i] = BytesUtils.toHexString(_reader.getBytes(docIds[i], _readerContext)); + valueBuffer[i] = BytesUtils.toHexString(_reader.getBytes(docIds[i], getReaderContext())); } break; default: @@ -464,18 +470,18 @@ public class DataFetcher { void readBytesValues(int[] docIds, int length, byte[][] valueBuffer) { if (_dictionary != null) { int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get(); - _reader.readDictIds(docIds, length, dictIdBuffer, _readerContext); + _reader.readDictIds(docIds, length, dictIdBuffer, getReaderContext()); _dictionary.readBytesValues(dictIdBuffer, length, valueBuffer); } else { switch (_reader.getValueType()) { case STRING: for (int i = 0; i < length; i++) { - valueBuffer[i] = BytesUtils.toBytes(_reader.getString(docIds[i], _readerContext)); + valueBuffer[i] = BytesUtils.toBytes(_reader.getString(docIds[i], getReaderContext())); } break; case BYTES: for (int i = 0; i < length; i++) { - valueBuffer[i] = _reader.getBytes(docIds[i], _readerContext); + valueBuffer[i] = _reader.getBytes(docIds[i], getReaderContext()); } break; default: @@ -486,14 +492,14 @@ public class DataFetcher { void readDictIdsMV(int[] docIds, int length, int[][] dictIdsBuffer) { for (int i = 0; i < length; i++) { - int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, _readerContext); + int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext()); dictIdsBuffer[i] = Arrays.copyOfRange(_reusableMVDictIds, 0, numValues); } } void readIntValuesMV(int[] docIds, int length, int[][] valuesBuffer) { for (int i = 0; i < length; i++) { - int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, _readerContext); + int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext()); int[] values = new int[numValues]; _dictionary.readIntValues(_reusableMVDictIds, numValues, values); valuesBuffer[i] = values; @@ -502,7 +508,7 @@ public class DataFetcher { void readLongValuesMV(int[] docIds, int length, long[][] valuesBuffer) { for (int i = 0; i < length; i++) { - int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, _readerContext); + int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext()); long[] values = new long[numValues]; _dictionary.readLongValues(_reusableMVDictIds, numValues, values); valuesBuffer[i] = values; @@ -511,7 +517,7 @@ public class DataFetcher { void readFloatValuesMV(int[] docIds, int length, float[][] valuesBuffer) { for (int i = 0; i < length; i++) { - int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, _readerContext); + int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext()); float[] values = new float[numValues]; _dictionary.readFloatValues(_reusableMVDictIds, numValues, values); valuesBuffer[i] = values; @@ -520,7 +526,7 @@ public class DataFetcher { void readDoubleValuesMV(int[] docIds, int length, double[][] valuesBuffer) { for (int i = 0; i < length; i++) { - int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, _readerContext); + int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext()); double[] values = new double[numValues]; _dictionary.readDoubleValues(_reusableMVDictIds, numValues, values); valuesBuffer[i] = values; @@ -529,7 +535,7 @@ public class DataFetcher { void readStringValuesMV(int[] docIds, int length, String[][] valuesBuffer) { for (int i = 0; i < length; i++) { - int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, _readerContext); + int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext()); String[] values = new String[numValues]; _dictionary.readStringValues(_reusableMVDictIds, numValues, values); valuesBuffer[i] = values; @@ -538,7 +544,7 @@ public class DataFetcher { public void readNumValuesMV(int[] docIds, int length, int[] numValuesBuffer) { for (int i = 0; i < length; i++) { - numValuesBuffer[i] = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, _readerContext); + numValuesBuffer[i] = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org