yashmayya commented on code in PR #13303:
URL: https://github.com/apache/pinot/pull/13303#discussion_r1735901348


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockSerde.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.datablock;
+
+import java.io.IOException;
+import java.util.function.LongConsumer;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.memory.DataBuffer;
+
+
+/**
+ * An interface that can be implemented to support different types of data 
block serialization and deserialization.
+ * <p>
+ * It is important to distinguish between the serialization format and the 
data block raw representation.

Review Comment:
   I think this might be an outdated description now that the "raw" interface 
was removed? It's a little confusing to read currently.



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -78,20 +106,119 @@ public static DataBlock.Type getType(int versionType) {
     return DataBlock.Type.fromOrdinal(versionType >> VERSION_TYPE_SHIFT);
   }
 
-  public static DataBlock getDataBlock(ByteBuffer byteBuffer)
+  public static List<ByteBuffer> serialize(DataBlock dataBlock)
+      throws IOException {
+    return serialize(DataBlockSerde.Version.V1_V2, dataBlock);
+  }
+
+  @VisibleForTesting
+  public static List<ByteBuffer> serialize(DataBlockSerde.Version version, 
DataBlock dataBlock)
+      throws IOException {
+
+    DataBlockSerde dataBlockSerde = SERDES.get(version);
+    if (dataBlockSerde == null) {
+      throw new UnsupportedOperationException("Unsupported data block version: 
" + version);
+    }
+
+    DataBlock.Type dataBlockType = dataBlock.getDataBlockType();
+    int firstInt = version.getVersion() + (dataBlockType.ordinal() << 
DataBlockUtils.VERSION_TYPE_SHIFT);
+
+    DataBuffer dataBuffer = dataBlockSerde.serialize(dataBlock, firstInt);
+
+    int readFirstByte;
+    if (dataBuffer.order() != ByteOrder.BIG_ENDIAN) {
+      readFirstByte = dataBuffer.view(0, 4, ByteOrder.BIG_ENDIAN).getInt(0);
+    } else {
+      readFirstByte = dataBuffer.getInt(0);
+    }
+    Preconditions.checkState(readFirstByte == firstInt, "Illegal serialization 
by {}. "
+        + "The first integer should be {} but is {} instead", 
dataBuffer.getClass().getName(), firstInt, readFirstByte);
+
+    ArrayList<ByteBuffer> result = new ArrayList<>();
+    dataBuffer.appendAsByteBuffers(result);
+    return result;
+  }
+
+  /**
+   * Reads a data block from the given byte buffer.
+   * @param buffer the buffer to read from. The data will be read at the 
buffer's current position. This position will
+   *               be updated to point to the end of the data block.
+   */
+  public static DataBlock readFrom(ByteBuffer buffer)
+      throws IOException {
+    return deserialize(PinotByteBuffer.wrap(buffer), buffer.position(), 
newOffset -> {
+      if (newOffset > Integer.MAX_VALUE) {
+        throw new IllegalStateException("Data block is too large");
+      }
+      buffer.position((int) newOffset);
+    });
+  }
+
+  /**
+   * Deserialize a list of byte buffers into a data block.
+   * Contrary to {@link #readFrom(ByteBuffer)}, the given buffers will not be 
modified.
+   */
+  public static DataBlock deserialize(List<ByteBuffer> buffers)
       throws IOException {
-    int versionType = readVersionType(byteBuffer);
-    int version = getVersion(versionType);
-    DataBlock.Type type = getType(versionType);
-    switch (type) {
-      case COLUMNAR:
-        return new ColumnarDataBlock(byteBuffer);
-      case ROW:
-        return new RowDataBlock(byteBuffer);
-      case METADATA:
-        return MetadataBlock.deserialize(byteBuffer, version);
-      default:
-        throw new UnsupportedOperationException("Unsupported data table 
version: " + version + " with type: " + type);
+    List<DataBuffer> dataBuffers = buffers.stream()
+        .map(PinotByteBuffer::wrap)
+        .collect(Collectors.toList());
+    try (CompoundDataBuffer compoundBuffer = new 
CompoundDataBuffer(dataBuffers, ByteOrder.BIG_ENDIAN, false)) {
+      return deserialize(compoundBuffer);
+    }
+  }
+
+  /**
+   * Deserialize a list of byte buffers into a data block.
+   * Contrary to {@link #readFrom(ByteBuffer)}, the given buffers will not be 
modified.
+   */
+  public static DataBlock deserialize(ByteBuffer[] buffers)
+      throws IOException {
+    try (CompoundDataBuffer compoundBuffer = new CompoundDataBuffer(buffers, 
ByteOrder.BIG_ENDIAN, false)) {
+      return deserialize(compoundBuffer);
+    }
+  }
+
+  /**
+   * Deserialize a list of byte buffers into a data block.
+   * <p>
+   * Data will be read from the first byte of the buffer. Use {@link 
#deserialize(DataBuffer, long, LongConsumer)}
+   * in case it is needed to read from a different position.
+   */
+  public static DataBlock deserialize(DataBuffer buffer)
+      throws IOException {
+    return deserialize(buffer, 0, null);
+  }
+
+  /**
+   * Deserialize a list of byte buffers into a data block.
+   * @param buffer the buffer to read from.
+   * @param offset the offset in the buffer where the data starts.
+   * @param finalOffsetConsumer An optional consumer that will be called after 
the data block is deserialized.
+   *                            The consumer will receive the offset where the 
data block ends.
+   */
+  public static DataBlock deserialize(DataBuffer buffer, long offset, 
@Nullable LongConsumer finalOffsetConsumer)
+      throws IOException {
+    int versionAndSubVersion = buffer.getInt(offset);
+    int version = getVersion(versionAndSubVersion);

Review Comment:
   I didn't quite follow what we're doing here - what's the subversion and how 
does the "version type shift" work?



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/RowDataBlock.java:
##########
@@ -58,41 +53,29 @@ protected void computeBlockObjectConstants() {
   }
 
   @Override
-  protected int getDataBlockVersionType() {
-    return VERSION + (Type.ROW.ordinal() << DataBlockUtils.VERSION_TYPE_SHIFT);
+  protected int getFixDataSize() {
+    return _fixDataSize;
   }
 
   @Override
   protected int getOffsetInFixedBuffer(int rowId, int colId) {
     return rowId * _rowSizeInBytes + _columnOffsets[colId];
   }
 
-  @Override
-  protected int positionOffsetInVariableBufferAndGetLength(int rowId, int 
colId) {
-    int offset = getOffsetInFixedBuffer(rowId, colId);
-    _variableSizeData.position(_fixedSizeData.getInt(offset));
-    return _fixedSizeData.getInt(offset + 4);
-  }
-
   public int getRowSizeInBytes() {
     return _rowSizeInBytes;
   }
 
   @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof RowDataBlock)) {
-      return false;
-    }
-    RowDataBlock that = (RowDataBlock) o;
-    return _rowSizeInBytes == that._rowSizeInBytes && 
Objects.deepEquals(_columnOffsets, that._columnOffsets);
+  public Type getDataBlockType() {
+    return Type.ROW;
   }
 
+  @NotNull // the method is override just to override its nullability 
annotation

Review Comment:
   nit: This is the first usage of `javax.validation.constraints.NotNull` in 
this project. Seems like we use `javax.annotation.Nonnull` and 
`javax.annotation.Nullable` everywhere else so it might be good to keep using 
the same ones unless there's a specific reason to use this one here.



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java:
##########
@@ -104,4 +113,39 @@ public static Type fromOrdinal(int ordinal) {
       }
     }
   }
