gortiz commented on code in PR #13304: URL: https://github.com/apache/pinot/pull/13304#discussion_r1650575662
########## pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/CompoundDataBuffer.java: ########## @@ -0,0 +1,672 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.segment.spi.memory; + +import java.io.File; +import java.io.IOException; +import java.nio.BufferUnderflowException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; +import org.roaringbitmap.buffer.ImmutableRoaringBitmap; + + +/** + * A {@link DataBuffer} that is composed of multiple {@link DataBuffer}s that define a single contiguous buffer. + * <p> + * While reads and writes can span multiple buffers, there may be a performance impact when doing so. + * Therefore it is recommended to try to wrap independent buffers. + * <p> + * Once this class is built, buffers cannot be added or removed. + */ +public class CompoundDataBuffer implements DataBuffer { + + private final DataBuffer[] _buffers; + private final long[] _bufferOffsets; + private int _lastBufferIndex = 0; + private final ByteOrder _order; + private final long _size; + private final boolean _owner; + + /** + * Creates a compound buffer from the given buffers. + * + * @param buffers The buffers that will be concatenated to form the compound buffer. + * @param order The byte order of the buffer. Buffers in the array that have a different byte order will be converted. + * @param owner Whether this buffer owns the underlying buffers. If true, the underlying buffers will be released when + * this buffer is closed. + */ + public CompoundDataBuffer(DataBuffer[] buffers, ByteOrder order, boolean owner) { + _owner = owner; + _buffers = buffers; + _bufferOffsets = new long[buffers.length]; + _order = order; + long offset = 0; + for (int i = 0; i < buffers.length; i++) { + if (buffers[i].size() == 0) { + throw new IllegalArgumentException("Buffer at index " + i + " is empty"); + } + if (buffers[i].order() != _order) { + buffers[i] = buffers[i].view(0, buffers[i].size(), _order); + } + } + for (int i = 0; i < buffers.length; i++) { + _bufferOffsets[i] = offset; + long size = buffers[i].size(); + offset += size; + } + _size = offset; + } + + public CompoundDataBuffer(ByteBuffer[] buffers, ByteOrder order, boolean owner) { + this(asDataBufferArray(buffers), order, owner); + } + + /** + * Creates a compound buffer from the given buffers. + * @param buffers The buffers that will be concatenated to form the compound buffer. + * @param order The byte order of the buffer. Buffers in the list that have a different byte order will be converted. + * @param owner Whether this buffer owns the underlying buffers. If true, the underlying buffers will be released when + * this buffer is closed. + */ + public CompoundDataBuffer(List<DataBuffer> buffers, ByteOrder order, boolean owner) { + this(buffers.toArray(new DataBuffer[0]), order, owner); + } + + private static DataBuffer[] asDataBufferArray(ByteBuffer[] buffers) { + DataBuffer[] result = new DataBuffer[buffers.length]; + for (int i = 0; i < buffers.length; i++) { + result[i] = PinotByteBuffer.wrap(buffers[i]); + } + return result; + } + + private int getBufferIndex(long offset) { + // this optimistically assumes that lookups are going to be in ascending order + // we don't care about concurrency here given that this is only used to speed up the lookup + int lastBufferIndex = _lastBufferIndex; + if (_bufferOffsets[lastBufferIndex] > offset) { + lastBufferIndex = 0; + } + + for (int i = lastBufferIndex; i < _bufferOffsets.length; i++) { + if (offset < _bufferOffsets[i]) { + int result = i - 1; + _lastBufferIndex = result; + return result; + } + } + return _bufferOffsets.length - 1; + } + + private ByteBuffer copy(long offset, int length) { + if (offset + length > _size) { + throw new BufferUnderflowException(); + } + byte[] result = new byte[length]; + + int bufferIndex = getBufferIndex(offset); + long inBufferIndex = offset - _bufferOffsets[bufferIndex]; + + DataBuffer buffer = _buffers[bufferIndex]; + int toCopy = (int) Math.min(length, buffer.size() - inBufferIndex); + buffer.copyTo(inBufferIndex, result, 0, toCopy); + + int remaining = length - toCopy; + while (remaining > 0) { + bufferIndex++; + buffer = _buffers[bufferIndex]; + toCopy = (int) Math.min(remaining, buffer.size()); + buffer.copyTo(0, result, length - remaining, toCopy); + + remaining -= toCopy; + } + return ByteBuffer.wrap(result) + .order(_order); + } + + @Override + public void readFrom(long offset, byte[] input, int srcOffset, int size) { + if (offset + size > _size) { + throw new BufferUnderflowException(); Review Comment: Yeah! I always think `readFrom` is a read operation when it is a write operation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org