gortiz commented on code in PR #13304:
URL: https://github.com/apache/pinot/pull/13304#discussion_r1639621299


##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/DataBufferPinotInputStream.java:
##########
@@ -0,0 +1,243 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+
+/**
+ * An adaptor that allows a {@link DataBuffer} to be read as a {@link 
PinotInputStream}.
+ */
+public class DataBufferPinotInputStream extends PinotInputStream {
+  private final DataBuffer _dataBuffer;
+  private long _currentOffset;
+
+  public DataBufferPinotInputStream(DataBuffer dataBuffer) {
+    this(dataBuffer, 0, dataBuffer.size());
+  }
+
+  public DataBufferPinotInputStream(DataBuffer dataBuffer, long startOffset, 
long endOffset) {
+    _dataBuffer = dataBuffer.view(startOffset, endOffset, 
ByteOrder.BIG_ENDIAN);
+    _currentOffset = 0;
+  }
+
+  @Override
+  public long getCurrentOffset() {
+    return _currentOffset;
+  }
+
+  @Override
+  public void seek(long newPos) {
+    if (newPos < 0 || newPos > _dataBuffer.size()) {
+      throw new IllegalArgumentException("Invalid new position: " + newPos);
+    }
+    _currentOffset = newPos;
+  }
+
+  @Override
+  public int read(ByteBuffer buf) {
+    int remaining = available();
+    if (remaining == 0) {
+      return -1;
+    }
+    PinotByteBuffer wrap = PinotByteBuffer.wrap(buf);
+    int toRead = Math.min(remaining, buf.remaining());
+    if (toRead > 0) {
+      _dataBuffer.copyTo(_currentOffset, wrap, 0, toRead);
+      _currentOffset += toRead;
+    }
+
+    return toRead;
+  }
+
+  @Override
+  public int read() {
+    if (_currentOffset >= _dataBuffer.size()) {
+      return -1;
+    } else {
+      return _dataBuffer.getByte(_currentOffset++) & 0xFF;
+    }
+  }
+
+  @Override
+  public int read(byte[] b, int off, int len) {
+    if (off < 0 || len < 0 || len > b.length - off) {
+      throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", 
b.length=" + b.length);
+    }
+    int available = available();
+    if (available == 0) {
+      return -1;
+    }
+    int result = Math.min(available, len);
+    _dataBuffer.copyTo(_currentOffset, b, off, result);
+    _currentOffset += result;
+    return result;
+  }
+
+  @Override
+  public long skip(long n) {
+    long increase = Math.min(n, availableLong());
+    _currentOffset += increase;
+    return increase;
+  }
+
+  public long availableLong() {
+    return _dataBuffer.size() - _currentOffset;
+  }
+
+  @Override
+  public int available() {
+    long available = _dataBuffer.size() - _currentOffset;
+    if (available > Integer.MAX_VALUE) {
+      return Integer.MAX_VALUE;
+    } else {
+      return (int) available;
+    }
+  }
+
+  @Override
+  public void readFully(byte[] b, int off, int len)
+      throws EOFException {
+    if (len < 0) {
+      throw new IndexOutOfBoundsException("len is negative: " + len);
+    }
+    if (off < 0 || off + len > b.length) {
+      throw new IndexOutOfBoundsException("off=" + off + ", len=" + len + ", 
b.length=" + b.length);
+    }
+    // the javadoc of DataInput.readFully(byte[], int, int) says that the 
method will block until the requested
+    // number of bytes has been read, end of file is detected, or an exception 
is thrown.
+    // So being formal, we should modify the buffer even if we know we are 
going to reach EOF.
+    boolean eof = availableLong() < len;
+    _dataBuffer.copyTo(_currentOffset, b, off, len);
+    _currentOffset += len;
+    if (eof) {
+      throw new EOFException();
+    }
+  }
+
+  @Override
+  public boolean readBoolean() throws EOFException {

Review Comment:
   Done



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/SeekableInputStream.java:
##########
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+
+/**
+ * SeekableInputStream is a class with the methods needed by Pinot to read 
data efficiently.
+ * <p>
+ * This class is based on Parquet's SeekableInputStream.
+ */
+public abstract class SeekableInputStream extends InputStream {

Review Comment:
   Done



##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CompoundDataBuffer.java:
##########
@@ -0,0 +1,665 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.spi.memory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.BufferUnderflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.roaringbitmap.buffer.ImmutableRoaringBitmap;
+
+
+/**
+ * A {@link DataBuffer} that is composed of multiple {@link DataBuffer}s that 
define a single contiguous buffer.
+ * <p>
+ * While reads and writes can span multiple buffers, there may be a 
performance impact when doing so.
+ * Therefore it is recommended to try to wrap independent buffers.
+ * <p>
+ * Once this class is built, buffers cannot be added or removed.
+ */
+public class CompoundDataBuffer implements DataBuffer {
+
+  private final DataBuffer[] _buffers;
+  private final long[] _bufferOffsets;
+  private int _lastBufferIndex = 0;
+  private final ByteOrder _order;
+  private final long _size;
+  private final boolean _owner;
+
+  /**
+   * Creates a compound buffer from the given buffers.
+   *
+   * @param buffers The buffers that will be concatenated to form the compound 
buffer.
+   * @param order The byte order of the buffer. Buffers in the array that have 
a different byte order will be converted.
+   * @param owner Whether this buffer owns the underlying buffers. If true, 
the underlying buffers will be released when
+   *              this buffer is closed.
+   */
+  public CompoundDataBuffer(DataBuffer[] buffers, ByteOrder order, boolean 
owner) {
+    _owner = owner;
+    _buffers = buffers;
+    _bufferOffsets = new long[buffers.length];
+    _order = order;
+    long offset = 0;
+    for (int i = 0; i < buffers.length; i++) {
+      if (buffers[i].size() == 0) {
+        throw new IllegalArgumentException("Buffer at index " + i + " is 
empty");
+      }
+      if (buffers[i].order() != _order) {
+        buffers[i] = buffers[i].view(0, buffers[i].size(), _order);
+      }
+    }
+    for (int i = 0; i < buffers.length; i++) {
+      _bufferOffsets[i] = offset;
+      long size = buffers[i].size();
+      offset += size;
+    }
+    _size = offset;
+  }
+
+  public CompoundDataBuffer(ByteBuffer[] buffers, ByteOrder order, boolean 
owner) {
+    
this(Arrays.stream(buffers).map(PinotByteBuffer::wrap).toArray(DataBuffer[]::new),
 order, owner);
+  }
+
+  /**
+   * Creates a compound buffer from the given buffers.
+   * @param buffers The buffers that will be concatenated to form the compound 
buffer.
+   * @param order The byte order of the buffer. Buffers in the list that have 
a different byte order will be converted.
+   * @param owner Whether this buffer owns the underlying buffers. If true, 
the underlying buffers will be released when
+   *              this buffer is closed.
+   */
+  public CompoundDataBuffer(List<DataBuffer> buffers, ByteOrder order, boolean 
owner) {
+    this(buffers.toArray(DataBuffer[]::new), order, owner);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to