gortiz commented on code in PR #13303: URL: https://github.com/apache/pinot/pull/13303#discussion_r1738735458
########## pinot-common/src/main/java/org/apache/pinot/common/datablock/ZeroCopyDataBlockSerde.java: ########## @@ -0,0 +1,467 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.datablock; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.LongConsumer; +import javax.annotation.Nullable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.HashUtil; +import org.apache.pinot.segment.spi.memory.CompoundDataBuffer; +import org.apache.pinot.segment.spi.memory.DataBuffer; +import org.apache.pinot.segment.spi.memory.PagedPinotOutputStream; +import org.apache.pinot.segment.spi.memory.PinotByteBuffer; +import org.apache.pinot.segment.spi.memory.PinotInputStream; +import org.apache.pinot.segment.spi.memory.PinotOutputStream; + + +/** + * An efficient serde that implements {@link DataBlockSerde.Version#V1_V2} using trying to make as fewer copies as + * possible. + */ +public class ZeroCopyDataBlockSerde implements DataBlockSerde { + + private final PagedPinotOutputStream.PageAllocator _allocator; + + public ZeroCopyDataBlockSerde() { + _allocator = PagedPinotOutputStream.HeapPageAllocator.createSmall(); + } + + public ZeroCopyDataBlockSerde(PagedPinotOutputStream.PageAllocator allocator) { + _allocator = allocator; + } + + public DataBuffer serialize(DataBlock dataBlock, int firstInt) + throws IOException { + Header header = new Header(firstInt, dataBlock.getNumberOfRows(), dataBlock.getNumberOfColumns()); + + CompoundDataBuffer.Builder builder = new CompoundDataBuffer.Builder(ByteOrder.BIG_ENDIAN, false); + + ByteBuffer headerBuffer = ByteBuffer.allocate(Header.BYTES); + builder.addBuffer(PinotByteBuffer.wrap(headerBuffer)); + + try (PagedPinotOutputStream into = new PagedPinotOutputStream(_allocator)) { + serializeExceptions(dataBlock, into); + header._exceptionsLength = (int) into.getCurrentOffset(); + serializeDictionary(dataBlock, into); + header._dictionaryLength = (int) into.getCurrentOffset() - header._exceptionsLength; + serializeDataSchema(dataBlock, into); + header._dataSchemaLength = (int) into.getCurrentOffset() - header._exceptionsLength - header._dictionaryLength; + + builder.addPagedOutputStream(into); + } + + DataBuffer fixedData = dataBlock.getFixedData(); + if (fixedData != null) { + builder.addBuffer(fixedData); + header._fixedSizeDataLength = (int) fixedData.size(); + } + + DataBuffer varSizeData = dataBlock.getVarSizeData(); + if (varSizeData != null) { + builder.addBuffer(varSizeData); + header._variableSizeDataLength = (int) varSizeData.size(); + } + + try (PagedPinotOutputStream into = new PagedPinotOutputStream(_allocator)) { + serializeMetadata(dataBlock, into, header); + builder.addPagedOutputStream(into); + } + + header.updateStarts(); + header.serialize(headerBuffer); + headerBuffer.flip(); + + return builder.build(); + } + + /** + * Serialize the exceptions map into the {@link PinotOutputStream} current position. + * <p> + * the header object is modified to update {@link Header#_exceptionsLength} + */ + private void serializeExceptions(DataBlock dataBlock, PinotOutputStream into) + throws IOException { + + Map<Integer, String> errCodeToExceptionMap = dataBlock.getExceptions(); + if (errCodeToExceptionMap == null || errCodeToExceptionMap.isEmpty()) { + return; + } + + into.writeInt(errCodeToExceptionMap.size()); + for (Map.Entry<Integer, String> entry : errCodeToExceptionMap.entrySet()) { + into.writeInt(entry.getKey()); + into.writeInt4String(entry.getValue()); + } + } + + private static void serializeDictionary(DataBlock dataBlock, PinotOutputStream into) + throws IOException { + String[] stringDictionary = dataBlock.getStringDictionary(); + if (stringDictionary == null) { + // TODO: probably we can also do this when length is 0 + return; + } + + into.writeInt(stringDictionary.length); + for (String entry : stringDictionary) { + into.writeInt4String(entry); + } + } + + private static void serializeDataSchema(DataBlock dataBlock, PinotOutputStream into) + throws IOException { + DataSchema dataSchema = dataBlock.getDataSchema(); + if (dataSchema == null) { + return; + } + + // TODO: This is actually allocating, we can improve it in the future if it is problematic + byte[] bytes = dataSchema.toBytes(); + into.write(bytes); + } + + private static void serializeMetadata(DataBlock dataBlock, PinotOutputStream into, Header header) + throws IOException { + header._metadataStart = (int) into.getCurrentOffset(); + if (!(dataBlock instanceof MetadataBlock)) { + into.writeInt(0); + return; + } + List<DataBuffer> statsByStage = dataBlock.getStatsByStage(); + if (statsByStage == null) { + into.writeInt(0); + return; + } + int size = statsByStage.size(); + into.writeInt(size); + if (size > 0) { + for (DataBuffer stat : statsByStage) { + if (stat == null) { + into.writeBoolean(false); + } else { + into.writeBoolean(true); + if (stat.size() > Integer.MAX_VALUE) { + throw new IOException("Stat size is too large to serialize"); + } + into.writeInt((int) stat.size()); + into.write(stat); + } + } + } + } + + @Override + public DataBlock deserialize(DataBuffer buffer, long offset, DataBlock.Type type, + @Nullable LongConsumer finalOffsetConsumer) + throws IOException { + try (PinotInputStream stream = buffer.openInputStream(offset)) { + Header header = Header.deserialize(stream); + + if (finalOffsetConsumer != null) { + finalOffsetConsumer.accept(offset + calculateEndOffset(buffer, header)); + } + + switch (type) { + case COLUMNAR: + return new ColumnarDataBlock(header._numRows, deserializeDataSchema(stream, header), + deserializeDictionary(stream, header), + bufferView(buffer, header._fixedSizeDataStart + offset, header._fixedSizeDataLength), + bufferView(buffer, header._variableSizeDataStart + offset, header._variableSizeDataLength)); + case ROW: + return new RowDataBlock(header._numRows, deserializeDataSchema(stream, header), + deserializeDictionary(stream, header), + bufferView(buffer, header._fixedSizeDataStart + offset, header._fixedSizeDataLength), + bufferView(buffer, header._variableSizeDataStart + offset, header._variableSizeDataLength)); + case METADATA: { + Map<Integer, String> exceptions = deserializeExceptions(stream, header); + if (!exceptions.isEmpty()) { + return MetadataBlock.newError(exceptions); + } else { + List<DataBuffer> metadata = deserializeMetadata(buffer, header); + return new MetadataBlock(metadata); + } + } + default: + throw new UnsupportedOperationException("Unsupported data table version: " + getVersion() + " with type: " + + type); + } + } + } + + private static List<DataBuffer> deserializeMetadata(DataBuffer buffer, Header header) { + long currentOffset = header._metadataStart; + int statsSize = buffer.getInt(currentOffset); + currentOffset += Integer.BYTES; + + List<DataBuffer> stats = new ArrayList<>(statsSize); + + for (int i = 0; i < statsSize; i++) { + boolean isPresent = buffer.getByte(currentOffset) != 0; + currentOffset += Byte.BYTES; + if (isPresent) { + int length = buffer.getInt(currentOffset); + currentOffset += Integer.BYTES; + stats.add(buffer.view(currentOffset, currentOffset + length)); + currentOffset += length; + } else { + stats.add(null); + } + } + return stats; + } + + private long calculateEndOffset(DataBuffer buffer, Header header) { + long currentOffset = header._metadataStart; + int statsSize = buffer.getInt(currentOffset); + currentOffset += Integer.BYTES; + + for (int i = 0; i < statsSize; i++) { + boolean isPresent = buffer.getByte(currentOffset) != 0; + currentOffset += Byte.BYTES; + if (isPresent) { + int length = buffer.getInt(currentOffset); + currentOffset += Integer.BYTES; + currentOffset += length; + } + } + return currentOffset; + } + + @VisibleForTesting + static Map<Integer, String> deserializeExceptions(PinotInputStream stream, Header header) + throws IOException { + if (header._exceptionsLength == 0) { + return new HashMap<>(); + } + stream.seek(header.getExceptionsStart()); + int numExceptions = stream.readInt(); + Map<Integer, String> exceptions = new HashMap<>(HashUtil.getHashMapCapacity(numExceptions)); + for (int i = 0; i < numExceptions; i++) { + int errCode = stream.readInt(); + String errMessage = stream.readInt4UTF(); + exceptions.put(errCode, errMessage); + } + return exceptions; + } + + private DataBuffer bufferView(DataBuffer buffer, long offset, int length) { + if (length == 0) { + return PinotByteBuffer.empty(); + } + return buffer.view(offset, offset + length, ByteOrder.BIG_ENDIAN); + } + + private static String[] deserializeDictionary(PinotInputStream stream, Header header) + throws IOException { + if (header._dictionaryLength == 0) { + return new String[0]; + } + stream.seek(header._dictionaryStart); + + int dictionarySize = stream.readInt(); + String[] stringDictionary = new String[dictionarySize]; + for (int i = 0; i < dictionarySize; i++) { + stringDictionary[i] = stream.readInt4UTF(); + } + return stringDictionary; + } + + private static DataSchema deserializeDataSchema(PinotInputStream stream, Header header) + throws IOException { + if (header._dataSchemaLength == 0) { + return null; + } + stream.seek(header._dataSchemaStart); + return DataSchema.fromBytes(stream); + } + + @Override + public Version getVersion() { + return Version.V1_V2; + } + + static class Header { + static final int BYTES = 13 * Integer.BYTES; + + // first int is used for versioning + int _firstInt; + int _numRows; + int _numColumns; + int _exceptionsLength; + int _dictionaryStart; + int _dictionaryLength; + int _dataSchemaStart; + int _dataSchemaLength; + int _fixedSizeDataStart; + int _fixedSizeDataLength; + int _variableSizeDataStart; + int _variableSizeDataLength; + int _metadataStart; + + public Header(int firstInt, int numRows, int numColumns) { + _firstInt = firstInt; + _numRows = numRows; + _numColumns = numColumns; + } + + public Header(int firstInt, int numRows, int numColumns, int exceptionsLength, int dictionaryStart, + int dictionaryLength, int dataSchemaStart, int dataSchemaLength, int fixedSizeDataStart, + int fixedSizeDataLength, int variableSizeDataStart, int variableSizeDataLength, int metadataStart) { + _firstInt = firstInt; + _numRows = numRows; + _numColumns = numColumns; + _exceptionsLength = exceptionsLength; + _dictionaryStart = dictionaryStart; + _dictionaryLength = dictionaryLength; + _dataSchemaStart = dataSchemaStart; + _dataSchemaLength = dataSchemaLength; + _fixedSizeDataStart = fixedSizeDataStart; + _fixedSizeDataLength = fixedSizeDataLength; + _variableSizeDataStart = variableSizeDataStart; + _variableSizeDataLength = variableSizeDataLength; + _metadataStart = metadataStart; + } + + public void serialize(ByteBuffer into) { + Preconditions.checkState(into.remaining() >= BYTES, "Buffer does not have enough space to " + + "serialize the header"); + into.putInt(_firstInt); + into.putInt(_numRows); + into.putInt(_numColumns); + + // offsets are stored first and we need to add the header offset + into.putInt(BYTES); // exceptions start Review Comment: Good catch. I thought the same. But that is what the older code was doing and we need to keep backward compatibility. We could also remove all (or almost all) starts and calculate them at deserialize time by adding the lengths. In general we should think about better encodings for future versions, but given usually these blocks are pretty large, I don't think these integers will be very expensive in comparison to other issues we have (like always using dictionaries for Strings). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org