gortiz commented on code in PR #13303: URL: https://github.com/apache/pinot/pull/13303#discussion_r1706790601
########## pinot-common/src/main/java/org/apache/pinot/common/datablock/BaseDataBlock.java: ########## @@ -426,168 +401,202 @@ public Map<Integer, String> getExceptions() { return _errCodeToExceptionMap; } - /** - * Serialize this data block to a byte array. - * <p> - * In order to deserialize it, {@link DataBlockUtils#getDataBlock(ByteBuffer)} should be used. - */ @Override - public byte[] toBytes() + public List<ByteBuffer> serialize() throws IOException { - ThreadResourceUsageProvider threadResourceUsageProvider = new ThreadResourceUsageProvider(); - - UnsynchronizedByteArrayOutputStream byteArrayOutputStream = new UnsynchronizedByteArrayOutputStream(8192); - DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); - writeLeadingSections(dataOutputStream); - - // Write metadata: length followed by actual metadata bytes. - // NOTE: We ignore metadata serialization time in "responseSerializationCpuTimeNs" as it's negligible while - // considering it will bring a lot code complexity. - serializeMetadata(dataOutputStream); - - return byteArrayOutputStream.toByteArray(); - } - - private void writeLeadingSections(DataOutputStream dataOutputStream) - throws IOException { - dataOutputStream.writeInt(getDataBlockVersionType()); - dataOutputStream.writeInt(_numRows); - dataOutputStream.writeInt(_numColumns); - int dataOffset = HEADER_SIZE; - - // Write exceptions section offset(START|SIZE). - dataOutputStream.writeInt(dataOffset); - byte[] exceptionsBytes; - exceptionsBytes = serializeExceptions(); - dataOutputStream.writeInt(exceptionsBytes.length); - dataOffset += exceptionsBytes.length; - - // Write dictionary map section offset(START|SIZE). - dataOutputStream.writeInt(dataOffset); - byte[] dictionaryBytes = null; - if (_stringDictionary != null) { - dictionaryBytes = serializeStringDictionary(); - dataOutputStream.writeInt(dictionaryBytes.length); - dataOffset += dictionaryBytes.length; - } else { - dataOutputStream.writeInt(0); - } - - // Write data schema section offset(START|SIZE). - dataOutputStream.writeInt(dataOffset); - byte[] dataSchemaBytes = null; - if (_dataSchema != null) { - dataSchemaBytes = _dataSchema.toBytes(); - dataOutputStream.writeInt(dataSchemaBytes.length); - dataOffset += dataSchemaBytes.length; - } else { - dataOutputStream.writeInt(0); + if (_serialized == null) { + _serialized = DataBlockUtils.serialize(this); Review Comment: > Seems like the idea in this PR is to decouple the DataBlock version and DataBlockSerDe version, which is counter intuitive to me. `DataBlock` has been used as the way data is represented in memory during `OpChain` execution AND the layout used to send data from one server to the other. That is what I'm trying to break here. My proposal is that `DataBlock` should be the object we use represent data in memory in a `OpChain`. The way we serialize that memory layout is independent and it is defined by the `DataBlockSerDe` version. Two servers in two different Pinot versions can have a different memory representation of the same information. The only serde we have right now mostly uses the same bytes in memory, so the memory and network representation is almost the same. But that may not be the case in the future. Another usage is in case we want to test a new serialization implementation that is binary compatible with the older one. We can have two `DataBlockSerDe` implementations for the same version and we can swap between one or the other whenever we want. This opens the ability to change the implementation used with a Pinot configuration but probably more useful, makes it easier to create a benchmark and to create tests that verify both implementations are compatible (we can create a unit test that serializes with one implementation and deserializes with the other). > Even if we want to maintain multiple ser-de versions so that we can try out new implementations, they should be under the same DataBlock version, where the serialized bytes remains the same. My suggestion is to forget about DataBlock version. If in future we have newer ways to represent a DataBlock (ie using Apache Arrow) we will need to define how to serialize and deserialize that new block, but we won't need to modify the data block version. In fact `DataBlock.getVersion()` and `BaseDataBlock.getDataBlockVersionType()` have been removed. > Say if we introduce a new DataBlock version with different bytes representation on the wire, I don't see how we can reuse the same DataBlockSerDe for it. That is not possible in this definition because the `DataBlock` do not define the wire representation. We could: 1. Create a new `DataBlock` that is serialized as one of the older ones in protocol version `V1_V2`. This will be probably be expensive, but it is required to support older versions. 2. Create a new `DataBlock` that is serialized and deserialized by a new `SerDe` in version `V3`. This will be more efficient, but can only be used to communicate two servers that understand this version. 3. Create a new `SerDe` that generates different wire representation for the `DataBlocks` we already have. This new `SerDe` should use a new version. 4. Create a new `SerDe` that generates the same wire representation for the `DataBlocks` we already have. This new `SerDe` could use the current `V1_V2` version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org