gortiz commented on code in PR #13303: URL: https://github.com/apache/pinot/pull/13303#discussion_r1738802125
########## pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java: ########## @@ -78,20 +106,119 @@ public static DataBlock.Type getType(int versionType) { return DataBlock.Type.fromOrdinal(versionType >> VERSION_TYPE_SHIFT); } - public static DataBlock getDataBlock(ByteBuffer byteBuffer) + public static List<ByteBuffer> serialize(DataBlock dataBlock) + throws IOException { + return serialize(DataBlockSerde.Version.V1_V2, dataBlock); + } + + @VisibleForTesting + public static List<ByteBuffer> serialize(DataBlockSerde.Version version, DataBlock dataBlock) + throws IOException { + + DataBlockSerde dataBlockSerde = SERDES.get(version); + if (dataBlockSerde == null) { + throw new UnsupportedOperationException("Unsupported data block version: " + version); + } + + DataBlock.Type dataBlockType = dataBlock.getDataBlockType(); + int firstInt = version.getVersion() + (dataBlockType.ordinal() << DataBlockUtils.VERSION_TYPE_SHIFT); + + DataBuffer dataBuffer = dataBlockSerde.serialize(dataBlock, firstInt); + + int readFirstByte; + if (dataBuffer.order() != ByteOrder.BIG_ENDIAN) { + readFirstByte = dataBuffer.view(0, 4, ByteOrder.BIG_ENDIAN).getInt(0); + } else { + readFirstByte = dataBuffer.getInt(0); + } + Preconditions.checkState(readFirstByte == firstInt, "Illegal serialization by {}. " + + "The first integer should be {} but is {} instead", dataBuffer.getClass().getName(), firstInt, readFirstByte); + + ArrayList<ByteBuffer> result = new ArrayList<>(); + dataBuffer.appendAsByteBuffers(result); + return result; + } + + /** + * Reads a data block from the given byte buffer. + * @param buffer the buffer to read from. The data will be read at the buffer's current position. This position will + * be updated to point to the end of the data block. + */ + public static DataBlock readFrom(ByteBuffer buffer) + throws IOException { + return deserialize(PinotByteBuffer.wrap(buffer), buffer.position(), newOffset -> { + if (newOffset > Integer.MAX_VALUE) { + throw new IllegalStateException("Data block is too large"); + } + buffer.position((int) newOffset); + }); + } + + /** + * Deserialize a list of byte buffers into a data block. + * Contrary to {@link #readFrom(ByteBuffer)}, the given buffers will not be modified. + */ + public static DataBlock deserialize(List<ByteBuffer> buffers) throws IOException { - int versionType = readVersionType(byteBuffer); - int version = getVersion(versionType); - DataBlock.Type type = getType(versionType); - switch (type) { - case COLUMNAR: - return new ColumnarDataBlock(byteBuffer); - case ROW: - return new RowDataBlock(byteBuffer); - case METADATA: - return MetadataBlock.deserialize(byteBuffer, version); - default: - throw new UnsupportedOperationException("Unsupported data table version: " + version + " with type: " + type); + List<DataBuffer> dataBuffers = buffers.stream() + .map(PinotByteBuffer::wrap) + .collect(Collectors.toList()); + try (CompoundDataBuffer compoundBuffer = new CompoundDataBuffer(dataBuffers, ByteOrder.BIG_ENDIAN, false)) { + return deserialize(compoundBuffer); + } + } + + /** + * Deserialize a list of byte buffers into a data block. + * Contrary to {@link #readFrom(ByteBuffer)}, the given buffers will not be modified. + */ + public static DataBlock deserialize(ByteBuffer[] buffers) + throws IOException { + try (CompoundDataBuffer compoundBuffer = new CompoundDataBuffer(buffers, ByteOrder.BIG_ENDIAN, false)) { + return deserialize(compoundBuffer); + } + } + + /** + * Deserialize a list of byte buffers into a data block. + * <p> + * Data will be read from the first byte of the buffer. Use {@link #deserialize(DataBuffer, long, LongConsumer)} + * in case it is needed to read from a different position. + */ + public static DataBlock deserialize(DataBuffer buffer) + throws IOException { + return deserialize(buffer, 0, null); + } + + /** + * Deserialize a list of byte buffers into a data block. + * @param buffer the buffer to read from. + * @param offset the offset in the buffer where the data starts. + * @param finalOffsetConsumer An optional consumer that will be called after the data block is deserialized. + * The consumer will receive the offset where the data block ends. + */ + public static DataBlock deserialize(DataBuffer buffer, long offset, @Nullable LongConsumer finalOffsetConsumer) + throws IOException { + int versionAndSubVersion = buffer.getInt(offset); + int version = getVersion(versionAndSubVersion); Review Comment: I think this was another ill decision we took in the past. It would be quite cleaner to just have a single version integer and assign one version to EOS and another version for errors. Instead we make the mistake of merging our implementation concept (EOS and errors are represented by the same object) and the network protocol. This kind of decisions are very common in the DataBlock codebase. That is why I wanted to create a clear separation between DataBlock (the atomic element of information used to communicate Operators in the same opChain) and the DataBlockSerde (how DataBlocks are serialized when different opChains are communicated). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org