This is an automated email from the ASF dual-hosted git repository. xiangfu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 55b8e8608f Adding encoding type into the BrokerGrpcServer (#15395) 55b8e8608f is described below commit 55b8e8608fd5ecac9d85c2c956d372fa872e62ed Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Sun Mar 30 16:51:48 2025 -0700 Adding encoding type into the BrokerGrpcServer (#15395) --- .../apache/pinot/broker/grpc/BrokerGrpcServer.java | 24 ++-- .../apache/pinot/client/grpc/GrpcConnection.java | 13 +- .../apache/pinot/client/grpc/GrpcResultSet.java | 16 +-- .../org/apache/pinot/client/grpc/GrpcUtils.java | 28 ++-- .../pinot/client/grpc/PinotGrpcResultSet.java | 22 +-- .../response/encoder/JsonResponseEncoder.java | 160 +++++++++++++++++++++ .../common/response/encoder/ResponseEncoder.java | 46 ++++++ .../response/encoder/ResponseEncoderFactory.java | 33 +++++ .../pinot/integration/tests/ClusterTest.java | 9 +- .../tests/OfflineClusterIntegrationTest.java | 2 +- .../apache/pinot/spi/utils/CommonConstants.java | 2 + 11 files changed, 288 insertions(+), 67 deletions(-) diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java b/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java index 845058c1e8..cc8ccb9b5f 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java @@ -34,10 +34,7 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext; import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder; import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider; import io.grpc.stub.StreamObserver; -import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; import java.util.Map; import java.util.stream.Collectors; import nl.altindag.ssl.SSLFactory; @@ -54,6 +51,8 @@ import org.apache.pinot.common.proto.PinotQueryBrokerGrpc; import org.apache.pinot.common.response.BrokerResponse; import org.apache.pinot.common.response.broker.BrokerResponseNative; import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.response.encoder.ResponseEncoder; +import org.apache.pinot.common.response.encoder.ResponseEncoderFactory; import org.apache.pinot.common.utils.request.RequestUtils; import org.apache.pinot.common.utils.tls.RenewableTlsUtils; import org.apache.pinot.common.utils.tls.TlsUtils; @@ -261,23 +260,17 @@ public class BrokerGrpcServer extends PinotQueryBrokerGrpc.PinotQueryBrokerImplB String compressionAlgorithm = metadataMap.getOrDefault(CommonConstants.Broker.Grpc.COMPRESSION, CommonConstants.Broker.Grpc.DEFAULT_COMPRESSION); Compressor compressor = CompressionFactory.getCompressor(compressionAlgorithm); + + String encodingAlgorithm = metadataMap.getOrDefault(CommonConstants.Broker.Grpc.ENCODING, + CommonConstants.Broker.Grpc.DEFAULT_ENCODING); + ResponseEncoder encoder = ResponseEncoderFactory.getResponseEncoder(encodingAlgorithm); // Multiple response blocks are compressed data rows for (int i = 0; i < resultTable.getRows().size(); i += blockRowSize) { try { - int rowSize = 0; + int rowSize = Math.min(blockRowSize, resultTable.getRows().size() - i); // Serialize the rows to a byte array - ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); - - for (int j = i; j < Math.min(i + blockRowSize, resultTable.getRows().size()); j++) { - String rowString = JsonUtils.objectToJsonNode(resultTable.getRows().get(j)).toString(); - byte[] bytesToWrite = rowString.getBytes(StandardCharsets.UTF_8); - byteArrayOutputStream.write(ByteBuffer.allocate(4).putInt(bytesToWrite.length).array()); - byteArrayOutputStream.write(bytesToWrite); - rowSize += 1; - } - + byte[] serializedData = encoder.encodeResultTable(resultTable, i, rowSize); // Compress the byte array using the compressor - byte[] serializedData = byteArrayOutputStream.toByteArray(); byte[] compressedResultTable = compressor.compress(serializedData); int originalSize = serializedData.length; int compressedSize = compressedResultTable.length; @@ -288,6 +281,7 @@ public class BrokerGrpcServer extends PinotQueryBrokerGrpc.PinotQueryBrokerImplB .putMetadata("compressedSize", String.valueOf(compressedSize)) .putMetadata("rowSize", String.valueOf(rowSize)) .putMetadata("compression", compressionAlgorithm) + .putMetadata("encoding", encodingAlgorithm) .build(); responseObserver.onNext(dataBlock); } catch (Exception e) { diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcConnection.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcConnection.java index 068d021c4c..e30b49979e 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcConnection.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcConnection.java @@ -38,6 +38,7 @@ import org.apache.pinot.client.ResultSetGroup; import org.apache.pinot.client.SimpleBrokerSelector; import org.apache.pinot.common.config.GrpcConfig; import org.apache.pinot.common.proto.Broker; +import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.common.utils.grpc.BrokerGrpcQueryClient; import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.utils.JsonUtils; @@ -48,7 +49,7 @@ import org.slf4j.LoggerFactory; /** * A grpc connection to Pinot, normally created through calls to the {@link org.apache.pinot.client.ConnectionFactory}. */ -public class GrpcConnection { +public class GrpcConnection implements AutoCloseable { public static final String FAIL_ON_EXCEPTIONS = "failOnExceptions"; private static final Logger LOGGER = LoggerFactory.getLogger(GrpcConnection.class); @@ -188,13 +189,18 @@ public class GrpcConnection { } // Process schema JsonNode schemaJsonNode = null; + DataSchema dataSchema = null; if (response.hasNext()) { - schemaJsonNode = GrpcUtils.extractSchemaJson(response.next()); + dataSchema = GrpcUtils.extractSchema(response.next()); + schemaJsonNode = JsonUtils.objectToJsonNode(dataSchema); } // Process rows ArrayNode rows = JsonUtils.newArrayNode(); while (response.hasNext()) { - rows.addAll(GrpcUtils.extractRowsJson(response.next())); + List<Object[]> resultTableRows = GrpcUtils.extractResultTable(response.next(), dataSchema).getRows(); + for (Object[] row : resultTableRows) { + rows.add(JsonUtils.objectToJsonNode(row)); + } } if (schemaJsonNode != null && rows != null) { ObjectNode resultTable = JsonUtils.newObjectNode(); @@ -245,6 +251,7 @@ public class GrpcConnection { * * @throws PinotClientException when connection is already closed */ + @Override public void close() throws PinotClientException { _grpcQueryClient.shutdown(); diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcResultSet.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcResultSet.java index 271e1e734d..2282b247e2 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcResultSet.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcResultSet.java @@ -18,14 +18,13 @@ */ package org.apache.pinot.client.grpc; -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; import java.io.IOException; import java.util.ArrayList; import java.util.List; import org.apache.pinot.client.AbstractResultSet; import org.apache.pinot.client.TextTable; import org.apache.pinot.common.proto.Broker; +import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; @@ -35,7 +34,7 @@ import org.apache.pinot.common.utils.DataSchema; public class GrpcResultSet extends AbstractResultSet { private final List<String> _columnNamesArray; private final List<String> _columnDataTypesArray; - private final ArrayNode _currentBatchRows; + private final ResultTable _currentBatchRows; public GrpcResultSet(DataSchema schema, Broker.BrokerResponse brokerResponse) { _columnNamesArray = new ArrayList<>(schema.size()); @@ -45,7 +44,7 @@ public class GrpcResultSet extends AbstractResultSet { _columnDataTypesArray.add(schema.getColumnDataType(i).toString()); } try { - _currentBatchRows = GrpcUtils.extractRowsJson(brokerResponse); + _currentBatchRows = GrpcUtils.extractResultTable(brokerResponse, schema); } catch (IOException e) { throw new RuntimeException(e); } @@ -53,7 +52,7 @@ public class GrpcResultSet extends AbstractResultSet { @Override public int getRowCount() { - return _currentBatchRows.size(); + return _currentBatchRows.getRows().size(); } @Override @@ -73,12 +72,7 @@ public class GrpcResultSet extends AbstractResultSet { @Override public String getString(int rowIndex, int columnIndex) { - JsonNode jsonValue = _currentBatchRows.get(rowIndex).get(columnIndex); - if (jsonValue.isTextual()) { - return jsonValue.textValue(); - } else { - return jsonValue.toString(); - } + return _currentBatchRows.getRows().get(rowIndex)[columnIndex].toString(); } public List<String> getAllColumns() { diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcUtils.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcUtils.java index b604d0109a..3235983b97 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcUtils.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcUtils.java @@ -19,16 +19,17 @@ package org.apache.pinot.client.grpc; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Iterator; import java.util.Map; import org.apache.pinot.client.ExecutionStats; import org.apache.pinot.common.compression.CompressionFactory; import org.apache.pinot.common.compression.Compressor; import org.apache.pinot.common.proto.Broker; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.response.encoder.ResponseEncoder; +import org.apache.pinot.common.response.encoder.ResponseEncoderFactory; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.JsonUtils; @@ -59,16 +60,18 @@ public class GrpcUtils { public static JsonNode extractSchemaJson(Broker.BrokerResponse brokerResponse) throws IOException { - DataSchema schema = DataSchema.fromBytes(brokerResponse.getPayload().asReadOnlyByteBuffer()); - return JsonUtils.objectToJsonNode(schema); + return JsonUtils.objectToJsonNode(extractSchema(brokerResponse)); } - public static ArrayNode extractRowsJson(Broker.BrokerResponse brokerResponse) + public static ResultTable extractResultTable(Broker.BrokerResponse brokerResponse, DataSchema schema) throws IOException { Map<String, String> metadataMap = brokerResponse.getMetadataMap(); String compressionAlgorithm = metadataMap.getOrDefault(CommonConstants.Broker.Grpc.COMPRESSION, CommonConstants.Broker.Grpc.DEFAULT_COMPRESSION); Compressor compressor = CompressionFactory.getCompressor(compressionAlgorithm); + String encodingType = metadataMap.getOrDefault(CommonConstants.Broker.Grpc.ENCODING, + CommonConstants.Broker.Grpc.DEFAULT_ENCODING); + ResponseEncoder responseEncoder = ResponseEncoderFactory.getResponseEncoder(encodingType); byte[] respBytes = brokerResponse.getPayload().toByteArray(); int rowSize = Integer.parseInt(brokerResponse.getMetadataOrThrow("rowSize")); @@ -78,20 +81,7 @@ public class GrpcUtils { } catch (Exception e) { throw new RuntimeException(e); } - ArrayNode jsonRows = JsonUtils.newArrayNode(); - int bytesRead = 0; - ByteBuffer byteBuffer = ByteBuffer.wrap(uncompressedPayload); - for (int i = 0; i < rowSize; i++) { - int nextRowSize = byteBuffer.getInt(bytesRead); - bytesRead += 4; - byte[] rowBytes = new byte[nextRowSize]; - byteBuffer.position(bytesRead); - byteBuffer.get(rowBytes); - bytesRead += nextRowSize; - String rowString = new String(rowBytes); - jsonRows.add(JsonUtils.stringToJsonNode(rowString)); - } - return jsonRows; + return responseEncoder.decodeResultTable(uncompressedPayload, rowSize, schema); } public static ExecutionStats extractExecutionStats(JsonNode executionStatsJson) { diff --git a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/grpc/PinotGrpcResultSet.java b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/grpc/PinotGrpcResultSet.java index 555ce9feb3..a47b3fcfc4 100644 --- a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/grpc/PinotGrpcResultSet.java +++ b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/grpc/PinotGrpcResultSet.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.client.grpc; -import com.fasterxml.jackson.databind.node.ArrayNode; import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.ByteArrayInputStream; import java.io.IOException; @@ -43,6 +42,7 @@ import org.apache.pinot.client.PinotResultMetadata; import org.apache.pinot.client.base.AbstractBaseResultSet; import org.apache.pinot.client.utils.DateTimeUtils; import org.apache.pinot.common.proto.Broker; +import org.apache.pinot.common.response.broker.ResultTable; import org.apache.pinot.common.utils.DataSchema; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,8 +56,8 @@ public class PinotGrpcResultSet extends AbstractBaseResultSet { private final int _totalColumns; private final Map<String, Integer> _columns = new HashMap<>(); private final Map<Integer, String> _columnDataTypes = new HashMap<>(); - - private ArrayNode _currentRowBatch; + private final DataSchema _dataSchema; + private ResultTable _currentRowBatch; private int _currentBatchSize; private int _currentBatchIndex = -1; @@ -71,11 +71,11 @@ public class PinotGrpcResultSet extends AbstractBaseResultSet { _brokerResponseIterator = brokerResponseIterator; _closed = false; ObjectNode metadata = GrpcUtils.extractMetadataJson(_brokerResponseIterator.next()); - DataSchema dataSchema = GrpcUtils.extractSchema(_brokerResponseIterator.next()); - _totalColumns = dataSchema.size(); + _dataSchema = GrpcUtils.extractSchema(_brokerResponseIterator.next()); + _totalColumns = _dataSchema.size(); for (int i = 0; i < _totalColumns; i++) { - _columns.put(dataSchema.getColumnName(i), i + 1); - _columnDataTypes.put(i + 1, dataSchema.getColumnDataType(i).name()); + _columns.put(_dataSchema.getColumnName(i), i + 1); + _columnDataTypes.put(i + 1, _dataSchema.getColumnDataType(i).name()); } } @@ -83,6 +83,7 @@ public class PinotGrpcResultSet extends AbstractBaseResultSet { _brokerResponseIterator = null; _currentBatchSize = 0; _totalColumns = 0; + _dataSchema = null; } public static PinotGrpcResultSet empty() { @@ -271,11 +272,10 @@ public class PinotGrpcResultSet extends AbstractBaseResultSet { public String getString(int columnIndex) throws SQLException { validateColumn(columnIndex); - String val = _currentRowBatch.get(_currentBatchIndex).get(columnIndex - 1).asText(); + String val = _currentRowBatch.getRows().get(_currentBatchIndex)[columnIndex - 1].toString(); if (checkIsNull(val)) { return null; } - return val; } @@ -425,9 +425,9 @@ public class PinotGrpcResultSet extends AbstractBaseResultSet { if (_currentBatchIndex == _currentBatchSize - 1) { if (_brokerResponseIterator.hasNext()) { try { - _currentRowBatch = GrpcUtils.extractRowsJson(_brokerResponseIterator.next()); + _currentRowBatch = GrpcUtils.extractResultTable(_brokerResponseIterator.next(), _dataSchema); _currentBatchIndex = 0; - _currentBatchSize = _currentRowBatch.size(); + _currentBatchSize = _currentRowBatch.getRows().size(); _currentRow++; return true; } catch (IOException e) { diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/JsonResponseEncoder.java b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/JsonResponseEncoder.java new file mode 100644 index 0000000000..e0f96ea3b7 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/JsonResponseEncoder.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.response.encoder; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.spi.utils.JsonUtils; + + +public class JsonResponseEncoder implements ResponseEncoder { + + @Override + public byte[] encodeResultTable(ResultTable resultTable, int startRow, int length) + throws IOException { + // Serialize the rows to a byte array + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + for (int j = startRow; j < Math.min(startRow + length, resultTable.getRows().size()); j++) { + String rowString = JsonUtils.objectToJsonNode(resultTable.getRows().get(j)).toString(); + byte[] bytesToWrite = rowString.getBytes(StandardCharsets.UTF_8); + byteArrayOutputStream.write(ByteBuffer.allocate(4).putInt(bytesToWrite.length).array()); + byteArrayOutputStream.write(bytesToWrite); + } + return byteArrayOutputStream.toByteArray(); + } + + @Override + public ResultTable decodeResultTable(byte[] bytes, int rowSize, DataSchema schema) + throws IOException { + List<Object[]> rows = new ArrayList<>(); + int bytesRead = 0; + ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); + for (int i = 0; i < rowSize; i++) { + int nextRowSize = byteBuffer.getInt(bytesRead); + bytesRead += 4; + byte[] rowBytes = new byte[nextRowSize]; + byteBuffer.position(bytesRead); + byteBuffer.get(rowBytes); + bytesRead += nextRowSize; + String rowString = new String(rowBytes); + JsonNode jsonRow = JsonUtils.stringToJsonNode(rowString); + Object[] row = new Object[jsonRow.size()]; + for (int columnIdx = 0; columnIdx < jsonRow.size(); columnIdx++) { + DataSchema.ColumnDataType columnDataType = schema.getColumnDataType(columnIdx); + JsonNode jsonValue = jsonRow.get(columnIdx); + if (columnDataType.isArray()) { + row[columnIdx] = extractArray(jsonValue); + } else if (columnDataType == DataSchema.ColumnDataType.MAP) { + row[columnIdx] = extractMap(jsonValue); + } else { + row[columnIdx] = extractValue(jsonValue); + } + } + rows.add(row); + } + return new ResultTable(schema, rows); + } + + private Object[] extractArray(JsonNode jsonValue) { + Object[] array = new Object[jsonValue.size()]; + for (int k = 0; k < jsonValue.size(); k++) { + if (jsonValue.get(k).isNull()) { + array[k] = null; + } else if (jsonValue.get(k).isBoolean()) { + array[k] = jsonValue.get(k).asBoolean(); + } else if (jsonValue.get(k).isInt()) { + array[k] = jsonValue.get(k).asInt(); + } else if (jsonValue.get(k).isLong()) { + array[k] = jsonValue.get(k).asLong(); + } else if (jsonValue.get(k).isFloat()) { + array[k] = jsonValue.get(k).floatValue(); + } else if (jsonValue.get(k).isDouble()) { + array[k] = jsonValue.get(k).asDouble(); + } else if (jsonValue.get(k).isTextual()) { + array[k] = jsonValue.get(k).textValue(); + } else if (jsonValue.isArray()) { + array[k] = extractArray(jsonValue.get(k)); + } else if (jsonValue.isObject()) { + array[k] = extractMap(jsonValue.get(k)); + } else { + array[k] = jsonValue.get(k).toString(); + } + } + return array; + } + + private Object extractValue(JsonNode jsonValue) { + if (jsonValue.isNull()) { + return null; + } + if (jsonValue.isBoolean()) { + return jsonValue.asBoolean(); + } + if (jsonValue.isShort()) { + return jsonValue.shortValue(); + } + if (jsonValue.isBigInteger()) { + return jsonValue.bigIntegerValue(); + } + if (jsonValue.isBigDecimal()) { + return jsonValue.decimalValue(); + } + if (jsonValue.isInt()) { + return jsonValue.asInt(); + } + if (jsonValue.isLong()) { + return jsonValue.asLong(); + } + if (jsonValue.isFloat()) { + return jsonValue.floatValue(); + } + if (jsonValue.isDouble()) { + return jsonValue.asDouble(); + } + if (jsonValue.isTextual()) { + return jsonValue.textValue(); + } + if (jsonValue.isArray()) { + return extractArray(jsonValue); + } + if (jsonValue.isObject()) { + return extractMap(jsonValue); + } + return jsonValue.toString(); + } + + private Map<String, Object> extractMap(JsonNode jsonValue) { + Map<String, Object> map = new HashMap<>(); + jsonValue.fields().forEachRemaining(entry -> { + String key = entry.getKey(); + Object value = extractValue(entry.getValue()); + map.put(key, value); + }); + return map; + } +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoder.java b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoder.java new file mode 100644 index 0000000000..f02e9062c5 --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoder.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.response.encoder; + +import java.io.IOException; +import org.apache.pinot.common.response.broker.ResultTable; +import org.apache.pinot.common.utils.DataSchema; + + +public interface ResponseEncoder { + + /** + * Encode the result table into a byte array. + * @param resultTable Result table to encode + * @return Encoded byte array + */ + byte[] encodeResultTable(ResultTable resultTable, int startRow, int length) + throws IOException; + + /** + * Decode the result table from a byte array. + * + * @param bytes Encoded byte array + * @param rowSize Number of rows in the result table + * @param schema Schema of the result table + * @return Decoded result table + */ + ResultTable decodeResultTable(byte[] bytes, int rowSize, DataSchema schema) + throws IOException; +} diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoderFactory.java b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoderFactory.java new file mode 100644 index 0000000000..ad2abbeb8d --- /dev/null +++ b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoderFactory.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.common.response.encoder; + +public class ResponseEncoderFactory { + private ResponseEncoderFactory() { + } + + public static ResponseEncoder getResponseEncoder(String format) { + switch (format.toUpperCase()) { + case "JSON": + return new JsonResponseEncoder(); + default: + throw new IllegalArgumentException("Unsupported format: " + format); + } + } +} diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java index 8a8b7c6a5e..f70900fd5c 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java @@ -528,14 +528,9 @@ public abstract class ClusterTest extends ControllerTest { public JsonNode queryGrpcEndpoint(String query, Map<String, String> metadataMap) throws IOException { - GrpcConnection grpcConnection = null; - try { - grpcConnection = ConnectionFactory.fromHostListGrpc(new Properties(), List.of(getBrokerGrpcEndpoint())); + try (GrpcConnection grpcConnection = ConnectionFactory.fromHostListGrpc(new Properties(), + List.of(getBrokerGrpcEndpoint()))) { return grpcConnection.getJsonResponse(query, metadataMap); - } finally { - if (grpcConnection != null) { - grpcConnection.close(); - } } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java index eb575799e3..7332b9cce4 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java @@ -2772,7 +2772,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet assertEquals(row.get(0).asInt(), tmpTableRow.get(0).asInt()); assertEquals(row.get(1).asLong(), tmpTableRow.get(1).asLong()); assertTrue(row.get(2).isNull()); - assertTrue(row.get(2).isNull()); + assertTrue(row.get(3).isNull()); } for (int i = 2; i < 363; i++) { JsonNode row = rows.get(i); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 1151a78077..7eb62120f7 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -739,6 +739,8 @@ public class CommonConstants { public static final int DEFAULT_BLOCK_ROW_SIZE = 10_000; public static final String COMPRESSION = "compression"; public static final String DEFAULT_COMPRESSION = "ZSTD"; + public static final String ENCODING = "encoding"; + public static final String DEFAULT_ENCODING = "JSON"; } public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY = "pinot.broker.storage.factory"; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org