+
+  /**
+   * Returns the dictionary for the given column.
+   *
+   * This is a break in the interface abstraction that assumes all 
implementations will use a dictionary only for
+   * string columns. This may change in the future.
+   */
+  @Nullable
+  String[] getStringDictionary();
+
+  /**
+   * The actual content is different depending on whether this is a row-based 
or columnar data block.
+   *
+   * This is an abstraction leak that assumes all implementations derive from 
{@link BaseDataBlock}.

Review Comment:
   I don't fully follow the abstractions and their goals here. Currently, all 
implementations of `DataBlock` in this project are also implementations of 
`BaseDataBlock`. It doesn't look like it is meant to be a pluggable interface 
either? Was there a separate implementation in the past or are there some 
intended future implementations of `DataBlock`? Feels like I'm missing some 
context here.



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/ZeroCopyDataBlockSerde.java:
##########
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.datablock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.LongConsumer;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.segment.spi.memory.CompoundDataBuffer;
+import org.apache.pinot.segment.spi.memory.DataBuffer;
+import org.apache.pinot.segment.spi.memory.PagedPinotOutputStream;
+import org.apache.pinot.segment.spi.memory.PinotByteBuffer;
+import org.apache.pinot.segment.spi.memory.PinotInputStream;
+import org.apache.pinot.segment.spi.memory.PinotOutputStream;
+
+
+/**
+ * An efficient serde that implements {@link DataBlockSerde.Version#V1_V2} 
using trying to make as fewer copies as
+ * possible.
+ */
+public class ZeroCopyDataBlockSerde implements DataBlockSerde {
+
+  private final PagedPinotOutputStream.PageAllocator _allocator;
+
+  public ZeroCopyDataBlockSerde() {
+    _allocator = PagedPinotOutputStream.HeapPageAllocator.createSmall();
+  }
+
+  public ZeroCopyDataBlockSerde(PagedPinotOutputStream.PageAllocator 
allocator) {
+    _allocator = allocator;
+  }
+
+  public DataBuffer serialize(DataBlock dataBlock, int firstInt)
+      throws IOException {
+    Header header = new Header(firstInt, dataBlock.getNumberOfRows(), 
dataBlock.getNumberOfColumns());
+
+    CompoundDataBuffer.Builder builder = new 
CompoundDataBuffer.Builder(ByteOrder.BIG_ENDIAN, false);
+
+    ByteBuffer headerBuffer = ByteBuffer.allocate(Header.BYTES);
+    builder.addBuffer(PinotByteBuffer.wrap(headerBuffer));
+
+    try (PagedPinotOutputStream into = new PagedPinotOutputStream(_allocator)) 
{
+      serializeExceptions(dataBlock, into);
+      header._exceptionsLength = (int) into.getCurrentOffset();
+      serializeDictionary(dataBlock, into);
+      header._dictionaryLength = (int) into.getCurrentOffset() - 
header._exceptionsLength;
+      serializeDataSchema(dataBlock, into);
+      header._dataSchemaLength = (int) into.getCurrentOffset() - 
header._exceptionsLength - header._dictionaryLength;
+
+      builder.addPagedOutputStream(into);
+    }
+
+    DataBuffer fixedData = dataBlock.getFixedData();
+    if (fixedData != null) {
+      builder.addBuffer(fixedData);
+      header._fixedSizeDataLength = (int) fixedData.size();
+    }
+
+    DataBuffer varSizeData = dataBlock.getVarSizeData();
+    if (varSizeData != null) {
+      builder.addBuffer(varSizeData);
+      header._variableSizeDataLength = (int) varSizeData.size();
+    }
+
+    try (PagedPinotOutputStream into = new PagedPinotOutputStream(_allocator)) 
{
+      serializeMetadata(dataBlock, into, header);
+      builder.addPagedOutputStream(into);
+    }
+
+    header.updateStarts();
+    header.serialize(headerBuffer);
+    headerBuffer.flip();
+
+    return builder.build();
+  }
+
+  /**
+   * Serialize the exceptions map into the {@link PinotOutputStream} current 
position.
+   * <p>
+   * the header object is modified to update {@link Header#_exceptionsLength}
+   */
+  private void serializeExceptions(DataBlock dataBlock, PinotOutputStream into)
+      throws IOException {
+
+    Map<Integer, String> errCodeToExceptionMap = dataBlock.getExceptions();
+    if (errCodeToExceptionMap == null || errCodeToExceptionMap.isEmpty()) {
+      return;
+    }
+
+    into.writeInt(errCodeToExceptionMap.size());
+    for (Map.Entry<Integer, String> entry : errCodeToExceptionMap.entrySet()) {
+      into.writeInt(entry.getKey());
+      into.writeInt4String(entry.getValue());
+    }
+  }
+
+  private static void serializeDictionary(DataBlock dataBlock, 
PinotOutputStream into)
+      throws IOException {
+    String[] stringDictionary = dataBlock.getStringDictionary();
+    if (stringDictionary == null) {
+      // TODO: probably we can also do this when length is 0
+      return;
+    }
+
+    into.writeInt(stringDictionary.length);
+    for (String entry : stringDictionary) {
+      into.writeInt4String(entry);
+    }
+  }
+
+  private static void serializeDataSchema(DataBlock dataBlock, 
PinotOutputStream into)
+      throws IOException {
+    DataSchema dataSchema = dataBlock.getDataSchema();
+    if (dataSchema == null) {
+      return;
+    }
+
+    // TODO: This is actually allocating, we can improve it in the future if 
it is problematic
+    byte[] bytes = dataSchema.toBytes();
+    into.write(bytes);
+  }
+
+  private static void serializeMetadata(DataBlock dataBlock, PinotOutputStream 
into, Header header)
+      throws IOException {
+    header._metadataStart = (int) into.getCurrentOffset();
+    if (!(dataBlock instanceof MetadataBlock)) {
+      into.writeInt(0);
+      return;
+    }
+    List<DataBuffer> statsByStage = dataBlock.getStatsByStage();
+    if (statsByStage == null) {
+      into.writeInt(0);
+      return;
+    }
+    int size = statsByStage.size();
+    into.writeInt(size);
+    if (size > 0) {
+      for (DataBuffer stat : statsByStage) {
+        if (stat == null) {
+          into.writeBoolean(false);
+        } else {
+          into.writeBoolean(true);
+          if (stat.size() > Integer.MAX_VALUE) {
+            throw new IOException("Stat size is too large to serialize");
+          }
+          into.writeInt((int) stat.size());
+          into.write(stat);
+        }
+      }
+    }
+  }
+
+  @Override
+  public DataBlock deserialize(DataBuffer buffer, long offset, DataBlock.Type 
type,
+      @Nullable LongConsumer finalOffsetConsumer)
+      throws IOException {
+    try (PinotInputStream stream = buffer.openInputStream(offset)) {
+      Header header = Header.deserialize(stream);
+
+      if (finalOffsetConsumer != null) {
+        finalOffsetConsumer.accept(offset + calculateEndOffset(buffer, 
header));
+      }
+
+      switch (type) {
+        case COLUMNAR:
+          return new ColumnarDataBlock(header._numRows, 
deserializeDataSchema(stream, header),
+              deserializeDictionary(stream, header),
+              bufferView(buffer, header._fixedSizeDataStart + offset, 
header._fixedSizeDataLength),
+              bufferView(buffer, header._variableSizeDataStart + offset, 
header._variableSizeDataLength));
+        case ROW:
+          return new RowDataBlock(header._numRows, 
deserializeDataSchema(stream, header),
+              deserializeDictionary(stream, header),
+              bufferView(buffer, header._fixedSizeDataStart + offset, 
header._fixedSizeDataLength),
+              bufferView(buffer, header._variableSizeDataStart + offset, 
header._variableSizeDataLength));
+        case METADATA: {
+          Map<Integer, String> exceptions = deserializeExceptions(stream, 
header);
+          if (!exceptions.isEmpty()) {
+            return MetadataBlock.newError(exceptions);
+          } else {
+            List<DataBuffer> metadata = deserializeMetadata(buffer, header);
+            return new MetadataBlock(metadata);
+          }
+        }
+        default:
+          throw new UnsupportedOperationException("Unsupported data table 
version: " + getVersion() + " with type: "
+              + type);
+      }
+    }
+  }
+
+  private static List<DataBuffer> deserializeMetadata(DataBuffer buffer, 
Header header) {
+    long currentOffset = header._metadataStart;
+    int statsSize = buffer.getInt(currentOffset);
+    currentOffset += Integer.BYTES;
+
+    List<DataBuffer> stats = new ArrayList<>(statsSize);
+
+    for (int i = 0; i < statsSize; i++) {
+      boolean isPresent = buffer.getByte(currentOffset) != 0;
+      currentOffset += Byte.BYTES;
+      if (isPresent) {
+        int length = buffer.getInt(currentOffset);
+        currentOffset += Integer.BYTES;
+        stats.add(buffer.view(currentOffset, currentOffset + length));
+        currentOffset += length;
+      } else {
+        stats.add(null);
+      }
+    }
+    return stats;
+  }
+
+  private long calculateEndOffset(DataBuffer buffer, Header header) {
+    long currentOffset = header._metadataStart;
+    int statsSize = buffer.getInt(currentOffset);
+    currentOffset += Integer.BYTES;
+
+    for (int i = 0; i < statsSize; i++) {
+      boolean isPresent = buffer.getByte(currentOffset) != 0;
+      currentOffset += Byte.BYTES;
+      if (isPresent) {
+        int length = buffer.getInt(currentOffset);
+        currentOffset += Integer.BYTES;
+        currentOffset += length;
+      }
+    }
+    return currentOffset;
+  }
+
+  @VisibleForTesting
+  static Map<Integer, String> deserializeExceptions(PinotInputStream stream, 
Header header)
+      throws IOException {
+    if (header._exceptionsLength == 0) {
+      return new HashMap<>();
+    }
+    stream.seek(header.getExceptionsStart());
+    int numExceptions = stream.readInt();
+    Map<Integer, String> exceptions = new 
HashMap<>(HashUtil.getHashMapCapacity(numExceptions));
+    for (int i = 0; i < numExceptions; i++) {
+      int errCode = stream.readInt();
+      String errMessage = stream.readInt4UTF();
+      exceptions.put(errCode, errMessage);
+    }
+    return exceptions;
+  }
+
+  private DataBuffer bufferView(DataBuffer buffer, long offset, int length) {
+    if (length == 0) {
+      return PinotByteBuffer.empty();
+    }
+    return buffer.view(offset, offset + length, ByteOrder.BIG_ENDIAN);
+  }
+
+  private static String[] deserializeDictionary(PinotInputStream stream, 
Header header)
+      throws IOException {
+    if (header._dictionaryLength == 0) {
+      return new String[0];
+    }
+    stream.seek(header._dictionaryStart);
+
+    int dictionarySize = stream.readInt();
+    String[] stringDictionary = new String[dictionarySize];
+    for (int i = 0; i < dictionarySize; i++) {
+      stringDictionary[i] = stream.readInt4UTF();
+    }
+    return stringDictionary;
+  }
+
+  private static DataSchema deserializeDataSchema(PinotInputStream stream, 
Header header)
+      throws IOException {
+    if (header._dataSchemaLength == 0) {
+      return null;
+    }
+    stream.seek(header._dataSchemaStart);
+    return DataSchema.fromBytes(stream);
+  }
+
+  @Override
+  public Version getVersion() {
+    return Version.V1_V2;
+  }
+
+  static class Header {
+    static final int BYTES = 13 * Integer.BYTES;
+
+    // first int is used for versioning
+    int _firstInt;
+    int _numRows;
+    int _numColumns;
+    int _exceptionsLength;
+    int _dictionaryStart;
+    int _dictionaryLength;
+    int _dataSchemaStart;
+    int _dataSchemaLength;
+    int _fixedSizeDataStart;
+    int _fixedSizeDataLength;
+    int _variableSizeDataStart;
+    int _variableSizeDataLength;
+    int _metadataStart;
+
+    public Header(int firstInt, int numRows, int numColumns) {
+      _firstInt = firstInt;
+      _numRows = numRows;
+      _numColumns = numColumns;
+    }
+
+    public Header(int firstInt, int numRows, int numColumns, int 
exceptionsLength, int dictionaryStart,
+        int dictionaryLength, int dataSchemaStart, int dataSchemaLength, int 
fixedSizeDataStart,
+        int fixedSizeDataLength, int variableSizeDataStart, int 
variableSizeDataLength, int metadataStart) {
+      _firstInt = firstInt;
+      _numRows = numRows;
+      _numColumns = numColumns;
+      _exceptionsLength = exceptionsLength;
+      _dictionaryStart = dictionaryStart;
+      _dictionaryLength = dictionaryLength;
+      _dataSchemaStart = dataSchemaStart;
+      _dataSchemaLength = dataSchemaLength;
+      _fixedSizeDataStart = fixedSizeDataStart;
+      _fixedSizeDataLength = fixedSizeDataLength;
+      _variableSizeDataStart = variableSizeDataStart;
+      _variableSizeDataLength = variableSizeDataLength;
+      _metadataStart = metadataStart;
+    }
+
+    public void serialize(ByteBuffer into) {
+      Preconditions.checkState(into.remaining() >= BYTES, "Buffer does not 
have enough space to "
+          + "serialize the header");
+      into.putInt(_firstInt);
+      into.putInt(_numRows);
+      into.putInt(_numColumns);
+
+      // offsets are stored first and we need to add the header offset
+      into.putInt(BYTES); // exceptions start

Review Comment:
   What's the purpose of adding this to the serialized buffer? On the deser 
side we're simply skipping over this value?



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java:
##########
@@ -265,10 +186,15 @@ public double getDouble(int rowId, int colId) {
 
   @Override
   public BigDecimal getBigDecimal(int rowId, int colId) {
-    int size = positionOffsetInVariableBufferAndGetLength(rowId, colId);
-    ByteBuffer byteBuffer = _variableSizeData.slice();
-    byteBuffer.limit(size);
-    return BigDecimalUtils.deserialize(byteBuffer);
+    // TODO: Add a allocation free deserialization mechanism.
+    int offsetInFixed = getOffsetInFixedBuffer(rowId, colId);
+    int size = _fixedSizeData.getInt(offsetInFixed + 4);
+    int offsetInVar = _fixedSizeData.getInt(offsetInFixed);
+
+    byte[] buffer = new byte[size];
+    _variableSizeData.copyTo(offsetInVar, buffer, 0, size);

Review Comment:
   Never mind, looks like the `BigDecimalUtils::deserialize` method would 
allocate a byte array in that case anyway so this needs more changes.



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java:
##########
@@ -36,21 +39,25 @@ public interface DataBlock {
 
   int getNumberOfRows();
 
+  int getNumberOfColumns();
+
   void addException(ProcessingException processingException);
 
   void addException(int errCode, String errMsg);
 
   Map<Integer, String> getExceptions();
 
-  byte[] toBytes()
+  /**
+   * This is a wrapper on top of {@link DataBlockUtils#serialize(DataBlock)} 
but implementations can cache
+   * the result so messages that are sent to more than one receiving mailbox 
don't need to be serialized as many times.
+   */
+  List<ByteBuffer> serialize()

Review Comment:
   What's the benefit of using a list of `ByteBuffer`s here rather than a 
`DataBuffer` / `CompoundDataBuffer` directly?



##########
pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkDataBlock.java:
##########
@@ -74,22 +77,26 @@ public static void main(String[] args)
         .addProfiler(GCProfiler.class));
   }
 
-  @Param(value = {"INT", "LONG", "STRING", "BYTES", "BIG_DECIMAL", 
"LONG_ARRAY", "STRING_ARRAY"})
+//  @Param(value = {"INT", "LONG", "STRING", "BYTES", "BIG_DECIMAL", 
"LONG_ARRAY", "STRING_ARRAY"})
+  @Param(value = {"INT", "LONG", "STRING", "BYTES", "LONG_ARRAY"})
   DataSchema.ColumnDataType _dataType;
   @Param(value = {"COLUMNAR", "ROW"})
   DataBlock.Type _blockType = DataBlock.Type.COLUMNAR;
   //    @Param(value = {"0", "10", "90"})
   int _nullPerCent = 10;
 
-  @Param(value = {"10000", "1000000"})
-  int _rows;
+  @Param(value = {"direct_small", "heap_small", "direct_large"})
+  String _version = "heap_small";
+
+//  @Param(value = {"10000", "1000000"})
+  int _rows = 10000;

Review Comment:
   I think these need to be cleaned up?



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockSerde.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.datablock;
+
+import java.io.IOException;
+import java.util.function.LongConsumer;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.memory.DataBuffer;
+
+
+/**
+ * An interface that can be implemented to support different types of data 
block serialization and deserialization.
+ * <p>
+ * It is important to distinguish between the serialization format and the 
data block raw representation.
+ * The raw representation is the in-memory representation of the data block 
and is completely dependent on the
+ * runtime Pinot version while the serialization format is the format used to 
write the data block through the network.
+ * Two Pinot nodes in different versions may represent the same data block in 
different raw representations but there
+ * should be at least one common serialization format (defined by the {@link 
Version} used) that can be used to
+ * serialize and deserialize the data block between the two nodes.
+ */
+public interface DataBlockSerde {
+
+  /**
+   * Serialize the data block into a buffer.
+   * @param dataBlock The data block to serialize.
+   * @param firstInt The first integer, which is used to codify the version 
and type of the data block in a protocol
+   *                 defined way. This integer must be written in the first 4 
positions of the buffer in BIG_ENDIAN
+   *                 order.
+   */
+  DataBuffer serialize(DataBlock dataBlock, int firstInt)
+      throws IOException;
+
+  /**
+   * Serialize the data block into the given output stream.

Review Comment:
   nit: Should this be something like: "Deserialize the data buffer into a 
DataBlock" instead?



##########
pinot-core/src/test/java/org/apache/pinot/core/common/datablock/DataBlockTest.java:
##########
@@ -83,11 +83,19 @@ public void testAllDataTypes(int nullPercentile)
     for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) {
       ColumnDataType columnDataType = dataSchema.getColumnDataType(colId);
       for (int rowId = 0; rowId < TEST_ROW_COUNT; rowId++) {
-        Object rowVal = DataBlockTestUtils.getElement(rowBlock, rowId, colId, 
columnDataType);
-        Object colVal = DataBlockTestUtils.getElement(columnarBlock, rowId, 
colId, columnDataType);
-        Assert.assertEquals(rowVal, colVal,
-            "Error comparing Row/Column Block at (" + rowId + "," + colId + 
")" + " of Type: " + columnDataType
-                + "! rowValue: [" + rowVal + "], columnarValue: [" + colVal + 
"]");
+        try {
+          Object rowVal = DataBlockTestUtils.getElement(rowBlock, rowId, 
colId, columnDataType);
+          Object colVal = DataBlockTestUtils.getElement(columnarBlock, rowId, 
colId, columnDataType);
+          Assert.assertEquals(rowVal, colVal,
+              "Error comparing Row/Column Block at (" + rowId + "," + colId + 
")" + " of Type: " + columnDataType
+                  + "! rowValue: [" + rowVal + "], columnarValue: [" + colVal 
+ "]");
+        } catch (AssertionError e) {
+          throw new AssertionError(
+              "Error comparing Row/Column Block at (" + rowId + "," + colId + 
") of Type: " + columnDataType + "!", e);

Review Comment:
   What's the purpose of this catch block?



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockEquals.java:
##########
@@ -0,0 +1,401 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.datablock;
+
+import java.util.Arrays;
+import java.util.Objects;
+import org.apache.pinot.common.utils.DataSchema;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * A utility class used to compare two DataBlocks.
+ */
+public class DataBlockEquals {
+
+  private DataBlockEquals() {
+  }
+
+  /**
+   * Returns true if the two DataBlocks are of the same type, the same schema 
and contains the same data in the same
+   * data according to the {@link DefaultContentComparator default 
contentComparator}.
+   *
+   * This means that a DataBlock of type {@link DataBlock.Type#COLUMNAR} and a 
DataBlock of type
+   * {@link DataBlock.Type#ROW} will not be considered equal even if they 
contain the same data in the same order.
+   */
+  public static boolean equals(DataBlock left, DataBlock right) {

Review Comment:
   Are all of these utility methods intended only for usage in unit tests or 
will they be used in a later PR? If it's the former, would it make sense to 
move these to some test utils class? If it's the latter, what exactly is the 
intended usage?



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java:
##########
@@ -265,10 +186,15 @@ public double getDouble(int rowId, int colId) {
 
   @Override
   public BigDecimal getBigDecimal(int rowId, int colId) {
-    int size = positionOffsetInVariableBufferAndGetLength(rowId, colId);
-    ByteBuffer byteBuffer = _variableSizeData.slice();
-    byteBuffer.limit(size);
-    return BigDecimalUtils.deserialize(byteBuffer);
+    // TODO: Add a allocation free deserialization mechanism.
+    int offsetInFixed = getOffsetInFixedBuffer(rowId, colId);
+    int size = _fixedSizeData.getInt(offsetInFixed + 4);
+    int offsetInVar = _fixedSizeData.getInt(offsetInFixed);
+
+    byte[] buffer = new byte[size];
+    _variableSizeData.copyTo(offsetInVar, buffer, 0, size);

Review Comment:
   Can the `DataBuffer::copyOrView` API be used instead?



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/ZeroCopyDataBlockSerde.java:
##########
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.datablock;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.io.DataInput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.LongConsumer;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.HashUtil;
+import org.apache.pinot.segment.spi.memory.CompoundDataBuffer;
+import org.apache.pinot.segment.spi.memory.DataBuffer;
+import org.apache.pinot.segment.spi.memory.PagedPinotOutputStream;
+import org.apache.pinot.segment.spi.memory.PinotByteBuffer;
+import org.apache.pinot.segment.spi.memory.PinotInputStream;
+import org.apache.pinot.segment.spi.memory.PinotOutputStream;
+
+
+/**
+ * An efficient serde that implements {@link DataBlockSerde.Version#V1_V2} 
using trying to make as fewer copies as
+ * possible.
+ */
+public class ZeroCopyDataBlockSerde implements DataBlockSerde {
+
+  private final PagedPinotOutputStream.PageAllocator _allocator;
+
+  public ZeroCopyDataBlockSerde() {
+    _allocator = PagedPinotOutputStream.HeapPageAllocator.createSmall();
+  }
+
+  public ZeroCopyDataBlockSerde(PagedPinotOutputStream.PageAllocator 
allocator) {
+    _allocator = allocator;
+  }
+
+  public DataBuffer serialize(DataBlock dataBlock, int firstInt)

Review Comment:
   nit: missing `@Override` annotation



##########
pinot-common/src/test/java/org/apache/pinot/common/datablock/ZeroCopyDataBlockSerdeTest.java:
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.datablock;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import java.util.Collections;
+import java.util.Random;
+import org.apache.pinot.segment.spi.memory.PinotByteBuffer;
+import org.apache.pinot.segment.spi.memory.PinotDataBuffer;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeSuite;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class ZeroCopyDataBlockSerdeTest {
+
+  private DataBlockSerde _originalSerde = null;
+
+  @BeforeSuite
+  public void setUp() {
+    _originalSerde = DataBlockUtils.getSerde(DataBlockSerde.Version.V1_V2);
+    DataBlockUtils.setSerde(DataBlockSerde.Version.V1_V2, new 
ZeroCopyDataBlockSerde());
+  }
+
+  @AfterSuite
+  public void tearDown() {
+    if (_originalSerde != null) {
+      DataBlockUtils.setSerde(DataBlockSerde.Version.V1_V2, _originalSerde);
+    }
+  }
+
+  @DataProvider(name = "blocks")
+  public Object[][] blocks() {
+
+    Random r = new Random();
+    byte[] bytes1 = new byte[128];
+    r.nextBytes(bytes1);
+    byte[] bytes2 = new byte[128];
+    r.nextBytes(bytes2);
+
+    return new Object[][] {
+        {"empty error", MetadataBlock.newError(Collections.emptyMap())},
+        {"error with single message", 
MetadataBlock.newError(ImmutableMap.<Integer, String>builder()
+            .put(123, "error")
+            .build())},
+        {"error with two messages", 
MetadataBlock.newError(ImmutableMap.<Integer, String>builder()
+            .put(123, "error")
+            .put(1234, "another error")
+            .build())},
+        {"eos empty", MetadataBlock.newEos()},
+        {"eos with empty stat", new 
MetadataBlock(Collections.singletonList(PinotDataBuffer.empty()))},
+        {"eos with several empty stats",
+            new MetadataBlock(Lists.newArrayList(PinotDataBuffer.empty(), 
PinotDataBuffer.empty()))},
+        {"eos with one not empty stat", new 
MetadataBlock(Lists.newArrayList(PinotByteBuffer.wrap(bytes1)))},
+        {"eos with two not empty stat",
+            new MetadataBlock(Lists.newArrayList(PinotByteBuffer.wrap(bytes1), 
PinotByteBuffer.wrap(bytes2)))}
+    };
+  }
+
+  @Test(dataProvider = "blocks")
+  void testSerde(String desc, DataBlock block) {

Review Comment:
   Could we also add tests for `RowDataBlock` / `ColumnarDataBlock` and if 
possible, including some edge cases like null dictionary, null data schema etc.?



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -78,20 +106,119 @@ public static DataBlock.Type getType(int versionType) {
     return DataBlock.Type.fromOrdinal(versionType >> VERSION_TYPE_SHIFT);
   }
 
-  public static DataBlock getDataBlock(ByteBuffer byteBuffer)
+  public static List<ByteBuffer> serialize(DataBlock dataBlock)
+      throws IOException {
+    return serialize(DataBlockSerde.Version.V1_V2, dataBlock);
+  }
+
+  @VisibleForTesting
+  public static List<ByteBuffer> serialize(DataBlockSerde.Version version, 
DataBlock dataBlock)
+      throws IOException {
+
+    DataBlockSerde dataBlockSerde = SERDES.get(version);
+    if (dataBlockSerde == null) {
+      throw new UnsupportedOperationException("Unsupported data block version: 
" + version);
+    }
+
+    DataBlock.Type dataBlockType = dataBlock.getDataBlockType();
+    int firstInt = version.getVersion() + (dataBlockType.ordinal() << 
DataBlockUtils.VERSION_TYPE_SHIFT);
+
+    DataBuffer dataBuffer = dataBlockSerde.serialize(dataBlock, firstInt);
+
+    int readFirstByte;
+    if (dataBuffer.order() != ByteOrder.BIG_ENDIAN) {
+      readFirstByte = dataBuffer.view(0, 4, ByteOrder.BIG_ENDIAN).getInt(0);
+    } else {
+      readFirstByte = dataBuffer.getInt(0);
+    }
+    Preconditions.checkState(readFirstByte == firstInt, "Illegal serialization 
by {}. "
+        + "The first integer should be {} but is {} instead", 
dataBuffer.getClass().getName(), firstInt, readFirstByte);
+
+    ArrayList<ByteBuffer> result = new ArrayList<>();
+    dataBuffer.appendAsByteBuffers(result);
+    return result;
+  }
+
+  /**
+   * Reads a data block from the given byte buffer.
+   * @param buffer the buffer to read from. The data will be read at the 
buffer's current position. This position will
+   *               be updated to point to the end of the data block.
+   */
+  public static DataBlock readFrom(ByteBuffer buffer)
+      throws IOException {
+    return deserialize(PinotByteBuffer.wrap(buffer), buffer.position(), 
newOffset -> {
+      if (newOffset > Integer.MAX_VALUE) {
+        throw new IllegalStateException("Data block is too large");
+      }
+      buffer.position((int) newOffset);
+    });
+  }
+
+  /**
+   * Deserialize a list of byte buffers into a data block.
+   * Contrary to {@link #readFrom(ByteBuffer)}, the given buffers will not be 
modified.
+   */
+  public static DataBlock deserialize(List<ByteBuffer> buffers)
       throws IOException {
-    int versionType = readVersionType(byteBuffer);
-    int version = getVersion(versionType);
-    DataBlock.Type type = getType(versionType);
-    switch (type) {
-      case COLUMNAR:
-        return new ColumnarDataBlock(byteBuffer);
-      case ROW:
-        return new RowDataBlock(byteBuffer);
-      case METADATA:
-        return MetadataBlock.deserialize(byteBuffer, version);
-      default:
-        throw new UnsupportedOperationException("Unsupported data table 
version: " + version + " with type: " + type);
+    List<DataBuffer> dataBuffers = buffers.stream()
+        .map(PinotByteBuffer::wrap)
+        .collect(Collectors.toList());
+    try (CompoundDataBuffer compoundBuffer = new 
CompoundDataBuffer(dataBuffers, ByteOrder.BIG_ENDIAN, false)) {
+      return deserialize(compoundBuffer);
+    }
+  }
+
+  /**
+   * Deserialize a list of byte buffers into a data block.
+   * Contrary to {@link #readFrom(ByteBuffer)}, the given buffers will not be 
modified.
+   */
+  public static DataBlock deserialize(ByteBuffer[] buffers)
+      throws IOException {
+    try (CompoundDataBuffer compoundBuffer = new CompoundDataBuffer(buffers, 
ByteOrder.BIG_ENDIAN, false)) {
+      return deserialize(compoundBuffer);
+    }
+  }
+
+  /**
+   * Deserialize a list of byte buffers into a data block.
+   * <p>
+   * Data will be read from the first byte of the buffer. Use {@link 
#deserialize(DataBuffer, long, LongConsumer)}
+   * in case it is needed to read from a different position.
+   */
+  public static DataBlock deserialize(DataBuffer buffer)
+      throws IOException {
+    return deserialize(buffer, 0, null);
+  }
+
+  /**
+   * Deserialize a list of byte buffers into a data block.
+   * @param buffer the buffer to read from.
+   * @param offset the offset in the buffer where the data starts.
+   * @param finalOffsetConsumer An optional consumer that will be called after 
the data block is deserialized.
+   *                            The consumer will receive the offset where the 
data block ends.
+   */
+  public static DataBlock deserialize(DataBuffer buffer, long offset, 
@Nullable LongConsumer finalOffsetConsumer)
+      throws IOException {
+    int versionAndSubVersion = buffer.getInt(offset);
+    int version = getVersion(versionAndSubVersion);

Review Comment:
   Never mind, I just saw the corresponding serialization logic - I assume this 
is to avoid using an additional int since both the version and type should be 
small integers. I think `versionAndSubVersion` could be renamed to 
`versionAndType`?



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/ColumnarDataBlock.java:
##########
@@ -64,39 +53,26 @@ protected void computeBlockObjectConstants() {
     }
   }
 
-  @Override
-  protected int getDataBlockVersionType() {
-    return VERSION + (Type.COLUMNAR.ordinal() << 
DataBlockUtils.VERSION_TYPE_SHIFT);
-  }
-
   @Override
   protected int getOffsetInFixedBuffer(int rowId, int colId) {
     return _cumulativeColumnOffsetSizeInBytes[colId] + 
_columnSizeInBytes[colId] * rowId;
   }
 
   @Override
-  protected int positionOffsetInVariableBufferAndGetLength(int rowId, int 
colId) {
-    int offset = getOffsetInFixedBuffer(rowId, colId);
-    _variableSizeData.position(_fixedSizeData.getInt(offset));
-    return _fixedSizeData.getInt(offset + 4);
+  public Type getDataBlockType() {
+    return Type.COLUMNAR;
   }
 
   @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof ColumnarDataBlock)) {
-      return false;
-    }
-    ColumnarDataBlock that = (ColumnarDataBlock) o;
-    return Objects.deepEquals(_cumulativeColumnOffsetSizeInBytes, 
that._cumulativeColumnOffsetSizeInBytes)
-        && Objects.deepEquals(_columnSizeInBytes, that._columnSizeInBytes);
+  protected int getFixDataSize() {
+    return _fixDataSize;
   }
 
+  @NotNull // the method is override just to override its nullability 
annotation

Review Comment:
   Same issue here regarding `javax.annotation.Nonnull` vs 
`javax.validation.constraints.NotNull`.



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/MetadataBlock.java:
##########
@@ -147,41 +71,24 @@ public MetadataBlockType getType() {
    * The returned list may contain nulls, which would mean that no stats were 
available for that stage.
    */
   @Nullable
-  public List<ByteBuffer> getStatsByStage() {
+  @Override
+  public List<DataBuffer> getStatsByStage() {
     return _statsByStage;
   }
 
   @Override
-  public int getDataBlockVersionType() {
-    return VERSION + (Type.METADATA.ordinal() << 
DataBlockUtils.VERSION_TYPE_SHIFT);
+  public Type getDataBlockType() {
+    return Type.METADATA;
   }
 
   @Override
   protected int getOffsetInFixedBuffer(int rowId, int colId) {
-    throw new UnsupportedOperationException("Metadata block uses JSON encoding 
for field access");
-  }
-
-  @Override
-  protected int positionOffsetInVariableBufferAndGetLength(int rowId, int 
colId) {
-    throw new UnsupportedOperationException("Metadata block uses JSON encoding 
for field access");
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof MetadataBlock)) {
-      return false;
-    }
-    MetadataBlock that = (MetadataBlock) o;
-    return Objects.equals(_statsByStage, that._statsByStage)
-        && _errCodeToExceptionMap.equals(that._errCodeToExceptionMap);
+    throw new UnsupportedOperationException("Not supported in metadata block");
   }
 
   @Override
-  public int hashCode() {
-    return Objects.hash(_statsByStage, _errCodeToExceptionMap);

Review Comment:
   Why has this been removed?



##########
pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java:
##########
@@ -187,200 +147,321 @@ public static RowDataBlock buildFromRows(List<Object[]> 
rows, DataSchema dataSch
                     dataSchema.getColumnName(colId)));
         }
       }
-      rowBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 
0, byteBuffer.position());
     }
+
+    CompoundDataBuffer.Builder varBufferBuilder = new 
CompoundDataBuffer.Builder(ByteOrder.BIG_ENDIAN, true)
+        .addPagedOutputStream(varSize);
+
     // Write null bitmaps after writing data.
-    for (RoaringBitmap nullBitmap : nullBitmaps) {
-      rowBuilder.setNullRowIds(nullBitmap);
-    }
-    return buildRowBlock(rowBuilder);
+    setNullRowIds(nullBitmaps, fixedSize, varBufferBuilder);
+    return buildRowBlock(numRows, dataSchema, 
getReverseDictionary(dictionary), fixedSize, varBufferBuilder);
   }
 
+
   public static ColumnarDataBlock buildFromColumns(List<Object[]> columns, 
DataSchema dataSchema)
       throws IOException {
+    return buildFromColumns(columns, dataSchema, 
PagedPinotOutputStream.HeapPageAllocator.createSmall());
+  }
+
+  public static ColumnarDataBlock buildFromColumns(List<Object[]> columns, 
DataSchema dataSchema,
+      PagedPinotOutputStream.PageAllocator allocator)
+      throws IOException {
     int numRows = columns.isEmpty() ? 0 : columns.get(0).length;
-    DataBlockBuilder columnarBuilder = new DataBlockBuilder(dataSchema, 
DataBlock.Type.COLUMNAR, numRows);
+
+    int fixedBytesPerRow = calculateBytesPerRow(dataSchema);
+    int nullFixedBytes = dataSchema.size() * Integer.BYTES * 2;
+    int fixedBytesRequired = fixedBytesPerRow * numRows + nullFixedBytes;
+
+    Object2IntOpenHashMap<String> dictionary = new Object2IntOpenHashMap<>();
+
     // TODO: consolidate these null utils into data table utils.
     // Selection / Agg / Distinct all have similar code.
-    ColumnDataType[] storedTypes = dataSchema.getStoredColumnDataTypes();
-    int numColumns = storedTypes.length;
+    int numColumns = dataSchema.size();
+
     RoaringBitmap[] nullBitmaps = new RoaringBitmap[numColumns];
-    Object[] nullPlaceholders = new Object[numColumns];
-    for (int colId = 0; colId < numColumns; colId++) {
-      nullBitmaps[colId] = new RoaringBitmap();
-      nullPlaceholders[colId] = storedTypes[colId].getNullPlaceholder();
+    ByteBuffer fixedSize = ByteBuffer.allocate(fixedBytesRequired);
+    CompoundDataBuffer.Builder varBufferBuilder = new 
CompoundDataBuffer.Builder(ByteOrder.BIG_ENDIAN, true);
+
+    try (PagedPinotOutputStream varSize = new 
PagedPinotOutputStream(allocator)) {
+      for (int colId = 0; colId < numColumns; colId++) {
+        RoaringBitmap nullBitmap = new RoaringBitmap();
+        nullBitmaps[colId] = nullBitmap;
+        serializeColumnData(columns, dataSchema, colId, fixedSize, varSize, 
nullBitmap, dictionary);
+      }
+      varBufferBuilder.addPagedOutputStream(varSize);
     }
-    for (int colId = 0; colId < numColumns; colId++) {
-      Object[] column = columns.get(colId);
-      ByteBuffer byteBuffer = ByteBuffer.allocate(numRows * 
columnarBuilder._columnSizeInBytes[colId]);
-      Object value;
-
-      // NOTE:
-      // We intentionally make the type casting very strict here (e.g. only 
accepting Integer for INT) to ensure the
-      // rows conform to the data schema. This can help catch the unexpected 
data type issues early.
-      switch (storedTypes[colId]) {
-        // Single-value column
-        case INT:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            byteBuffer.putInt((int) value);
+    // Write null bitmaps after writing data.
+    setNullRowIds(nullBitmaps, fixedSize, varBufferBuilder);
+    return buildColumnarBlock(numRows, dataSchema, 
getReverseDictionary(dictionary), fixedSize, varBufferBuilder);
+  }
+
+  private static void serializeColumnData(List<Object[]> columns, DataSchema 
dataSchema, int colId,
+      ByteBuffer fixedSize, PagedPinotOutputStream varSize, RoaringBitmap 
nullBitmap,
+      Object2IntOpenHashMap<String> dictionary)
+      throws IOException {
+    ColumnDataType storedType = 
dataSchema.getColumnDataType(colId).getStoredType();
+    int numRows = columns.get(colId).length;
+
+    Object[] column = columns.get(colId);
+
+    // NOTE:
+    // We intentionally make the type casting very strict here (e.g. only 
accepting Integer for INT) to ensure the
+    // rows conform to the data schema. This can help catch the unexpected 
data type issues early.
+    switch (storedType) {
+      // Single-value column
+      case INT: {
+        int nullPlaceholder = (int) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            fixedSize.putInt(nullPlaceholder);
+          } else {
+            fixedSize.putInt((int) value);
           }
-          break;
-        case LONG:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            byteBuffer.putLong((long) value);
+        }
+        break;
+      }
+      case LONG: {
+        long nullPlaceholder = (long) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            fixedSize.putLong(nullPlaceholder);
+          } else {
+            fixedSize.putLong((long) value);
           }
-          break;
-        case FLOAT:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            byteBuffer.putFloat((float) value);
+        }
+        break;
+      }
+      case FLOAT: {
+        float nullPlaceholder = (float) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            fixedSize.putFloat(nullPlaceholder);
+          } else {
+            fixedSize.putFloat((float) value);
           }
-          break;
-        case DOUBLE:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            byteBuffer.putDouble((double) value);
+        }
+        break;
+      }
+      case DOUBLE: {
+        double nullPlaceholder = (double) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            fixedSize.putDouble(nullPlaceholder);
+          } else {
+            fixedSize.putDouble((double) value);
           }
-          break;
-        case BIG_DECIMAL:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (BigDecimal) value);
+        }
+        break;
+      }
+      case BIG_DECIMAL: {
+        BigDecimal nullPlaceholder = (BigDecimal) 
storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (BigDecimal) value);
           }
-          break;
-        case STRING:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (String) value);
+        }
+        break;
+      }
+      case STRING: {
+        ToIntFunction<String> didSupplier = k -> dictionary.size();
+        int nullPlaceHolder = dictionary.computeIfAbsent((String) 
storedType.getNullPlaceholder(), didSupplier);
+
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            fixedSize.putInt(nullPlaceHolder);
+          } else {
+            int dictId = dictionary.computeIfAbsent((String) value, 
didSupplier);
+            fixedSize.putInt(dictId);
           }
