This is an automated email from the ASF dual-hosted git repository. richardstartin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 44da1a098f DataTable deserialization improvements (#8499) 44da1a098f is described below commit 44da1a098fd47ffb533da6ca1386c64fb86e50a9 Author: Richard Startin <rich...@startree.ai> AuthorDate: Tue Apr 12 10:47:52 2022 +0100 DataTable deserialization improvements (#8499) * avoid allocating MetaDataKey[] * avoid excessive allocation in DataTable deserialization * remove unused overload of DataSchema.fromBytes, ensure new overload is tested, add comments about expectations of ByteBuffer arguments --- .../org/apache/pinot/common/utils/DataSchema.java | 27 +++---- .../org/apache/pinot/common/utils/DataTable.java | 11 +-- .../apache/pinot/common/utils/DataSchemaTest.java | 3 +- .../pinot/core/common/datatable/BaseDataTable.java | 17 ++--- .../core/common/datatable/DataTableImplV2.java | 25 ++----- .../core/common/datatable/DataTableImplV3.java | 82 +++++++++------------- .../core/common/datatable/DataTableUtils.java | 16 +++++ 7 files changed, 81 insertions(+), 100 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java index 2f461bdbbe..915a8110ef 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java @@ -22,12 +22,11 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonPropertyOrder; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.io.Serializable; +import java.nio.ByteBuffer; import java.sql.Timestamp; import java.util.Arrays; import java.util.EnumSet; @@ -176,35 +175,29 @@ public class DataSchema { return byteArrayOutputStream.toByteArray(); } - public static DataSchema fromBytes(byte[] buffer) + /** + * This method use relative operations on the ByteBuffer and expects the buffer's position to be set correctly. + */ + public static DataSchema fromBytes(ByteBuffer buffer) throws IOException { - ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(buffer); - DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream); - // Read the number of columns. - int numColumns = dataInputStream.readInt(); + int numColumns = buffer.getInt(); String[] columnNames = new String[numColumns]; ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns]; - // Read the column names. - int readLength; for (int i = 0; i < numColumns; i++) { - int length = dataInputStream.readInt(); + int length = buffer.getInt(); byte[] bytes = new byte[length]; - readLength = dataInputStream.read(bytes); - assert readLength == length; + buffer.get(bytes); columnNames[i] = new String(bytes, UTF_8); } - // Read the column types. for (int i = 0; i < numColumns; i++) { - int length = dataInputStream.readInt(); + int length = buffer.getInt(); byte[] bytes = new byte[length]; - readLength = dataInputStream.read(bytes); - assert readLength == length; + buffer.get(bytes); columnDataTypes[i] = ColumnDataType.valueOf(new String(bytes, UTF_8)); } - return new DataSchema(columnNames, columnDataTypes); } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java index 256d217299..f73ab72c2f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java @@ -19,6 +19,7 @@ package org.apache.pinot.common.utils; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; import javax.annotation.Nullable; @@ -109,6 +110,7 @@ public interface DataTable { SYSTEM_ACTIVITIES_CPU_TIME_NS("systemActivitiesCpuTimeNs", MetadataValueType.LONG), RESPONSE_SER_CPU_TIME_NS("responseSerializationCpuTimeNs", MetadataValueType.LONG); + private static final MetadataKey[] VALUES; private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new HashMap<>(); private final String _name; private final MetadataValueType _valueType; @@ -121,10 +123,7 @@ public interface DataTable { // getByOrdinal returns an enum key for a given ordinal or null if the key does not exist. @Nullable public static MetadataKey getByOrdinal(int ordinal) { - if (ordinal >= MetadataKey.values().length) { - return null; - } - return MetadataKey.values()[ordinal]; + return VALUES[Math.min(ordinal, VALUES.length - 1)]; } // getByName returns an enum key for a given name or null if the key does not exist. @@ -143,11 +142,13 @@ public interface DataTable { } static { - for (MetadataKey key : MetadataKey.values()) { + MetadataKey[] values = values(); + for (MetadataKey key : values) { if (NAME_TO_ENUM_KEY_MAP.put(key.getName(), key) != null) { throw new IllegalArgumentException("Duplicate name defined in the MetadataKey definition: " + key.getName()); } } + VALUES = Arrays.copyOf(values, values.length + 1); } } } diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java index 04355b8fc0..ec18998161 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.common.utils; +import java.nio.ByteBuffer; import java.util.Arrays; import org.apache.pinot.spi.data.FieldSpec; import org.testng.Assert; @@ -65,7 +66,7 @@ public class DataSchemaTest { public void testSerDe() throws Exception { DataSchema dataSchema = new DataSchema(COLUMN_NAMES, COLUMN_DATA_TYPES); - DataSchema dataSchemaAfterSerDe = DataSchema.fromBytes(dataSchema.toBytes()); + DataSchema dataSchemaAfterSerDe = DataSchema.fromBytes(ByteBuffer.wrap(dataSchema.toBytes())); Assert.assertEquals(dataSchema, dataSchemaAfterSerDe); Assert.assertEquals(dataSchema.hashCode(), dataSchemaAfterSerDe.hashCode()); } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java index 941ffe382a..24d113bd7d 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java @@ -18,9 +18,7 @@ */ package org.apache.pinot.core.common.datatable; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -115,27 +113,24 @@ public abstract class BaseDataTable implements DataTable { /** * Helper method to deserialize dictionary map. */ - protected Map<String, Map<Integer, String>> deserializeDictionaryMap(byte[] bytes) + protected Map<String, Map<Integer, String>> deserializeDictionaryMap(ByteBuffer buffer) throws IOException { - try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); - DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) { - int numDictionaries = dataInputStream.readInt(); + int numDictionaries = buffer.getInt(); Map<String, Map<Integer, String>> dictionaryMap = new HashMap<>(numDictionaries); for (int i = 0; i < numDictionaries; i++) { - String column = DataTableUtils.decodeString(dataInputStream); - int dictionarySize = dataInputStream.readInt(); + String column = DataTableUtils.decodeString(buffer); + int dictionarySize = buffer.getInt(); Map<Integer, String> dictionary = new HashMap<>(dictionarySize); for (int j = 0; j < dictionarySize; j++) { - int key = dataInputStream.readInt(); - String value = DataTableUtils.decodeString(dataInputStream); + int key = buffer.getInt(); + String value = DataTableUtils.decodeString(buffer); dictionary.put(key, value); } dictionaryMap.put(column, dictionary); } return dictionaryMap; - } } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java index 924ff24bd1..1340b45e19 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java @@ -18,9 +18,7 @@ */ package org.apache.pinot.core.common.datatable; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -79,26 +77,20 @@ public class DataTableImplV2 extends BaseDataTable { // Read dictionary. if (dictionaryMapLength != 0) { - byte[] dictionaryMapBytes = new byte[dictionaryMapLength]; byteBuffer.position(dictionaryMapStart); - byteBuffer.get(dictionaryMapBytes); - _dictionaryMap = deserializeDictionaryMap(dictionaryMapBytes); + _dictionaryMap = deserializeDictionaryMap(byteBuffer); } else { _dictionaryMap = null; } // Read metadata. - byte[] metadataBytes = new byte[metadataLength]; byteBuffer.position(metadataStart); - byteBuffer.get(metadataBytes); - _metadata = deserializeMetadata(metadataBytes); + _metadata = deserializeMetadata(byteBuffer); // Read data schema. if (dataSchemaLength != 0) { - byte[] schemaBytes = new byte[dataSchemaLength]; byteBuffer.position(dataSchemaStart); - byteBuffer.get(schemaBytes); - _dataSchema = DataSchema.fromBytes(schemaBytes); + _dataSchema = DataSchema.fromBytes(byteBuffer); _columnOffsets = new int[_dataSchema.size()]; _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, _columnOffsets); } else { @@ -130,21 +122,18 @@ public class DataTableImplV2 extends BaseDataTable { } } - private Map<String, String> deserializeMetadata(byte[] bytes) + private Map<String, String> deserializeMetadata(ByteBuffer buffer) throws IOException { - try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); - DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) { - int numEntries = dataInputStream.readInt(); + int numEntries = buffer.getInt(); Map<String, String> metadata = new HashMap<>(numEntries); for (int i = 0; i < numEntries; i++) { - String key = DataTableUtils.decodeString(dataInputStream); - String value = DataTableUtils.decodeString(dataInputStream); + String key = DataTableUtils.decodeString(buffer); + String value = DataTableUtils.decodeString(buffer); metadata.put(key, value); } return metadata; - } } @Override diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java index 97512357c7..7838c362fe 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java @@ -21,9 +21,7 @@ package org.apache.pinot.core.common.datatable; import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; -import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; -import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; import java.nio.ByteBuffer; @@ -111,30 +109,24 @@ public class DataTableImplV3 extends BaseDataTable { // Read exceptions. if (exceptionsLength != 0) { - byte[] exceptionsBytes = new byte[exceptionsLength]; byteBuffer.position(exceptionsStart); - byteBuffer.get(exceptionsBytes); - _errCodeToExceptionMap = deserializeExceptions(exceptionsBytes); + _errCodeToExceptionMap = deserializeExceptions(byteBuffer); } else { _errCodeToExceptionMap = new HashMap<>(); } // Read dictionary. if (dictionaryMapLength != 0) { - byte[] dictionaryMapBytes = new byte[dictionaryMapLength]; byteBuffer.position(dictionaryMapStart); - byteBuffer.get(dictionaryMapBytes); - _dictionaryMap = deserializeDictionaryMap(dictionaryMapBytes); + _dictionaryMap = deserializeDictionaryMap(byteBuffer); } else { _dictionaryMap = null; } // Read data schema. if (dataSchemaLength != 0) { - byte[] schemaBytes = new byte[dataSchemaLength]; byteBuffer.position(dataSchemaStart); - byteBuffer.get(schemaBytes); - _dataSchema = DataSchema.fromBytes(schemaBytes); + _dataSchema = DataSchema.fromBytes(byteBuffer); _columnOffsets = new int[_dataSchema.size()]; _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, _columnOffsets); } else { @@ -168,9 +160,7 @@ public class DataTableImplV3 extends BaseDataTable { // Read metadata. int metadataLength = byteBuffer.getInt(); if (metadataLength != 0) { - byte[] metadataBytes = new byte[metadataLength]; - byteBuffer.get(metadataBytes); - _metadata = deserializeMetadata(metadataBytes); + _metadata = deserializeMetadata(byteBuffer); } } @@ -349,33 +339,32 @@ public class DataTableImplV3 extends BaseDataTable { * DataTable from each server and aggregates the values). * This is to make V3 implementation keep the consumers of Map<String, String> getMetadata() API in the code happy * by internally converting it. + * + * This method use relative operations on the ByteBuffer and expects the buffer's position to be set correctly. */ - private Map<String, String> deserializeMetadata(byte[] bytes) + private Map<String, String> deserializeMetadata(ByteBuffer buffer) throws IOException { - try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); - DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) { - int numEntries = dataInputStream.readInt(); - Map<String, String> metadata = new HashMap<>(); - for (int i = 0; i < numEntries; i++) { - int keyId = dataInputStream.readInt(); - MetadataKey key = MetadataKey.getByOrdinal(keyId); - // Ignore unknown keys. - if (key == null) { - continue; - } - if (key.getValueType() == MetadataValueType.INT) { - String value = String.valueOf(DataTableUtils.decodeInt(dataInputStream)); - metadata.put(key.getName(), value); - } else if (key.getValueType() == MetadataValueType.LONG) { - String value = String.valueOf(DataTableUtils.decodeLong(dataInputStream)); - metadata.put(key.getName(), value); - } else { - String value = String.valueOf(DataTableUtils.decodeString(dataInputStream)); - metadata.put(key.getName(), value); - } + int numEntries = buffer.getInt(); + Map<String, String> metadata = new HashMap<>(); + for (int i = 0; i < numEntries; i++) { + int keyId = buffer.getInt(); + MetadataKey key = MetadataKey.getByOrdinal(keyId); + // Ignore unknown keys. + if (key == null) { + continue; + } + if (key.getValueType() == MetadataValueType.INT) { + String value = "" + buffer.getInt(); + metadata.put(key.getName(), value); + } else if (key.getValueType() == MetadataValueType.LONG) { + String value = "" + buffer.getLong(); + metadata.put(key.getName(), value); + } else { + String value = DataTableUtils.decodeString(buffer); + metadata.put(key.getName(), value); } - return metadata; } + return metadata; } private byte[] serializeExceptions() @@ -397,18 +386,15 @@ public class DataTableImplV3 extends BaseDataTable { return byteArrayOutputStream.toByteArray(); } - private Map<Integer, String> deserializeExceptions(byte[] bytes) + private Map<Integer, String> deserializeExceptions(ByteBuffer buffer) throws IOException { - try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes); - DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream)) { - int numExceptions = dataInputStream.readInt(); - Map<Integer, String> exceptions = new HashMap<>(numExceptions); - for (int i = 0; i < numExceptions; i++) { - int errCode = dataInputStream.readInt(); - String errMessage = DataTableUtils.decodeString(dataInputStream); - exceptions.put(errCode, errMessage); - } - return exceptions; + int numExceptions = buffer.getInt(); + Map<Integer, String> exceptions = new HashMap<>(numExceptions); + for (int i = 0; i < numExceptions; i++) { + int errCode = buffer.getInt(); + String errMessage = DataTableUtils.decodeString(buffer); + exceptions.put(errCode, errMessage); } + return exceptions; } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java index 632e6187ef..606a8d8fb4 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java @@ -22,6 +22,7 @@ import com.google.common.primitives.Ints; import com.google.common.primitives.Longs; import java.io.DataInputStream; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Arrays; import java.util.Collections; import java.util.List; @@ -257,6 +258,21 @@ public class DataTableUtils { } } + /** + * Helper method to decode string. + */ + public static String decodeString(ByteBuffer buffer) + throws IOException { + int length = buffer.getInt(); + if (length == 0) { + return ""; + } else { + byte[] bytes = new byte[length]; + buffer.get(bytes); + return new String(bytes, UTF_8); + } + } + /** * Helper method to decode int. */ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org