ankitsultana commented on code in PR #13303:
URL: https://github.com/apache/pinot/pull/13303#discussion_r1722589466


##########
pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/memory/PagedPinotOutputStream.java:
##########
@@ -291,35 +290,8 @@ public void close()
   }
 
   public static abstract class PageAllocator {
-    public static final int MIN_RECOMMENDED_PAGE_SIZE;
-    public static final int MAX_RECOMMENDED_PAGE_SIZE;
-
-    static {
-      int minRecommendedPageSize = -1;
-      int maxRecommendedPageSize = -1;
-      try {
-        switch (ArchUtils.getProcessor().getType()) {
-          case AARCH_64:
-            // ARM processors support 4KB and 1MB pages
-            minRecommendedPageSize = 16 * 1024;
-            maxRecommendedPageSize = 1024 * 1024;
-            break;
-          case X86:
-          default:
-            // X86 processors support 4KB and 4MB pages
-            minRecommendedPageSize = 4 * 1024;
-            maxRecommendedPageSize = 4 * 1024 * 1024;
-            break;
-        }
-      } catch (Throwable t) {
-        LOGGER.warn("Could not determine processor architecture. Falling back 
to default values", t);
-        // Fallback to 4KB and 4MBs
-        minRecommendedPageSize = 4 * 1024;
-        maxRecommendedPageSize = 4 * 1024 * 1024;
-      }
-      MIN_RECOMMENDED_PAGE_SIZE = minRecommendedPageSize;
-      MAX_RECOMMENDED_PAGE_SIZE = maxRecommendedPageSize;
-    }
+    public static final int MIN_RECOMMENDED_PAGE_SIZE = 16 * 1024;
+    public static final int MAX_RECOMMENDED_PAGE_SIZE = 1024 * 1024;

Review Comment:
   @gortiz : I had missed reviewing the previous PR #13304 which introduced 
this Page output stream.
   
   Are we using DirectPageAllocator anywhere yet? I only see them used in the 
benchmarks so far.



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java:
##########
@@ -104,4 +117,44 @@ public static Type fromOrdinal(int ordinal) {
       }
     }
   }
