This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 55b8e8608f Adding encoding type into the BrokerGrpcServer (#15395)
55b8e8608f is described below

commit 55b8e8608fd5ecac9d85c2c956d372fa872e62ed
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Sun Mar 30 16:51:48 2025 -0700

    Adding encoding type into the BrokerGrpcServer (#15395)
---
 .../apache/pinot/broker/grpc/BrokerGrpcServer.java |  24 ++--
 .../apache/pinot/client/grpc/GrpcConnection.java   |  13 +-
 .../apache/pinot/client/grpc/GrpcResultSet.java    |  16 +--
 .../org/apache/pinot/client/grpc/GrpcUtils.java    |  28 ++--
 .../pinot/client/grpc/PinotGrpcResultSet.java      |  22 +--
 .../response/encoder/JsonResponseEncoder.java      | 160 +++++++++++++++++++++
 .../common/response/encoder/ResponseEncoder.java   |  46 ++++++
 .../response/encoder/ResponseEncoderFactory.java   |  33 +++++
 .../pinot/integration/tests/ClusterTest.java       |   9 +-
 .../tests/OfflineClusterIntegrationTest.java       |   2 +-
 .../apache/pinot/spi/utils/CommonConstants.java    |   2 +
 11 files changed, 288 insertions(+), 67 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java 
b/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
index 845058c1e8..cc8ccb9b5f 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
@@ -34,10 +34,7 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider;
 import io.grpc.stub.StreamObserver;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
 import java.util.Map;
 import java.util.stream.Collectors;
 import nl.altindag.ssl.SSLFactory;
@@ -54,6 +51,8 @@ import org.apache.pinot.common.proto.PinotQueryBrokerGrpc;
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.response.encoder.ResponseEncoder;
+import org.apache.pinot.common.response.encoder.ResponseEncoderFactory;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.common.utils.tls.RenewableTlsUtils;
 import org.apache.pinot.common.utils.tls.TlsUtils;
@@ -261,23 +260,17 @@ public class BrokerGrpcServer extends 
PinotQueryBrokerGrpc.PinotQueryBrokerImplB
     String compressionAlgorithm = 
metadataMap.getOrDefault(CommonConstants.Broker.Grpc.COMPRESSION,
         CommonConstants.Broker.Grpc.DEFAULT_COMPRESSION);
     Compressor compressor = 
CompressionFactory.getCompressor(compressionAlgorithm);
+
+    String encodingAlgorithm = 
metadataMap.getOrDefault(CommonConstants.Broker.Grpc.ENCODING,
+        CommonConstants.Broker.Grpc.DEFAULT_ENCODING);
+    ResponseEncoder encoder = 
ResponseEncoderFactory.getResponseEncoder(encodingAlgorithm);
     // Multiple response blocks are compressed data rows
     for (int i = 0; i < resultTable.getRows().size(); i += blockRowSize) {
       try {
-        int rowSize = 0;
+        int rowSize = Math.min(blockRowSize, resultTable.getRows().size() - i);
         // Serialize the rows to a byte array
-        ByteArrayOutputStream byteArrayOutputStream = new 
ByteArrayOutputStream();
-
-        for (int j = i; j < Math.min(i + blockRowSize, 
resultTable.getRows().size()); j++) {
-          String rowString = 
JsonUtils.objectToJsonNode(resultTable.getRows().get(j)).toString();
-          byte[] bytesToWrite = rowString.getBytes(StandardCharsets.UTF_8);
-          
byteArrayOutputStream.write(ByteBuffer.allocate(4).putInt(bytesToWrite.length).array());
-          byteArrayOutputStream.write(bytesToWrite);
-          rowSize += 1;
-        }
-
+        byte[] serializedData = encoder.encodeResultTable(resultTable, i, 
rowSize);
         // Compress the byte array using the compressor
-        byte[] serializedData = byteArrayOutputStream.toByteArray();
         byte[] compressedResultTable = compressor.compress(serializedData);
         int originalSize = serializedData.length;
         int compressedSize = compressedResultTable.length;
@@ -288,6 +281,7 @@ public class BrokerGrpcServer extends 
PinotQueryBrokerGrpc.PinotQueryBrokerImplB
                 .putMetadata("compressedSize", String.valueOf(compressedSize))
                 .putMetadata("rowSize", String.valueOf(rowSize))
                 .putMetadata("compression", compressionAlgorithm)
+                .putMetadata("encoding", encodingAlgorithm)
                 .build();
         responseObserver.onNext(dataBlock);
       } catch (Exception e) {
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcConnection.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcConnection.java
index 068d021c4c..e30b49979e 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcConnection.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcConnection.java
@@ -38,6 +38,7 @@ import org.apache.pinot.client.ResultSetGroup;
 import org.apache.pinot.client.SimpleBrokerSelector;
 import org.apache.pinot.common.config.GrpcConfig;
 import org.apache.pinot.common.proto.Broker;
+import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.grpc.BrokerGrpcQueryClient;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -48,7 +49,7 @@ import org.slf4j.LoggerFactory;
 /**
  * A grpc connection to Pinot, normally created through calls to the {@link 
org.apache.pinot.client.ConnectionFactory}.
  */
-public class GrpcConnection {
+public class GrpcConnection implements AutoCloseable {
   public static final String FAIL_ON_EXCEPTIONS = "failOnExceptions";
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcConnection.class);
 
@@ -188,13 +189,18 @@ public class GrpcConnection {
     }
     // Process schema
     JsonNode schemaJsonNode = null;
+    DataSchema dataSchema = null;
     if (response.hasNext()) {
-      schemaJsonNode = GrpcUtils.extractSchemaJson(response.next());
+      dataSchema = GrpcUtils.extractSchema(response.next());
+      schemaJsonNode = JsonUtils.objectToJsonNode(dataSchema);
     }
     // Process rows
     ArrayNode rows = JsonUtils.newArrayNode();
     while (response.hasNext()) {
-      rows.addAll(GrpcUtils.extractRowsJson(response.next()));
+      List<Object[]> resultTableRows = 
GrpcUtils.extractResultTable(response.next(), dataSchema).getRows();
+      for (Object[] row : resultTableRows) {
+        rows.add(JsonUtils.objectToJsonNode(row));
+      }
     }
     if (schemaJsonNode != null && rows != null) {
       ObjectNode resultTable = JsonUtils.newObjectNode();
@@ -245,6 +251,7 @@ public class GrpcConnection {
    *
    * @throws PinotClientException when connection is already closed
    */
+  @Override
   public void close()
       throws PinotClientException {
     _grpcQueryClient.shutdown();
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcResultSet.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcResultSet.java
index 271e1e734d..2282b247e2 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcResultSet.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcResultSet.java
@@ -18,14 +18,13 @@
  */
 package org.apache.pinot.client.grpc;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.pinot.client.AbstractResultSet;
 import org.apache.pinot.client.TextTable;
 import org.apache.pinot.common.proto.Broker;
+import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 
 
@@ -35,7 +34,7 @@ import org.apache.pinot.common.utils.DataSchema;
 public class GrpcResultSet extends AbstractResultSet {
   private final List<String> _columnNamesArray;
   private final List<String> _columnDataTypesArray;
-  private final ArrayNode _currentBatchRows;
+  private final ResultTable _currentBatchRows;
 
   public GrpcResultSet(DataSchema schema, Broker.BrokerResponse 
brokerResponse) {
     _columnNamesArray = new ArrayList<>(schema.size());
@@ -45,7 +44,7 @@ public class GrpcResultSet extends AbstractResultSet {
       _columnDataTypesArray.add(schema.getColumnDataType(i).toString());
     }
     try {
-      _currentBatchRows = GrpcUtils.extractRowsJson(brokerResponse);
+      _currentBatchRows = GrpcUtils.extractResultTable(brokerResponse, schema);
     } catch (IOException e) {
       throw new RuntimeException(e);
     }
@@ -53,7 +52,7 @@ public class GrpcResultSet extends AbstractResultSet {
 
   @Override
   public int getRowCount() {
-    return _currentBatchRows.size();
+    return _currentBatchRows.getRows().size();
   }
 
   @Override
@@ -73,12 +72,7 @@ public class GrpcResultSet extends AbstractResultSet {
 
   @Override
   public String getString(int rowIndex, int columnIndex) {
-    JsonNode jsonValue = _currentBatchRows.get(rowIndex).get(columnIndex);
-    if (jsonValue.isTextual()) {
-      return jsonValue.textValue();
-    } else {
-      return jsonValue.toString();
-    }
+    return _currentBatchRows.getRows().get(rowIndex)[columnIndex].toString();
   }
 
   public List<String> getAllColumns() {
diff --git 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcUtils.java
 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcUtils.java
index b604d0109a..3235983b97 100644
--- 
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcUtils.java
+++ 
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcUtils.java
@@ -19,16 +19,17 @@
 package org.apache.pinot.client.grpc;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.Map;
 import org.apache.pinot.client.ExecutionStats;
 import org.apache.pinot.common.compression.CompressionFactory;
 import org.apache.pinot.common.compression.Compressor;
 import org.apache.pinot.common.proto.Broker;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.response.encoder.ResponseEncoder;
+import org.apache.pinot.common.response.encoder.ResponseEncoderFactory;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.JsonUtils;
@@ -59,16 +60,18 @@ public class GrpcUtils {
 
   public static JsonNode extractSchemaJson(Broker.BrokerResponse 
brokerResponse)
       throws IOException {
-    DataSchema schema = 
DataSchema.fromBytes(brokerResponse.getPayload().asReadOnlyByteBuffer());
-    return JsonUtils.objectToJsonNode(schema);
+    return JsonUtils.objectToJsonNode(extractSchema(brokerResponse));
   }
 
-  public static ArrayNode extractRowsJson(Broker.BrokerResponse brokerResponse)
+  public static ResultTable extractResultTable(Broker.BrokerResponse 
brokerResponse, DataSchema schema)
       throws IOException {
     Map<String, String> metadataMap = brokerResponse.getMetadataMap();
     String compressionAlgorithm = 
metadataMap.getOrDefault(CommonConstants.Broker.Grpc.COMPRESSION,
         CommonConstants.Broker.Grpc.DEFAULT_COMPRESSION);
     Compressor compressor = 
CompressionFactory.getCompressor(compressionAlgorithm);
+    String encodingType = 
metadataMap.getOrDefault(CommonConstants.Broker.Grpc.ENCODING,
+        CommonConstants.Broker.Grpc.DEFAULT_ENCODING);
+    ResponseEncoder responseEncoder = 
ResponseEncoderFactory.getResponseEncoder(encodingType);
 
     byte[] respBytes = brokerResponse.getPayload().toByteArray();
     int rowSize = 
Integer.parseInt(brokerResponse.getMetadataOrThrow("rowSize"));
@@ -78,20 +81,7 @@ public class GrpcUtils {
     } catch (Exception e) {
       throw new RuntimeException(e);
     }
-    ArrayNode jsonRows = JsonUtils.newArrayNode();
-    int bytesRead = 0;
-    ByteBuffer byteBuffer = ByteBuffer.wrap(uncompressedPayload);
-    for (int i = 0; i < rowSize; i++) {
-      int nextRowSize = byteBuffer.getInt(bytesRead);
-      bytesRead += 4;
-      byte[] rowBytes = new byte[nextRowSize];
-      byteBuffer.position(bytesRead);
-      byteBuffer.get(rowBytes);
-      bytesRead += nextRowSize;
-      String rowString = new String(rowBytes);
-      jsonRows.add(JsonUtils.stringToJsonNode(rowString));
-    }
-    return jsonRows;
+    return responseEncoder.decodeResultTable(uncompressedPayload, rowSize, 
schema);
   }
 
   public static ExecutionStats extractExecutionStats(JsonNode 
executionStatsJson) {
diff --git 
a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/grpc/PinotGrpcResultSet.java
 
b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/grpc/PinotGrpcResultSet.java
index 555ce9feb3..a47b3fcfc4 100644
--- 
a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/grpc/PinotGrpcResultSet.java
+++ 
b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/grpc/PinotGrpcResultSet.java
@@ -18,7 +18,6 @@
  */
 package org.apache.pinot.client.grpc;
 
-import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -43,6 +42,7 @@ import org.apache.pinot.client.PinotResultMetadata;
 import org.apache.pinot.client.base.AbstractBaseResultSet;
 import org.apache.pinot.client.utils.DateTimeUtils;
 import org.apache.pinot.common.proto.Broker;
+import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,8 +56,8 @@ public class PinotGrpcResultSet extends AbstractBaseResultSet 
{
   private final int _totalColumns;
   private final Map<String, Integer> _columns = new HashMap<>();
   private final Map<Integer, String> _columnDataTypes = new HashMap<>();
-
-  private ArrayNode _currentRowBatch;
+  private final DataSchema _dataSchema;
+  private ResultTable _currentRowBatch;
   private int _currentBatchSize;
 
   private int _currentBatchIndex = -1;
@@ -71,11 +71,11 @@ public class PinotGrpcResultSet extends 
AbstractBaseResultSet {
     _brokerResponseIterator = brokerResponseIterator;
     _closed = false;
     ObjectNode metadata = 
GrpcUtils.extractMetadataJson(_brokerResponseIterator.next());
-    DataSchema dataSchema = 
GrpcUtils.extractSchema(_brokerResponseIterator.next());
-    _totalColumns = dataSchema.size();
+    _dataSchema = GrpcUtils.extractSchema(_brokerResponseIterator.next());
+    _totalColumns = _dataSchema.size();
     for (int i = 0; i < _totalColumns; i++) {
-      _columns.put(dataSchema.getColumnName(i), i + 1);
-      _columnDataTypes.put(i + 1, dataSchema.getColumnDataType(i).name());
+      _columns.put(_dataSchema.getColumnName(i), i + 1);
+      _columnDataTypes.put(i + 1, _dataSchema.getColumnDataType(i).name());
     }
   }
 
@@ -83,6 +83,7 @@ public class PinotGrpcResultSet extends AbstractBaseResultSet 
{
     _brokerResponseIterator = null;
     _currentBatchSize = 0;
     _totalColumns = 0;
+    _dataSchema = null;
   }
 
   public static PinotGrpcResultSet empty() {
@@ -271,11 +272,10 @@ public class PinotGrpcResultSet extends 
AbstractBaseResultSet {
   public String getString(int columnIndex)
       throws SQLException {
     validateColumn(columnIndex);
-    String val = _currentRowBatch.get(_currentBatchIndex).get(columnIndex - 
1).asText();
+    String val = 
_currentRowBatch.getRows().get(_currentBatchIndex)[columnIndex - 1].toString();
     if (checkIsNull(val)) {
       return null;
     }
-
     return val;
   }
 
@@ -425,9 +425,9 @@ public class PinotGrpcResultSet extends 
AbstractBaseResultSet {
     if (_currentBatchIndex == _currentBatchSize - 1) {
       if (_brokerResponseIterator.hasNext()) {
         try {
-          _currentRowBatch = 
GrpcUtils.extractRowsJson(_brokerResponseIterator.next());
+          _currentRowBatch = 
GrpcUtils.extractResultTable(_brokerResponseIterator.next(), _dataSchema);
           _currentBatchIndex = 0;
-          _currentBatchSize = _currentRowBatch.size();
+          _currentBatchSize = _currentRowBatch.getRows().size();
           _currentRow++;
           return true;
         } catch (IOException e) {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/JsonResponseEncoder.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/JsonResponseEncoder.java
new file mode 100644
index 0000000000..e0f96ea3b7
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/JsonResponseEncoder.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.encoder;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+public class JsonResponseEncoder implements ResponseEncoder {
+
+  @Override
+  public byte[] encodeResultTable(ResultTable resultTable, int startRow, int 
length)
+      throws IOException {
+    // Serialize the rows to a byte array
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    for (int j = startRow; j < Math.min(startRow + length, 
resultTable.getRows().size()); j++) {
+      String rowString = 
JsonUtils.objectToJsonNode(resultTable.getRows().get(j)).toString();
+      byte[] bytesToWrite = rowString.getBytes(StandardCharsets.UTF_8);
+      
byteArrayOutputStream.write(ByteBuffer.allocate(4).putInt(bytesToWrite.length).array());
+      byteArrayOutputStream.write(bytesToWrite);
+    }
+    return byteArrayOutputStream.toByteArray();
+  }
+
+  @Override
+  public ResultTable decodeResultTable(byte[] bytes, int rowSize, DataSchema 
schema)
+      throws IOException {
+    List<Object[]> rows = new ArrayList<>();
+    int bytesRead = 0;
+    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+    for (int i = 0; i < rowSize; i++) {
+      int nextRowSize = byteBuffer.getInt(bytesRead);
+      bytesRead += 4;
+      byte[] rowBytes = new byte[nextRowSize];
+      byteBuffer.position(bytesRead);
+      byteBuffer.get(rowBytes);
+      bytesRead += nextRowSize;
+      String rowString = new String(rowBytes);
+      JsonNode jsonRow = JsonUtils.stringToJsonNode(rowString);
+      Object[] row = new Object[jsonRow.size()];
+      for (int columnIdx = 0; columnIdx < jsonRow.size(); columnIdx++) {
+        DataSchema.ColumnDataType columnDataType = 
schema.getColumnDataType(columnIdx);
+        JsonNode jsonValue = jsonRow.get(columnIdx);
+        if (columnDataType.isArray()) {
+          row[columnIdx] = extractArray(jsonValue);
+        } else if (columnDataType == DataSchema.ColumnDataType.MAP) {
+          row[columnIdx] = extractMap(jsonValue);
+        } else {
+          row[columnIdx] = extractValue(jsonValue);
+        }
+      }
+      rows.add(row);
+    }
+    return new ResultTable(schema, rows);
+  }
+
+  private Object[] extractArray(JsonNode jsonValue) {
+    Object[] array = new Object[jsonValue.size()];
+    for (int k = 0; k < jsonValue.size(); k++) {
+      if (jsonValue.get(k).isNull()) {
+        array[k] = null;
+      } else if (jsonValue.get(k).isBoolean()) {
+        array[k] = jsonValue.get(k).asBoolean();
+      } else if (jsonValue.get(k).isInt()) {
+        array[k] = jsonValue.get(k).asInt();
+      } else if (jsonValue.get(k).isLong()) {
+        array[k] = jsonValue.get(k).asLong();
+      } else if (jsonValue.get(k).isFloat()) {
+        array[k] = jsonValue.get(k).floatValue();
+      } else if (jsonValue.get(k).isDouble()) {
+        array[k] = jsonValue.get(k).asDouble();
+      } else if (jsonValue.get(k).isTextual()) {
+        array[k] = jsonValue.get(k).textValue();
+      } else if (jsonValue.isArray()) {
+        array[k] = extractArray(jsonValue.get(k));
+      } else if (jsonValue.isObject()) {
+        array[k] = extractMap(jsonValue.get(k));
+      } else {
+        array[k] = jsonValue.get(k).toString();
+      }
+    }
+    return array;
+  }
+
+  private Object extractValue(JsonNode jsonValue) {
+    if (jsonValue.isNull()) {
+      return null;
+    }
+    if (jsonValue.isBoolean()) {
+      return jsonValue.asBoolean();
+    }
+    if (jsonValue.isShort()) {
+      return jsonValue.shortValue();
+    }
+    if (jsonValue.isBigInteger()) {
+      return jsonValue.bigIntegerValue();
+    }
+    if (jsonValue.isBigDecimal()) {
+      return jsonValue.decimalValue();
+    }
+    if (jsonValue.isInt()) {
+      return jsonValue.asInt();
+    }
+    if (jsonValue.isLong()) {
+      return jsonValue.asLong();
+    }
+    if (jsonValue.isFloat()) {
+      return jsonValue.floatValue();
+    }
+    if (jsonValue.isDouble()) {
+      return jsonValue.asDouble();
+    }
+    if (jsonValue.isTextual()) {
+      return jsonValue.textValue();
+    }
+    if (jsonValue.isArray()) {
+      return extractArray(jsonValue);
+    }
+    if (jsonValue.isObject()) {
+      return extractMap(jsonValue);
+    }
+    return jsonValue.toString();
+  }
+
+  private Map<String, Object> extractMap(JsonNode jsonValue) {
+    Map<String, Object> map = new HashMap<>();
+    jsonValue.fields().forEachRemaining(entry -> {
+      String key = entry.getKey();
+      Object value = extractValue(entry.getValue());
+      map.put(key, value);
+    });
+    return map;
+  }
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoder.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoder.java
new file mode 100644
index 0000000000..f02e9062c5
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoder.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.encoder;
+
+import java.io.IOException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+
+
+public interface ResponseEncoder {
+
+  /**
+   * Encode the result table into a byte array.
+   * @param resultTable Result table to encode
+   * @return Encoded byte array
+   */
+  byte[] encodeResultTable(ResultTable resultTable, int startRow, int length)
+      throws IOException;
+
+  /**
+   * Decode the result table from a byte array.
+   *
+   * @param bytes  Encoded byte array
+   * @param rowSize Number of rows in the result table
+   * @param schema Schema of the result table
+   * @return Decoded result table
+   */
+  ResultTable decodeResultTable(byte[] bytes, int rowSize, DataSchema schema)
+      throws IOException;
+}
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoderFactory.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoderFactory.java
new file mode 100644
index 0000000000..ad2abbeb8d
--- /dev/null
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoderFactory.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.encoder;
+
+public class ResponseEncoderFactory {
+  private ResponseEncoderFactory() {
+  }
+
+  public static ResponseEncoder getResponseEncoder(String format) {
+    switch (format.toUpperCase()) {
+      case "JSON":
+        return new JsonResponseEncoder();
+      default:
+        throw new IllegalArgumentException("Unsupported format: " + format);
+    }
+  }
+}
diff --git 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 8a8b7c6a5e..f70900fd5c 100644
--- 
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++ 
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -528,14 +528,9 @@ public abstract class ClusterTest extends ControllerTest {
 
   public JsonNode queryGrpcEndpoint(String query, Map<String, String> 
metadataMap)
       throws IOException {
-    GrpcConnection grpcConnection = null;
-    try {
-      grpcConnection = ConnectionFactory.fromHostListGrpc(new Properties(), 
List.of(getBrokerGrpcEndpoint()));
+    try (GrpcConnection grpcConnection = 
ConnectionFactory.fromHostListGrpc(new Properties(),
+        List.of(getBrokerGrpcEndpoint()))) {
       return grpcConnection.getJsonResponse(query, metadataMap);
-    } finally {
-      if (grpcConnection != null) {
-        grpcConnection.close();
-      }
     }
   }
 
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index eb575799e3..7332b9cce4 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -2772,7 +2772,7 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
       assertEquals(row.get(0).asInt(), tmpTableRow.get(0).asInt());
       assertEquals(row.get(1).asLong(), tmpTableRow.get(1).asLong());
       assertTrue(row.get(2).isNull());
-      assertTrue(row.get(2).isNull());
+      assertTrue(row.get(3).isNull());
     }
     for (int i = 2; i < 363; i++) {
       JsonNode row = rows.get(i);
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 1151a78077..7eb62120f7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -739,6 +739,8 @@ public class CommonConstants {
       public static final int DEFAULT_BLOCK_ROW_SIZE = 10_000;
       public static final String COMPRESSION = "compression";
       public static final String DEFAULT_COMPRESSION = "ZSTD";
+      public static final String ENCODING = "encoding";
+      public static final String DEFAULT_ENCODING = "JSON";
     }
 
     public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY = 
"pinot.broker.storage.factory";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to