This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 4f30ab2b89 Manual tracing (#8485) 4f30ab2b89 is described below commit 4f30ab2b89368610b0482252a7e3ca6a0665c82e Author: Richard Startin <rich...@startree.ai> AuthorDate: Wed Apr 13 01:34:06 2022 +0100 Manual tracing (#8485) * trace projections and transforms * hooks for combine operators and most filters * add num children for operators/tasks with more than one dependency * rework datatype tracking * record forward index reader data type * revert to stackless builtin tracer because stack maintenance adds no value * trace segment pruning * register trace earlier so segment pruning can be traced * refine JSON match tracing * don't record Lucene FST set size * ensure trace gets unregistered * move segment pruner scope * move JsonMatch tracing * review comments * remove useless scope --- .../org/apache/pinot/core/common/DataFetcher.java | 30 +++++++++ .../pinot/core/operator/ProjectionOperator.java | 2 + .../core/operator/combine/BaseCombineOperator.java | 3 +- .../operator/docvalsets/ProjectionBlockValSet.java | 77 ++++++++++++++++++---- .../operator/docvalsets/TransformBlockValSet.java | 77 ++++++++++++++++++---- .../core/operator/filter/AndFilterOperator.java | 2 + .../operator/filter/BitmapBasedFilterOperator.java | 9 +++ .../operator/filter/CombinedFilterOperator.java | 2 + .../operator/filter/JsonMatchFilterOperator.java | 17 ++++- .../core/operator/filter/OrFilterOperator.java | 2 + .../filter/RangeIndexBasedFilterOperator.java | 14 ++++ .../operator/filter/TextMatchFilterOperator.java | 17 ++++- .../core/operator/transform/TransformOperator.java | 2 + .../query/executor/ServerQueryExecutorV1Impl.java | 21 ++++-- .../core/query/pruner/SegmentPrunerService.java | 12 +++- .../pinot/core/util/trace/BuiltInTracer.java | 30 ++------- .../pinot/spi/trace/InvocationRecording.java | 32 +++++---- 17 files changed, 274 insertions(+), 75 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java index 1bc593fbdb..e366edc5a6 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/DataFetcher.java @@ -31,6 +31,8 @@ import org.apache.pinot.segment.spi.evaluator.TransformEvaluator; import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader; import org.apache.pinot.segment.spi.index.reader.ForwardIndexReaderContext; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.trace.Tracing; import org.apache.pinot.spi.utils.BytesUtils; @@ -396,6 +398,8 @@ public class DataFetcher { private class ColumnValueReader implements Closeable { final ForwardIndexReader _reader; final Dictionary _dictionary; + final FieldSpec.DataType _dataType; + final boolean _singleValue; boolean _readerContextCreated; ForwardIndexReaderContext _readerContext; @@ -403,6 +407,8 @@ public class DataFetcher { ColumnValueReader(ForwardIndexReader reader, @Nullable Dictionary dictionary) { _reader = reader; _dictionary = dictionary; + _dataType = reader.getValueType(); + _singleValue = reader.isSingleValue(); } private ForwardIndexReaderContext getReaderContext() { @@ -415,10 +421,12 @@ public class DataFetcher { } void readDictIds(int[] docIds, int length, int[] dictIdBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); _reader.readDictIds(docIds, length, dictIdBuffer, getReaderContext()); } void readIntValues(int[] docIds, int length, int[] valueBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); ForwardIndexReaderContext readerContext = getReaderContext(); if (_dictionary != null) { int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get(); @@ -430,11 +438,13 @@ public class DataFetcher { } void readIntValues(TransformEvaluator evaluator, int[] docIds, int length, int[] valueBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(), valueBuffer); } void readLongValues(int[] docIds, int length, long[] valueBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); ForwardIndexReaderContext readerContext = getReaderContext(); if (_dictionary != null) { int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get(); @@ -446,11 +456,13 @@ public class DataFetcher { } void readLongValues(TransformEvaluator evaluator, int[] docIds, int length, long[] valueBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(), valueBuffer); } void readFloatValues(int[] docIds, int length, float[] valueBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); ForwardIndexReaderContext readerContext = getReaderContext(); if (_dictionary != null) { int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get(); @@ -462,11 +474,13 @@ public class DataFetcher { } void readFloatValues(TransformEvaluator evaluator, int[] docIds, int length, float[] valueBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(), valueBuffer); } void readDoubleValues(int[] docIds, int length, double[] valueBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); ForwardIndexReaderContext readerContext = getReaderContext(); if (_dictionary != null) { int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get(); @@ -478,11 +492,13 @@ public class DataFetcher { } void readDoubleValues(TransformEvaluator evaluator, int[] docIds, int length, double[] valueBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(), valueBuffer); } void readStringValues(int[] docIds, int length, String[] valueBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); ForwardIndexReaderContext readerContext = getReaderContext(); if (_dictionary != null) { int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get(); @@ -527,11 +543,13 @@ public class DataFetcher { } void readStringValues(TransformEvaluator evaluator, int[] docIds, int length, String[] valueBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(), valueBuffer); } void readBytesValues(int[] docIds, int length, byte[][] valueBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); ForwardIndexReaderContext readerContext = getReaderContext(); if (_dictionary != null) { int[] dictIdBuffer = THREAD_LOCAL_DICT_IDS.get(); @@ -556,6 +574,7 @@ public class DataFetcher { } void readDictIdsMV(int[] docIds, int length, int[][] dictIdsBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); for (int i = 0; i < length; i++) { int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext()); dictIdsBuffer[i] = Arrays.copyOfRange(_reusableMVDictIds, 0, numValues); @@ -563,6 +582,7 @@ public class DataFetcher { } void readIntValuesMV(int[] docIds, int length, int[][] valuesBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); assert _dictionary != null; for (int i = 0; i < length; i++) { int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext()); @@ -573,11 +593,13 @@ public class DataFetcher { } void readIntValuesMV(TransformEvaluator evaluator, int[] docIds, int length, int[][] valuesBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(), valuesBuffer); } void readLongValuesMV(int[] docIds, int length, long[][] valuesBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); assert _dictionary != null; for (int i = 0; i < length; i++) { int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext()); @@ -588,11 +610,13 @@ public class DataFetcher { } void readLongValuesMV(TransformEvaluator evaluator, int[] docIds, int length, long[][] valuesBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(), valuesBuffer); } void readFloatValuesMV(int[] docIds, int length, float[][] valuesBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); assert _dictionary != null; for (int i = 0; i < length; i++) { int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext()); @@ -603,11 +627,13 @@ public class DataFetcher { } void readFloatValuesMV(TransformEvaluator evaluator, int[] docIds, int length, float[][] valuesBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(), valuesBuffer); } void readDoubleValuesMV(int[] docIds, int length, double[][] valuesBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); assert _dictionary != null; for (int i = 0; i < length; i++) { int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext()); @@ -618,11 +644,13 @@ public class DataFetcher { } void readDoubleValuesMV(TransformEvaluator evaluator, int[] docIds, int length, double[][] valuesBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(), valuesBuffer); } void readStringValuesMV(int[] docIds, int length, String[][] valuesBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); assert _dictionary != null; for (int i = 0; i < length; i++) { int numValues = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext()); @@ -633,11 +661,13 @@ public class DataFetcher { } void readStringValuesMV(TransformEvaluator evaluator, int[] docIds, int length, String[][] valuesBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); evaluator.evaluateBlock(docIds, length, _reader, getReaderContext(), _dictionary, getSVDictIdsBuffer(), valuesBuffer); } public void readNumValuesMV(int[] docIds, int length, int[] numValuesBuffer) { + Tracing.activeRecording().setInputDataType(_dataType, _singleValue); for (int i = 0; i < length; i++) { numValuesBuffer[i] = _reader.getDictIdMV(docIds[i], _reusableMVDictIds, getReaderContext()); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java index 573748abac..b5438858ca 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/ProjectionOperator.java @@ -28,6 +28,7 @@ import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.DocIdSetBlock; import org.apache.pinot.core.operator.blocks.ProjectionBlock; import org.apache.pinot.segment.spi.datasource.DataSource; +import org.apache.pinot.spi.trace.Tracing; public class ProjectionOperator extends BaseOperator<ProjectionBlock> { @@ -62,6 +63,7 @@ public class ProjectionOperator extends BaseOperator<ProjectionBlock> { if (docIdSetBlock == null) { return null; } else { + Tracing.activeRecording().setNumChildren(_dataSourceMap.size()); _dataBlockCache.initNewBlock(docIdSetBlock.getDocIdSet(), docIdSetBlock.getSearchableLength()); return new ProjectionBlock(_dataSourceMap, _dataBlockCache); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index 2a3ac72699..44fdd7cdbf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -37,6 +37,7 @@ import org.apache.pinot.core.query.request.context.ThreadTimer; import org.apache.pinot.core.query.scheduler.resources.ResourceManager; import org.apache.pinot.core.util.trace.TraceRunnable; import org.apache.pinot.spi.exception.EarlyTerminationException; +import org.apache.pinot.spi.trace.Tracing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +82,7 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul // deleted/refreshed, the segment will be released after the main thread returns, which would lead to undefined // behavior (even JVM crash) when processing queries against it. Phaser phaser = new Phaser(1); - + Tracing.activeRecording().setNumTasks(_numTasks); for (int i = 0; i < _numTasks; i++) { int taskIndex = i; _futures[i] = _executorService.submit(new TraceRunnable() { diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java index a70ebfe047..4963b779ad 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/ProjectionBlockValSet.java @@ -25,6 +25,9 @@ import org.apache.pinot.core.operator.ProjectionOperator; import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.trace.InvocationRecording; +import org.apache.pinot.spi.trace.InvocationScope; +import org.apache.pinot.spi.trace.Tracing; /** @@ -66,71 +69,119 @@ public class ProjectionBlockValSet implements BlockValSet { @Override public int[] getDictionaryIdsSV() { - return _dataBlockCache.getDictIdsForSVColumn(_column); + try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) { + recordReadValues(scope, DataType.INT, true); + return _dataBlockCache.getDictIdsForSVColumn(_column); + } } @Override public int[] getIntValuesSV() { - return _dataBlockCache.getIntValuesForSVColumn(_column); + try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) { + recordReadValues(scope, DataType.INT, true); + return _dataBlockCache.getIntValuesForSVColumn(_column); + } } @Override public long[] getLongValuesSV() { - return _dataBlockCache.getLongValuesForSVColumn(_column); + try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) { + recordReadValues(scope, DataType.LONG, true); + return _dataBlockCache.getLongValuesForSVColumn(_column); + } } @Override public float[] getFloatValuesSV() { - return _dataBlockCache.getFloatValuesForSVColumn(_column); + try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) { + recordReadValues(scope, DataType.FLOAT, true); + return _dataBlockCache.getFloatValuesForSVColumn(_column); + } } @Override public double[] getDoubleValuesSV() { - return _dataBlockCache.getDoubleValuesForSVColumn(_column); + try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) { + recordReadValues(scope, DataType.DOUBLE, true); + return _dataBlockCache.getDoubleValuesForSVColumn(_column); + } } @Override public String[] getStringValuesSV() { - return _dataBlockCache.getStringValuesForSVColumn(_column); + try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) { + recordReadValues(scope, DataType.STRING, true); + return _dataBlockCache.getStringValuesForSVColumn(_column); + } } @Override public byte[][] getBytesValuesSV() { - return _dataBlockCache.getBytesValuesForSVColumn(_column); + try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) { + recordReadValues(scope, DataType.BYTES, true); + return _dataBlockCache.getBytesValuesForSVColumn(_column); + } } @Override public int[][] getDictionaryIdsMV() { - return _dataBlockCache.getDictIdsForMVColumn(_column); + try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) { + recordReadValues(scope, DataType.INT, false); + return _dataBlockCache.getDictIdsForMVColumn(_column); + } } @Override public int[][] getIntValuesMV() { - return _dataBlockCache.getIntValuesForMVColumn(_column); + try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) { + recordReadValues(scope, DataType.INT, false); + return _dataBlockCache.getIntValuesForMVColumn(_column); + } } @Override public long[][] getLongValuesMV() { - return _dataBlockCache.getLongValuesForMVColumn(_column); + try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) { + recordReadValues(scope, DataType.LONG, false); + return _dataBlockCache.getLongValuesForMVColumn(_column); + } } @Override public float[][] getFloatValuesMV() { - return _dataBlockCache.getFloatValuesForMVColumn(_column); + try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) { + recordReadValues(scope, DataType.FLOAT, false); + return _dataBlockCache.getFloatValuesForMVColumn(_column); + } } @Override public double[][] getDoubleValuesMV() { - return _dataBlockCache.getDoubleValuesForMVColumn(_column); + try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) { + recordReadValues(scope, DataType.DOUBLE, false); + return _dataBlockCache.getDoubleValuesForMVColumn(_column); + } } @Override public String[][] getStringValuesMV() { - return _dataBlockCache.getStringValuesForMVColumn(_column); + try (InvocationScope scope = Tracing.getTracer().createScope(ProjectionBlockValSet.class)) { + recordReadValues(scope, DataType.STRING, false); + return _dataBlockCache.getStringValuesForMVColumn(_column); + } } @Override public int[] getNumMVEntries() { return _dataBlockCache.getNumValuesForMVColumn(_column); } + + private void recordReadValues(InvocationRecording recording, DataType dataType, boolean singleValue) { + if (recording.isEnabled()) { + int numDocs = _dataBlockCache.getNumDocs(); + recording.setNumDocsScanned(numDocs); + recording.setColumnName(_column); + recording.setOutputDataType(dataType, singleValue); + } + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java index a664b256b5..9b71541d2a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/TransformBlockValSet.java @@ -26,6 +26,9 @@ import org.apache.pinot.core.operator.transform.function.TransformFunction; import org.apache.pinot.core.plan.DocIdSetPlanNode; import org.apache.pinot.segment.spi.index.reader.Dictionary; import org.apache.pinot.spi.data.FieldSpec.DataType; +import org.apache.pinot.spi.trace.InvocationRecording; +import org.apache.pinot.spi.trace.InvocationScope; +import org.apache.pinot.spi.trace.Tracing; /** @@ -62,67 +65,106 @@ public class TransformBlockValSet implements BlockValSet { @Override public int[] getDictionaryIdsSV() { - return _transformFunction.transformToDictIdsSV(_projectionBlock); + try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { + recordTransformValues(scope, DataType.INT, true); + return _transformFunction.transformToDictIdsSV(_projectionBlock); + } } @Override public int[] getIntValuesSV() { - return _transformFunction.transformToIntValuesSV(_projectionBlock); + try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { + recordTransformValues(scope, DataType.INT, true); + return _transformFunction.transformToIntValuesSV(_projectionBlock); + } } @Override public long[] getLongValuesSV() { - return _transformFunction.transformToLongValuesSV(_projectionBlock); + try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { + recordTransformValues(scope, DataType.LONG, true); + return _transformFunction.transformToLongValuesSV(_projectionBlock); + } } @Override public float[] getFloatValuesSV() { - return _transformFunction.transformToFloatValuesSV(_projectionBlock); + try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { + recordTransformValues(scope, DataType.FLOAT, true); + return _transformFunction.transformToFloatValuesSV(_projectionBlock); + } } @Override public double[] getDoubleValuesSV() { - return _transformFunction.transformToDoubleValuesSV(_projectionBlock); + try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { + recordTransformValues(scope, DataType.DOUBLE, true); + return _transformFunction.transformToDoubleValuesSV(_projectionBlock); + } } @Override public String[] getStringValuesSV() { - return _transformFunction.transformToStringValuesSV(_projectionBlock); + try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { + recordTransformValues(scope, DataType.STRING, true); + return _transformFunction.transformToStringValuesSV(_projectionBlock); + } } @Override public byte[][] getBytesValuesSV() { - return _transformFunction.transformToBytesValuesSV(_projectionBlock); + try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { + recordTransformValues(scope, DataType.BYTES, true); + return _transformFunction.transformToBytesValuesSV(_projectionBlock); + } } @Override public int[][] getDictionaryIdsMV() { - return _transformFunction.transformToDictIdsMV(_projectionBlock); + try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { + recordTransformValues(scope, DataType.INT, false); + return _transformFunction.transformToDictIdsMV(_projectionBlock); + } } @Override public int[][] getIntValuesMV() { - return _transformFunction.transformToIntValuesMV(_projectionBlock); + try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { + recordTransformValues(scope, DataType.INT, false); + return _transformFunction.transformToIntValuesMV(_projectionBlock); + } } @Override public long[][] getLongValuesMV() { - return _transformFunction.transformToLongValuesMV(_projectionBlock); + try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { + recordTransformValues(scope, DataType.LONG, false); + return _transformFunction.transformToLongValuesMV(_projectionBlock); + } } @Override public float[][] getFloatValuesMV() { - return _transformFunction.transformToFloatValuesMV(_projectionBlock); + try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { + recordTransformValues(scope, DataType.FLOAT, false); + return _transformFunction.transformToFloatValuesMV(_projectionBlock); + } } @Override public double[][] getDoubleValuesMV() { - return _transformFunction.transformToDoubleValuesMV(_projectionBlock); + try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { + recordTransformValues(scope, DataType.DOUBLE, false); + return _transformFunction.transformToDoubleValuesMV(_projectionBlock); + } } @Override public String[][] getStringValuesMV() { - return _transformFunction.transformToStringValuesMV(_projectionBlock); + try (InvocationScope scope = Tracing.getTracer().createScope(TransformBlockValSet.class)) { + recordTransformValues(scope, DataType.STRING, false); + return _transformFunction.transformToStringValuesMV(_projectionBlock); + } } @Override @@ -175,4 +217,13 @@ public class TransformBlockValSet implements BlockValSet { } } } + + private void recordTransformValues(InvocationRecording recording, DataType dataType, boolean singleValue) { + if (recording.isEnabled()) { + int numDocs = _projectionBlock.getNumDocs(); + recording.setNumDocsScanned(numDocs); + recording.setFunctionName(_transformFunction.getName()); + recording.setOutputDataType(dataType, singleValue); + } + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/AndFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/AndFilterOperator.java index dae8e1407c..e181fe5788 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/AndFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/AndFilterOperator.java @@ -24,6 +24,7 @@ import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.FilterBlock; import org.apache.pinot.core.operator.docidsets.AndDocIdSet; import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet; +import org.apache.pinot.spi.trace.Tracing; import org.roaringbitmap.buffer.BufferFastAggregation; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; @@ -40,6 +41,7 @@ public class AndFilterOperator extends BaseFilterOperator { @Override protected FilterBlock getNextBlock() { + Tracing.activeRecording().setNumChildren(_filterOperators.size()); List<FilterBlockDocIdSet> filterBlockDocIdSets = new ArrayList<>(_filterOperators.size()); for (BaseFilterOperator filterOperator : _filterOperators) { filterBlockDocIdSets.add(filterOperator.nextBlock().getBlockDocIdSet()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java index 4ce8410383..594307a964 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/BitmapBasedFilterOperator.java @@ -28,6 +28,9 @@ import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet; import org.apache.pinot.core.operator.filter.predicate.PredicateEvaluator; import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader; +import org.apache.pinot.spi.trace.FilterType; +import org.apache.pinot.spi.trace.InvocationRecording; +import org.apache.pinot.spi.trace.Tracing; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.roaringbitmap.buffer.MutableRoaringBitmap; @@ -97,6 +100,12 @@ public class BitmapBasedFilterOperator extends BaseFilterOperator { if (_exclusive) { docIds.flip(0L, _numDocs); } + InvocationRecording recording = Tracing.activeRecording(); + if (recording.isEnabled()) { + recording.setColumnName(_predicateEvaluator.getPredicate().getLhs().getIdentifier()); + recording.setNumDocsMatchingAfterFilter(docIds.getCardinality()); + recording.setFilter(FilterType.INDEX, String.valueOf(_predicateEvaluator.getPredicateType())); + } return new FilterBlock(new BitmapDocIdSet(docIds, _numDocs)); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java index 54c26dc5e2..e6b96da6e4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/CombinedFilterOperator.java @@ -24,6 +24,7 @@ import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.FilterBlock; import org.apache.pinot.core.operator.docidsets.AndDocIdSet; import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet; +import org.apache.pinot.spi.trace.Tracing; /** @@ -59,6 +60,7 @@ public class CombinedFilterOperator extends BaseFilterOperator { @Override protected FilterBlock getNextBlock() { + Tracing.activeRecording().setNumChildren(2); FilterBlockDocIdSet mainFilterDocIdSet = _mainFilterOperator.nextBlock().getNonScanFilterBLockDocIdSet(); FilterBlockDocIdSet subFilterDocIdSet = _subFilterOperator.nextBlock().getBlockDocIdSet(); return new FilterBlock(new AndDocIdSet(Arrays.asList(mainFilterDocIdSet, subFilterDocIdSet))); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JsonMatchFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JsonMatchFilterOperator.java index efac5de9e0..f5c67468d7 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JsonMatchFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/JsonMatchFilterOperator.java @@ -25,6 +25,10 @@ import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.FilterBlock; import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet; import org.apache.pinot.segment.spi.index.reader.JsonIndexReader; +import org.apache.pinot.spi.trace.FilterType; +import org.apache.pinot.spi.trace.InvocationRecording; +import org.apache.pinot.spi.trace.Tracing; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; /** @@ -47,7 +51,9 @@ public class JsonMatchFilterOperator extends BaseFilterOperator { @Override protected FilterBlock getNextBlock() { - return new FilterBlock(new BitmapDocIdSet(_jsonIndex.getMatchingDocIds(_predicate.getValue()), _numDocs)); + ImmutableRoaringBitmap bitmap = _jsonIndex.getMatchingDocIds(_predicate.getValue()); + record(bitmap); + return new FilterBlock(new BitmapDocIdSet(bitmap, _numDocs)); } @Override @@ -87,4 +93,13 @@ public class JsonMatchFilterOperator extends BaseFilterOperator { stringBuilder.append(",predicate:").append(_predicate.toString()); return stringBuilder.append(')').toString(); } + + private void record(ImmutableRoaringBitmap bitmap) { + InvocationRecording recording = Tracing.activeRecording(); + if (recording.isEnabled()) { + recording.setColumnName(_predicate.getLhs().getIdentifier()); + recording.setFilter(FilterType.INDEX, _predicate.getType().name()); + recording.setNumDocsMatchingAfterFilter(bitmap.getCardinality()); + } + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/OrFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/OrFilterOperator.java index 2e32d8fe50..5904b7c963 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/OrFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/OrFilterOperator.java @@ -24,6 +24,7 @@ import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.FilterBlock; import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet; import org.apache.pinot.core.operator.docidsets.OrDocIdSet; +import org.apache.pinot.spi.trace.Tracing; import org.roaringbitmap.buffer.BufferFastAggregation; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; @@ -41,6 +42,7 @@ public class OrFilterOperator extends BaseFilterOperator { @Override protected FilterBlock getNextBlock() { + Tracing.activeRecording().setNumChildren(_filterOperators.size()); List<FilterBlockDocIdSet> filterBlockDocIdSets = new ArrayList<>(_filterOperators.size()); for (BaseFilterOperator filterOperator : _filterOperators) { filterBlockDocIdSets.add(filterOperator.nextBlock().getBlockDocIdSet()); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java index 032cd7a3f2..9f1ddb6c48 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/RangeIndexBasedFilterOperator.java @@ -33,6 +33,9 @@ import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFa import org.apache.pinot.core.operator.filter.predicate.RangePredicateEvaluatorFactory.SortedDictionaryBasedRangePredicateEvaluator; import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.index.reader.RangeIndexReader; +import org.apache.pinot.spi.trace.FilterType; +import org.apache.pinot.spi.trace.InvocationRecording; +import org.apache.pinot.spi.trace.Tracing; import org.roaringbitmap.buffer.ImmutableRoaringBitmap; import org.roaringbitmap.buffer.MutableRoaringBitmap; @@ -125,6 +128,7 @@ public class RangeIndexBasedFilterOperator extends BaseFilterOperator { } }); } else { + recordFilter(matches); return new FilterBlock(new BitmapDocIdSet(matches == null ? new MutableRoaringBitmap() : matches, _numDocs)); } } @@ -146,4 +150,14 @@ public class RangeIndexBasedFilterOperator extends BaseFilterOperator { stringBuilder.append(",predicate:").append(_rangePredicateEvaluator.getPredicate().toString()); return stringBuilder.append(')').toString(); } + + private void recordFilter(ImmutableRoaringBitmap bitmap) { + InvocationRecording recording = Tracing.activeRecording(); + if (recording.isEnabled()) { + recording.setNumDocsMatchingAfterFilter(bitmap == null ? 0 : bitmap.getCardinality()); + recording.setColumnName(_dataSource.getDataSourceMetadata().getFieldSpec().getName()); + recording.setFilter(FilterType.INDEX, _rangePredicateEvaluator.getPredicateType().name()); + recording.setNumDocsScanned(_numDocs); + } + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/TextMatchFilterOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/TextMatchFilterOperator.java index 45b74bea65..641f3ce63b 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/TextMatchFilterOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/filter/TextMatchFilterOperator.java @@ -25,6 +25,10 @@ import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.blocks.FilterBlock; import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet; import org.apache.pinot.segment.spi.index.reader.TextIndexReader; +import org.apache.pinot.spi.trace.FilterType; +import org.apache.pinot.spi.trace.InvocationRecording; +import org.apache.pinot.spi.trace.Tracing; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; /** @@ -67,7 +71,9 @@ public class TextMatchFilterOperator extends BaseFilterOperator { @Override public BitmapCollection getBitmaps() { - return new BitmapCollection(_numDocs, false, _textIndexReader.getDocIds(_predicate.getValue())); + ImmutableRoaringBitmap bitmap = _textIndexReader.getDocIds(_predicate.getValue()); + record(bitmap); + return new BitmapCollection(_numDocs, false, bitmap); } @Override @@ -87,4 +93,13 @@ public class TextMatchFilterOperator extends BaseFilterOperator { stringBuilder.append(",predicate:").append(_predicate.toString()); return stringBuilder.append(')').toString(); } + + private void record(ImmutableRoaringBitmap matches) { + InvocationRecording recording = Tracing.activeRecording(); + if (recording.isEnabled()) { + recording.setNumDocsMatchingAfterFilter(matches.getCardinality()); + recording.setColumnName(_predicate.getLhs().getIdentifier()); + recording.setFilter(FilterType.INDEX, "LUCENE_TEXT"); + } + } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java index d3b752e061..ded21e302e 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java @@ -38,6 +38,7 @@ import org.apache.pinot.core.operator.transform.function.TransformFunctionFactor import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.datasource.DataSource; import org.apache.pinot.segment.spi.index.reader.Dictionary; +import org.apache.pinot.spi.trace.Tracing; /** @@ -111,6 +112,7 @@ public class TransformOperator extends BaseOperator<TransformBlock> { if (projectionBlock == null) { return null; } else { + Tracing.activeRecording().setNumChildren(_dataSourceMap.size()); return new TransformBlock(projectionBlock, _transformFunctionMap); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java index a9ea7bad9e..4e5c0e998a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java @@ -119,6 +119,19 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { @Override public DataTable processQuery(ServerQueryRequest queryRequest, ExecutorService executorService, @Nullable StreamObserver<Server.ServerResponse> responseObserver) { + if (!queryRequest.isEnableTrace()) { + return processQueryInternal(queryRequest, executorService, responseObserver); + } + try { + Tracing.getTracer().register(queryRequest.getRequestId()); + return processQueryInternal(queryRequest, executorService, responseObserver); + } finally { + Tracing.getTracer().unregister(); + } + } + + private DataTable processQueryInternal(ServerQueryRequest queryRequest, ExecutorService executorService, + @Nullable StreamObserver<Server.ServerResponse> responseObserver) { TimerContext timerContext = queryRequest.getTimerContext(); TimerContext.Timer schedulerWaitTimer = timerContext.getPhaseTimer(ServerQueryPhase.SCHEDULER_WAIT); if (schedulerWaitTimer != null) { @@ -194,11 +207,6 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { } } - boolean enableTrace = queryRequest.isEnableTrace(); - if (enableTrace) { - Tracing.getTracer().register(requestId); - } - DataTable dataTable = null; try { dataTable = processQuery(indexSegments, queryContext, timerContext, executorService, responseObserver, @@ -219,11 +227,10 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor { for (SegmentDataManager segmentDataManager : segmentDataManagers) { tableDataManager.releaseSegment(segmentDataManager); } - if (enableTrace) { + if (queryRequest.isEnableTrace()) { if (TraceContext.traceEnabled() && dataTable != null) { dataTable.getMetadata().put(MetadataKey.TRACE_INFO.getName(), TraceContext.getTraceInfo()); } - Tracing.getTracer().unregister(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java index e6b9bf0e02..1fb88a1b27 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java @@ -24,6 +24,8 @@ import org.apache.pinot.core.query.config.SegmentPrunerConfig; import org.apache.pinot.core.query.request.context.QueryContext; import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.trace.InvocationScope; +import org.apache.pinot.spi.trace.Tracing; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -59,8 +61,14 @@ public class SegmentPrunerService { * Prunes the segments based on the query request, returns the segments that are not pruned. */ public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query) { - for (SegmentPruner segmentPruner : _segmentPruners) { - segments = segmentPruner.prune(segments, query); + try (InvocationScope scope = Tracing.getTracer().createScope(SegmentPrunerService.class)) { + scope.setNumChildren(_segmentPruners.size()); + for (SegmentPruner segmentPruner : _segmentPruners) { + try (InvocationScope prunerScope = Tracing.getTracer().createScope(segmentPruner.getClass())) { + prunerScope.setNumSegments(segments.size()); + segments = segmentPruner.prune(segments, query); + } + } } return segments; } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/BuiltInTracer.java b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/BuiltInTracer.java index 60a6f793f3..7bd93b1763 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/util/trace/BuiltInTracer.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/util/trace/BuiltInTracer.java @@ -18,13 +18,10 @@ */ package org.apache.pinot.core.util.trace; -import java.util.ArrayDeque; -import java.util.Deque; import org.apache.pinot.spi.trace.BaseRecording; import org.apache.pinot.spi.trace.InvocationRecording; import org.apache.pinot.spi.trace.InvocationScope; import org.apache.pinot.spi.trace.NoOpRecording; -import org.apache.pinot.spi.trace.TraceState; import org.apache.pinot.spi.trace.Tracer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,18 +30,15 @@ import org.slf4j.LoggerFactory; public class BuiltInTracer implements Tracer { private static final Logger LOGGER = LoggerFactory.getLogger(BuiltInTracer.class); - private static final ThreadLocal<Deque<InvocationRecording>> STACK = ThreadLocal.withInitial(ArrayDeque::new); private static final class MilliTimeSpan extends BaseRecording implements InvocationScope { private final long _startTimeMillis = System.currentTimeMillis(); private final Class<?> _operator; - private final Runnable _onClose; - public MilliTimeSpan(Class<?> operator, Runnable onClose) { + public MilliTimeSpan(Class<?> operator) { super(true); _operator = operator; - _onClose = onClose; } @Override @@ -54,8 +48,7 @@ public class BuiltInTracer implements Tracer { if (LOGGER.isTraceEnabled()) { LOGGER.trace("Time spent in {}: {}", operatorName, duration); } - org.apache.pinot.core.util.trace.TraceContext.logTime(operatorName, duration); - _onClose.run(); + TraceContext.logTime(operatorName, duration); } } @@ -71,26 +64,11 @@ public class BuiltInTracer implements Tracer { @Override public InvocationScope createScope(Class<?> operatorClass) { - if (TraceContext.traceEnabled()) { - Deque<InvocationRecording> stack = getStack(); - MilliTimeSpan execution = new MilliTimeSpan(operatorClass, stack::removeLast); - stack.addLast(execution); - return execution; - } - return NoOpRecording.INSTANCE; + return TraceContext.traceEnabled() ? new MilliTimeSpan(operatorClass) : NoOpRecording.INSTANCE; } @Override public InvocationRecording activeRecording() { - Deque<InvocationRecording> stack = getStack(); - return stack.isEmpty() ? NoOpRecording.INSTANCE : stack.peekLast(); - } - - private Deque<InvocationRecording> getStack() { - Thread thread = Thread.currentThread(); - if (thread instanceof TraceState) { - return ((TraceState) thread).getRecordings(); - } - return STACK.get(); + return NoOpRecording.INSTANCE; } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/InvocationRecording.java b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/InvocationRecording.java index a922c3b170..ad86e8b6db 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/trace/InvocationRecording.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/trace/InvocationRecording.java @@ -56,6 +56,18 @@ public interface InvocationRecording { default void setNumTasks(int numTasks) { } + /** + * @param numChildren the number of children operators/transforms/projections + */ + default void setNumChildren(int numChildren) { + } + + /** + * @param numSegments the number of segments + */ + default void setNumSegments(int numSegments) { + } + /** * If the operator is a filter, determines the filter type (scan or index) and the predicate type * @param filterType SCAN or INDEX @@ -64,23 +76,21 @@ public interface InvocationRecording { default void setFilter(FilterType filterType, String predicateType) { } + /** - * Records whether type transformation took place during the operator's invocation and what the types were - * @param inputDataType the input data type - * @param inputSV if the input data type is single-value - * @param outputDataType the output data type - * @param outputSV if the output data type is single-value + * Records the input datatype before a stage of query execution + * @param dataType the output data type + * @param singleValue if the output data type is single-value */ - default void setDataTypes(FieldSpec.DataType inputDataType, boolean inputSV, FieldSpec.DataType outputDataType, - boolean outputSV) { + default void setInputDataType(FieldSpec.DataType dataType, boolean singleValue) { } /** - * Records whether type transformation took place during the operator's invocation and what the types were - * @param inputDataType the input data type - * @param outputDataType the output data type + * Records the output datatype after a stage of query execution + * @param dataType the output data type + * @param singleValue if the output data type is single-value */ - default void setDataTypes(String inputDataType, String outputDataType) { + default void setOutputDataType(FieldSpec.DataType dataType, boolean singleValue) { } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org