This is an automated email from the ASF dual-hosted git repository. kishoreg pushed a commit to branch h3-index in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 4bb66774f94528b6ae6607f16c8089ad4f8adfc6 Author: kishoreg <g.kish...@gmail.com> AuthorDate: Mon Nov 30 09:58:32 2020 -0800 Initial commit for H3 based geospatial indexing --- pinot-core/pom.xml | 5 + .../segment/creator/GeoSpatialIndexCreator.java | 9 + .../creator/impl/geospatial/H3IndexCreator.java | 312 +++++++++++++++++++++ .../index/readers/BaseImmutableDictionary.java | 2 +- .../index/readers/geospatial/H3IndexReader.java | 102 +++++++ 5 files changed, 429 insertions(+), 1 deletion(-) diff --git a/pinot-core/pom.xml b/pinot-core/pom.xml index d4938a7..9a93eae 100644 --- a/pinot-core/pom.xml +++ b/pinot-core/pom.xml @@ -54,6 +54,11 @@ </build> <dependencies> <dependency> + <groupId>com.uber</groupId> + <artifactId>h3</artifactId> + <version>3.0.3</version> + </dependency> + <dependency> <groupId>me.lemire.integercompression</groupId> <artifactId>JavaFastPFOR</artifactId> </dependency> diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/GeoSpatialIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/GeoSpatialIndexCreator.java new file mode 100644 index 0000000..8fcd3bd --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/GeoSpatialIndexCreator.java @@ -0,0 +1,9 @@ +package org.apache.pinot.core.segment.creator; + +import java.io.Closeable; + + +public interface GeoSpatialIndexCreator extends Closeable { + + void add(int docId, double lat, double lon); +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/geospatial/H3IndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/geospatial/H3IndexCreator.java new file mode 100644 index 0000000..e2ec6a2 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/geospatial/H3IndexCreator.java @@ -0,0 +1,312 @@ +package org.apache.pinot.core.segment.creator.impl.geospatial; + +import com.uber.h3core.H3Core; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Random; +import java.util.TreeMap; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.core.segment.creator.GeoSpatialIndexCreator; +import org.apache.pinot.core.segment.index.readers.geospatial.H3IndexReader; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import org.apache.pinot.spi.data.DimensionFieldSpec; +import org.apache.pinot.spi.data.FieldSpec; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +public class H3IndexCreator implements GeoSpatialIndexCreator { + + private static final int VERSION = 1; + private static final int FLUSH_THRESHOLD = 100_000; + private final H3Core _h3Core; + private File _indexDir; + private FieldSpec _fieldSpec; + private int _resolution; + + TreeMap<Long, MutableRoaringBitmap> _h3IndexMap; + + int numChunks = 0; + List<Integer> chunkLengths = new ArrayList<>(); + + public H3IndexCreator(File indexDir, FieldSpec fieldSpec, int resolution) + throws IOException { + + _indexDir = indexDir; + _fieldSpec = fieldSpec; + _resolution = resolution; + _h3Core = H3Core.newInstance(); + //todo: initialize this with right size based on the + long numHexagons = _h3Core.numHexagons(resolution); + _h3IndexMap = new TreeMap<>(); + } + + @Override + public void add(int docId, double lat, double lon) { + Long h3Id = _h3Core.geoToH3(lat, lon, _resolution); + MutableRoaringBitmap roaringBitmap = _h3IndexMap.get(h3Id); + if (roaringBitmap == null) { + roaringBitmap = new MutableRoaringBitmap(); + _h3IndexMap.put(h3Id, roaringBitmap); + } + roaringBitmap.add(docId); + if (_h3IndexMap.size() > FLUSH_THRESHOLD) { + flush(); + } + } + + private void flush() { + //dump what ever we have in _h3IndexMap in a sorted order + try { + + File tempChunkFile = new File(_indexDir, "chunk-" + numChunks); + DataOutputStream dos = new DataOutputStream(new FileOutputStream(tempChunkFile)); + chunkLengths.add(_h3IndexMap.size()); + for (Map.Entry<Long, MutableRoaringBitmap> entry : _h3IndexMap.entrySet()) { + Long h3Id = entry.getKey(); + MutableRoaringBitmap bitmap = entry.getValue(); + dos.writeLong(h3Id); + //write bitmap + int serializedSizeInBytes = bitmap.serializedSizeInBytes(); + byte[] byteArray = new byte[serializedSizeInBytes]; + bitmap.serialize(ByteBuffer.wrap(byteArray)); + dos.writeInt(serializedSizeInBytes); + dos.write(byteArray); + } + dos.close(); + _h3IndexMap.clear(); + numChunks++; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + public void seal() + throws Exception { + flush(); + + //merge all the chunk files, since they are sorted we can write the dictionary as well + PriorityQueue<Entry> queue = new PriorityQueue<>(); + + ChunkReader[] chunkReaders = new ChunkReader[numChunks]; + for (int chunkId = 0; chunkId < numChunks; chunkId++) { + File chunkFile = new File(_indexDir, "chunk-" + chunkId); + chunkReaders[chunkId] = new ChunkReader(chunkId, chunkLengths.get(chunkId), chunkFile); + + Entry e = chunkReaders[chunkId].getNextEntry(); + queue.add(e); + } + long prevH3Id = -1; + MutableRoaringBitmap bitmap = new MutableRoaringBitmap(); + + File headerFile = new File(_indexDir, _fieldSpec.getName() + "-h3index-header.buffer"); + DataOutputStream headerStream = new DataOutputStream(new FileOutputStream(headerFile)); + + File dictionaryFile = new File(_indexDir, _fieldSpec.getName() + "-h3index-dictionary.buffer"); + DataOutputStream dictionaryStream = new DataOutputStream(new FileOutputStream(dictionaryFile)); + + File offsetFile = new File(_indexDir, _fieldSpec.getName() + "-h3index-offset.buffer"); + DataOutputStream offsetStream = new DataOutputStream(new FileOutputStream(offsetFile)); + + File bitmapFile = new File(_indexDir, _fieldSpec.getName() + "-h3index-bitmap.buffer"); + DataOutputStream bitmapStream = new DataOutputStream(new FileOutputStream(bitmapFile)); + + Writer writer = new Writer(dictionaryStream, offsetStream, bitmapStream); + while (queue.size() > 0) { + Entry poll = queue.poll(); + long currH3Id = poll.h3Id; + if (prevH3Id != -1 && currH3Id != prevH3Id) { + writer.add(prevH3Id, bitmap); + bitmap.clear(); + } + bitmap.or(poll.bitmap); + + prevH3Id = currH3Id; + + Entry e = chunkReaders[poll.chunkId].getNextEntry(); + if (e != null) { + queue.add(e); + } + } + if (prevH3Id != -1) { + writer.add(prevH3Id, bitmap); + } + + //write header file + headerStream.writeInt(VERSION); + headerStream.writeInt(writer.getNumUniqueIds()); + headerStream.close(); + dictionaryStream.close(); + offsetStream.close(); + bitmapStream.close(); + + File outputFile = new File(_indexDir, _fieldSpec.getName() + ".h3.index"); + long length = headerStream.size() + dictionaryStream.size() + offsetStream.size() + bitmapStream.size(); + //write the actual file + PinotDataBuffer h3IndexBuffer = + PinotDataBuffer.mapFile(outputFile, false, 0, length, ByteOrder.BIG_ENDIAN, "H3 Index Buffer"); + + long writtenBytes = 0; + h3IndexBuffer.readFrom(writtenBytes, headerFile, 0, headerFile.length()); + writtenBytes += headerFile.length(); + + h3IndexBuffer.readFrom(writtenBytes, dictionaryFile, 0, dictionaryFile.length()); + writtenBytes += dictionaryFile.length(); + + h3IndexBuffer.readFrom(writtenBytes, offsetFile, 0, offsetFile.length()); + writtenBytes += offsetFile.length(); + + h3IndexBuffer.readFrom(writtenBytes, bitmapFile, 0, bitmapFile.length()); + writtenBytes += headerFile.length(); + } + + @Override + public void close() + throws IOException { + //delete chunk files + } + + class ChunkReader { + private int _chunkId; + private Integer _chunkLength; + private DataInputStream dataInputStream; + int index = 0; + + ChunkReader(int chunkId, Integer chunkLength, File chunkFile) + throws Exception { + _chunkId = chunkId; + _chunkLength = chunkLength; + dataInputStream = new DataInputStream(new FileInputStream(chunkFile)); + } + + private Entry getNextEntry() + throws IOException { + if (index >= _chunkLength) { + return null; + } + long h3Id = dataInputStream.readLong(); + int size = dataInputStream.readInt(); + byte[] serializedBytes = new byte[size]; + dataInputStream.read(serializedBytes); + ImmutableRoaringBitmap bitmap = new ImmutableRoaringBitmap(ByteBuffer.wrap(serializedBytes)); + index++; + return new Entry(_chunkId, h3Id, bitmap); + } + } + + class Entry implements Comparable<Entry> { + int chunkId; + + long h3Id; + + ImmutableRoaringBitmap bitmap; + + @Override + public boolean equals(Object o) { + + return h3Id == ((Entry) o).h3Id; + } + + public Entry(int chunkId, long h3Id, ImmutableRoaringBitmap bitmap) { + this.chunkId = chunkId; + this.h3Id = h3Id; + this.bitmap = bitmap; + } + + @Override + public int hashCode() { + return Long.hashCode(h3Id); + } + + @Override + public int compareTo(Entry o) { + return Long.compare(h3Id, o.h3Id); + } + } + + private class Writer { + + private DataOutputStream _dictionaryStream; + private DataOutputStream _offsetStream; + private DataOutputStream _bitmapStream; + private int _offset = 0; + private int _numUniqueIds = 0; + + public Writer(DataOutputStream dictionaryStream, DataOutputStream offsetStream, DataOutputStream bitmapStream) { + + _dictionaryStream = dictionaryStream; + _offsetStream = offsetStream; + _bitmapStream = bitmapStream; + } + + public int getNumUniqueIds() { + return _numUniqueIds; + } + + public void add(long h3Id, ImmutableRoaringBitmap bitmap) + throws IOException { + _dictionaryStream.writeLong(h3Id); + _offsetStream.writeInt(_offset); + int serializedSizeInBytes = bitmap.serializedSizeInBytes(); + byte[] byteArray = new byte[serializedSizeInBytes]; + bitmap.serialize(ByteBuffer.wrap(byteArray)); + _bitmapStream.write(byteArray); + _offset += serializedSizeInBytes; + _numUniqueIds++; + } + } + + public static void main(String[] args) + throws Exception { + File indexDir = new File(System.getProperty("java.io.tmpdir"), "h3IndexDir"); + FileUtils.deleteDirectory(indexDir); + indexDir.mkdirs(); + FieldSpec spec = new DimensionFieldSpec("geo_col", FieldSpec.DataType.STRING, true); + int resolution = 5; + H3IndexCreator creator = new H3IndexCreator(indexDir, spec, resolution); + Random rand = new Random(); + H3Core h3Core = H3Core.newInstance(); + Map<Long, Integer> map = new HashMap<>(); + for (int i = 0; i < 10000; i++) { + int lat = rand.nextInt(10); + int lon = rand.nextInt(10); + creator.add(i, lat, lon); + long h3 = h3Core.geoToH3(lat, lon, resolution); + Integer count = map.get(h3); + if (count != null) { + map.put(h3, count + 1); + } else { + map.put(h3, 1); + } + } + creator.seal(); + + System.out.println( + "Contents of IndexDir \n " + FileUtils.listFiles(indexDir, null, true).toString().replaceAll(",", "\n")); + File h3IndexFile = new File(indexDir, "geo_col.h3.index"); + PinotDataBuffer h3IndexBuffer = + PinotDataBuffer.mapFile(h3IndexFile, true, 0, h3IndexFile.length(), ByteOrder.BIG_ENDIAN, "H3 index file"); + H3IndexReader reader = new H3IndexReader(h3IndexBuffer); + for (Map.Entry<Long, Integer> entry : map.entrySet()) { + Long h3 = entry.getKey(); + ImmutableRoaringBitmap docIds = reader.getDocIds(h3); + if (docIds.getCardinality() != map.get(h3)) { + System.out.printf("Failed: expected: %d actual: %d for h3:%d \n", map.get(h3), docIds.getCardinality(), h3); + } else { + System.out.printf("Matched: expected: %d actual: %d for h3:%d \n", map.get(h3), docIds.getCardinality(), h3); + } + } + } +} diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java index b9f5f53..ff24016 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/BaseImmutableDictionary.java @@ -45,7 +45,7 @@ public abstract class BaseImmutableDictionary implements Dictionary { _valueReader = new VarLengthBytesValueReaderWriter(dataBuffer); } else { Preconditions.checkState(dataBuffer.size() == length * numBytesPerValue, - "Buffer size mismatch: bufferSize = %s, numValues = %s, numByesPerValue = %s", dataBuffer.size(), length, + "Buffer size mismatch: bufferSize = %s, numValues = %s, numBytesPerValue = %s", dataBuffer.size(), length, numBytesPerValue); _valueReader = new FixedByteValueReaderWriter(dataBuffer); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java new file mode 100644 index 0000000..96fa586 --- /dev/null +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/readers/geospatial/H3IndexReader.java @@ -0,0 +1,102 @@ +package org.apache.pinot.core.segment.index.readers.geospatial; + +import java.io.Closeable; +import java.lang.ref.SoftReference; +import org.apache.pinot.core.segment.index.readers.BitmapInvertedIndexReader; +import org.apache.pinot.core.segment.index.readers.Dictionary; +import org.apache.pinot.core.segment.index.readers.IntDictionary; +import org.apache.pinot.core.segment.index.readers.LongDictionary; +import org.apache.pinot.core.segment.memory.PinotDataBuffer; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class H3IndexReader implements Closeable { + + public static final Logger LOGGER = LoggerFactory.getLogger(BitmapInvertedIndexReader.class); + + private final PinotDataBuffer _bitmapBuffer; + private final PinotDataBuffer _offsetBuffer; + private final int _numBitmaps; + private final int _bitmapBufferSize; + + private volatile SoftReference<SoftReference<ImmutableRoaringBitmap>[]> _bitmaps; + + private Dictionary _dictionary; + + /** + * Constructs an inverted index with the specified size. + * @param dataBuffer data buffer for the inverted index. + */ + public H3IndexReader(PinotDataBuffer dataBuffer) { + int version = dataBuffer.getInt(0 * Integer.BYTES); + _numBitmaps = dataBuffer.getInt(1 * Integer.BYTES); + + int headerSize = 2 * Integer.BYTES; + //read the dictionary + int dictionarySize = _numBitmaps * Long.BYTES; + int offsetsSize = _numBitmaps * Integer.BYTES; + PinotDataBuffer dictionaryBuffer = dataBuffer.view(headerSize, headerSize + dictionarySize); + _offsetBuffer = dataBuffer.view(headerSize + dictionarySize, headerSize + dictionarySize + offsetsSize); + _bitmapBuffer = dataBuffer.view(headerSize + dictionarySize + offsetsSize, dataBuffer.size()); + _dictionary = new LongDictionary(dictionaryBuffer, _numBitmaps); + _bitmapBufferSize = (int) _bitmapBuffer.size(); + } + + public ImmutableRoaringBitmap getDocIds(long h3IndexId) { + SoftReference<ImmutableRoaringBitmap>[] bitmapArrayReference = null; + int dictId = _dictionary.indexOf(String.valueOf(h3IndexId)); + // Return the bitmap if it's still on heap + if (_bitmaps != null) { + bitmapArrayReference = _bitmaps.get(); + if (bitmapArrayReference != null) { + SoftReference<ImmutableRoaringBitmap> bitmapReference = bitmapArrayReference[dictId]; + if (bitmapReference != null) { + ImmutableRoaringBitmap value = bitmapReference.get(); + if (value != null) { + return value; + } + } + } else { + bitmapArrayReference = new SoftReference[_numBitmaps]; + _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference); + } + } else { + bitmapArrayReference = new SoftReference[_numBitmaps]; + _bitmaps = new SoftReference<SoftReference<ImmutableRoaringBitmap>[]>(bitmapArrayReference); + } + synchronized (this) { + ImmutableRoaringBitmap value; + if (bitmapArrayReference[dictId] == null || bitmapArrayReference[dictId].get() == null) { + value = buildRoaringBitmapForIndex(dictId); + bitmapArrayReference[dictId] = new SoftReference<ImmutableRoaringBitmap>(value); + } else { + value = bitmapArrayReference[dictId].get(); + } + return value; + } + } + + //todo: fix this + private synchronized ImmutableRoaringBitmap buildRoaringBitmapForIndex(final int index) { + int currentOffset = getOffset(index); + int bufferLength; + if (index == _numBitmaps - 1) { + bufferLength = _bitmapBufferSize - currentOffset; + } else { + bufferLength = getOffset(index + 1) - currentOffset; + } + return new ImmutableRoaringBitmap(_bitmapBuffer.toDirectByteBuffer(currentOffset, bufferLength)); + } + + private int getOffset(final int index) { + return _offsetBuffer.getInt(index * Integer.BYTES); + } + + @Override + public void close() { + // NOTE: DO NOT close the PinotDataBuffer here because it is tracked by the caller and might be reused later. The + // caller is responsible of closing the PinotDataBuffer. + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org