Jackie-Jiang commented on a change in pull request #6409: URL: https://github.com/apache/incubator-pinot/pull/6409#discussion_r553605768
########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java ########## @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.filter; + +import com.uber.h3core.H3Core; +import com.uber.h3core.LengthUnit; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.geospatial.serde.GeometrySerializer; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator; +import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet; +import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.FunctionContext; +import org.apache.pinot.core.query.request.context.predicate.Predicate; +import org.apache.pinot.core.query.request.context.predicate.RangePredicate; +import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution; +import org.apache.pinot.core.segment.index.readers.H3IndexReader; +import org.apache.pinot.core.segment.index.readers.geospatial.ImmutableH3IndexReader; +import org.apache.pinot.spi.utils.BytesUtils; +import org.locationtech.jts.geom.Geometry; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +/** + * A filter operator that uses H3 index for geospatial data retrieval + */ +public class H3IndexFilterOperator extends BaseFilterOperator { + private static final String OPERATOR_NAME = "H3IndexFilterOperator"; + + private Predicate _predicate; + private final DataSource _dataSource; + private final int _numDocs; + private final H3Core _h3Core; + private final IndexSegment _segment; + private final H3IndexReader _h3IndexReader; + private final H3IndexResolution _resolution; + private Geometry _geometry; + private double _distance; + + public H3IndexFilterOperator(Predicate predicate, IndexSegment indexSegment, int numDocs) { + _predicate = predicate; + _segment = indexSegment; + FunctionContext function = predicate.getLhs().getFunction(); + String columnName; + + // TODO: handle composite function that contains ST_DISTANCE + if (function.getArguments().get(0).getType() == ExpressionContext.Type.IDENTIFIER) { + columnName = function.getArguments().get(0).getIdentifier(); + byte[] bytes = BytesUtils.toBytes(function.getArguments().get(1).getLiteral()); + _geometry = GeometrySerializer.deserialize(bytes); + } else if (function.getArguments().get(1).getType() == ExpressionContext.Type.IDENTIFIER) { Review comment: This one does not match the `canApplyH3Index()` in `FilterPlanNode` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java ########## @@ -312,6 +316,13 @@ public long getLatestIngestionTimestamp() { // Inverted index RealtimeInvertedIndexReader invertedIndexReader = invertedIndexColumns.contains(column) ? new RealtimeInvertedIndexReader() : null; + RealtimeH3IndexReader h3IndexReader; + try { + h3IndexReader = h3IndexColumnMap.containsKey(column) ? new RealtimeH3IndexReader( + new H3IndexResolution(h3IndexColumnMap.get(column).getResolutions())) : null; + } catch (IOException e) { + throw new RuntimeException(String.format("Failed to initiate H3 index reader for column %s", column), e); Review comment: ```suggestion throw new RuntimeException(String.format("Failed to initiate H3 index reader for column: %s", column), e); ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java ########## @@ -63,7 +68,7 @@ private RealtimeSegmentConfig(String tableNameWithType, String segmentName, Stri boolean offHeap, PinotDataBufferMemoryManager memoryManager, RealtimeSegmentStatsHistory statsHistory, String partitionColumn, PartitionFunction partitionFunction, int partitionId, boolean aggregateMetrics, boolean nullHandlingEnabled, String consumerDir, UpsertConfig.Mode upsertMode, - PartitionUpsertMetadataManager partitionUpsertMetadataManager) { + PartitionUpsertMetadataManager partitionUpsertMetadataManager, List<H3IndexColumn> h3IndexColumns) { Review comment: Move the argument after `jsonIndexColumns` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java ########## @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.filter; + +import com.uber.h3core.H3Core; +import com.uber.h3core.LengthUnit; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.geospatial.serde.GeometrySerializer; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator; +import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet; +import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.FunctionContext; +import org.apache.pinot.core.query.request.context.predicate.Predicate; +import org.apache.pinot.core.query.request.context.predicate.RangePredicate; +import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution; +import org.apache.pinot.core.segment.index.readers.H3IndexReader; +import org.apache.pinot.core.segment.index.readers.geospatial.ImmutableH3IndexReader; +import org.apache.pinot.spi.utils.BytesUtils; +import org.locationtech.jts.geom.Geometry; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +/** + * A filter operator that uses H3 index for geospatial data retrieval + */ +public class H3IndexFilterOperator extends BaseFilterOperator { + private static final String OPERATOR_NAME = "H3IndexFilterOperator"; + + private Predicate _predicate; + private final DataSource _dataSource; + private final int _numDocs; + private final H3Core _h3Core; + private final IndexSegment _segment; + private final H3IndexReader _h3IndexReader; + private final H3IndexResolution _resolution; + private Geometry _geometry; + private double _distance; + + public H3IndexFilterOperator(Predicate predicate, IndexSegment indexSegment, int numDocs) { + _predicate = predicate; + _segment = indexSegment; + FunctionContext function = predicate.getLhs().getFunction(); + String columnName; + + // TODO: handle composite function that contains ST_DISTANCE + if (function.getArguments().get(0).getType() == ExpressionContext.Type.IDENTIFIER) { + columnName = function.getArguments().get(0).getIdentifier(); + byte[] bytes = BytesUtils.toBytes(function.getArguments().get(1).getLiteral()); + _geometry = GeometrySerializer.deserialize(bytes); + } else if (function.getArguments().get(1).getType() == ExpressionContext.Type.IDENTIFIER) { + columnName = function.getArguments().get(1).getIdentifier(); + byte[] bytes = BytesUtils.toBytes(function.getArguments().get(0).getLiteral()); + _geometry = GeometrySerializer.deserialize(bytes); + } else { + throw new RuntimeException("Expecting one of the arguments of ST_DISTANCE to be an identifier"); + } + _dataSource = indexSegment.getDataSource(columnName); + _h3IndexReader = _dataSource.getH3Index(); + _resolution = _h3IndexReader.getH3IndexResolution(); + switch (predicate.getType()) { + case RANGE: + RangePredicate rangePredicate = (RangePredicate) predicate; + _distance = Double.parseDouble(rangePredicate.getUpperBound()); + break; + default: + throw new RuntimeException(String.format("H3 index does not support predicate type %s", predicate.getType())); + } + _numDocs = numDocs; + try { + _h3Core = H3Core.newInstance(); + } catch (IOException e) { + throw new RuntimeException("Unable to instantiate H3 instance", e); + } + } + + @Override + protected FilterBlock getNextBlock() { + int resolution = _resolution.getLowestResolution(); + long h3Id = _h3Core.geoToH3(_geometry.getCoordinate().x, _geometry.getCoordinate().y, resolution); + assert _h3IndexReader != null; + + // find the number of rings based on distance for full match + // use the edge of the hexagon to determine the rings are within the distance. This is calculated by (1) divide the + // distance by edge length of the solution to get the number of contained rings (2) use the (floor of number - 1) + // for fetching the rings since ring0 is the original hexagon + double edgeLength = _h3Core.edgeLength(resolution, LengthUnit.m); + int numFullMatchedRings = (int) Math.floor(_distance / edgeLength); + MutableRoaringBitmap fullMatchedDocIds = new MutableRoaringBitmap(); + List<Long> fullMatchRings = new ArrayList<>(); + if (numFullMatchedRings > 0) { + fullMatchRings = _h3Core.kRing(h3Id, numFullMatchedRings - 1); Review comment: Is the algorithm proved and documented somewhere? I somehow feel it won't give the correct result ########## File path: pinot-core/src/main/java/org/apache/pinot/core/segment/creator/GeoSpatialIndexCreator.java ########## @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.segment.creator; + +import java.io.Closeable; + + +/** + * An index creator for geospatial data + */ +public interface GeoSpatialIndexCreator extends Closeable { + + /** + * Adds a doc associated with a geospatial point + * @param docId the document id + * @param lat the latitude + * @param lon the longitude + */ + void add(int docId, double lat, double lon); Review comment: We don't usually put docId explicitly. It is always appended as the next doc ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java ########## @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.filter; + +import com.uber.h3core.H3Core; +import com.uber.h3core.LengthUnit; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.geospatial.serde.GeometrySerializer; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator; +import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet; +import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.FunctionContext; +import org.apache.pinot.core.query.request.context.predicate.Predicate; +import org.apache.pinot.core.query.request.context.predicate.RangePredicate; +import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution; +import org.apache.pinot.core.segment.index.readers.H3IndexReader; +import org.apache.pinot.core.segment.index.readers.geospatial.ImmutableH3IndexReader; +import org.apache.pinot.spi.utils.BytesUtils; +import org.locationtech.jts.geom.Geometry; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +/** + * A filter operator that uses H3 index for geospatial data retrieval + */ +public class H3IndexFilterOperator extends BaseFilterOperator { + private static final String OPERATOR_NAME = "H3IndexFilterOperator"; + + private Predicate _predicate; + private final DataSource _dataSource; + private final int _numDocs; + private final H3Core _h3Core; + private final IndexSegment _segment; + private final H3IndexReader _h3IndexReader; + private final H3IndexResolution _resolution; + private Geometry _geometry; Review comment: `_geometry` and `_distance` should also be final ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java ########## @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.filter; + +import com.uber.h3core.H3Core; +import com.uber.h3core.LengthUnit; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.geospatial.serde.GeometrySerializer; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator; +import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet; +import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.FunctionContext; +import org.apache.pinot.core.query.request.context.predicate.Predicate; +import org.apache.pinot.core.query.request.context.predicate.RangePredicate; +import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution; +import org.apache.pinot.core.segment.index.readers.H3IndexReader; +import org.apache.pinot.core.segment.index.readers.geospatial.ImmutableH3IndexReader; +import org.apache.pinot.spi.utils.BytesUtils; +import org.locationtech.jts.geom.Geometry; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +/** + * A filter operator that uses H3 index for geospatial data retrieval + */ +public class H3IndexFilterOperator extends BaseFilterOperator { + private static final String OPERATOR_NAME = "H3IndexFilterOperator"; + + private Predicate _predicate; Review comment: final ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java ########## @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.filter; + +import com.uber.h3core.H3Core; +import com.uber.h3core.LengthUnit; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.geospatial.serde.GeometrySerializer; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator; +import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet; +import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.FunctionContext; +import org.apache.pinot.core.query.request.context.predicate.Predicate; +import org.apache.pinot.core.query.request.context.predicate.RangePredicate; +import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution; +import org.apache.pinot.core.segment.index.readers.H3IndexReader; +import org.apache.pinot.core.segment.index.readers.geospatial.ImmutableH3IndexReader; +import org.apache.pinot.spi.utils.BytesUtils; +import org.locationtech.jts.geom.Geometry; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +/** + * A filter operator that uses H3 index for geospatial data retrieval + */ +public class H3IndexFilterOperator extends BaseFilterOperator { + private static final String OPERATOR_NAME = "H3IndexFilterOperator"; + + private Predicate _predicate; + private final DataSource _dataSource; + private final int _numDocs; + private final H3Core _h3Core; + private final IndexSegment _segment; + private final H3IndexReader _h3IndexReader; + private final H3IndexResolution _resolution; + private Geometry _geometry; + private double _distance; + + public H3IndexFilterOperator(Predicate predicate, IndexSegment indexSegment, int numDocs) { + _predicate = predicate; + _segment = indexSegment; + FunctionContext function = predicate.getLhs().getFunction(); + String columnName; + + // TODO: handle composite function that contains ST_DISTANCE + if (function.getArguments().get(0).getType() == ExpressionContext.Type.IDENTIFIER) { + columnName = function.getArguments().get(0).getIdentifier(); + byte[] bytes = BytesUtils.toBytes(function.getArguments().get(1).getLiteral()); + _geometry = GeometrySerializer.deserialize(bytes); + } else if (function.getArguments().get(1).getType() == ExpressionContext.Type.IDENTIFIER) { + columnName = function.getArguments().get(1).getIdentifier(); + byte[] bytes = BytesUtils.toBytes(function.getArguments().get(0).getLiteral()); + _geometry = GeometrySerializer.deserialize(bytes); + } else { + throw new RuntimeException("Expecting one of the arguments of ST_DISTANCE to be an identifier"); + } + _dataSource = indexSegment.getDataSource(columnName); + _h3IndexReader = _dataSource.getH3Index(); + _resolution = _h3IndexReader.getH3IndexResolution(); + switch (predicate.getType()) { + case RANGE: + RangePredicate rangePredicate = (RangePredicate) predicate; + _distance = Double.parseDouble(rangePredicate.getUpperBound()); + break; + default: + throw new RuntimeException(String.format("H3 index does not support predicate type %s", predicate.getType())); + } + _numDocs = numDocs; + try { + _h3Core = H3Core.newInstance(); + } catch (IOException e) { + throw new RuntimeException("Unable to instantiate H3 instance", e); + } + } + + @Override + protected FilterBlock getNextBlock() { + int resolution = _resolution.getLowestResolution(); + long h3Id = _h3Core.geoToH3(_geometry.getCoordinate().x, _geometry.getCoordinate().y, resolution); + assert _h3IndexReader != null; + + // find the number of rings based on distance for full match + // use the edge of the hexagon to determine the rings are within the distance. This is calculated by (1) divide the + // distance by edge length of the solution to get the number of contained rings (2) use the (floor of number - 1) Review comment: ```suggestion // distance by edge length of the resolution to get the number of contained rings (2) use the (floor of number - 1) ``` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/geospatial/RealtimeH3IndexReader.java ########## @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.geospatial; + +import com.uber.h3core.H3Core; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap; +import org.apache.pinot.core.segment.creator.GeoSpatialIndexCreator; +import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution; +import org.apache.pinot.core.segment.index.readers.H3IndexReader; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +/** + * A H3 index reader for the real-time H3 index values on the fly. + * <p>This class is thread-safe for single writer multiple readers. + */ +public class RealtimeH3IndexReader implements GeoSpatialIndexCreator, H3IndexReader { + private final H3Core _h3Core; + private final Map<Long, ThreadSafeMutableRoaringBitmap> _h3IndexMap = new ConcurrentHashMap<>(); + private final H3IndexResolution _resolution; + private int _lowestResolution; + + public RealtimeH3IndexReader(H3IndexResolution resolution) + throws IOException { + _resolution = resolution; + _lowestResolution = resolution.getLowestResolution(); + _h3Core = H3Core.newInstance(); + } + + @Override + public void add(int docId, double lat, double lon) { + // TODO support multiple resolutions + Long h3Id = _h3Core.geoToH3(lat, lon, _lowestResolution); + _h3IndexMap.computeIfAbsent(h3Id, k -> new ThreadSafeMutableRoaringBitmap()); + synchronized (this) { Review comment: No need to synchronize it. It is guarded by `ConcurrentHashMap` and `ThreadSafeMutableRoaringBitmap` ########## File path: pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java ########## @@ -45,13 +47,14 @@ public BaseDataSource(DataSourceMetadata dataSourceMetadata, ForwardIndexReader<?> forwardIndex, @Nullable Dictionary dictionary, @Nullable InvertedIndexReader<?> invertedIndex, @Nullable InvertedIndexReader<?> rangeIndex, @Nullable TextIndexReader textIndex, - @Nullable TextIndexReader fstIndex, @Nullable JsonIndexReader jsonIndex, @Nullable BloomFilterReader bloomFilter, - @Nullable NullValueVectorReader nullValueVector) { + @Nullable TextIndexReader fstIndex, @Nullable JsonIndexReader jsonIndex, @Nullable H3IndexReader h3Index, Review comment: Keep the interface, constructor and member variables in the same order. I personally suggest putting `H3Index` right after `JsonIndex` (the order of the constructor) ########## File path: pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java ########## @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.operator.filter; + +import com.uber.h3core.H3Core; +import com.uber.h3core.LengthUnit; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.pinot.core.common.DataSource; +import org.apache.pinot.core.geospatial.serde.GeometrySerializer; +import org.apache.pinot.core.indexsegment.IndexSegment; +import org.apache.pinot.core.operator.blocks.FilterBlock; +import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator; +import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet; +import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet; +import org.apache.pinot.core.query.request.context.ExpressionContext; +import org.apache.pinot.core.query.request.context.FunctionContext; +import org.apache.pinot.core.query.request.context.predicate.Predicate; +import org.apache.pinot.core.query.request.context.predicate.RangePredicate; +import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution; +import org.apache.pinot.core.segment.index.readers.H3IndexReader; +import org.apache.pinot.core.segment.index.readers.geospatial.ImmutableH3IndexReader; +import org.apache.pinot.spi.utils.BytesUtils; +import org.locationtech.jts.geom.Geometry; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +/** + * A filter operator that uses H3 index for geospatial data retrieval + */ +public class H3IndexFilterOperator extends BaseFilterOperator { + private static final String OPERATOR_NAME = "H3IndexFilterOperator"; + + private Predicate _predicate; + private final DataSource _dataSource; + private final int _numDocs; + private final H3Core _h3Core; + private final IndexSegment _segment; + private final H3IndexReader _h3IndexReader; + private final H3IndexResolution _resolution; + private Geometry _geometry; + private double _distance; + + public H3IndexFilterOperator(Predicate predicate, IndexSegment indexSegment, int numDocs) { + _predicate = predicate; + _segment = indexSegment; + FunctionContext function = predicate.getLhs().getFunction(); + String columnName; + + // TODO: handle composite function that contains ST_DISTANCE + if (function.getArguments().get(0).getType() == ExpressionContext.Type.IDENTIFIER) { + columnName = function.getArguments().get(0).getIdentifier(); + byte[] bytes = BytesUtils.toBytes(function.getArguments().get(1).getLiteral()); + _geometry = GeometrySerializer.deserialize(bytes); + } else if (function.getArguments().get(1).getType() == ExpressionContext.Type.IDENTIFIER) { + columnName = function.getArguments().get(1).getIdentifier(); + byte[] bytes = BytesUtils.toBytes(function.getArguments().get(0).getLiteral()); + _geometry = GeometrySerializer.deserialize(bytes); + } else { + throw new RuntimeException("Expecting one of the arguments of ST_DISTANCE to be an identifier"); + } + _dataSource = indexSegment.getDataSource(columnName); + _h3IndexReader = _dataSource.getH3Index(); + _resolution = _h3IndexReader.getH3IndexResolution(); + switch (predicate.getType()) { Review comment: Do a precondition instead of `switch` here ########## File path: pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/geospatial/RealtimeH3IndexReader.java ########## @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.core.realtime.impl.geospatial; + +import com.uber.h3core.H3Core; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap; +import org.apache.pinot.core.segment.creator.GeoSpatialIndexCreator; +import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution; +import org.apache.pinot.core.segment.index.readers.H3IndexReader; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; +import org.roaringbitmap.buffer.MutableRoaringBitmap; + + +/** + * A H3 index reader for the real-time H3 index values on the fly. + * <p>This class is thread-safe for single writer multiple readers. + */ +public class RealtimeH3IndexReader implements GeoSpatialIndexCreator, H3IndexReader { + private final H3Core _h3Core; + private final Map<Long, ThreadSafeMutableRoaringBitmap> _h3IndexMap = new ConcurrentHashMap<>(); + private final H3IndexResolution _resolution; + private int _lowestResolution; Review comment: final ########## File path: pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java ########## @@ -359,12 +375,17 @@ public Builder setPartitionUpsertMetadataManager(PartitionUpsertMetadataManager return this; } + public Builder setH3IndexColumns(List<H3IndexColumn> h3IndexColumns) { Review comment: Move it after `setJsonIndexColumns` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org