Jackie-Jiang commented on a change in pull request #6409:
URL: https://github.com/apache/incubator-pinot/pull/6409#discussion_r553605768



##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java
##########
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.uber.h3core.H3Core;
+import com.uber.h3core.LengthUnit;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FunctionContext;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.predicate.RangePredicate;
+import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution;
+import org.apache.pinot.core.segment.index.readers.H3IndexReader;
+import 
org.apache.pinot.core.segment.index.readers.geospatial.ImmutableH3IndexReader;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.locationtech.jts.geom.Geometry;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+/**
+ * A filter operator that uses H3 index for geospatial data retrieval
+ */
+public class H3IndexFilterOperator extends BaseFilterOperator {
+  private static final String OPERATOR_NAME = "H3IndexFilterOperator";
+
+  private Predicate _predicate;
+  private final DataSource _dataSource;
+  private final int _numDocs;
+  private final H3Core _h3Core;
+  private final IndexSegment _segment;
+  private final H3IndexReader _h3IndexReader;
+  private final H3IndexResolution _resolution;
+  private Geometry _geometry;
+  private double _distance;
+
+  public H3IndexFilterOperator(Predicate predicate, IndexSegment indexSegment, 
int numDocs) {
+    _predicate = predicate;
+    _segment = indexSegment;
+    FunctionContext function = predicate.getLhs().getFunction();
+    String columnName;
+
+    // TODO: handle composite function that contains ST_DISTANCE
+    if (function.getArguments().get(0).getType() == 
ExpressionContext.Type.IDENTIFIER) {
+      columnName = function.getArguments().get(0).getIdentifier();
+      byte[] bytes = 
BytesUtils.toBytes(function.getArguments().get(1).getLiteral());
+      _geometry = GeometrySerializer.deserialize(bytes);
+    } else if (function.getArguments().get(1).getType() == 
ExpressionContext.Type.IDENTIFIER) {

Review comment:
       This one does not match the `canApplyH3Index()` in `FilterPlanNode`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImpl.java
##########
@@ -312,6 +316,13 @@ public long getLatestIngestionTimestamp() {
       // Inverted index
       RealtimeInvertedIndexReader invertedIndexReader =
           invertedIndexColumns.contains(column) ? new 
RealtimeInvertedIndexReader() : null;
+      RealtimeH3IndexReader h3IndexReader;
+      try {
+        h3IndexReader = h3IndexColumnMap.containsKey(column) ? new 
RealtimeH3IndexReader(
+            new 
H3IndexResolution(h3IndexColumnMap.get(column).getResolutions())) : null;
+      } catch (IOException e) {
+        throw new RuntimeException(String.format("Failed to initiate H3 index 
reader for column %s", column), e);

Review comment:
       ```suggestion
           throw new RuntimeException(String.format("Failed to initiate H3 
index reader for column: %s", column), e);
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
##########
@@ -63,7 +68,7 @@ private RealtimeSegmentConfig(String tableNameWithType, 
String segmentName, Stri
       boolean offHeap, PinotDataBufferMemoryManager memoryManager, 
RealtimeSegmentStatsHistory statsHistory,
       String partitionColumn, PartitionFunction partitionFunction, int 
partitionId, boolean aggregateMetrics,
       boolean nullHandlingEnabled, String consumerDir, UpsertConfig.Mode 
upsertMode,
-      PartitionUpsertMetadataManager partitionUpsertMetadataManager) {
+      PartitionUpsertMetadataManager partitionUpsertMetadataManager, 
List<H3IndexColumn> h3IndexColumns) {

Review comment:
       Move the argument after `jsonIndexColumns`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java
##########
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.uber.h3core.H3Core;
+import com.uber.h3core.LengthUnit;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FunctionContext;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.predicate.RangePredicate;
+import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution;
+import org.apache.pinot.core.segment.index.readers.H3IndexReader;
+import 
org.apache.pinot.core.segment.index.readers.geospatial.ImmutableH3IndexReader;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.locationtech.jts.geom.Geometry;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+/**
+ * A filter operator that uses H3 index for geospatial data retrieval
+ */
+public class H3IndexFilterOperator extends BaseFilterOperator {
+  private static final String OPERATOR_NAME = "H3IndexFilterOperator";
+
+  private Predicate _predicate;
+  private final DataSource _dataSource;
+  private final int _numDocs;
+  private final H3Core _h3Core;
+  private final IndexSegment _segment;
+  private final H3IndexReader _h3IndexReader;
+  private final H3IndexResolution _resolution;
+  private Geometry _geometry;
+  private double _distance;
+
+  public H3IndexFilterOperator(Predicate predicate, IndexSegment indexSegment, 
int numDocs) {
+    _predicate = predicate;
+    _segment = indexSegment;
+    FunctionContext function = predicate.getLhs().getFunction();
+    String columnName;
+
+    // TODO: handle composite function that contains ST_DISTANCE
+    if (function.getArguments().get(0).getType() == 
ExpressionContext.Type.IDENTIFIER) {
+      columnName = function.getArguments().get(0).getIdentifier();
+      byte[] bytes = 
BytesUtils.toBytes(function.getArguments().get(1).getLiteral());
+      _geometry = GeometrySerializer.deserialize(bytes);
+    } else if (function.getArguments().get(1).getType() == 
ExpressionContext.Type.IDENTIFIER) {
+      columnName = function.getArguments().get(1).getIdentifier();
+      byte[] bytes = 
BytesUtils.toBytes(function.getArguments().get(0).getLiteral());
+      _geometry = GeometrySerializer.deserialize(bytes);
+    } else {
+      throw new RuntimeException("Expecting one of the arguments of 
ST_DISTANCE to be an identifier");
+    }
+    _dataSource = indexSegment.getDataSource(columnName);
+    _h3IndexReader = _dataSource.getH3Index();
+    _resolution = _h3IndexReader.getH3IndexResolution();
+    switch (predicate.getType()) {
+      case RANGE:
+        RangePredicate rangePredicate = (RangePredicate) predicate;
+        _distance = Double.parseDouble(rangePredicate.getUpperBound());
+        break;
+      default:
+        throw new RuntimeException(String.format("H3 index does not support 
predicate type %s", predicate.getType()));
+    }
+    _numDocs = numDocs;
+    try {
+      _h3Core = H3Core.newInstance();
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to instantiate H3 instance", e);
+    }
+  }
+
+  @Override
+  protected FilterBlock getNextBlock() {
+    int resolution = _resolution.getLowestResolution();
+    long h3Id = _h3Core.geoToH3(_geometry.getCoordinate().x, 
_geometry.getCoordinate().y, resolution);
+    assert _h3IndexReader != null;
+
+    // find the number of rings based on distance for full match
+    // use the edge of the hexagon to determine the rings are within the 
distance. This is calculated by (1) divide the
+    // distance by edge length of the solution to get the number of contained 
rings (2) use the (floor of number - 1)
+    // for fetching the rings since ring0 is the original hexagon
+    double edgeLength = _h3Core.edgeLength(resolution, LengthUnit.m);
+    int numFullMatchedRings = (int) Math.floor(_distance / edgeLength);
+    MutableRoaringBitmap fullMatchedDocIds = new MutableRoaringBitmap();
+    List<Long> fullMatchRings = new ArrayList<>();
+    if (numFullMatchedRings > 0) {
+      fullMatchRings = _h3Core.kRing(h3Id, numFullMatchedRings - 1);

Review comment:
       Is the algorithm proved and documented somewhere? I somehow feel it 
won't give the correct result

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/creator/GeoSpatialIndexCreator.java
##########
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.creator;
+
+import java.io.Closeable;
+
+
+/**
+ * An index creator for geospatial data
+ */
+public interface GeoSpatialIndexCreator extends Closeable {
+
+  /**
+   * Adds a doc associated with a geospatial point
+   * @param docId the document id
+   * @param lat the latitude
+   * @param lon the longitude
+   */
+  void add(int docId, double lat, double lon);

Review comment:
       We don't usually put docId explicitly. It is always appended as the next 
doc

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java
##########
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.uber.h3core.H3Core;
+import com.uber.h3core.LengthUnit;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FunctionContext;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.predicate.RangePredicate;
+import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution;
+import org.apache.pinot.core.segment.index.readers.H3IndexReader;
+import 
org.apache.pinot.core.segment.index.readers.geospatial.ImmutableH3IndexReader;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.locationtech.jts.geom.Geometry;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+/**
+ * A filter operator that uses H3 index for geospatial data retrieval
+ */
+public class H3IndexFilterOperator extends BaseFilterOperator {
+  private static final String OPERATOR_NAME = "H3IndexFilterOperator";
+
+  private Predicate _predicate;
+  private final DataSource _dataSource;
+  private final int _numDocs;
+  private final H3Core _h3Core;
+  private final IndexSegment _segment;
+  private final H3IndexReader _h3IndexReader;
+  private final H3IndexResolution _resolution;
+  private Geometry _geometry;

Review comment:
       `_geometry` and `_distance` should also be final

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java
##########
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.uber.h3core.H3Core;
+import com.uber.h3core.LengthUnit;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FunctionContext;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.predicate.RangePredicate;
+import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution;
+import org.apache.pinot.core.segment.index.readers.H3IndexReader;
+import 
org.apache.pinot.core.segment.index.readers.geospatial.ImmutableH3IndexReader;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.locationtech.jts.geom.Geometry;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+/**
+ * A filter operator that uses H3 index for geospatial data retrieval
+ */
+public class H3IndexFilterOperator extends BaseFilterOperator {
+  private static final String OPERATOR_NAME = "H3IndexFilterOperator";
+
+  private Predicate _predicate;

Review comment:
       final

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java
##########
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.uber.h3core.H3Core;
+import com.uber.h3core.LengthUnit;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FunctionContext;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.predicate.RangePredicate;
+import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution;
+import org.apache.pinot.core.segment.index.readers.H3IndexReader;
+import 
org.apache.pinot.core.segment.index.readers.geospatial.ImmutableH3IndexReader;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.locationtech.jts.geom.Geometry;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+/**
+ * A filter operator that uses H3 index for geospatial data retrieval
+ */
+public class H3IndexFilterOperator extends BaseFilterOperator {
+  private static final String OPERATOR_NAME = "H3IndexFilterOperator";
+
+  private Predicate _predicate;
+  private final DataSource _dataSource;
+  private final int _numDocs;
+  private final H3Core _h3Core;
+  private final IndexSegment _segment;
+  private final H3IndexReader _h3IndexReader;
+  private final H3IndexResolution _resolution;
+  private Geometry _geometry;
+  private double _distance;
+
+  public H3IndexFilterOperator(Predicate predicate, IndexSegment indexSegment, 
int numDocs) {
+    _predicate = predicate;
+    _segment = indexSegment;
+    FunctionContext function = predicate.getLhs().getFunction();
+    String columnName;
+
+    // TODO: handle composite function that contains ST_DISTANCE
+    if (function.getArguments().get(0).getType() == 
ExpressionContext.Type.IDENTIFIER) {
+      columnName = function.getArguments().get(0).getIdentifier();
+      byte[] bytes = 
BytesUtils.toBytes(function.getArguments().get(1).getLiteral());
+      _geometry = GeometrySerializer.deserialize(bytes);
+    } else if (function.getArguments().get(1).getType() == 
ExpressionContext.Type.IDENTIFIER) {
+      columnName = function.getArguments().get(1).getIdentifier();
+      byte[] bytes = 
BytesUtils.toBytes(function.getArguments().get(0).getLiteral());
+      _geometry = GeometrySerializer.deserialize(bytes);
+    } else {
+      throw new RuntimeException("Expecting one of the arguments of 
ST_DISTANCE to be an identifier");
+    }
+    _dataSource = indexSegment.getDataSource(columnName);
+    _h3IndexReader = _dataSource.getH3Index();
+    _resolution = _h3IndexReader.getH3IndexResolution();
+    switch (predicate.getType()) {
+      case RANGE:
+        RangePredicate rangePredicate = (RangePredicate) predicate;
+        _distance = Double.parseDouble(rangePredicate.getUpperBound());
+        break;
+      default:
+        throw new RuntimeException(String.format("H3 index does not support 
predicate type %s", predicate.getType()));
+    }
+    _numDocs = numDocs;
+    try {
+      _h3Core = H3Core.newInstance();
+    } catch (IOException e) {
+      throw new RuntimeException("Unable to instantiate H3 instance", e);
+    }
+  }
+
+  @Override
+  protected FilterBlock getNextBlock() {
+    int resolution = _resolution.getLowestResolution();
+    long h3Id = _h3Core.geoToH3(_geometry.getCoordinate().x, 
_geometry.getCoordinate().y, resolution);
+    assert _h3IndexReader != null;
+
+    // find the number of rings based on distance for full match
+    // use the edge of the hexagon to determine the rings are within the 
distance. This is calculated by (1) divide the
+    // distance by edge length of the solution to get the number of contained 
rings (2) use the (floor of number - 1)

Review comment:
       ```suggestion
       // distance by edge length of the resolution to get the number of 
contained rings (2) use the (floor of number - 1)
   ```

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/geospatial/RealtimeH3IndexReader.java
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.geospatial;
+
+import com.uber.h3core.H3Core;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.core.segment.creator.GeoSpatialIndexCreator;
+import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution;
+import org.apache.pinot.core.segment.index.readers.H3IndexReader;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+/**
+ * A H3 index reader for the real-time H3 index values on the fly.
+ * <p>This class is thread-safe for single writer multiple readers.
+ */
+public class RealtimeH3IndexReader implements GeoSpatialIndexCreator, 
H3IndexReader {
+  private final H3Core _h3Core;
+  private final Map<Long, ThreadSafeMutableRoaringBitmap> _h3IndexMap = new 
ConcurrentHashMap<>();
+  private final H3IndexResolution _resolution;
+  private int _lowestResolution;
+
+  public RealtimeH3IndexReader(H3IndexResolution resolution)
+      throws IOException {
+    _resolution = resolution;
+    _lowestResolution = resolution.getLowestResolution();
+    _h3Core = H3Core.newInstance();
+  }
+
+  @Override
+  public void add(int docId, double lat, double lon) {
+    // TODO support multiple resolutions
+    Long h3Id = _h3Core.geoToH3(lat, lon, _lowestResolution);
+    _h3IndexMap.computeIfAbsent(h3Id, k -> new 
ThreadSafeMutableRoaringBitmap());
+    synchronized (this) {

Review comment:
       No need to synchronize it. It is guarded by `ConcurrentHashMap` and 
`ThreadSafeMutableRoaringBitmap`

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/segment/index/datasource/BaseDataSource.java
##########
@@ -45,13 +47,14 @@
   public BaseDataSource(DataSourceMetadata dataSourceMetadata, 
ForwardIndexReader<?> forwardIndex,
       @Nullable Dictionary dictionary, @Nullable InvertedIndexReader<?> 
invertedIndex,
       @Nullable InvertedIndexReader<?> rangeIndex, @Nullable TextIndexReader 
textIndex,
-      @Nullable TextIndexReader fstIndex, @Nullable JsonIndexReader jsonIndex, 
@Nullable BloomFilterReader bloomFilter,
-      @Nullable NullValueVectorReader nullValueVector) {
+      @Nullable TextIndexReader fstIndex, @Nullable JsonIndexReader jsonIndex, 
@Nullable H3IndexReader h3Index,

Review comment:
       Keep the interface, constructor and member variables in the same order. 
I personally suggest putting `H3Index` right after `JsonIndex` (the order of 
the constructor)

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/operator/filter/H3IndexFilterOperator.java
##########
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.filter;
+
+import com.uber.h3core.H3Core;
+import com.uber.h3core.LengthUnit;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.core.common.DataSource;
+import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.blocks.FilterBlock;
+import org.apache.pinot.core.operator.dociditerators.ScanBasedDocIdIterator;
+import org.apache.pinot.core.operator.docidsets.BitmapDocIdSet;
+import org.apache.pinot.core.operator.docidsets.FilterBlockDocIdSet;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FunctionContext;
+import org.apache.pinot.core.query.request.context.predicate.Predicate;
+import org.apache.pinot.core.query.request.context.predicate.RangePredicate;
+import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution;
+import org.apache.pinot.core.segment.index.readers.H3IndexReader;
+import 
org.apache.pinot.core.segment.index.readers.geospatial.ImmutableH3IndexReader;
+import org.apache.pinot.spi.utils.BytesUtils;
+import org.locationtech.jts.geom.Geometry;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+/**
+ * A filter operator that uses H3 index for geospatial data retrieval
+ */
+public class H3IndexFilterOperator extends BaseFilterOperator {
+  private static final String OPERATOR_NAME = "H3IndexFilterOperator";
+
+  private Predicate _predicate;
+  private final DataSource _dataSource;
+  private final int _numDocs;
+  private final H3Core _h3Core;
+  private final IndexSegment _segment;
+  private final H3IndexReader _h3IndexReader;
+  private final H3IndexResolution _resolution;
+  private Geometry _geometry;
+  private double _distance;
+
+  public H3IndexFilterOperator(Predicate predicate, IndexSegment indexSegment, 
int numDocs) {
+    _predicate = predicate;
+    _segment = indexSegment;
+    FunctionContext function = predicate.getLhs().getFunction();
+    String columnName;
+
+    // TODO: handle composite function that contains ST_DISTANCE
+    if (function.getArguments().get(0).getType() == 
ExpressionContext.Type.IDENTIFIER) {
+      columnName = function.getArguments().get(0).getIdentifier();
+      byte[] bytes = 
BytesUtils.toBytes(function.getArguments().get(1).getLiteral());
+      _geometry = GeometrySerializer.deserialize(bytes);
+    } else if (function.getArguments().get(1).getType() == 
ExpressionContext.Type.IDENTIFIER) {
+      columnName = function.getArguments().get(1).getIdentifier();
+      byte[] bytes = 
BytesUtils.toBytes(function.getArguments().get(0).getLiteral());
+      _geometry = GeometrySerializer.deserialize(bytes);
+    } else {
+      throw new RuntimeException("Expecting one of the arguments of 
ST_DISTANCE to be an identifier");
+    }
+    _dataSource = indexSegment.getDataSource(columnName);
+    _h3IndexReader = _dataSource.getH3Index();
+    _resolution = _h3IndexReader.getH3IndexResolution();
+    switch (predicate.getType()) {

Review comment:
       Do a precondition instead of `switch` here

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/geospatial/RealtimeH3IndexReader.java
##########
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.realtime.impl.geospatial;
+
+import com.uber.h3core.H3Core;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.pinot.core.realtime.impl.ThreadSafeMutableRoaringBitmap;
+import org.apache.pinot.core.segment.creator.GeoSpatialIndexCreator;
+import org.apache.pinot.core.segment.creator.impl.geospatial.H3IndexResolution;
+import org.apache.pinot.core.segment.index.readers.H3IndexReader;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+
+/**
+ * A H3 index reader for the real-time H3 index values on the fly.
+ * <p>This class is thread-safe for single writer multiple readers.
+ */
+public class RealtimeH3IndexReader implements GeoSpatialIndexCreator, 
H3IndexReader {
+  private final H3Core _h3Core;
+  private final Map<Long, ThreadSafeMutableRoaringBitmap> _h3IndexMap = new 
ConcurrentHashMap<>();
+  private final H3IndexResolution _resolution;
+  private int _lowestResolution;

Review comment:
       final

##########
File path: 
pinot-core/src/main/java/org/apache/pinot/core/realtime/impl/RealtimeSegmentConfig.java
##########
@@ -359,12 +375,17 @@ public Builder 
setPartitionUpsertMetadataManager(PartitionUpsertMetadataManager
       return this;
     }
 
+    public Builder setH3IndexColumns(List<H3IndexColumn> h3IndexColumns) {

Review comment:
       Move it after `setJsonIndexColumns`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to