-          break;
-        case BYTES:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (ByteArray) value);
+        }
+        break;
+      }
+      case BYTES: {
+        ByteArray nullPlaceholder = (ByteArray) 
storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (ByteArray) value);
           }
-          break;
+        }
+        break;
+      }
 
-        // Multi-value column
-        case INT_ARRAY:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (int[]) value);
+      // Multi-value column
+      case INT_ARRAY: {
+        int[] nullPlaceholder = (int[]) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (int[]) value);
           }
-          break;
-        case LONG_ARRAY:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (long[]) value);
+        }
+        break;
+      }
+      case LONG_ARRAY: {
+        long[] nullPlaceholder = (long[]) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (long[]) value);
           }
-          break;
-        case FLOAT_ARRAY:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (float[]) value);
+        }
+        break;
+      }
+      case FLOAT_ARRAY: {
+        float[] nullPlaceholder = (float[]) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (float[]) value);
           }
-          break;
-        case DOUBLE_ARRAY:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (double[]) value);
+        }
+        break;
+      }
+      case DOUBLE_ARRAY: {
+        double[] nullPlaceholder = (double[]) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder);
+          } else {
+            setColumn(fixedSize, varSize, (double[]) value);
           }
-          break;
-        case STRING_ARRAY:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            value = column[rowId];
-            if (value == null) {
-              nullBitmaps[colId].add(rowId);
-              value = nullPlaceholders[colId];
-            }
-            setColumn(columnarBuilder, byteBuffer, (String[]) value);
+        }
+        break;
+      }
+      case STRING_ARRAY: {
+        String[] nullPlaceholder = (String[]) storedType.getNullPlaceholder();
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          Object value = column[rowId];
+          if (value == null) {
+            nullBitmap.add(rowId);
+            setColumn(fixedSize, varSize, nullPlaceholder, dictionary);
+          } else {
+            setColumn(fixedSize, varSize, (String[]) value, dictionary);
           }
-          break;
+        }
+        break;
+      }
 