+
+  /**
+   * A raw representation of the block.
+   * <p>
+   * Do not confuse this with the serialized form of the block. This is a 
representation of the block in memory and
+   * it is completely dependent on the current Pinot version. That means that 
this representation can change between
+   * Pinot versions.
+   * <p>
+   * The {@link DataBlockSerde} is responsible for serializing and 
deserializing this raw representation into a binary
+   * format that is compatible with the other Pinot versions.
+   */
+  interface Raw {

Review Comment:
   > Alternatively I can change DataBlockSerde to consume a BaseDataBlock, 
which won't break the encapsulation at the cost of imposing that every block is 
a BaseDataBlock. Do you prefer this alternative?
   
   I'd vote for this.



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlock.java:
##########
@@ -36,21 +39,25 @@ public interface DataBlock {
 
   int getNumberOfRows();
 
+  int getNumberOfColumns();
+
   void addException(ProcessingException processingException);
 
   void addException(int errCode, String errMsg);
 
   Map<Integer, String> getExceptions();
 
-  byte[] toBytes()
+  /**
+   * This is basically a wrap on top of {@link 
DataBlockUtils#serialize(DataBlock)} but implementations can catch

Review Comment:
   nit: "cache the result" and "wrapper on top of"



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java:
##########
@@ -426,168 +390,202 @@ public Map<Integer, String> getExceptions() {
     return _errCodeToExceptionMap;
   }
 
-  /**
-   * Serialize this data block to a byte array.
-   * <p>
-   * In order to deserialize it, {@link 
DataBlockUtils#getDataBlock(ByteBuffer)} should be used.
-   */
   @Override
-  public byte[] toBytes()
+  public List<ByteBuffer> serialize()
       throws IOException {
-    ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
-
-    UnsynchronizedByteArrayOutputStream byteArrayOutputStream = new 
UnsynchronizedByteArrayOutputStream(8192);
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-    writeLeadingSections(dataOutputStream);
-
-    // Write metadata: length followed by actual metadata bytes.
-    // NOTE: We ignore metadata serialization time in 
"responseSerializationCpuTimeNs" as it's negligible while
-    // considering it will bring a lot code complexity.
-    serializeMetadata(dataOutputStream);
-
-    return byteArrayOutputStream.toByteArray();
-  }
-
-  private void writeLeadingSections(DataOutputStream dataOutputStream)
-      throws IOException {
-    dataOutputStream.writeInt(getDataBlockVersionType());
-    dataOutputStream.writeInt(_numRows);
-    dataOutputStream.writeInt(_numColumns);
-    int dataOffset = HEADER_SIZE;
-
-    // Write exceptions section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] exceptionsBytes;
-    exceptionsBytes = serializeExceptions();
-    dataOutputStream.writeInt(exceptionsBytes.length);
-    dataOffset += exceptionsBytes.length;
-
-    // Write dictionary map section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] dictionaryBytes = null;
-    if (_stringDictionary != null) {
-      dictionaryBytes = serializeStringDictionary();
-      dataOutputStream.writeInt(dictionaryBytes.length);
-      dataOffset += dictionaryBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
-    }
-
-    // Write data schema section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] dataSchemaBytes = null;
-    if (_dataSchema != null) {
-      dataSchemaBytes = _dataSchema.toBytes();
-      dataOutputStream.writeInt(dataSchemaBytes.length);
-      dataOffset += dataSchemaBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
+    if (_serialized == null) {
+      _serialized = DataBlockUtils.serialize(this);
     }
+    return _serialized;
+  }
 
-    // Write fixed size data section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    if (_fixedSizeDataBytes != null) {
-      dataOutputStream.writeInt(_fixedSizeDataBytes.length);
-      dataOffset += _fixedSizeDataBytes.length;
+  @Override
+  public String toString() {
+    if (_dataSchema == null) {
+      return "{}";
     } else {
-      dataOutputStream.writeInt(0);
+      return "resultSchema:" + '\n' + _dataSchema + '\n' + "numRows: " + 
_numRows + '\n';
     }
+  }
 
-    // Write variable size data section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    if (_variableSizeDataBytes != null) {
-      dataOutputStream.writeInt(_variableSizeDataBytes.length);
-    } else {
-      dataOutputStream.writeInt(0);
-    }
+  @Nullable
+  @Override
+  public String[] getStringDictionary() {
+    return _stringDictionary;
+  }
 
-    // Write actual data.
-    // Write exceptions bytes.
-    dataOutputStream.write(exceptionsBytes);
-    // Write dictionary map bytes.
-    if (dictionaryBytes != null) {
-      dataOutputStream.write(dictionaryBytes);
-    }
-    // Write data schema bytes.
-    if (dataSchemaBytes != null) {
-      dataOutputStream.write(dataSchemaBytes);
-    }
-    // Write fixed size data bytes.
-    if (_fixedSizeDataBytes != null) {
-      dataOutputStream.write(_fixedSizeDataBytes);
-    }
-    // Write variable size data bytes.
-    if (_variableSizeDataBytes != null) {
-      dataOutputStream.write(_variableSizeDataBytes);
-    }
+  @Nullable
+  @Override
+  public DataBuffer getFixedData() {
+    return _fixedSizeData;
   }
 
-  /**
-   * Writes the metadata section to the given data output stream.
-   */
-  protected void serializeMetadata(DataOutputStream dataOutputStream)
-      throws IOException {
-    dataOutputStream.writeInt(0);
+  @Nullable
+  @Override
+  public DataBuffer getVarSizeData() {
+    return _variableSizeData;
   }
 
   /**
-   * Deserializes the metadata section from the given byte buffer.
-   * <p>
-   * This is the counterpart of {@link #serializeMetadata(DataOutputStream)} 
and it is guaranteed that the buffer will
-   * be positioned at the start of the metadata section when this method is 
called.
+   * Returns the list of serialized stats.
    * <p>
-   * <strong>Important:</strong> It is mandatory for implementations to leave 
the cursor at the end of the metadata, in
-   * the exact same position as it was when {@link 
#serializeMetadata(DataOutputStream)} was called.
+   * The returned list may contain nulls, which would mean that no stats were 
available for that stage.
    * <p>
-   * <strong>Important:</strong> This method will be called at the end of the 
BaseDataConstructor constructor to read
-   * the metadata section. This means that it will be called 
<strong>before</strong> the subclass have been constructor
-   * have been called. Therefore it is not possible to use any subclass fields 
in this method.
+   * The list itself may also be null.
    */
-  protected void deserializeMetadata(ByteBuffer buffer)
-      throws IOException {
-    buffer.getInt();
+  @Nullable
+  @Override
+  public List<DataBuffer> getStatsByStage() {
+    return Collections.emptyList();
   }
 
-  private byte[] serializeExceptions()
-      throws IOException {
-    if (_errCodeToExceptionMap.isEmpty()) {
-      return new byte[4];
-    }
-    UnsynchronizedByteArrayOutputStream byteArrayOutputStream = new 
UnsynchronizedByteArrayOutputStream(1024);
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-
-    dataOutputStream.writeInt(_errCodeToExceptionMap.size());
-
-    for (Map.Entry<Integer, String> entry : _errCodeToExceptionMap.entrySet()) 
{
-      int key = entry.getKey();
-      String value = entry.getValue();
-      byte[] valueBytes = value.getBytes(UTF_8);
-      dataOutputStream.writeInt(key);
-      dataOutputStream.writeInt(valueBytes.length);
-      dataOutputStream.write(valueBytes);
-    }
-
-    return byteArrayOutputStream.toByteArray();
+  @Override
+  public Raw asRaw() {
+    return this;
   }
 
-  private Map<Integer, String> deserializeExceptions(ByteBuffer buffer)
-      throws IOException {
-    int numExceptions = buffer.getInt();
-    Map<Integer, String> exceptions = new 
HashMap<>(HashUtil.getHashMapCapacity(numExceptions));
-    for (int i = 0; i < numExceptions; i++) {
-      int errCode = buffer.getInt();
-      String errMessage = DataTableUtils.decodeString(buffer);
-      exceptions.put(errCode, errMessage);
+  @Override
+  public final boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof DataBlock)) {
+      return false;
+    }
+    DataBlock that = (DataBlock) o;
+    if (getDataBlockType() != that.getDataBlockType()) {
+      return false;
+    }
+    switch (getDataBlockType()) {
+      case ROW:
+      case COLUMNAR:
+        assert _dataSchema != null;
+        if (!_dataSchema.equals(that.getDataSchema())) {
+          return false;
+        }
+        if (_numRows != that.getNumberOfRows() || _numColumns != 
that.getNumberOfColumns()) {
+          return false;
+        }
+        DataSchema.ColumnDataType[] colTypes = 
_dataSchema.getColumnDataTypes();
+        String[] colNames = _dataSchema.getColumnNames();
+        for (int colId = 0; colId < colNames.length; colId++) {
+          switch (colTypes[colId]) {
+            case INT:
+            case BOOLEAN:
+              for (int did = 0; did < _numRows; did++) {
+                if (getInt(did, colId) != that.getInt(did, colId)) {
+                  return false;
+                }
+              }
+              break;
+            case LONG:
+              for (int did = 0; did < _numRows; did++) {
+                if (getLong(did, colId) != that.getLong(did, colId)) {
+                  return false;
+                }
+              }
+              break;
+            case FLOAT:
+              for (int did = 0; did < _numRows; did++) {
+                if (getFloat(did, colId) != that.getFloat(did, colId)) {
+                  return false;
+                }
+              }
+              break;
+            case DOUBLE:
+              for (int did = 0; did < _numRows; did++) {
+                if (getDouble(did, colId) != that.getDouble(did, colId)) {
+                  return false;
+                }
+              }
+              break;
+            case TIMESTAMP:
+              break;

Review Comment:
   this is missing implementation?



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java:
##########
@@ -426,168 +401,202 @@ public Map<Integer, String> getExceptions() {
     return _errCodeToExceptionMap;
   }
 
-  /**
-   * Serialize this data block to a byte array.
-   * <p>
-   * In order to deserialize it, {@link 
DataBlockUtils#getDataBlock(ByteBuffer)} should be used.
-   */
   @Override
-  public byte[] toBytes()
+  public List<ByteBuffer> serialize()
       throws IOException {
-    ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
-
-    UnsynchronizedByteArrayOutputStream byteArrayOutputStream = new 
UnsynchronizedByteArrayOutputStream(8192);
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-    writeLeadingSections(dataOutputStream);
-
-    // Write metadata: length followed by actual metadata bytes.
-    // NOTE: We ignore metadata serialization time in 
"responseSerializationCpuTimeNs" as it's negligible while
-    // considering it will bring a lot code complexity.
-    serializeMetadata(dataOutputStream);
-
-    return byteArrayOutputStream.toByteArray();
-  }
-
-  private void writeLeadingSections(DataOutputStream dataOutputStream)
-      throws IOException {
-    dataOutputStream.writeInt(getDataBlockVersionType());
-    dataOutputStream.writeInt(_numRows);
-    dataOutputStream.writeInt(_numColumns);
-    int dataOffset = HEADER_SIZE;
-
-    // Write exceptions section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] exceptionsBytes;
-    exceptionsBytes = serializeExceptions();
-    dataOutputStream.writeInt(exceptionsBytes.length);
-    dataOffset += exceptionsBytes.length;
-
-    // Write dictionary map section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] dictionaryBytes = null;
-    if (_stringDictionary != null) {
-      dictionaryBytes = serializeStringDictionary();
-      dataOutputStream.writeInt(dictionaryBytes.length);
-      dataOffset += dictionaryBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
-    }
-
-    // Write data schema section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] dataSchemaBytes = null;
-    if (_dataSchema != null) {
-      dataSchemaBytes = _dataSchema.toBytes();
-      dataOutputStream.writeInt(dataSchemaBytes.length);
-      dataOffset += dataSchemaBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
+    if (_serialized == null) {
+      _serialized = DataBlockUtils.serialize(this);
     }
+    return _serialized;
+  }
 
-    // Write fixed size data section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    if (_fixedSizeDataBytes != null) {
-      dataOutputStream.writeInt(_fixedSizeDataBytes.length);
-      dataOffset += _fixedSizeDataBytes.length;
+  @Override
+  public String toString() {
+    if (_dataSchema == null) {
+      return "{}";
     } else {
-      dataOutputStream.writeInt(0);
+      return "resultSchema:" + '\n' + _dataSchema + '\n' + "numRows: " + 
_numRows + '\n';
     }
+  }
 
-    // Write variable size data section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    if (_variableSizeDataBytes != null) {
-      dataOutputStream.writeInt(_variableSizeDataBytes.length);
-    } else {
-      dataOutputStream.writeInt(0);
-    }
+  @Nullable
+  @Override
+  public String[] getStringDictionary() {
+    return _stringDictionary;
+  }
 
-    // Write actual data.
-    // Write exceptions bytes.
-    dataOutputStream.write(exceptionsBytes);
-    // Write dictionary map bytes.
-    if (dictionaryBytes != null) {
-      dataOutputStream.write(dictionaryBytes);
-    }
-    // Write data schema bytes.
-    if (dataSchemaBytes != null) {
-      dataOutputStream.write(dataSchemaBytes);
-    }
-    // Write fixed size data bytes.
-    if (_fixedSizeDataBytes != null) {
-      dataOutputStream.write(_fixedSizeDataBytes);
-    }
-    // Write variable size data bytes.
-    if (_variableSizeDataBytes != null) {
-      dataOutputStream.write(_variableSizeDataBytes);
-    }
+  @Nullable
+  @Override
+  public DataBuffer getFixedData() {
+    return _fixedSizeData;
   }
 
-  /**
-   * Writes the metadata section to the given data output stream.
-   */
-  protected void serializeMetadata(DataOutputStream dataOutputStream)
-      throws IOException {
-    dataOutputStream.writeInt(0);
+  @Nullable
+  @Override
+  public DataBuffer getVarSizeData() {
+    return _variableSizeData;
   }
 
   /**
-   * Deserializes the metadata section from the given byte buffer.
+   * Returns the list of serialized stats.
    * <p>
-   * This is the counterpart of {@link #serializeMetadata(DataOutputStream)} 
and it is guaranteed that the buffer will
-   * be positioned at the start of the metadata section when this method is 
called.
+   * The returned list may contain nulls, which would mean that no stats were 
available for that stage.
    * <p>
-   * <strong>Important:</strong> It is mandatory for implementations to leave 
the cursor at the end of the metadata, in
-   * the exact same position as it was when {@link 
#serializeMetadata(DataOutputStream)} was called.
-   * <p>
-   * <strong>Important:</strong> This method will be called at the end of the 
BaseDataConstructor constructor to read
-   * the metadata section. This means that it will be called 
<strong>before</strong> the subclass have been constructor
-   * have been called. Therefore it is not possible to use any subclass fields 
in this method.
+   * The list itself may also be null.
    */
-  protected void deserializeMetadata(ByteBuffer buffer)
-      throws IOException {
-    buffer.getInt();
+  @Nullable
+  @Override
+  public List<DataBuffer> getStatsByStage() {
+    return Collections.emptyList();
   }
 
-  private byte[] serializeExceptions()
-      throws IOException {
-    if (_errCodeToExceptionMap.isEmpty()) {
-      return new byte[4];
-    }
-    UnsynchronizedByteArrayOutputStream byteArrayOutputStream = new 
UnsynchronizedByteArrayOutputStream(1024);
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-
-    dataOutputStream.writeInt(_errCodeToExceptionMap.size());
-
-    for (Map.Entry<Integer, String> entry : _errCodeToExceptionMap.entrySet()) 
{
-      int key = entry.getKey();
-      String value = entry.getValue();
-      byte[] valueBytes = value.getBytes(UTF_8);
-      dataOutputStream.writeInt(key);
-      dataOutputStream.writeInt(valueBytes.length);
-      dataOutputStream.write(valueBytes);
-    }
-
-    return byteArrayOutputStream.toByteArray();
+  @Override
+  public Raw asRaw() {
+    return this;
   }
 
-  private Map<Integer, String> deserializeExceptions(ByteBuffer buffer)
-      throws IOException {
-    int numExceptions = buffer.getInt();
-    Map<Integer, String> exceptions = new 
HashMap<>(HashUtil.getHashMapCapacity(numExceptions));
-    for (int i = 0; i < numExceptions; i++) {
-      int errCode = buffer.getInt();
-      String errMessage = DataTableUtils.decodeString(buffer);
-      exceptions.put(errCode, errMessage);
+  @Override
+  public final boolean equals(Object o) {

Review Comment:
   I'd vote for code simplification for now (either relocate or remove this).
   
   If we are using it for tests, then it might make sense to build a utils 
class which can be used to compare the content across two data blocks, instead 
of implementing the equals method. The equals method could then take args on 
certain params (like whether to do an ordered comparison, whether to do a 
semantic comparison (data is same but in arbitrary formats), or exact 
comparison (data including layout is same)).
   
   From a logical perspective, I feel implementing equals method suggests that 
the DataBlock has an authoritative opinion on how it should be considered for 
comparison with other blocks. But there's no easy way to define that (e.g. 
should equals denote "same data" or data in "exact same format", etc.)



##########
pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java:
##########
@@ -426,168 +390,202 @@ public Map<Integer, String> getExceptions() {
     return _errCodeToExceptionMap;
   }
 
-  /**
-   * Serialize this data block to a byte array.
-   * <p>
-   * In order to deserialize it, {@link 
DataBlockUtils#getDataBlock(ByteBuffer)} should be used.
-   */
   @Override
-  public byte[] toBytes()
+  public List<ByteBuffer> serialize()
       throws IOException {
-    ThreadResourceUsageProvider threadResourceUsageProvider = new 
ThreadResourceUsageProvider();
-
-    UnsynchronizedByteArrayOutputStream byteArrayOutputStream = new 
UnsynchronizedByteArrayOutputStream(8192);
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-    writeLeadingSections(dataOutputStream);
-
-    // Write metadata: length followed by actual metadata bytes.
-    // NOTE: We ignore metadata serialization time in 
"responseSerializationCpuTimeNs" as it's negligible while
-    // considering it will bring a lot code complexity.
-    serializeMetadata(dataOutputStream);
-
-    return byteArrayOutputStream.toByteArray();
-  }
-
-  private void writeLeadingSections(DataOutputStream dataOutputStream)
-      throws IOException {
-    dataOutputStream.writeInt(getDataBlockVersionType());
-    dataOutputStream.writeInt(_numRows);
-    dataOutputStream.writeInt(_numColumns);
-    int dataOffset = HEADER_SIZE;
-
-    // Write exceptions section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] exceptionsBytes;
-    exceptionsBytes = serializeExceptions();
-    dataOutputStream.writeInt(exceptionsBytes.length);
-    dataOffset += exceptionsBytes.length;
-
-    // Write dictionary map section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] dictionaryBytes = null;
-    if (_stringDictionary != null) {
-      dictionaryBytes = serializeStringDictionary();
-      dataOutputStream.writeInt(dictionaryBytes.length);
-      dataOffset += dictionaryBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
-    }
-
-    // Write data schema section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    byte[] dataSchemaBytes = null;
-    if (_dataSchema != null) {
-      dataSchemaBytes = _dataSchema.toBytes();
-      dataOutputStream.writeInt(dataSchemaBytes.length);
-      dataOffset += dataSchemaBytes.length;
-    } else {
-      dataOutputStream.writeInt(0);
+    if (_serialized == null) {
+      _serialized = DataBlockUtils.serialize(this);
     }
+    return _serialized;
+  }
 
-    // Write fixed size data section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    if (_fixedSizeDataBytes != null) {
-      dataOutputStream.writeInt(_fixedSizeDataBytes.length);
-      dataOffset += _fixedSizeDataBytes.length;
+  @Override
+  public String toString() {
+    if (_dataSchema == null) {
+      return "{}";
     } else {
-      dataOutputStream.writeInt(0);
+      return "resultSchema:" + '\n' + _dataSchema + '\n' + "numRows: " + 
_numRows + '\n';
     }
+  }
 
-    // Write variable size data section offset(START|SIZE).
-    dataOutputStream.writeInt(dataOffset);
-    if (_variableSizeDataBytes != null) {
-      dataOutputStream.writeInt(_variableSizeDataBytes.length);
-    } else {
-      dataOutputStream.writeInt(0);
-    }
+  @Nullable
+  @Override
+  public String[] getStringDictionary() {
+    return _stringDictionary;
+  }
 
-    // Write actual data.
-    // Write exceptions bytes.
-    dataOutputStream.write(exceptionsBytes);
-    // Write dictionary map bytes.
-    if (dictionaryBytes != null) {
-      dataOutputStream.write(dictionaryBytes);
-    }
-    // Write data schema bytes.
-    if (dataSchemaBytes != null) {
-      dataOutputStream.write(dataSchemaBytes);
-    }
-    // Write fixed size data bytes.
-    if (_fixedSizeDataBytes != null) {
-      dataOutputStream.write(_fixedSizeDataBytes);
-    }
-    // Write variable size data bytes.
-    if (_variableSizeDataBytes != null) {
-      dataOutputStream.write(_variableSizeDataBytes);
-    }
+  @Nullable
+  @Override
+  public DataBuffer getFixedData() {
+    return _fixedSizeData;
   }
 
-  /**
-   * Writes the metadata section to the given data output stream.
-   */
-  protected void serializeMetadata(DataOutputStream dataOutputStream)
-      throws IOException {
-    dataOutputStream.writeInt(0);
+  @Nullable
+  @Override
+  public DataBuffer getVarSizeData() {
+    return _variableSizeData;
   }
 
   /**
-   * Deserializes the metadata section from the given byte buffer.
-   * <p>
-   * This is the counterpart of {@link #serializeMetadata(DataOutputStream)} 
and it is guaranteed that the buffer will
-   * be positioned at the start of the metadata section when this method is 
called.
+   * Returns the list of serialized stats.
    * <p>
-   * <strong>Important:</strong> It is mandatory for implementations to leave 
the cursor at the end of the metadata, in
-   * the exact same position as it was when {@link 
#serializeMetadata(DataOutputStream)} was called.
+   * The returned list may contain nulls, which would mean that no stats were 
available for that stage.
    * <p>
-   * <strong>Important:</strong> This method will be called at the end of the 
BaseDataConstructor constructor to read
-   * the metadata section. This means that it will be called 
<strong>before</strong> the subclass have been constructor
-   * have been called. Therefore it is not possible to use any subclass fields 
in this method.
+   * The list itself may also be null.
    */
-  protected void deserializeMetadata(ByteBuffer buffer)
-      throws IOException {
-    buffer.getInt();
+  @Nullable
+  @Override
+  public List<DataBuffer> getStatsByStage() {
+    return Collections.emptyList();
   }
 
-  private byte[] serializeExceptions()
-      throws IOException {
-    if (_errCodeToExceptionMap.isEmpty()) {
-      return new byte[4];
-    }
-    UnsynchronizedByteArrayOutputStream byteArrayOutputStream = new 
UnsynchronizedByteArrayOutputStream(1024);
-    DataOutputStream dataOutputStream = new 
DataOutputStream(byteArrayOutputStream);
-
-    dataOutputStream.writeInt(_errCodeToExceptionMap.size());
-
-    for (Map.Entry<Integer, String> entry : _errCodeToExceptionMap.entrySet()) 
{
-      int key = entry.getKey();
-      String value = entry.getValue();
-      byte[] valueBytes = value.getBytes(UTF_8);
-      dataOutputStream.writeInt(key);
-      dataOutputStream.writeInt(valueBytes.length);
-      dataOutputStream.write(valueBytes);
-    }
-
-    return byteArrayOutputStream.toByteArray();
+  @Override
+  public Raw asRaw() {
+    return this;
   }
 
-  private Map<Integer, String> deserializeExceptions(ByteBuffer buffer)
-      throws IOException {
-    int numExceptions = buffer.getInt();
-    Map<Integer, String> exceptions = new 
HashMap<>(HashUtil.getHashMapCapacity(numExceptions));
-    for (int i = 0; i < numExceptions; i++) {
-      int errCode = buffer.getInt();
-      String errMessage = DataTableUtils.decodeString(buffer);
-      exceptions.put(errCode, errMessage);
+  @Override
+  public final boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof DataBlock)) {
+      return false;
+    }
+    DataBlock that = (DataBlock) o;
+    if (getDataBlockType() != that.getDataBlockType()) {
+      return false;
+    }
+    switch (getDataBlockType()) {
+      case ROW:
+      case COLUMNAR:
+        assert _dataSchema != null;
+        if (!_dataSchema.equals(that.getDataSchema())) {
+          return false;
+        }
+        if (_numRows != that.getNumberOfRows() || _numColumns != 
that.getNumberOfColumns()) {
+          return false;
+        }
+        DataSchema.ColumnDataType[] colTypes = 
_dataSchema.getColumnDataTypes();
+        String[] colNames = _dataSchema.getColumnNames();
+        for (int colId = 0; colId < colNames.length; colId++) {
+          switch (colTypes[colId]) {
+            case INT:
+            case BOOLEAN:
+              for (int did = 0; did < _numRows; did++) {
+                if (getInt(did, colId) != that.getInt(did, colId)) {
+                  return false;
+                }
+              }
+              break;
+            case LONG:
+              for (int did = 0; did < _numRows; did++) {
+                if (getLong(did, colId) != that.getLong(did, colId)) {
+                  return false;
+                }
+              }
+              break;
+            case FLOAT:
+              for (int did = 0; did < _numRows; did++) {
+                if (getFloat(did, colId) != that.getFloat(did, colId)) {
+                  return false;
+                }
+              }
+              break;
+            case DOUBLE:
+              for (int did = 0; did < _numRows; did++) {
+                if (getDouble(did, colId) != that.getDouble(did, colId)) {
+                  return false;
+                }
+              }
+              break;
+            case TIMESTAMP:
+              break;
+            case STRING:
+            case JSON:
+              for (int did = 0; did < _numRows; did++) {
+                if (!getString(did, colId).equals(that.getString(did, colId))) 
{
+                  return false;
+                }
+              }
+              break;
+            case BYTES:
+              for (int did = 0; did < _numRows; did++) {
+                if (!getBytes(did, colId).equals(that.getBytes(did, colId))) {
+                  return false;
+                }
+              }
+              break;
+            case BIG_DECIMAL:
+              for (int did = 0; did < _numRows; did++) {
+                if (!getBigDecimal(did, colId).equals(that.getBigDecimal(did, 
colId))) {
+                  return false;
+                }
+              }
+              break;
+            case OBJECT:
+              for (int did = 0; did < _numRows; did++) {
+                if (!Objects.equals(getCustomObject(did, colId), 
that.getCustomObject(did, colId))) {
+                  return false;
+                }
+              }
+              break;
+            case INT_ARRAY:
+              for (int did = 0; did < _numRows; did++) {
+                if (!Arrays.equals(getIntArray(did, colId), 
that.getIntArray(did, colId))) {
+                  return false;
+                }
+              }
+              break;
+            case LONG_ARRAY:
+            case TIMESTAMP_ARRAY:
+              for (int did = 0; did < _numRows; did++) {
+                if (!Arrays.equals(getLongArray(did, colId), 
that.getLongArray(did, colId))) {
+                  return false;
+                }
+              }
+              break;
+            case FLOAT_ARRAY:
+              for (int did = 0; did < _numRows; did++) {
+                if (!Arrays.equals(getFloatArray(did, colId), 
that.getFloatArray(did, colId))) {
+                  return false;
+                }
+              }
+              break;
+            case DOUBLE_ARRAY:
+              for (int did = 0; did < _numRows; did++) {
+                if (!Arrays.equals(getDoubleArray(did, colId), 
that.getDoubleArray(did, colId))) {
+                  return false;
+                }
+              }
+              break;
+            case STRING_ARRAY:
+              for (int did = 0; did < _numRows; did++) {
+                if (!Arrays.equals(getStringArray(did, colId), 
that.getStringArray(did, colId))) {
+                  return false;
+                }
+              }
+              break;
+            case BYTES_ARRAY:
+            case BOOLEAN_ARRAY:
+            case UNKNOWN:
+              throw new UnsupportedOperationException("Check how to read " + 
colTypes[colId] + " from data block");
+            default:
+              throw new UnsupportedOperationException("Unsupported column 
type: " + colTypes[colId]);
+          }
+        }
+        return true;
+      case METADATA: {
+        return Objects.equals(_errCodeToExceptionMap, that.getExceptions())
+            && Objects.equals(getStatsByStage(), 
that.asRaw().getStatsByStage());
+      }
+      default:
+        throw new UnsupportedOperationException("Unsupported data block type: 
" + getDataBlockType());
     }
-    return exceptions;
   }
 
   @Override
-  public String toString() {
-    if (_dataSchema == null) {
-      return "{}";
-    } else {
-      return "resultSchema:" + '\n' + _dataSchema + '\n' + "numRows: " + 
_numRows + '\n';
-    }
+  public final int hashCode() {
+    return Objects.hash(_errCodeToExceptionMap, _numRows, _dataSchema);

Review Comment:
   Similar to the other comment, is this getting used anywhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to