gortiz commented on code in PR #13303:
URL: https://github.com/apache/pinot/pull/13303#discussion_r1738802125


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java:
##########
@@ -78,20 +106,119 @@ public static DataBlock.Type getType(int versionType) {
     return DataBlock.Type.fromOrdinal(versionType >> VERSION_TYPE_SHIFT);
   }
 
-  public static DataBlock getDataBlock(ByteBuffer byteBuffer)
+  public static List<ByteBuffer> serialize(DataBlock dataBlock)
+      throws IOException {
+    return serialize(DataBlockSerde.Version.V1_V2, dataBlock);
+  }
+
+  @VisibleForTesting
+  public static List<ByteBuffer> serialize(DataBlockSerde.Version version, 
DataBlock dataBlock)
+      throws IOException {
+
+    DataBlockSerde dataBlockSerde = SERDES.get(version);
+    if (dataBlockSerde == null) {
+      throw new UnsupportedOperationException("Unsupported data block version: 
" + version);
+    }
+
+    DataBlock.Type dataBlockType = dataBlock.getDataBlockType();
+    int firstInt = version.getVersion() + (dataBlockType.ordinal() << 
DataBlockUtils.VERSION_TYPE_SHIFT);
+
+    DataBuffer dataBuffer = dataBlockSerde.serialize(dataBlock, firstInt);
+
+    int readFirstByte;
+    if (dataBuffer.order() != ByteOrder.BIG_ENDIAN) {
+      readFirstByte = dataBuffer.view(0, 4, ByteOrder.BIG_ENDIAN).getInt(0);
+    } else {
+      readFirstByte = dataBuffer.getInt(0);
+    }
+    Preconditions.checkState(readFirstByte == firstInt, "Illegal serialization 
by {}. "
+        + "The first integer should be {} but is {} instead", 
dataBuffer.getClass().getName(), firstInt, readFirstByte);
+
+    ArrayList<ByteBuffer> result = new ArrayList<>();
+    dataBuffer.appendAsByteBuffers(result);
+    return result;
+  }
+
+  /**
+   * Reads a data block from the given byte buffer.
+   * @param buffer the buffer to read from. The data will be read at the 
buffer's current position. This position will
+   *               be updated to point to the end of the data block.
+   */
+  public static DataBlock readFrom(ByteBuffer buffer)
+      throws IOException {
+    return deserialize(PinotByteBuffer.wrap(buffer), buffer.position(), 
newOffset -> {
+      if (newOffset > Integer.MAX_VALUE) {
+        throw new IllegalStateException("Data block is too large");
+      }
+      buffer.position((int) newOffset);
+    });
+  }
+
+  /**
+   * Deserialize a list of byte buffers into a data block.
+   * Contrary to {@link #readFrom(ByteBuffer)}, the given buffers will not be 
modified.
+   */
+  public static DataBlock deserialize(List<ByteBuffer> buffers)
       throws IOException {
-    int versionType = readVersionType(byteBuffer);
-    int version = getVersion(versionType);
-    DataBlock.Type type = getType(versionType);
-    switch (type) {
-      case COLUMNAR:
-        return new ColumnarDataBlock(byteBuffer);
-      case ROW:
-        return new RowDataBlock(byteBuffer);
-      case METADATA:
-        return MetadataBlock.deserialize(byteBuffer, version);
-      default:
-        throw new UnsupportedOperationException("Unsupported data table 
version: " + version + " with type: " + type);
+    List<DataBuffer> dataBuffers = buffers.stream()
+        .map(PinotByteBuffer::wrap)
+        .collect(Collectors.toList());
+    try (CompoundDataBuffer compoundBuffer = new 
CompoundDataBuffer(dataBuffers, ByteOrder.BIG_ENDIAN, false)) {
+      return deserialize(compoundBuffer);
+    }
+  }
+
+  /**
+   * Deserialize a list of byte buffers into a data block.
+   * Contrary to {@link #readFrom(ByteBuffer)}, the given buffers will not be 
modified.
+   */
+  public static DataBlock deserialize(ByteBuffer[] buffers)
+      throws IOException {
+    try (CompoundDataBuffer compoundBuffer = new CompoundDataBuffer(buffers, 
ByteOrder.BIG_ENDIAN, false)) {
+      return deserialize(compoundBuffer);
+    }
+  }
+
+  /**
+   * Deserialize a list of byte buffers into a data block.
+   * <p>
+   * Data will be read from the first byte of the buffer. Use {@link 
#deserialize(DataBuffer, long, LongConsumer)}
+   * in case it is needed to read from a different position.
+   */
+  public static DataBlock deserialize(DataBuffer buffer)
+      throws IOException {
+    return deserialize(buffer, 0, null);
+  }
+
+  /**
+   * Deserialize a list of byte buffers into a data block.
+   * @param buffer the buffer to read from.
+   * @param offset the offset in the buffer where the data starts.
+   * @param finalOffsetConsumer An optional consumer that will be called after 
the data block is deserialized.
+   *                            The consumer will receive the offset where the 
data block ends.
+   */
+  public static DataBlock deserialize(DataBuffer buffer, long offset, 
@Nullable LongConsumer finalOffsetConsumer)
+      throws IOException {
+    int versionAndSubVersion = buffer.getInt(offset);
+    int version = getVersion(versionAndSubVersion);

Review Comment:
   I think this was another ill decision we took in the past. It would be quite 
cleaner to just have a single version integer and assign one version to EOS and 
another version for errors. Instead we make the mistake of merging our 
implementation concept (EOS and errors are represented by the same object) and 
the network protocol.
   
   This kind of decisions are very common in the DataBlock codebase. That is 
why I wanted to create a clear separation between DataBlock (the atomic element 
of information used to communicate Operators in the same opChain) and the 
DataBlockSerde (how DataBlocks are serialized when different opChains are 
communicated).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to