-        // Special intermediate result for aggregation function
-        case OBJECT:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            setColumn(columnarBuilder, byteBuffer, column[rowId]);
-          }
-          break;
+      // Special intermediate result for aggregation function
+      case OBJECT: {
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          setColumn(fixedSize, varSize, column[rowId]);
+        }
+        break;
+      }
+      // Null
+      case UNKNOWN:
+        for (int rowId = 0; rowId < numRows; rowId++) {
+          setColumn(fixedSize, varSize, (Object) null);
+        }
+        break;
 
-        // Null
-        case UNKNOWN:
-          for (int rowId = 0; rowId < numRows; rowId++) {
-            setColumn(columnarBuilder, byteBuffer, (Object) null);
-          }
-          break;
+      default:
+        throw new IllegalStateException(
+            String.format("Unsupported stored type: %s for column: %s", 
storedType,
+                dataSchema.getColumnName(colId)));
+    }
+  }
 
+  private static int calculateBytesPerRow(DataSchema dataSchema) {
+    int rowSizeInBytes = 0;
+    for (ColumnDataType columnDataType : dataSchema.getColumnDataTypes()) {
+      switch (columnDataType) {
+        case INT:
+          rowSizeInBytes += 4;
+          break;
+        case LONG:
+          rowSizeInBytes += 8;
+          break;
+        case FLOAT:
+          rowSizeInBytes += 4;
+          break;
+        case DOUBLE:
+          rowSizeInBytes += 8;
+          break;
+        case STRING:
+          rowSizeInBytes += 4;
+          break;
+        // Object and array. (POSITION|LENGTH)
         default:
-          throw new IllegalStateException(
-              String.format("Unsupported stored type: %s for column: %s", 
storedTypes[colId],
-                  dataSchema.getColumnName(colId)));
+          rowSizeInBytes += 8;
+          break;
       }
-      
columnarBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 
0, byteBuffer.position());
     }
