This is an automated email from the ASF dual-hosted git repository.

richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 44da1a098f DataTable deserialization improvements (#8499)
44da1a098f is described below

commit 44da1a098fd47ffb533da6ca1386c64fb86e50a9
Author: Richard Startin <rich...@startree.ai>
AuthorDate: Tue Apr 12 10:47:52 2022 +0100

    DataTable deserialization improvements (#8499)
    
    * avoid allocating MetaDataKey[]
    
    * avoid excessive allocation in DataTable deserialization
    
    * remove unused overload of DataSchema.fromBytes, ensure new overload is 
tested, add comments about expectations of ByteBuffer arguments
---
 .../org/apache/pinot/common/utils/DataSchema.java  | 27 +++----
 .../org/apache/pinot/common/utils/DataTable.java   | 11 +--
 .../apache/pinot/common/utils/DataSchemaTest.java  |  3 +-
 .../pinot/core/common/datatable/BaseDataTable.java | 17 ++---
 .../core/common/datatable/DataTableImplV2.java     | 25 ++-----
 .../core/common/datatable/DataTableImplV3.java     | 82 +++++++++-------------
 .../core/common/datatable/DataTableUtils.java      | 16 +++++
 7 files changed, 81 insertions(+), 100 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
index 2f461bdbbe..915a8110ef 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataSchema.java
@@ -22,12 +22,11 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonPropertyOrder;
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.ByteBuffer;
 import java.sql.Timestamp;
 import java.util.Arrays;
 import java.util.EnumSet;
@@ -176,35 +175,29 @@ public class DataSchema {
     return byteArrayOutputStream.toByteArray();
   }
 
-  public static DataSchema fromBytes(byte[] buffer)
+  /**
+   * This method use relative operations on the ByteBuffer and expects the 
buffer's position to be set correctly.
+   */
+  public static DataSchema fromBytes(ByteBuffer buffer)
       throws IOException {
-    ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(buffer);
-    DataInputStream dataInputStream = new 
DataInputStream(byteArrayInputStream);
-
     // Read the number of columns.
-    int numColumns = dataInputStream.readInt();
+    int numColumns = buffer.getInt();
     String[] columnNames = new String[numColumns];
     ColumnDataType[] columnDataTypes = new ColumnDataType[numColumns];
-
     // Read the column names.
-    int readLength;
     for (int i = 0; i < numColumns; i++) {
-      int length = dataInputStream.readInt();
+      int length = buffer.getInt();
       byte[] bytes = new byte[length];
-      readLength = dataInputStream.read(bytes);
-      assert readLength == length;
+      buffer.get(bytes);
       columnNames[i] = new String(bytes, UTF_8);
     }
-
     // Read the column types.
     for (int i = 0; i < numColumns; i++) {
-      int length = dataInputStream.readInt();
+      int length = buffer.getInt();
       byte[] bytes = new byte[length];
-      readLength = dataInputStream.read(bytes);
-      assert readLength == length;
+      buffer.get(bytes);
       columnDataTypes[i] = ColumnDataType.valueOf(new String(bytes, UTF_8));
     }
-
     return new DataSchema(columnNames, columnDataTypes);
   }
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
index 256d217299..f73ab72c2f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/DataTable.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.common.utils;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import javax.annotation.Nullable;
@@ -109,6 +110,7 @@ public interface DataTable {
     SYSTEM_ACTIVITIES_CPU_TIME_NS("systemActivitiesCpuTimeNs", 
MetadataValueType.LONG),
     RESPONSE_SER_CPU_TIME_NS("responseSerializationCpuTimeNs", 
MetadataValueType.LONG);
 
+    private static final MetadataKey[] VALUES;
     private static final Map<String, MetadataKey> NAME_TO_ENUM_KEY_MAP = new 
HashMap<>();
     private final String _name;
     private final MetadataValueType _valueType;
@@ -121,10 +123,7 @@ public interface DataTable {
     // getByOrdinal returns an enum key for a given ordinal or null if the key 
does not exist.
     @Nullable
     public static MetadataKey getByOrdinal(int ordinal) {
-      if (ordinal >= MetadataKey.values().length) {
-        return null;
-      }
-      return MetadataKey.values()[ordinal];
+      return VALUES[Math.min(ordinal, VALUES.length - 1)];
     }
 
     // getByName returns an enum key for a given name or null if the key does 
not exist.
@@ -143,11 +142,13 @@ public interface DataTable {
     }
 
     static {
-      for (MetadataKey key : MetadataKey.values()) {
+      MetadataKey[] values = values();
+      for (MetadataKey key : values) {
         if (NAME_TO_ENUM_KEY_MAP.put(key.getName(), key) != null) {
           throw new IllegalArgumentException("Duplicate name defined in the 
MetadataKey definition: " + key.getName());
         }
       }
+      VALUES = Arrays.copyOf(values, values.length + 1);
     }
   }
 }
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
index 04355b8fc0..ec18998161 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/DataSchemaTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.utils;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.testng.Assert;
@@ -65,7 +66,7 @@ public class DataSchemaTest {
   public void testSerDe()
       throws Exception {
     DataSchema dataSchema = new DataSchema(COLUMN_NAMES, COLUMN_DATA_TYPES);
-    DataSchema dataSchemaAfterSerDe = 
DataSchema.fromBytes(dataSchema.toBytes());
+    DataSchema dataSchemaAfterSerDe = 
DataSchema.fromBytes(ByteBuffer.wrap(dataSchema.toBytes()));
     Assert.assertEquals(dataSchema, dataSchemaAfterSerDe);
     Assert.assertEquals(dataSchema.hashCode(), 
dataSchemaAfterSerDe.hashCode());
   }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
index 941ffe382a..24d113bd7d 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/BaseDataTable.java
@@ -18,9 +18,7 @@
  */
 package org.apache.pinot.core.common.datatable;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -115,27 +113,24 @@ public abstract class BaseDataTable implements DataTable {
   /**
    * Helper method to deserialize dictionary map.
    */
-  protected Map<String, Map<Integer, String>> deserializeDictionaryMap(byte[] 
bytes)
+  protected Map<String, Map<Integer, String>> 
deserializeDictionaryMap(ByteBuffer buffer)
       throws IOException {
-    try (ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(bytes);
-        DataInputStream dataInputStream = new 
DataInputStream(byteArrayInputStream)) {
-      int numDictionaries = dataInputStream.readInt();
+      int numDictionaries = buffer.getInt();
       Map<String, Map<Integer, String>> dictionaryMap = new 
HashMap<>(numDictionaries);
 
       for (int i = 0; i < numDictionaries; i++) {
-        String column = DataTableUtils.decodeString(dataInputStream);
-        int dictionarySize = dataInputStream.readInt();
+        String column = DataTableUtils.decodeString(buffer);
+        int dictionarySize = buffer.getInt();
         Map<Integer, String> dictionary = new HashMap<>(dictionarySize);
         for (int j = 0; j < dictionarySize; j++) {
-          int key = dataInputStream.readInt();
-          String value = DataTableUtils.decodeString(dataInputStream);
+          int key = buffer.getInt();
+          String value = DataTableUtils.decodeString(buffer);
           dictionary.put(key, value);
         }
         dictionaryMap.put(column, dictionary);
       }
 
       return dictionaryMap;
-    }
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
index 924ff24bd1..1340b45e19 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV2.java
@@ -18,9 +18,7 @@
  */
 package org.apache.pinot.core.common.datatable;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -79,26 +77,20 @@ public class DataTableImplV2 extends BaseDataTable {
 
     // Read dictionary.
     if (dictionaryMapLength != 0) {
-      byte[] dictionaryMapBytes = new byte[dictionaryMapLength];
       byteBuffer.position(dictionaryMapStart);
-      byteBuffer.get(dictionaryMapBytes);
-      _dictionaryMap = deserializeDictionaryMap(dictionaryMapBytes);
+      _dictionaryMap = deserializeDictionaryMap(byteBuffer);
     } else {
       _dictionaryMap = null;
     }
 
     // Read metadata.
-    byte[] metadataBytes = new byte[metadataLength];
     byteBuffer.position(metadataStart);
-    byteBuffer.get(metadataBytes);
-    _metadata = deserializeMetadata(metadataBytes);
+    _metadata = deserializeMetadata(byteBuffer);
 
     // Read data schema.
     if (dataSchemaLength != 0) {
-      byte[] schemaBytes = new byte[dataSchemaLength];
       byteBuffer.position(dataSchemaStart);
-      byteBuffer.get(schemaBytes);
-      _dataSchema = DataSchema.fromBytes(schemaBytes);
+      _dataSchema = DataSchema.fromBytes(byteBuffer);
       _columnOffsets = new int[_dataSchema.size()];
       _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, 
_columnOffsets);
     } else {
@@ -130,21 +122,18 @@ public class DataTableImplV2 extends BaseDataTable {
     }
   }
 
-  private Map<String, String> deserializeMetadata(byte[] bytes)
+  private Map<String, String> deserializeMetadata(ByteBuffer buffer)
       throws IOException {
-    try (ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(bytes);
-        DataInputStream dataInputStream = new 
DataInputStream(byteArrayInputStream)) {
-      int numEntries = dataInputStream.readInt();
+      int numEntries = buffer.getInt();
       Map<String, String> metadata = new HashMap<>(numEntries);
 
       for (int i = 0; i < numEntries; i++) {
-        String key = DataTableUtils.decodeString(dataInputStream);
-        String value = DataTableUtils.decodeString(dataInputStream);
+        String key = DataTableUtils.decodeString(buffer);
+        String value = DataTableUtils.decodeString(buffer);
         metadata.put(key, value);
       }
 
       return metadata;
-    }
   }
 
   @Override
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
index 97512357c7..7838c362fe 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableImplV3.java
@@ -21,9 +21,7 @@ package org.apache.pinot.core.common.datatable;
 
 import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -111,30 +109,24 @@ public class DataTableImplV3 extends BaseDataTable {
 
     // Read exceptions.
     if (exceptionsLength != 0) {
-      byte[] exceptionsBytes = new byte[exceptionsLength];
       byteBuffer.position(exceptionsStart);
-      byteBuffer.get(exceptionsBytes);
-      _errCodeToExceptionMap = deserializeExceptions(exceptionsBytes);
+      _errCodeToExceptionMap = deserializeExceptions(byteBuffer);
     } else {
       _errCodeToExceptionMap = new HashMap<>();
     }
 
     // Read dictionary.
     if (dictionaryMapLength != 0) {
-      byte[] dictionaryMapBytes = new byte[dictionaryMapLength];
       byteBuffer.position(dictionaryMapStart);
-      byteBuffer.get(dictionaryMapBytes);
-      _dictionaryMap = deserializeDictionaryMap(dictionaryMapBytes);
+      _dictionaryMap = deserializeDictionaryMap(byteBuffer);
     } else {
       _dictionaryMap = null;
     }
 
     // Read data schema.
     if (dataSchemaLength != 0) {
-      byte[] schemaBytes = new byte[dataSchemaLength];
       byteBuffer.position(dataSchemaStart);
-      byteBuffer.get(schemaBytes);
-      _dataSchema = DataSchema.fromBytes(schemaBytes);
+      _dataSchema = DataSchema.fromBytes(byteBuffer);
       _columnOffsets = new int[_dataSchema.size()];
       _rowSizeInBytes = DataTableUtils.computeColumnOffsets(_dataSchema, 
_columnOffsets);
     } else {
@@ -168,9 +160,7 @@ public class DataTableImplV3 extends BaseDataTable {
     // Read metadata.
     int metadataLength = byteBuffer.getInt();
     if (metadataLength != 0) {
-      byte[] metadataBytes = new byte[metadataLength];
-      byteBuffer.get(metadataBytes);
-      _metadata = deserializeMetadata(metadataBytes);
+      _metadata = deserializeMetadata(byteBuffer);
     }
   }
 
@@ -349,33 +339,32 @@ public class DataTableImplV3 extends BaseDataTable {
    * DataTable from each server and aggregates the values).
    * This is to make V3 implementation keep the consumers of Map<String, 
String> getMetadata() API in the code happy
    * by internally converting it.
+   *
+   * This method use relative operations on the ByteBuffer and expects the 
buffer's position to be set correctly.
    */
-  private Map<String, String> deserializeMetadata(byte[] bytes)
+  private Map<String, String> deserializeMetadata(ByteBuffer buffer)
       throws IOException {
-    try (ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(bytes);
-        DataInputStream dataInputStream = new 
DataInputStream(byteArrayInputStream)) {
-      int numEntries = dataInputStream.readInt();
-      Map<String, String> metadata = new HashMap<>();
-      for (int i = 0; i < numEntries; i++) {
-        int keyId = dataInputStream.readInt();
-        MetadataKey key = MetadataKey.getByOrdinal(keyId);
-        // Ignore unknown keys.
-        if (key == null) {
-          continue;
-        }
-        if (key.getValueType() == MetadataValueType.INT) {
-          String value = 
String.valueOf(DataTableUtils.decodeInt(dataInputStream));
-          metadata.put(key.getName(), value);
-        } else if (key.getValueType() == MetadataValueType.LONG) {
-          String value = 
String.valueOf(DataTableUtils.decodeLong(dataInputStream));
-          metadata.put(key.getName(), value);
-        } else {
-          String value = 
String.valueOf(DataTableUtils.decodeString(dataInputStream));
-          metadata.put(key.getName(), value);
-        }
+    int numEntries = buffer.getInt();
+    Map<String, String> metadata = new HashMap<>();
+    for (int i = 0; i < numEntries; i++) {
+      int keyId = buffer.getInt();
+      MetadataKey key = MetadataKey.getByOrdinal(keyId);
+      // Ignore unknown keys.
+      if (key == null) {
+        continue;
+      }
+      if (key.getValueType() == MetadataValueType.INT) {
+        String value = "" + buffer.getInt();
+        metadata.put(key.getName(), value);
+      } else if (key.getValueType() == MetadataValueType.LONG) {
+        String value = "" + buffer.getLong();
+        metadata.put(key.getName(), value);
+      } else {
+        String value = DataTableUtils.decodeString(buffer);
+        metadata.put(key.getName(), value);
       }
-      return metadata;
     }
+    return metadata;
   }
 
   private byte[] serializeExceptions()
@@ -397,18 +386,15 @@ public class DataTableImplV3 extends BaseDataTable {
     return byteArrayOutputStream.toByteArray();
   }
 
-  private Map<Integer, String> deserializeExceptions(byte[] bytes)
+  private Map<Integer, String> deserializeExceptions(ByteBuffer buffer)
       throws IOException {
-    try (ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(bytes);
-        DataInputStream dataInputStream = new 
DataInputStream(byteArrayInputStream)) {
-      int numExceptions = dataInputStream.readInt();
-      Map<Integer, String> exceptions = new HashMap<>(numExceptions);
-      for (int i = 0; i < numExceptions; i++) {
-        int errCode = dataInputStream.readInt();
-        String errMessage = DataTableUtils.decodeString(dataInputStream);
-        exceptions.put(errCode, errMessage);
-      }
-      return exceptions;
+    int numExceptions = buffer.getInt();
+    Map<Integer, String> exceptions = new HashMap<>(numExceptions);
+    for (int i = 0; i < numExceptions; i++) {
+      int errCode = buffer.getInt();
+      String errMessage = DataTableUtils.decodeString(buffer);
+      exceptions.put(errCode, errMessage);
     }
+    return exceptions;
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
index 632e6187ef..606a8d8fb4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/common/datatable/DataTableUtils.java
@@ -22,6 +22,7 @@ import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -257,6 +258,21 @@ public class DataTableUtils {
     }
   }
 
+  /**
+   * Helper method to decode string.
+   */
+  public static String decodeString(ByteBuffer buffer)
+      throws IOException {
+    int length = buffer.getInt();
+    if (length == 0) {
+      return "";
+    } else {
+      byte[] bytes = new byte[length];
+      buffer.get(bytes);
+      return new String(bytes, UTF_8);
+    }
+  }
+
   /**
    * Helper method to decode int.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to