This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 51632a51ff5 Adding EmptyIndexBuffer for remote forward Index and 
simplify mapbufferEntries  (#17059)
51632a51ff5 is described below

commit 51632a51ff515b4cd84b226fca49a7b81b3884db
Author: RAGHVENDRA KUMAR YADAV <[email protected]>
AuthorDate: Sat Oct 25 20:00:48 2025 -0700

    Adding EmptyIndexBuffer for remote forward Index and simplify 
mapbufferEntries  (#17059)
    
    * Simplifying mapBufferEntries in SingleFileIndexDirectory and adding 
support for EmptyIndexBuffer for remote forward Index.
    
    * Update 
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/EmptyIndexBuffer.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * Update 
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/EmptyIndexBuffer.java
    
    Co-authored-by: Copilot <[email protected]>
    
    * incorporating the review comments.
    
    * incorporating the review comments.
    
    * Incorporating review comments
    
    ---------
    
    Co-authored-by: Copilot <[email protected]>
---
 .../segment/store/SingleFileIndexDirectory.java    | 101 +++++++-
 .../pinot/segment/spi/memory/EmptyIndexBuffer.java | 279 +++++++++++++++++++++
 2 files changed, 367 insertions(+), 13 deletions(-)

diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
index 49c303ac431..e727a4581af 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/store/SingleFileIndexDirectory.java
@@ -32,6 +32,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
@@ -44,6 +45,7 @@ import org.apache.pinot.segment.spi.index.IndexType;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext;
+import org.apache.pinot.segment.spi.memory.EmptyIndexBuffer;
 import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
 import org.apache.pinot.segment.spi.store.ColumnIndexDirectory;
 import org.apache.pinot.segment.spi.store.ColumnIndexUtils;
@@ -259,23 +261,52 @@ class SingleFileIndexDirectory extends 
ColumnIndexDirectory {
     }
   }
 
-  private void mapBufferEntries()
-      throws IOException {
-    SortedMap<Long, IndexEntry> indexStartMap = new TreeMap<>();
+  private void mapBufferEntries() throws IOException {
+    // Split Entries which have zero size vs non-zero size
+    // Entries with size 0 represent empty indices like remote forward index
+    List<IndexEntry> pinotBufferEntries = new ArrayList<>();
+    List<IndexEntry> zeroSizeEntries = new ArrayList<>();
+
+    for (IndexEntry entry : _columnEntries.values()) {
+      if (entry._size == 0) {
+        zeroSizeEntries.add(entry);
+      } else {
+        pinotBufferEntries.add(entry);
+      }
+    }
 
-    for (Map.Entry<IndexKey, IndexEntry> columnEntry : 
_columnEntries.entrySet()) {
-      long startOffset = columnEntry.getValue()._startOffset;
-      indexStartMap.put(startOffset, columnEntry.getValue());
+    if (!pinotBufferEntries.isEmpty()) {
+      createPinotBuffers(pinotBufferEntries);
+    }
+    if (!zeroSizeEntries.isEmpty()) {
+      createRemoteBuffers(zeroSizeEntries);
+    }
+  }
+
+  /**
+   * Creates buffers for entries with non-zero size, handling memory 
allocation limits
+   */
+  private void createPinotBuffers(List<IndexEntry> regularEntries) throws 
IOException {
+    // Use TreeMap for better memory management of regular entries
+    SortedMap<Long, IndexEntry> indexStartMap = new TreeMap<>();
+    for (IndexEntry entry : regularEntries) {
+      indexStartMap.put(entry._startOffset, entry);
     }
 
+    // Process regular entries in chunks to respect MAX_ALLOCATION_SIZE
     long runningSize = 0;
     List<Long> offsetAccum = new ArrayList<>();
+
     for (Map.Entry<Long, IndexEntry> offsetEntry : indexStartMap.entrySet()) {
       IndexEntry entry = offsetEntry.getValue();
       runningSize += entry._size;
 
       if (runningSize >= MAX_ALLOCATION_SIZE && !offsetAccum.isEmpty()) {
-        mapAndSliceFile(indexStartMap, offsetAccum, offsetEntry.getKey());
+        // Calculate the correct end offset for the previous entries
+        long lastOffset = offsetAccum.get(offsetAccum.size() - 1);
+        IndexEntry lastEntry = indexStartMap.get(lastOffset);
+        long endOffset = lastOffset + lastEntry._size;
+        mapAndSliceFile(indexStartMap, offsetAccum, endOffset);
         runningSize = entry._size;
         offsetAccum.clear();
       }
@@ -283,7 +314,30 @@ class SingleFileIndexDirectory extends 
ColumnIndexDirectory {
     }
 
     if (!offsetAccum.isEmpty()) {
-      mapAndSliceFile(indexStartMap, offsetAccum, offsetAccum.get(0) + 
runningSize);
+      long lastOffset = offsetAccum.get(offsetAccum.size() - 1);
+      IndexEntry lastEntry = indexStartMap.get(lastOffset);
+      long endOffset = lastOffset + lastEntry._size;
+      mapAndSliceFile(indexStartMap, offsetAccum, endOffset);
+    }
+  }
+
+  /**
+   * Creates empty buffers for zero-size entries, using EmptyIndexBuffer
+   * Buffers created this way do not occupy space in the index file and pinot 
segment
+   */
+  private void createRemoteBuffers(List<IndexEntry> zeroSizeEntries) {
+    // Create properties only once for all zero-size entries
+    Properties properties = new Properties();
+    if (_segmentDirectoryLoaderContext != null
+        && _segmentDirectoryLoaderContext.getSegmentCustomConfigs() != null) {
+      
properties.putAll(_segmentDirectoryLoaderContext.getSegmentCustomConfigs());
+    }
+
+    // Create empty buffers for all zero-size entries
+    for (IndexEntry entry : zeroSizeEntries) {
+      entry._buffer = new EmptyIndexBuffer(properties,
+          _segmentMetadata.getName(),
+          _segmentMetadata.getTableName());
     }
   }
 
@@ -296,6 +350,9 @@ class SingleFileIndexDirectory extends ColumnIndexDirectory 
{
     long fromFilePos = offsetAccum.get(0);
     long size = endOffset - fromFilePos;
 
+    LOGGER.debug("Creating buffer: fromFilePos={}, endOffset={}, size={}, 
offsetAccum={}",
+        fromFilePos, endOffset, size, offsetAccum);
+
     String context = allocationContext(_indexFile,
         "single_file_index.rw." + "." + String.valueOf(fromFilePos) + "." + 
String.valueOf(size));
 
@@ -308,13 +365,31 @@ class SingleFileIndexDirectory extends 
ColumnIndexDirectory {
     }
     _allocBuffers.add(buffer);
 
-    long prevSlicePoint = 0;
     for (Long fileOffset : offsetAccum) {
       IndexEntry entry = startOffsets.get(fileOffset);
-      long endSlicePoint = prevSlicePoint + entry._size;
-      validateMagicMarker(buffer, prevSlicePoint);
-      entry._buffer = buffer.view(prevSlicePoint + MAGIC_MARKER_SIZE_BYTES, 
endSlicePoint);
-      prevSlicePoint = endSlicePoint;
+      if (entry._size == 0) {
+        continue;
+      }
+      long baseOffset = entry._startOffset + MAGIC_MARKER_SIZE_BYTES;
+      long sliceSize = entry._size - MAGIC_MARKER_SIZE_BYTES;
+      LOGGER.debug("Processing entry: key={}, startOffset={}, size={}, 
baseOffset={}, sliceSize={}",
+          entry._key, entry._startOffset, entry._size, baseOffset, sliceSize);
+
+      // Convert absolute file offset to buffer-relative offset
+      long bufferRelativeOffset = entry._startOffset - fromFilePos;
+      // Add bounds checking to prevent IndexOutOfBoundsException
+      if (bufferRelativeOffset < 0
+          || bufferRelativeOffset + MAGIC_MARKER_SIZE_BYTES > buffer.size()) {
+        LOGGER.error("Buffer offset out of bounds: bufferRelativeOffset={}, 
buffer.size()={}, "
+            + "entry._startOffset={}, fromFilePos={}",
+            bufferRelativeOffset, buffer.size(), entry._startOffset, 
fromFilePos);
+        throw new RuntimeException("Buffer offset out of bounds for entry: " + 
entry._key);
+      }
+      validateMagicMarker(buffer, bufferRelativeOffset);
+      // Calculate the correct start and end positions for the view
+      long start = baseOffset - fromFilePos;
+      long end = start + sliceSize;
+      entry._buffer = buffer.view(start, end);
     }
   }
 
diff --git 
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/EmptyIndexBuffer.java
 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/EmptyIndexBuffer.java
new file mode 100644
index 00000000000..f760e0515b5
--- /dev/null
+++ 
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/EmptyIndexBuffer.java
@@ -0,0 +1,279 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Properties;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+
+/**
+ * A specialized PinotDataBuffer implementation for zero-size index entries 
that configuration.
+ * This buffer is useful for debugging and tracking purposes when dealing with 
empty index entries.
+ */
+public class EmptyIndexBuffer extends PinotDataBuffer {
+  private final Properties _properties;
+  private final String _segmentName;
+  private final String _tableNameWithType;
+
+  /**
+   * Creates a new EmptyIndexBuffer for a zero-size index entry
+   *
+   * @param properties Properties containing configuration
+   * @param segmentName The name of the segment
+   * @param tableNameWithType The table name with type
+   */
+  public EmptyIndexBuffer(Properties properties, String segmentName, String 
tableNameWithType) {
+    super(false); // Not closeable since it's just metadata
+    _properties = properties;
+    _segmentName = segmentName;
+    _tableNameWithType = tableNameWithType;
+  }
+
+  /**
+   * Gets the properties containing configuration information
+   * @return The properties
+   */
+  public Properties getProperties() {
+    return _properties;
+  }
+
+  @Override
+  public byte getByte(long offset) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot read from empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public void putByte(long offset, byte value) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot write to empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public char getChar(long offset) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot read from empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public void putChar(long offset, char value) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot write to empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public short getShort(long offset) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot read from empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public void putShort(long offset, short value) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot write to empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public int getInt(long offset) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot read from empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public void putInt(long offset, int value) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot write to empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public long getLong(long offset) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot read from empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public void putLong(long offset, long value) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot write to empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public float getFloat(long offset) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot read from empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public void putFloat(long offset, float value) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot write to empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public double getDouble(long offset) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot read from empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public void putDouble(long offset, double value) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot write to empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public void copyTo(long offset, byte[] buffer, int destOffset, int size) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot copy from empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public void readFrom(long offset, byte[] buffer, int srcOffset, int size) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot write to empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public void readFrom(long offset, ByteBuffer buffer) {
+    throw new UnsupportedOperationException(
+        String.format("Cannot write to empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public void readFrom(long offset, java.io.File file, long srcOffset, long 
size)
+      throws IOException {
+    throw new UnsupportedOperationException(
+        String.format("Cannot write to empty buffer for index: %s, segment: 
%s, table: %s", _segmentName,
+            _tableNameWithType));
+  }
+
+  @Override
+  public long size() {
+    return 0; // Zero-size buffer
+  }
+
+  @Override
+  public ByteOrder order() {
+    return ByteOrder.BIG_ENDIAN; // Default to big-endian for consistency
+  }
+
+  @Override
+  public PinotDataBuffer view(long start, long end, ByteOrder byteOrder) {
+    if (start == 0 && end == 0) {
+      return this; // Return self for zero-size view
+    }
+    throw new IllegalArgumentException(
+        String.format("Invalid view range [%d, %d) for empty buffer. Index: 
%s, segment: %s, table: %s", start, end,
+            _segmentName, _tableNameWithType));
+  }
+
+  @Override
+  public void flush() {
+    // No-op for empty buffer
+  }
+
+  @Override
+  public void release()
+      throws IOException {
+    // No-op for empty buffer
+  }
+
+  @Override
+  public ByteBuffer toDirectByteBuffer(long offset, int size, ByteOrder 
byteOrder) {
+    if (size == 0) {
+      return ByteBuffer.allocate(0).order(byteOrder);
+    }
+    throw new IllegalArgumentException(
+        String.format("Cannot create ByteBuffer of size %d from empty buffer. 
Index: %s, segment: %s, table: %s", size,
+            _segmentName, _tableNameWithType));
+  }
+
+  @Override
+  public ImmutableRoaringBitmap viewAsRoaringBitmap(long offset, int length) {
+    throw new IllegalArgumentException(
+        String.format("Cannot create RoaringBitmap of length %d from empty 
buffer. Index: %s, segment: %s, table: %s",
+            length, _segmentName, _tableNameWithType));
+  }
+
+  @Override
+  public void appendAsByteBuffers(java.util.List<ByteBuffer> appendTo) {
+    // No-op for empty buffer
+  }
+
+
+  /**
+   * Gets the segment name for this empty buffer
+   * @return The segment name
+   */
+  public String getSegmentName() {
+    return _segmentName;
+  }
+
+  /**
+   * Gets the table name with type for this empty buffer
+   * @return The table name with type
+   */
+  public String getTableNameWithType() {
+    return _tableNameWithType;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("EmptyIndexBuffer{ segmentName=%s, 
tableNameWithType=%s, segmentPath=%s, size=0}",
+        _segmentName, _tableNameWithType);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (!(obj instanceof EmptyIndexBuffer)) {
+      return false;
+    }
+    EmptyIndexBuffer other = (EmptyIndexBuffer) obj;
+    return _segmentName.equals(
+        other._segmentName) && 
_tableNameWithType.equals(other._tableNameWithType);
+  }
+
+  @Override
+  public int hashCode() {
+    return java.util.Objects.hash(_segmentName, _tableNameWithType);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to