-    // Write null bitmaps after writing data.
-    for (RoaringBitmap nullBitmap : nullBitmaps) {
-      columnarBuilder.setNullRowIds(nullBitmap);
+    return rowSizeInBytes;
+  }
+
+  private static void writeVarOffsetInFixed(ByteBuffer fixedSize, 
PagedPinotOutputStream varSize) {
+    long offsetInVar = varSize.getCurrentOffset();
+    Preconditions.checkState(offsetInVar <= Integer.MAX_VALUE,
+        "Cannot handle variable size output stream larger than 2GB");
+    fixedSize.putInt((int) offsetInVar);
+  }
+
+  private static void setNullRowIds(RoaringBitmap[] nullVectors, ByteBuffer 
fixedSize,
+      CompoundDataBuffer.Builder varBufferBuilder)
+      throws IOException {
+    int varBufSize = Arrays.stream(nullVectors)
+        .mapToInt(bitmap -> bitmap == null ? 0 : 
bitmap.serializedSizeInBytes())
+        .sum();
+    ByteBuffer variableSize = ByteBuffer.allocate(varBufSize)
+        .order(ByteOrder.BIG_ENDIAN);
+
+    long varWrittenBytes = varBufferBuilder.getWrittenBytes();
+    Preconditions.checkArgument(varWrittenBytes < Integer.MAX_VALUE,
+        "Cannot handle variable size output stream larger than 2GB but found 
{} written bytes", varWrittenBytes);
+    int startVariableOffset = (int) varWrittenBytes;
+    for (RoaringBitmap nullRowIds : nullVectors) {
+      int writtenVarBytes = variableSize.position();
+      fixedSize.putInt(startVariableOffset + writtenVarBytes);
+      if (nullRowIds == null || nullRowIds.isEmpty()) {
+        fixedSize.putInt(0);
+      } else {
+        RoaringBitmapUtils.serialize(nullRowIds, variableSize);
+        fixedSize.putInt(variableSize.position() - writtenVarBytes);
+      }
     }
-    return buildColumnarBlock(columnarBuilder);
+    varBufferBuilder.addBuffer(variableSize);
   }
 
-  private static RowDataBlock buildRowBlock(DataBlockBuilder builder) {
-    return new RowDataBlock(builder._numRows, builder._dataSchema, 
getReverseDictionary(builder._dictionary),
-        builder._fixedSizeDataByteArrayOutputStream.toByteArray(),
-        builder._variableSizeDataByteArrayOutputStream.toByteArray());
+  private static RowDataBlock buildRowBlock(int numRows, DataSchema 
dataSchema, String[] dictionary,
+      ByteBuffer fixedSize, CompoundDataBuffer.Builder varBufferBuilder) {
+    return new RowDataBlock(numRows, dataSchema, dictionary, 
PinotByteBuffer.wrap(fixedSize), varBufferBuilder.build());
   }
 
-  private static ColumnarDataBlock buildColumnarBlock(DataBlockBuilder 
builder) {
-    return new ColumnarDataBlock(builder._numRows, builder._dataSchema, 
getReverseDictionary(builder._dictionary),
-        builder._fixedSizeDataByteArrayOutputStream.toByteArray(),
-        builder._variableSizeDataByteArrayOutputStream.toByteArray());
+  private static ColumnarDataBlock buildColumnarBlock(int numRows, DataSchema 
dataSchema, String[] dictionary,
+      ByteBuffer fixedSize, CompoundDataBuffer.Builder varBufferBuilder) {
+    return new ColumnarDataBlock(numRows, dataSchema, dictionary,
+        PinotByteBuffer.wrap(fixedSize), varBufferBuilder.build());

Review Comment:
   nit: These can probably be simply in-lined now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to