Jackie-Jiang commented on code in PR #13303:
URL: https://github.com/apache/pinot/pull/13303#discussion_r1669493712


##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java:
##########
@@ -104,4 +117,44 @@ public static Type fromOrdinal(int ordinal) {
       }
     }
   }
+
+  /**
+   * A raw representation of the block.
+   * <p>
+   * Do not confuse this with the serialized form of the block. This is a 
representation of the block in memory and
+   * it is completely dependent on the current Pinot version. That means that 
this representation can change between
+   * Pinot versions.
+   * <p>
+   * The {@link DataBlockSerde} is responsible for serializing and 
deserializing this raw representation into a binary
+   * format that is compatible with the other Pinot versions.
+   */
+  interface Raw {

Review Comment:
   I'm a little bit confused about this `Raw` format. We don't really have a 
serialized form of the block (serialized form is `ByteBuffer`), or we can also 
say we don't have a raw format because currently the block is partially 
serialized (e.g. with fixed data, var-size data).
   This `Raw` interface just exposed some implementation detail of the 
`BaseDataBlock`. I'd suggest just making them public in `BaseDataBlock` and not 
model them as an interface because they are all implementation details, not 
general abstraction.
   In the future, maybe we want to introduce the real raw format, which can be 
very useful when ser-de is not needed (e.g. colocated join).



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockSerde.java:
##########
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.datablock;
+
+import java.io.IOException;
+import java.util.function.LongConsumer;
+import javax.annotation.Nullable;
+import org.apache.pinot.segment.spi.memory.DataBuffer;
+
+
+/**
+ * An interface that can be implemented to support different types of data 
block serialization and deserialization.
+ * <p>
+ * It is important to distinguish between the serialization format and the 
data block raw representation.
+ * The raw representation is the in-memory representation of the data block 
and is completely dependent on the
+ * runtime Pinot version while the serialization format is the format used to 
write the data block through the network.
+ * Two Pinot nodes in different versions may represent the same data block in 
different raw representations but there
+ * should be at least one common serialization format (defined by the {@link 
Version} used) that can be used to
+ * serialize and deserialize the data block between the two nodes.
+ */
+public interface DataBlockSerde {
+
+  /**
+   * Serialize the data block into a buffer.
+   * @param dataBlock The data block to serialize.
+   * @param firstInt The first integer, which is used to codify the version 
and type of the data block in a protocol
+   *                 defined way. This integer must be written in the first 4 
positions of the buffer in BIG_ENDIAN
+   *                 order.
+   */
+  DataBuffer serialize(DataBlock.Raw dataBlock, int firstInt)
+      throws IOException;
+
+  /**
+   * Serialize the data block into the given output stream.
+   *
+   * @param buffer The buffer that contains the data. It will always use 
{@link java.nio.ByteOrder#BIG_ENDIAN} order.
+   * @param offset the offset in the buffer where the data starts. The first 
integer is reserved to store version and
+   *               type and should not be trusted by the implementation. Use 
the type parameter instead.
+   * @param type   the type of data block.
+   * @param finalOffsetConsumer A consumer that will be called after the data 
block is deserialized. The consumer will
+   *                            receive the offset where the data block ends.
+   */
+  DataBlock deserialize(DataBuffer buffer, long offset, DataBlock.Type type, 
@Nullable LongConsumer finalOffsetConsumer)
+      throws IOException;
+
+  default DataBlock deserialize(DataBuffer buffer, long offset, DataBlock.Type 
type)
+      throws IOException {
+    return deserialize(buffer, offset, type, null);
+  }
+
+  Version getVersion();
+
+  /**
+   * The version used by this implementation.
+   * <p>
+   * The version should be incremented whenever the serialization format 
changes in a way that is not backwards
+   * compatible in both serialization and deserialization ways.
+   */
+  enum Version {

Review Comment:
   Why do we need ser/de version? Shouldn't it always handle all possible data 
block versions?



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java:
##########
@@ -426,168 +401,202 @@ public Map<Integer, String> getExceptions() {
     return _errCodeToExceptionMap;
   }
 
-  /**
-   * Serialize this data block to a byte array.
-   * <p>
-   * In order to deserialize it, {@link 
DataBlockUtils#getDataBlock(ByteBuffer)} should be used.
-   */
   @Override
-  public byte[] toBytes()
+  public List<ByteBuffer> serialize()
       throws IOException {
-    ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
-
-    UnsynchronizedByteArrayOutputStream byteArrayOutputStream = new 
UnsynchronizedByteArrayOutputStream(8192);
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-    writeLeadingSections(dataOutputStream);
-
-    // Write metadata: length followed by actual metadata bytes.
-    // NOTE: We ignore metadata serialization time in 
"responseSerializationCpuTimeNs" as it's negligible while
-    // considering it will bring a lot code complexity.
-    serializeMetadata(dataOutputStream);
-
-    return byteArrayOutputStream.toByteArray();
-  }
-
-  private void writeLeadingSections(DataOutputStream dataOutputStream)
-      throws IOException {
-    dataOutputStream.writeInt(getDataBlockVersionType());
-    dataOutputStream.writeInt(_numRows);
-    dataOutputStream.writeInt(_numColumns);
-    int dataOffset = HEADER_SIZE;
-
-    // Write exceptions section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] exceptionsBytes;
-    exceptionsBytes = serializeExceptions();
-    dataOutputStream.writeInt(exceptionsBytes.length);
-    dataOffset += exceptionsBytes.length;
-
-    // Write dictionary map section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] dictionaryBytes = null;
-    if (_stringDictionary != null) {
-      dictionaryBytes = serializeStringDictionary();
-      dataOutputStream.writeInt(dictionaryBytes.length);
-      dataOffset += dictionaryBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
-    }
-
-    // Write data schema section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] dataSchemaBytes = null;
-    if (_dataSchema != null) {
-      dataSchemaBytes = _dataSchema.toBytes();
-      dataOutputStream.writeInt(dataSchemaBytes.length);
-      dataOffset += dataSchemaBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
+    if (_serialized == null) {
+      _serialized = DataBlockUtils.serialize(this);

Review Comment:
   What is the benefit of taking it out into a separate ser/de module? I think 
keeping it within the class is easier for version control.



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java:
##########
@@ -239,7 +169,11 @@ public int getNumberOfRows() {
     return _numRows;
   }
 
-  // --------------------------------------------------------------------------
+  @Override
+  public int getNumberOfColumns() {
+    return _numColumns;
+  }
+// --------------------------------------------------------------------------

Review Comment:
   (minor) Reformat



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java:
##########
@@ -36,13 +39,19 @@ public interface DataBlock {
 
   int getNumberOfRows();
 
+  int getNumberOfColumns();
+
   void addException(ProcessingException processingException);
 
   void addException(int errCode, String errMsg);
 
   Map<Integer, String> getExceptions();
 
-  byte[] toBytes()

Review Comment:
   Just calling it out that this will be backward incompatible if there are 
external code using this method. I guess it should be fine because this class 
is supposed to be used only for broker server or inter server communication



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java:
##########
@@ -109,92 +114,27 @@ public BaseDataBlock(int numRows, @Nullable DataSchema 
dataSchema, String[] stri
     _numColumns = dataSchema == null ? 0 : dataSchema.size();
     _fixDataSize = 0;
     _stringDictionary = stringDictionary;
-    _fixedSizeDataBytes = fixedSizeDataBytes;
-    _fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes);
-    _variableSizeDataBytes = variableSizeDataBytes;
-    _variableSizeData = ByteBuffer.wrap(variableSizeDataBytes);
+    _fixedSizeData = PinotByteBuffer.wrap(ByteBuffer.wrap(fixedSizeDataBytes));
+    _variableSizeData = 
PinotByteBuffer.wrap(ByteBuffer.wrap(variableSizeDataBytes));
     _errCodeToExceptionMap = new HashMap<>();
   }
 
-  /**
-   * Construct empty data table.
-   */
-  public BaseDataBlock() {
-    _numRows = 0;
-    _numColumns = 0;
+  public BaseDataBlock(int numRows, DataSchema dataSchema, String[] 
stringDictionary,
+      DataBuffer fixedSizeData, DataBuffer variableSizeData) {
+    Preconditions.checkArgument(fixedSizeData.size() <= Integer.MAX_VALUE, 
"Fixed size data too large ({} bytes",
+        fixedSizeData.size());
+    Preconditions.checkArgument(variableSizeData.size() <= Integer.MAX_VALUE, 
"Variable size data too large ({} bytes",
+        variableSizeData.size());
+    _numRows = numRows;
+    _dataSchema = dataSchema;
+    _numColumns = dataSchema.size();
     _fixDataSize = 0;

Review Comment:
   Not introduced in this PR, but having a 0 `_fixDataSize` is super confusing. 
Consider making it `final` and always pass it in. Seems several other fields 
can also be made final



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java:
##########
@@ -290,84 +232,117 @@ public ByteArray getBytes(int rowId, int colId) {
 
   @Override
   public int[] getIntArray(int rowId, int colId) {
-    int length = positionOffsetInVariableBufferAndGetLength(rowId, colId);
-    int[] ints = new int[length];
-    for (int i = 0; i < length; i++) {
-      ints[i] = _variableSizeData.getInt();
+    int offsetInFixed = getOffsetInFixedBuffer(rowId, colId);
+    int size = _fixedSizeData.getInt(offsetInFixed + 4);

Review Comment:
   Is this `size` the array length instead of bytes size?



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java:
##########
@@ -426,168 +401,202 @@ public Map<Integer, String> getExceptions() {
     return _errCodeToExceptionMap;
   }
 
-  /**
-   * Serialize this data block to a byte array.
-   * <p>
-   * In order to deserialize it, {@link 
DataBlockUtils#getDataBlock(ByteBuffer)} should be used.
-   */
   @Override
-  public byte[] toBytes()
+  public List<ByteBuffer> serialize()
       throws IOException {
-    ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
-
-    UnsynchronizedByteArrayOutputStream byteArrayOutputStream = new 
UnsynchronizedByteArrayOutputStream(8192);
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-    writeLeadingSections(dataOutputStream);
-
-    // Write metadata: length followed by actual metadata bytes.
-    // NOTE: We ignore metadata serialization time in 
"responseSerializationCpuTimeNs" as it's negligible while
-    // considering it will bring a lot code complexity.
-    serializeMetadata(dataOutputStream);
-
-    return byteArrayOutputStream.toByteArray();
-  }
-
-  private void writeLeadingSections(DataOutputStream dataOutputStream)
-      throws IOException {
-    dataOutputStream.writeInt(getDataBlockVersionType());
-    dataOutputStream.writeInt(_numRows);
-    dataOutputStream.writeInt(_numColumns);
-    int dataOffset = HEADER_SIZE;
-
-    // Write exceptions section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] exceptionsBytes;
-    exceptionsBytes = serializeExceptions();
-    dataOutputStream.writeInt(exceptionsBytes.length);
-    dataOffset += exceptionsBytes.length;
-
-    // Write dictionary map section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] dictionaryBytes = null;
-    if (_stringDictionary != null) {
-      dictionaryBytes = serializeStringDictionary();
-      dataOutputStream.writeInt(dictionaryBytes.length);
-      dataOffset += dictionaryBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
-    }
-
-    // Write data schema section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] dataSchemaBytes = null;
-    if (_dataSchema != null) {
-      dataSchemaBytes = _dataSchema.toBytes();
-      dataOutputStream.writeInt(dataSchemaBytes.length);
-      dataOffset += dataSchemaBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
+    if (_serialized == null) {
+      _serialized = DataBlockUtils.serialize(this);
     }
+    return _serialized;
+  }
 
-    // Write fixed size data section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    if (_fixedSizeDataBytes != null) {
-      dataOutputStream.writeInt(_fixedSizeDataBytes.length);
-      dataOffset += _fixedSizeDataBytes.length;
+  @Override
+  public String toString() {
+    if (_dataSchema == null) {
+      return "{}";
     } else {
-      dataOutputStream.writeInt(0);
+      return "resultSchema:" + '\n' + _dataSchema + '\n' + "numRows: " + 
_numRows + '\n';
     }
+  }
 
-    // Write variable size data section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    if (_variableSizeDataBytes != null) {
-      dataOutputStream.writeInt(_variableSizeDataBytes.length);
-    } else {
-      dataOutputStream.writeInt(0);
-    }
+  @Nullable
+  @Override
+  public String[] getStringDictionary() {
+    return _stringDictionary;
+  }
 
-    // Write actual data.
-    // Write exceptions bytes.
-    dataOutputStream.write(exceptionsBytes);
-    // Write dictionary map bytes.
-    if (dictionaryBytes != null) {
-      dataOutputStream.write(dictionaryBytes);
-    }
-    // Write data schema bytes.
-    if (dataSchemaBytes != null) {
-      dataOutputStream.write(dataSchemaBytes);
-    }
-    // Write fixed size data bytes.
-    if (_fixedSizeDataBytes != null) {
-      dataOutputStream.write(_fixedSizeDataBytes);
-    }
-    // Write variable size data bytes.
-    if (_variableSizeDataBytes != null) {
-      dataOutputStream.write(_variableSizeDataBytes);
-    }
+  @Nullable
+  @Override
+  public DataBuffer getFixedData() {
+    return _fixedSizeData;
   }
 
-  /**
-   * Writes the metadata section to the given data output stream.
-   */
-  protected void serializeMetadata(DataOutputStream dataOutputStream)
-      throws IOException {
-    dataOutputStream.writeInt(0);
+  @Nullable
+  @Override
+  public DataBuffer getVarSizeData() {
+    return _variableSizeData;
   }
 
   /**
-   * Deserializes the metadata section from the given byte buffer.
+   * Returns the list of serialized stats.
    * <p>
-   * This is the counterpart of {@link #serializeMetadata(DataOutputStream)} 
and it is guaranteed that the buffer will
-   * be positioned at the start of the metadata section when this method is 
called.
+   * The returned list may contain nulls, which would mean that no stats were 
available for that stage.
    * <p>
-   * <strong>Important:</strong> It is mandatory for implementations to leave 
the cursor at the end of the metadata, in
-   * the exact same position as it was when {@link 
#serializeMetadata(DataOutputStream)} was called.
-   * <p>
-   * <strong>Important:</strong> This method will be called at the end of the 
BaseDataConstructor constructor to read
-   * the metadata section. This means that it will be called 
<strong>before</strong> the subclass have been constructor
-   * have been called. Therefore it is not possible to use any subclass fields 
in this method.
+   * The list itself may also be null.
    */
-  protected void deserializeMetadata(ByteBuffer buffer)
-      throws IOException {
-    buffer.getInt();
+  @Nullable
+  @Override
+  public List<DataBuffer> getStatsByStage() {
+    return Collections.emptyList();
   }
 
-  private byte[] serializeExceptions()
-      throws IOException {
-    if (_errCodeToExceptionMap.isEmpty()) {
-      return new byte[4];
-    }
-    UnsynchronizedByteArrayOutputStream byteArrayOutputStream = new 
UnsynchronizedByteArrayOutputStream(1024);
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-
-    dataOutputStream.writeInt(_errCodeToExceptionMap.size());
-
-    for (Map.Entry<Integer, String> entry : _errCodeToExceptionMap.entrySet()) 
{
-      int key = entry.getKey();
-      String value = entry.getValue();
-      byte[] valueBytes = value.getBytes(UTF_8);
-      dataOutputStream.writeInt(key);
-      dataOutputStream.writeInt(valueBytes.length);
-      dataOutputStream.write(valueBytes);
-    }
-
-    return byteArrayOutputStream.toByteArray();
+  @Override
+  public Raw asRaw() {
+    return this;
   }
 
-  private Map<Integer, String> deserializeExceptions(ByteBuffer buffer)
-      throws IOException {
-    int numExceptions = buffer.getInt();
-    Map<Integer, String> exceptions = new 
HashMap<>(HashUtil.getHashMapCapacity(numExceptions));
-    for (int i = 0; i < numExceptions; i++) {
-      int errCode = buffer.getInt();
-      String errMessage = DataTableUtils.decodeString(buffer);
-      exceptions.put(errCode, errMessage);
+  @Override
+  public final boolean equals(Object o) {

Review Comment:
   Are we using this for test only? If so, is it easier to just compare the 
serialized bytes?



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java:
##########
@@ -290,84 +232,117 @@ public ByteArray getBytes(int rowId, int colId) {
 
   @Override
   public int[] getIntArray(int rowId, int colId) {
-    int length = positionOffsetInVariableBufferAndGetLength(rowId, colId);
-    int[] ints = new int[length];
-    for (int i = 0; i < length; i++) {
-      ints[i] = _variableSizeData.getInt();
+    int offsetInFixed = getOffsetInFixedBuffer(rowId, colId);
+    int size = _fixedSizeData.getInt(offsetInFixed + 4);
+    int offsetInVar = _fixedSizeData.getInt(offsetInFixed);
+
+    int[] ints = new int[size];
+    try (PinotInputStream stream = 
_variableSizeData.openInputStream(offsetInVar)) {

Review Comment:
   Is input stream better than directly read at offset? I think it will be an 
extra object and quite some extra method invocations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to