This is an automated email from the ASF dual-hosted git repository. nehapawar pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new d638444 Move acquire/release to right before/after execution call (#7493) d638444 is described below commit d638444e7254bdc881a299f37ac0909a5664f0ca Author: Neha Pawar <neha.pawa...@gmail.com> AuthorDate: Wed Sep 29 12:56:48 2021 -0700 Move acquire/release to right before/after execution call (#7493) --- .../AcquireReleaseColumnsSegmentOperator.java | 43 ++-- .../core/operator/combine/BaseCombineOperator.java | 14 +- .../combine/GroupByOrderByCombineOperator.java | 96 +++++---- .../plan/AcquireReleaseColumnsSegmentPlanNode.java | 13 +- .../core/plan/maker/InstancePlanMakerImplV2.java | 2 +- .../pinot/perf/BenchmarkRoaringBitmapCreation.java | 217 +++++++++++++++++++++ .../index/readers/BitmapInvertedIndexReader.java | 29 --- .../index/readers/NullValueVectorReaderImpl.java | 8 +- .../index/readers/RangeIndexReaderImpl.java | 36 ---- .../org/apache/pinot/segment/spi/FetchContext.java | 11 +- 10 files changed, 339 insertions(+), 130 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java index 127f38f..422374f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/AcquireReleaseColumnsSegmentOperator.java @@ -20,41 +20,56 @@ package org.apache.pinot.core.operator; import org.apache.pinot.core.common.Block; import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.plan.PlanNode; import org.apache.pinot.segment.spi.FetchContext; import org.apache.pinot.segment.spi.IndexSegment; /** * A common wrapper around the segment-level operator. - * Provides an opportunity to acquire and release column buffers before reading data + * NOTE: This is only used if <code>pinot.server.query.executor.enable.prefetch</code> is true + * This creates a mechanism to acquire and release column buffers before reading data. + * This Operator is different from others in the following way: + * It expects the PlanNode of the execution, instead of the Operator. + * It runs the plan to get the operator, before it begins execution. + * The reason this is done is the planners access segment buffers, + * and we need to acquire the segment before any access is made to the buffers. */ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator { private static final String OPERATOR_NAME = "AcquireReleaseColumnsSegmentOperator"; - private final Operator _childOperator; + private final PlanNode _planNode; private final IndexSegment _indexSegment; private final FetchContext _fetchContext; + private Operator _childOperator; - public AcquireReleaseColumnsSegmentOperator(Operator childOperator, IndexSegment indexSegment, - FetchContext fetchContext) { - _childOperator = childOperator; + public AcquireReleaseColumnsSegmentOperator(PlanNode planNode, IndexSegment indexSegment, FetchContext fetchContext) { + _planNode = planNode; _indexSegment = indexSegment; _fetchContext = fetchContext; } /** - * Makes a call to acquire column buffers from {@link IndexSegment} before getting nextBlock from childOperator, - * and - * a call to release the column buffers from {@link IndexSegment} after. + * Runs the planNode to get the childOperator, and then proceeds with execution. */ @Override protected Block getNextBlock() { + _childOperator = _planNode.run(); + return _childOperator.nextBlock(); + } + + /** + * Acquires the indexSegment using the provided fetchContext + */ + public void acquire() { _indexSegment.acquire(_fetchContext); - try { - return _childOperator.nextBlock(); - } finally { - _indexSegment.release(_fetchContext); - } + } + + /** + * Releases the indexSegment using the provided fetchContext + */ + public void release() { + _indexSegment.release(_fetchContext); } @Override @@ -64,6 +79,6 @@ public class AcquireReleaseColumnsSegmentOperator extends BaseOperator { @Override public ExecutionStatistics getExecutionStatistics() { - return _childOperator.getExecutionStatistics(); + return _childOperator == null ? new ExecutionStatistics(0, 0, 0, 0) : _childOperator.getExecutionStatistics(); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java index 4d93c02..eb77329 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/BaseCombineOperator.java @@ -29,6 +29,7 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.core.common.Operator; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.query.request.context.QueryContext; @@ -151,7 +152,18 @@ public abstract class BaseCombineOperator extends BaseOperator<IntermediateResul */ protected void processSegments(int taskIndex) { for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) { - IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock(); + Operator operator = _operators.get(operatorIndex); + IntermediateResultsBlock resultsBlock; + try { + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); + } + resultsBlock = (IntermediateResultsBlock) operator.nextBlock(); + } finally { + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).release(); + } + } if (isQuerySatisfied(resultsBlock)) { // Query is satisfied, skip processing the remaining segments _blockingQueue.offer(resultsBlock); diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java index 2ece042..e4203bc 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java @@ -39,6 +39,7 @@ import org.apache.pinot.core.data.table.IntermediateRecord; import org.apache.pinot.core.data.table.Key; import org.apache.pinot.core.data.table.Record; import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable; +import org.apache.pinot.core.operator.AcquireReleaseColumnsSegmentOperator; import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock; import org.apache.pinot.core.query.aggregation.function.AggregationFunction; import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult; @@ -121,58 +122,67 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator { @Override protected void processSegments(int taskIndex) { for (int operatorIndex = taskIndex; operatorIndex < _numOperators; operatorIndex += _numTasks) { - IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) _operators.get(operatorIndex).nextBlock(); - - if (_indexedTable == null) { - synchronized (this) { - if (_indexedTable == null) { - DataSchema dataSchema = resultsBlock.getDataSchema(); - // NOTE: Use trimSize as resultSize on server size. - if (_trimThreshold >= MAX_TRIM_THRESHOLD) { - // special case of trim threshold where it is set to max value. - // there won't be any trimming during upsert in this case. - // thus we can avoid the overhead of read-lock and write-lock - // in the upsert method. - _indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize); - } else { - _indexedTable = - new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold); + Operator operator = _operators.get(operatorIndex); + try { + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).acquire(); + } + IntermediateResultsBlock resultsBlock = (IntermediateResultsBlock) operator.nextBlock(); + if (_indexedTable == null) { + synchronized (this) { + if (_indexedTable == null) { + DataSchema dataSchema = resultsBlock.getDataSchema(); + // NOTE: Use trimSize as resultSize on server size. + if (_trimThreshold >= MAX_TRIM_THRESHOLD) { + // special case of trim threshold where it is set to max value. + // there won't be any trimming during upsert in this case. + // thus we can avoid the overhead of read-lock and write-lock + // in the upsert method. + _indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, _trimSize); + } else { + _indexedTable = + new ConcurrentIndexedTable(dataSchema, _queryContext, _trimSize, _trimSize, _trimThreshold); + } } } } - } - // Merge processing exceptions. - List<ProcessingException> processingExceptionsToMerge = resultsBlock.getProcessingExceptions(); - if (processingExceptionsToMerge != null) { - _mergedProcessingExceptions.addAll(processingExceptionsToMerge); - } + // Merge processing exceptions. + List<ProcessingException> processingExceptionsToMerge = resultsBlock.getProcessingExceptions(); + if (processingExceptionsToMerge != null) { + _mergedProcessingExceptions.addAll(processingExceptionsToMerge); + } - // Merge aggregation group-by result. - // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable - Collection<IntermediateRecord> intermediateRecords = resultsBlock.getIntermediateRecords(); - // For now, only GroupBy OrderBy query has pre-constructed intermediate records - if (intermediateRecords == null) { // Merge aggregation group-by result. - AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult(); - if (aggregationGroupByResult != null) { - // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable - Iterator<GroupKeyGenerator.GroupKey> dicGroupKeyIterator = aggregationGroupByResult.getGroupKeyIterator(); - while (dicGroupKeyIterator.hasNext()) { - GroupKeyGenerator.GroupKey groupKey = dicGroupKeyIterator.next(); - Object[] keys = groupKey._keys; - Object[] values = Arrays.copyOf(keys, _numColumns); - int groupId = groupKey._groupId; - for (int i = 0; i < _numAggregationFunctions; i++) { - values[_numGroupByExpressions + i] = aggregationGroupByResult.getResultForGroupId(i, groupId); + // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable + Collection<IntermediateRecord> intermediateRecords = resultsBlock.getIntermediateRecords(); + // For now, only GroupBy OrderBy query has pre-constructed intermediate records + if (intermediateRecords == null) { + // Merge aggregation group-by result. + AggregationGroupByResult aggregationGroupByResult = resultsBlock.getAggregationGroupByResult(); + if (aggregationGroupByResult != null) { + // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable + Iterator<GroupKeyGenerator.GroupKey> dicGroupKeyIterator = aggregationGroupByResult.getGroupKeyIterator(); + while (dicGroupKeyIterator.hasNext()) { + GroupKeyGenerator.GroupKey groupKey = dicGroupKeyIterator.next(); + Object[] keys = groupKey._keys; + Object[] values = Arrays.copyOf(keys, _numColumns); + int groupId = groupKey._groupId; + for (int i = 0; i < _numAggregationFunctions; i++) { + values[_numGroupByExpressions + i] = aggregationGroupByResult.getResultForGroupId(i, groupId); + } + _indexedTable.upsert(new Key(keys), new Record(values)); } - _indexedTable.upsert(new Key(keys), new Record(values)); + } + } else { + for (IntermediateRecord intermediateResult : intermediateRecords) { + //TODO: change upsert api so that it accepts intermediateRecord directly + _indexedTable.upsert(intermediateResult._key, intermediateResult._record); } } - } else { - for (IntermediateRecord intermediateResult : intermediateRecords) { - //TODO: change upsert api so that it accepts intermediateRecord directly - _indexedTable.upsert(intermediateResult._key, intermediateResult._record); + } finally { + if (operator instanceof AcquireReleaseColumnsSegmentOperator) { + ((AcquireReleaseColumnsSegmentOperator) operator).release(); } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java index 5a9f506..73406c2 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AcquireReleaseColumnsSegmentPlanNode.java @@ -25,6 +25,13 @@ import org.apache.pinot.segment.spi.IndexSegment; /** * A common wrapper for the segment-level plan node. + * NOTE: This is only used if <code>pinot.server.query.executor.enable.prefetch</code> is true + * This PlanNode differs from the other PlanNodes in the following way: + * This PlanNode does not invoke a <code>run</code> on the childOperator in its run method. + * Instead, it passes the childPlanNode as is, to the {@link AcquireReleaseColumnsSegmentOperator}, + * and it is that operator's responsibility to run the childPlanNode and get the childOperator before execution. + * The reason this is done is the planners access segment buffers, + * and we need to acquire the segment before any access is made to the buffers. */ public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode { @@ -39,8 +46,12 @@ public class AcquireReleaseColumnsSegmentPlanNode implements PlanNode { _fetchContext = fetchContext; } + /** + * Doesn't run the childPlan, + * but instead just creates a {@link AcquireReleaseColumnsSegmentOperator} and passes the plan to it + */ @Override public AcquireReleaseColumnsSegmentOperator run() { - return new AcquireReleaseColumnsSegmentOperator(_childPlanNode.run(), _indexSegment, _fetchContext); + return new AcquireReleaseColumnsSegmentOperator(_childPlanNode, _indexSegment, _fetchContext); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java index ea54327..f5567a4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java @@ -175,7 +175,7 @@ public class InstancePlanMakerImplV2 implements PlanMaker { } else { columns = queryContext.getColumns(); } - FetchContext fetchContext = new FetchContext(UUID.randomUUID(), columns); + FetchContext fetchContext = new FetchContext(UUID.randomUUID(), indexSegment.getSegmentName(), columns); fetchContexts.add(fetchContext); planNodes.add( new AcquireReleaseColumnsSegmentPlanNode(makeSegmentPlanNode(indexSegment, queryContext), indexSegment, diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java new file mode 100644 index 0000000..2d85652 --- /dev/null +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkRoaringBitmapCreation.java @@ -0,0 +1,217 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.perf; + +import java.io.File; +import java.io.IOException; +import java.lang.ref.SoftReference; +import java.nio.ByteOrder; +import java.util.concurrent.TimeUnit; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.math.RandomUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pinot.segment.local.segment.creator.impl.inv.BitmapInvertedIndexWriter; +import org.apache.pinot.segment.spi.memory.PinotByteBuffer; +import org.apache.pinot.segment.spi.memory.PinotDataBuffer; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.profile.GCProfiler; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; +import org.openjdk.jmh.runner.options.OptionsBuilder; +import org.openjdk.jmh.runner.options.TimeValue; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; + + +/** + * Benchmark created to test the impact of removing the SoftReference array cache for ImmutableRoaringBitmap + */ +@State(Scope.Benchmark) +@Fork(1) +public class BenchmarkRoaringBitmapCreation { + + private static final int NUM_DOCS = 1_000_000; + private static final int CARDINALITY = 100_000; + private static final File TEMP_DIR = + new File(FileUtils.getTempDirectory(), "bitmap_creation_benchmark_" + System.currentTimeMillis()); + + @Param({"100", "10000", "99999"}) // higher this is, lesser the cache access + public int _dictIdsToRead; + + private int _numBitmaps; + private BitmapInvertedIndexWriter _bitmapInvertedIndexWriter; + private SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmapsArrayReference = null; + private SoftReference<SoftReference<Pair<Integer, Integer>>[]> _offsetLengthPairsArrayReference = null; + private PinotDataBuffer _offsetLengthBuffer; + private PinotDataBuffer _bitmapBuffer; + private int _firstOffset; + + @Setup + public void setup() + throws IllegalAccessException, InstantiationException, IOException { + _numBitmaps = CARDINALITY; + + File bufferDir = new File(TEMP_DIR, "cardinality_" + CARDINALITY); + FileUtils.forceMkdir(bufferDir); + File bufferFile = new File(bufferDir, "buffer"); + _bitmapInvertedIndexWriter = new BitmapInvertedIndexWriter(bufferFile, _numBitmaps); + // Insert between 10-1000 values per bitmap + for (int i = 0; i < _numBitmaps; i++) { + int size = 10 + RandomUtils.nextInt(990); + int[] data = new int[size]; + for (int j = 0; j < size; j++) { + data[j] = RandomUtils + .nextInt(NUM_DOCS); // docIds will repeat across bitmaps, but doesn't matter for purpose of this benchmark + } + RoaringBitmap bitmap = RoaringBitmap.bitmapOf(data); + _bitmapInvertedIndexWriter.add(bitmap); + } + + PinotDataBuffer dataBuffer = PinotByteBuffer.mapReadOnlyBigEndianFile(bufferFile); + long offsetBufferEndOffset = (long) (_numBitmaps + 1) * Integer.BYTES; + _offsetLengthBuffer = dataBuffer.view(0, offsetBufferEndOffset, ByteOrder.BIG_ENDIAN); + _bitmapBuffer = dataBuffer.view(offsetBufferEndOffset, dataBuffer.size()); + _firstOffset = _offsetLengthBuffer.getInt(0); + } + + @TearDown + public void teardown() + throws IOException { + _bitmapInvertedIndexWriter.close(); + FileUtils.deleteQuietly(TEMP_DIR); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public boolean cacheReferences() { + int dictId = RandomUtils.nextInt(_dictIdsToRead); + ImmutableRoaringBitmap roaringBitmapFromCache = getRoaringBitmapFromCache(dictId); + return roaringBitmapFromCache.isEmpty(); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public boolean alwaysBuild() { + int dictId = RandomUtils.nextInt(_dictIdsToRead); + ImmutableRoaringBitmap immutableRoaringBitmap = buildRoaringBitmap(dictId); + return immutableRoaringBitmap.isEmpty(); + } + + @Benchmark + @BenchmarkMode(Mode.AverageTime) + @OutputTimeUnit(TimeUnit.MICROSECONDS) + public boolean alwaysBuildCachedOffsetAndLength() { + int dictId = RandomUtils.nextInt(_dictIdsToRead); + ImmutableRoaringBitmap immutableRoaringBitmap = buildRoaringBitmapUsingOffsetPairFromCache(dictId); + return immutableRoaringBitmap.isEmpty(); + } + + /** + * Code as of before this commit, using an array of SoftReference for the ImmutableRoaringBitmap + */ + private ImmutableRoaringBitmap getRoaringBitmapFromCache(int dictId) { + SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = + (_bitmapsArrayReference != null) ? _bitmapsArrayReference.get() : null; + if (bitmapArrayReference != null) { + SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId]; + ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null; + if (bitmap != null) { + return bitmap; + } + } else { + bitmapArrayReference = new SoftReference[_numBitmaps]; + _bitmapsArrayReference = new SoftReference<>(bitmapArrayReference); + } + synchronized (this) { + SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId]; + ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null; + if (bitmap == null) { + bitmap = buildRoaringBitmap(dictId); + bitmapArrayReference[dictId] = new SoftReference<>(bitmap); + } + return bitmap; + } + } + + private ImmutableRoaringBitmap buildRoaringBitmap(int dictId) { + Pair<Integer, Integer> offsetLengthPair = buildOffsetLengthPair(dictId); + return buildRoaringBitmap(offsetLengthPair); + } + + private Pair<Integer, Integer> buildOffsetLengthPair(int dictId) { + int offset = _offsetLengthBuffer.getInt(dictId * Integer.BYTES); + int length = _offsetLengthBuffer.getInt((dictId + 1) * Integer.BYTES) - offset; + return Pair.of(offset, length); + } + + private ImmutableRoaringBitmap buildRoaringBitmap(Pair<Integer, Integer> offsetLengthPair) { + return new ImmutableRoaringBitmap( + _bitmapBuffer.toDirectByteBuffer(offsetLengthPair.getLeft() - _firstOffset, offsetLengthPair.getRight())); + } + + private ImmutableRoaringBitmap buildRoaringBitmapUsingOffsetPairFromCache(int dictId) { + return buildRoaringBitmap(getOffsetLengthPairFromCache(dictId)); + } + + private Pair<Integer, Integer> getOffsetLengthPairFromCache(int dictId) { + + SoftReference<Pair<Integer, Integer>>[] offsetLengthPairArrayReference = + (_offsetLengthPairsArrayReference != null) ? _offsetLengthPairsArrayReference.get() : null; + if (offsetLengthPairArrayReference != null) { + SoftReference<Pair<Integer, Integer>> offsetLengthPairReference = offsetLengthPairArrayReference[dictId]; + Pair<Integer, Integer> offsetLengthPair = + (offsetLengthPairReference != null) ? offsetLengthPairReference.get() : null; + if (offsetLengthPair != null) { + return offsetLengthPair; + } + } else { + offsetLengthPairArrayReference = new SoftReference[_numBitmaps]; + _offsetLengthPairsArrayReference = new SoftReference<>(offsetLengthPairArrayReference); + } + synchronized (this) { + SoftReference<Pair<Integer, Integer>> offsetLengthPairReference = offsetLengthPairArrayReference[dictId]; + Pair<Integer, Integer> offsetLengthPair = + (offsetLengthPairReference != null) ? offsetLengthPairReference.get() : null; + if (offsetLengthPair == null) { + offsetLengthPair = buildOffsetLengthPair(dictId); + offsetLengthPairArrayReference[dictId] = new SoftReference<>(offsetLengthPair); + } + return offsetLengthPair; + } + } + + public static void main(String[] args) + throws Exception { + ChainedOptionsBuilder opt = new OptionsBuilder().include(BenchmarkRoaringBitmapCreation.class.getSimpleName()) + .warmupTime(TimeValue.seconds(10)).warmupIterations(1).measurementTime(TimeValue.seconds(60)) + .measurementIterations(1).forks(1).addProfiler(GCProfiler.class); + new Runner(opt.build()).run(); + } +} diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BitmapInvertedIndexReader.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BitmapInvertedIndexReader.java index 491fe67..48905a7 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BitmapInvertedIndexReader.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/BitmapInvertedIndexReader.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.segment.local.segment.index.readers; -import java.lang.ref.SoftReference; import java.nio.ByteOrder; import org.apache.pinot.segment.local.segment.creator.impl.inv.BitmapInvertedIndexWriter; import org.apache.pinot.segment.spi.index.reader.InvertedIndexReader; @@ -35,7 +34,6 @@ import org.slf4j.LoggerFactory; public class BitmapInvertedIndexReader implements InvertedIndexReader<ImmutableRoaringBitmap> { public static final Logger LOGGER = LoggerFactory.getLogger(BitmapInvertedIndexReader.class); - private final int _numBitmaps; private final PinotDataBuffer _offsetBuffer; private final PinotDataBuffer _bitmapBuffer; @@ -44,11 +42,7 @@ public class BitmapInvertedIndexReader implements InvertedIndexReader<ImmutableR // 2. Offset buffer stores the offsets within the bitmap buffer private final int _firstOffset; - private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps; - public BitmapInvertedIndexReader(PinotDataBuffer dataBuffer, int numBitmaps) { - _numBitmaps = numBitmaps; - long offsetBufferEndOffset = (long) (numBitmaps + 1) * Integer.BYTES; _offsetBuffer = dataBuffer.view(0, offsetBufferEndOffset, ByteOrder.BIG_ENDIAN); _bitmapBuffer = dataBuffer.view(offsetBufferEndOffset, dataBuffer.size()); @@ -59,29 +53,6 @@ public class BitmapInvertedIndexReader implements InvertedIndexReader<ImmutableR @SuppressWarnings("unchecked") @Override public ImmutableRoaringBitmap getDocIds(int dictId) { - SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = (_bitmaps != null) ? _bitmaps.get() : null; - if (bitmapArrayReference != null) { - SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId]; - ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null; - if (bitmap != null) { - return bitmap; - } - } else { - bitmapArrayReference = new SoftReference[_numBitmaps]; - _bitmaps = new SoftReference<>(bitmapArrayReference); - } - synchronized (this) { - SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId]; - ImmutableRoaringBitmap bitmap = (bitmapReference != null) ? bitmapReference.get() : null; - if (bitmap == null) { - bitmap = buildRoaringBitmap(dictId); - bitmapArrayReference[dictId] = new SoftReference<>(bitmap); - } - return bitmap; - } - } - - private ImmutableRoaringBitmap buildRoaringBitmap(int dictId) { int offset = _offsetBuffer.getInt(dictId * Integer.BYTES); int length = _offsetBuffer.getInt((dictId + 1) * Integer.BYTES) - offset; return new ImmutableRoaringBitmap(_bitmapBuffer.toDirectByteBuffer(offset - _firstOffset, length)); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java index 7a60cd8..ccd79a2 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/NullValueVectorReaderImpl.java @@ -25,18 +25,18 @@ import org.roaringbitmap.buffer.ImmutableRoaringBitmap; public class NullValueVectorReaderImpl implements NullValueVectorReader { - private final ImmutableRoaringBitmap _nullBitmap; + private final PinotDataBuffer _dataBuffer; public NullValueVectorReaderImpl(PinotDataBuffer dataBuffer) { - _nullBitmap = new ImmutableRoaringBitmap(dataBuffer.toDirectByteBuffer(0, (int) dataBuffer.size())); + _dataBuffer = dataBuffer; } public boolean isNull(int docId) { - return _nullBitmap.contains(docId); + return getNullBitmap().contains(docId); } @Override public ImmutableRoaringBitmap getNullBitmap() { - return _nullBitmap; + return new ImmutableRoaringBitmap(_dataBuffer.toDirectByteBuffer(0, (int) _dataBuffer.size())); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java index 09bb619..510fa80 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/index/readers/RangeIndexReaderImpl.java @@ -19,7 +19,6 @@ package org.apache.pinot.segment.local.segment.index.readers; import com.google.common.base.Preconditions; -import java.lang.ref.SoftReference; import java.nio.ByteBuffer; import javax.annotation.Nullable; import org.apache.pinot.segment.local.segment.creator.impl.inv.RangeIndexCreator; @@ -44,8 +43,6 @@ public class RangeIndexReaderImpl implements RangeIndexReader<ImmutableRoaringBi private final Number[] _rangeStartArray; private final Number _lastRangeEnd; - private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps; - public RangeIndexReaderImpl(PinotDataBuffer dataBuffer) { _dataBuffer = dataBuffer; long offset = 0; @@ -179,39 +176,6 @@ public class RangeIndexReaderImpl implements RangeIndexReader<ImmutableRoaringBi } private ImmutableRoaringBitmap getDocIds(int rangeId) { - SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = null; - // Return the bitmap if it's still on heap - if (_bitmaps != null) { - bitmapArrayReference = _bitmaps.get(); - if (bitmapArrayReference != null) { - SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[rangeId]; - if (bitmapReference != null) { - ImmutableRoaringBitmap value = bitmapReference.get(); - if (value != null) { - return value; - } - } - } else { - bitmapArrayReference = new SoftReference[_numRanges]; - _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference); - } - } else { - bitmapArrayReference = new SoftReference[_numRanges]; - _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference); - } - synchronized (this) { - ImmutableRoaringBitmap value; - if (bitmapArrayReference[rangeId] == null || bitmapArrayReference[rangeId].get() == null) { - value = buildRoaringBitmapForIndex(rangeId); - bitmapArrayReference[rangeId] = new SoftReference<ImmutableRoaringBitmap>(value); - } else { - value = bitmapArrayReference[rangeId].get(); - } - return value; - } - } - - private synchronized ImmutableRoaringBitmap buildRoaringBitmapForIndex(final int rangeId) { final long currentOffset = getOffset(rangeId); final long nextOffset = getOffset(rangeId + 1); final int bufferLength = (int) (nextOffset - currentOffset); diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/FetchContext.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/FetchContext.java index 03e0c68..e00ed08 100644 --- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/FetchContext.java +++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/FetchContext.java @@ -27,10 +27,12 @@ import java.util.UUID; */ public class FetchContext { private final UUID _fetchId; + private final String _segmentName; private final Set<String> _columns; - public FetchContext(UUID fetchId, Set<String> columns) { + public FetchContext(UUID fetchId, String segmentName, Set<String> columns) { _fetchId = fetchId; + _segmentName = segmentName; _columns = columns; } @@ -43,6 +45,13 @@ public class FetchContext { } /** + * Segment name associated with this fetch context + */ + public String getSegmentName() { + return _segmentName; + } + + /** * Columns to be fetched as part of this request */ public Set<String> getColumns() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org