This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push: new fe2a63c use RoaringBitmapWriter and direct to ByteBuffer serialization in BitmapInvertedIndexCreators (#6320) fe2a63c is described below commit fe2a63ca6d21015cbb09500f00d98a702338d488 Author: Richard Startin <richard.star...@datadoghq.com> AuthorDate: Tue Dec 8 05:03:45 2020 +0000 use RoaringBitmapWriter and direct to ByteBuffer serialization in BitmapInvertedIndexCreators (#6320) This PR uses some more efficient APIs from RoaringBitmap. - `RoaringBitmapWriter` is optimized for writing row ids into the bitmap sequentially. This class has a few options to control memory usage, but I've used its defaults. - A `RoaringBitmap` can be serialized to a `ByteBuffer`, which is generally a lot faster than a `DataOutput`. However, this requires mapping a file outside of the control of `PinotByteBuffer`. The buffer also has to be big endian for backward compatibility with `DataOutput`. --- .../inv/OffHeapBitmapInvertedIndexCreator.java | 59 +++++++----- .../impl/inv/OnHeapBitmapInvertedIndexCreator.java | 63 ++++++++----- ...BenchmarkOffheapBitmapInvertedIndexCreator.java | 105 +++++++++++++++++++++ 3 files changed, 181 insertions(+), 46 deletions(-) diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java index b43b330..1e72bc1 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OffHeapBitmapInvertedIndexCreator.java @@ -18,19 +18,22 @@ */ package org.apache.pinot.core.segment.creator.impl.inv; -import com.google.common.base.Preconditions; -import java.io.BufferedOutputStream; import java.io.Closeable; -import java.io.DataOutputStream; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; + import org.apache.commons.io.FileUtils; import org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator; import org.apache.pinot.core.segment.creator.impl.V1Constants; import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import org.apache.pinot.core.util.CleanerUtil; import org.apache.pinot.spi.data.FieldSpec; -import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.RoaringBitmapWriter; /** @@ -181,33 +184,45 @@ public final class OffHeapBitmapInvertedIndexCreator implements DictionaryBasedI } // Create bitmaps from inverted index buffers and serialize them to file - try (DataOutputStream offsetDataStream = new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(_invertedIndexFile))); - FileOutputStream bitmapFileStream = new FileOutputStream(_invertedIndexFile); - DataOutputStream bitmapDataStream = new DataOutputStream(new BufferedOutputStream(bitmapFileStream))) { - int bitmapOffset = (_cardinality + 1) * Integer.BYTES; - offsetDataStream.writeInt(bitmapOffset); - bitmapFileStream.getChannel().position(bitmapOffset); - + ByteBuffer offsetBuffer = null; + ByteBuffer bitmapBuffer = null; + try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, "rw").getChannel()) { + // map the offsets buffer + int startOfBitmaps = (_cardinality + 1) * Integer.BYTES; + offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, startOfBitmaps) + .order(ByteOrder.BIG_ENDIAN); + offsetBuffer.putInt(startOfBitmaps); + bitmapBuffer = channel.map(FileChannel.MapMode.READ_WRITE, startOfBitmaps, Integer.MAX_VALUE - startOfBitmaps) + .order(ByteOrder.LITTLE_ENDIAN); + RoaringBitmapWriter<RoaringBitmap> writer = RoaringBitmapWriter.writer().runCompress(false).get(); int startIndex = 0; for (int dictId = 0; dictId < _cardinality; dictId++) { - MutableRoaringBitmap bitmap = new MutableRoaringBitmap(); int endIndex = getInt(_invertedIndexLengthBuffer, dictId); for (int i = startIndex; i < endIndex; i++) { - bitmap.add(getInt(_invertedIndexValueBuffer, i)); + writer.add(getInt(_invertedIndexValueBuffer, i)); } + RoaringBitmap bitmap = writer.get(); + bitmap.serialize(bitmapBuffer); + // write offset into file + offsetBuffer.putInt(startOfBitmaps + bitmapBuffer.position()); startIndex = endIndex; - - // Write offset and bitmap into file - bitmapOffset += bitmap.serializedSizeInBytes(); - // Check for int overflow - Preconditions.checkState(bitmapOffset > 0, "Inverted index file: %s exceeds 2GB limit", _invertedIndexFile); - offsetDataStream.writeInt(bitmapOffset); - bitmap.serialize(bitmapDataStream); + writer.reset(); } + // we know how long the file should be now, so truncate it + channel.truncate(startOfBitmaps + bitmapBuffer.position()); } catch (Exception e) { FileUtils.deleteQuietly(_invertedIndexFile); throw e; + } finally { + if (CleanerUtil.UNMAP_SUPPORTED) { + CleanerUtil.BufferCleaner cleaner = CleanerUtil.getCleaner(); + if (offsetBuffer != null) { + cleaner.freeBuffer(offsetBuffer); + } + if (bitmapBuffer != null) { + cleaner.freeBuffer(bitmapBuffer); + } + } } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java index 84c9929..44f4588 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/inv/OnHeapBitmapInvertedIndexCreator.java @@ -18,16 +18,20 @@ */ package org.apache.pinot.core.segment.creator.impl.inv; -import com.google.common.base.Preconditions; -import java.io.BufferedOutputStream; -import java.io.DataOutputStream; + import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.nio.channels.FileChannel; + import org.apache.commons.io.FileUtils; import org.apache.pinot.core.segment.creator.DictionaryBasedInvertedIndexCreator; import org.apache.pinot.core.segment.creator.impl.V1Constants; -import org.roaringbitmap.buffer.MutableRoaringBitmap; +import org.apache.pinot.core.util.CleanerUtil; +import org.roaringbitmap.RoaringBitmap; +import org.roaringbitmap.RoaringBitmapWriter; /** @@ -35,26 +39,27 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap; */ public final class OnHeapBitmapInvertedIndexCreator implements DictionaryBasedInvertedIndexCreator { private final File _invertedIndexFile; - private final MutableRoaringBitmap[] _bitmaps; + private final RoaringBitmapWriter<RoaringBitmap>[] _bitmapWriters; private int _nextDocId; + @SuppressWarnings("unchecked") public OnHeapBitmapInvertedIndexCreator(File indexDir, String columnName, int cardinality) { _invertedIndexFile = new File(indexDir, columnName + V1Constants.Indexes.BITMAP_INVERTED_INDEX_FILE_EXTENSION); - _bitmaps = new MutableRoaringBitmap[cardinality]; + _bitmapWriters = new RoaringBitmapWriter[cardinality]; for (int i = 0; i < cardinality; i++) { - _bitmaps[i] = new MutableRoaringBitmap(); + _bitmapWriters[i] = RoaringBitmapWriter.writer().runCompress(false).get(); } } @Override public void add(int dictId) { - _bitmaps[dictId].add(_nextDocId++); + _bitmapWriters[dictId].add(_nextDocId++); } @Override public void add(int[] dictIds, int length) { for (int i = 0; i < length; i++) { - _bitmaps[dictIds[i]].add(_nextDocId); + _bitmapWriters[dictIds[i]].add(_nextDocId); } _nextDocId++; } @@ -62,25 +67,35 @@ public final class OnHeapBitmapInvertedIndexCreator implements DictionaryBasedIn @Override public void seal() throws IOException { - try (DataOutputStream out = new DataOutputStream( - new BufferedOutputStream(new FileOutputStream(_invertedIndexFile)))) { + int startOfBitmaps = (_bitmapWriters.length + 1) * Integer.BYTES; + ByteBuffer bitmapBuffer = null; + ByteBuffer offsetBuffer = null; + try (FileChannel channel = new RandomAccessFile(_invertedIndexFile, "rw").getChannel()) { + // map the offsets buffer + offsetBuffer = channel.map(FileChannel.MapMode.READ_WRITE, 0, startOfBitmaps) + .order(ByteOrder.BIG_ENDIAN); + bitmapBuffer = channel.map(FileChannel.MapMode.READ_WRITE, startOfBitmaps, Integer.MAX_VALUE - startOfBitmaps) + .order(ByteOrder.LITTLE_ENDIAN); // Write bitmap offsets - int bitmapOffset = (_bitmaps.length + 1) * Integer.BYTES; - out.writeInt(bitmapOffset); - for (MutableRoaringBitmap bitmap : _bitmaps) { - bitmapOffset += bitmap.serializedSizeInBytes(); - // Check for int overflow - Preconditions.checkState(bitmapOffset > 0, "Inverted index file: %s exceeds 2GB limit", _invertedIndexFile); - out.writeInt(bitmapOffset); - } - - // Write bitmap data - for (MutableRoaringBitmap bitmap : _bitmaps) { - bitmap.serialize(out); + offsetBuffer.putInt(startOfBitmaps); + for (RoaringBitmapWriter<RoaringBitmap> writer : _bitmapWriters) { + writer.get().serialize(bitmapBuffer); + offsetBuffer.putInt(startOfBitmaps + bitmapBuffer.position()); } + channel.truncate(startOfBitmaps + bitmapBuffer.position()); } catch (Exception e) { FileUtils.deleteQuietly(_invertedIndexFile); throw e; + } finally { + if (CleanerUtil.UNMAP_SUPPORTED) { + CleanerUtil.BufferCleaner cleaner = CleanerUtil.getCleaner(); + if (bitmapBuffer != null) { + cleaner.freeBuffer(bitmapBuffer); + } + if (offsetBuffer != null) { + cleaner.freeBuffer(offsetBuffer); + } + } } } diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOffheapBitmapInvertedIndexCreator.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOffheapBitmapInvertedIndexCreator.java new file mode 100644 index 0000000..0979c98 --- /dev/null +++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkOffheapBitmapInvertedIndexCreator.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pinot.perf; + +import org.apache.commons.io.FileUtils; +import org.apache.pinot.core.segment.creator.impl.inv.OffHeapBitmapInvertedIndexCreator; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.ChainedOptionsBuilder; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +@State(Scope.Benchmark) +public class BenchmarkOffheapBitmapInvertedIndexCreator { + + public enum Assignment { + ROUND_ROBIN { + @Override + void assign(OffHeapBitmapInvertedIndexCreator creator, int docs, int cardinality) { + for (int i = 0; i < docs; ++i) { + creator.add(i % cardinality); + } + } + }, + SORTED_UNIFORM { + @Override + void assign(OffHeapBitmapInvertedIndexCreator creator, int docs, int cardinality) { + for (int i = 0; i < cardinality; ++i) { + for (int j = 0; j < docs / cardinality; ++j) { + creator.add(i); + } + } + } + }; + + abstract void assign(OffHeapBitmapInvertedIndexCreator creator, int docs, int cardinality); + } + + private Path indexDir; + @Param({"10", "1000", "10000"}) + int cardinality; + + @Param({"1000000", "10000000", "100000000"}) + int numDocs; + + @Param + Assignment assignment; + + private OffHeapBitmapInvertedIndexCreator creator; + + @Setup(Level.Invocation) + public void setup() throws IOException { + indexDir = Files.createTempDirectory("index"); + creator = new OffHeapBitmapInvertedIndexCreator( + indexDir.toFile(), new DimensionFieldSpec("foo", FieldSpec.DataType.STRING, true), + cardinality, numDocs, -1); + assignment.assign(creator, numDocs, cardinality); + } + + @TearDown(Level.Invocation) + public void tearDown() throws IOException { + if (null != indexDir) { + FileUtils.deleteDirectory(indexDir.toFile()); + } + creator.close(); + } + + @Benchmark + public Object seal() throws IOException { + creator.seal(); + return creator; + } + + public static void main(String[] args) + throws Exception { + ChainedOptionsBuilder opt = + new OptionsBuilder().include(BenchmarkOffheapBitmapInvertedIndexCreator.class.getSimpleName()) + .mode(Mode.SingleShotTime) + .warmupIterations(8).measurementIterations(8).forks(5); + + new Runner(opt.build()).run(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org