This is an automated email from the ASF dual-hosted git repository. rongr pushed a commit to branch multi_stage_query_engine in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 2c07cb04e25c9fc506b144b157756bca9ee612ac Author: Rong Rong <walterddr.walter...@gmail.com> AuthorDate: Tue May 10 14:30:46 2022 -0700 row/columnar compatible block (#8583) adding tests and verify both row/columnar format adding test and validating all types fix and add query dispatcher test Co-authored-by: Rong Rong <ro...@startree.ai> --- .../apache/pinot/query/runtime/QueryRunner.java | 15 +- .../pinot/query/runtime/blocks/BaseDataBlock.java | 697 +++++++++++++++++++++ .../query/runtime/blocks/ColumnarDataBlock.java | 96 +++ .../query/runtime/blocks/DataBlockBuilder.java | 406 ++++++++++++ .../pinot/query/runtime/blocks/DataBlockUtils.java | 174 +++++ .../query/runtime/blocks/DataTableBlockUtils.java | 71 --- .../pinot/query/runtime/blocks/MetadataBlock.java | 66 ++ .../pinot/query/runtime/blocks/RowDataBlock.java | 90 +++ ...{DataTableBlock.java => TransferableBlock.java} | 37 +- .../runtime/executor/WorkerQueryExecutor.java | 16 +- .../query/runtime/operator/HashJoinOperator.java | 51 +- .../runtime/operator/MailboxReceiveOperator.java | 19 +- .../runtime/operator/MailboxSendOperator.java | 48 +- .../pinot/query/service/QueryDispatcher.java | 54 +- .../query/mailbox/GrpcMailboxServiceTest.java | 7 +- .../pinot/query/runtime/QueryRunnerTest.java | 52 +- .../pinot/query/runtime/blocks/DataBlockTest.java | 80 +++ .../query/runtime/blocks/DataBlockTestUtils.java | 181 ++++++ .../pinot/query/service/QueryDispatcherTest.java | 92 +++ 19 files changed, 2034 insertions(+), 218 deletions(-) diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index f33cde43c2..e25c6e03b8 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -18,6 +18,8 @@ */ package org.apache.pinot.query.runtime; +import java.io.IOException; +import java.nio.ByteBuffer; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutorService; @@ -32,6 +34,8 @@ import org.apache.pinot.query.mailbox.GrpcMailboxService; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.StageMetadata; import org.apache.pinot.query.planner.stage.MailboxSendNode; +import org.apache.pinot.query.runtime.blocks.BaseDataBlock; +import org.apache.pinot.query.runtime.blocks.DataBlockUtils; import org.apache.pinot.query.runtime.executor.WorkerQueryExecutor; import org.apache.pinot.query.runtime.operator.MailboxSendOperator; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; @@ -94,12 +98,19 @@ public class QueryRunner { ServerRequestUtils.constructServerQueryRequest(distributedStagePlan, requestMetadataMap); // send the data table via mailbox in one-off fashion (e.g. no block-level split, one data table/partition key) - DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, executorService, null); + BaseDataBlock dataBlock; + try { + DataTable dataTable = _serverExecutor.processQuery(serverQueryRequest, executorService, null); + // this works because default DataTableImplV3 will have ordinal 0, which maps to ROW(0) + dataBlock = DataBlockUtils.getDataBlock(ByteBuffer.wrap(dataTable.toBytes())); + } catch (IOException e) { + throw new RuntimeException("Unable to convert byte buffer", e); + } MailboxSendNode sendNode = (MailboxSendNode) distributedStagePlan.getStageRoot(); StageMetadata receivingStageMetadata = distributedStagePlan.getMetadataMap().get(sendNode.getReceiverStageId()); MailboxSendOperator mailboxSendOperator = - new MailboxSendOperator(_mailboxService, dataTable, receivingStageMetadata.getServerInstances(), + new MailboxSendOperator(_mailboxService, dataBlock, receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _hostname, _port, serverQueryRequest.getRequestId(), sendNode.getStageId()); mailboxSendOperator.nextBlock(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BaseDataBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BaseDataBlock.java new file mode 100644 index 0000000000..97d007b58b --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/BaseDataBlock.java @@ -0,0 +1,697 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.blocks; + +import com.google.common.primitives.Ints; +import com.google.common.primitives.Longs; +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import org.apache.pinot.common.response.ProcessingException; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataTable; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.common.datatable.DataTableUtils; +import org.apache.pinot.core.query.request.context.ThreadTimer; +import org.apache.pinot.spi.utils.BigDecimalUtils; +import org.apache.pinot.spi.utils.ByteArray; + +import static java.nio.charset.StandardCharsets.UTF_8; + + +/** + * Base data block mostly replicating implementation of {@link org.apache.pinot.core.common.datatable.DataTableImplV3}. + * + * +-----------------------------------------------+ + * | 13 integers of header: | + * | VERSION | + * | NUM_ROWS | + * | NUM_COLUMNS | + * | EXCEPTIONS SECTION START OFFSET | + * | EXCEPTIONS SECTION LENGTH | + * | DICTIONARY_MAP SECTION START OFFSET | + * | DICTIONARY_MAP SECTION LENGTH | + * | DATA_SCHEMA SECTION START OFFSET | + * | DATA_SCHEMA SECTION LENGTH | + * | FIXED_SIZE_DATA SECTION START OFFSET | + * | FIXED_SIZE_DATA SECTION LENGTH | + * | VARIABLE_SIZE_DATA SECTION START OFFSET | + * | VARIABLE_SIZE_DATA SECTION LENGTH | + * +-----------------------------------------------+ + * | EXCEPTIONS SECTION | + * +-----------------------------------------------+ + * | DICTIONARY_MAP SECTION | + * +-----------------------------------------------+ + * | DATA_SCHEMA SECTION | + * +-----------------------------------------------+ + * | FIXED_SIZE_DATA SECTION | + * +-----------------------------------------------+ + * | VARIABLE_SIZE_DATA SECTION | + * +-----------------------------------------------+ + * | METADATA LENGTH | + * | METADATA SECTION | + * +-----------------------------------------------+ + * + * To support both row and columnar data format. the size of the data payload will be exactly the same. the only + * difference is the data layout in FIXED_SIZE_DATA and VARIABLE_SIZE_DATA section, see each impl for details. + */ +@SuppressWarnings("DuplicatedCode") +public abstract class BaseDataBlock implements DataTable { + protected static final int HEADER_SIZE = Integer.BYTES * 13; + // _errCodeToExceptionMap stores exceptions as a map of errorCode->errorMessage + protected Map<Integer, String> _errCodeToExceptionMap; + + protected int _numRows; + protected int _numColumns; + protected DataSchema _dataSchema; + protected Map<String, Map<Integer, String>> _dictionaryMap; + protected byte[] _fixedSizeDataBytes; + protected ByteBuffer _fixedSizeData; + protected byte[] _variableSizeDataBytes; + protected ByteBuffer _variableSizeData; + protected Map<String, String> _metadata; + + /** + * construct a base data block. + * @param numRows num of rows in the block + * @param dataSchema schema of the data in the block + * @param dictionaryMap dictionary encoding map + * @param fixedSizeDataBytes byte[] for fix-sized columns. + * @param variableSizeDataBytes byte[] for variable length columns (arrays). + */ + public BaseDataBlock(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap, + byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) { + _numRows = numRows; + _numColumns = dataSchema.size(); + _dataSchema = dataSchema; + _dictionaryMap = dictionaryMap; + _fixedSizeDataBytes = fixedSizeDataBytes; + _fixedSizeData = ByteBuffer.wrap(fixedSizeDataBytes); + _variableSizeDataBytes = variableSizeDataBytes; + _variableSizeData = ByteBuffer.wrap(variableSizeDataBytes); + _metadata = new HashMap<>(); + _errCodeToExceptionMap = new HashMap<>(); + } + + /** + * Construct empty data table. + */ + public BaseDataBlock() { + _numRows = 0; + _numColumns = 0; + _dataSchema = null; + _dictionaryMap = null; + _fixedSizeDataBytes = null; + _fixedSizeData = null; + _variableSizeDataBytes = null; + _variableSizeData = null; + _metadata = new HashMap<>(); + _errCodeToExceptionMap = new HashMap<>(); + } + + public BaseDataBlock(ByteBuffer byteBuffer) + throws IOException { + // Read header. + _numRows = byteBuffer.getInt(); + _numColumns = byteBuffer.getInt(); + int exceptionsStart = byteBuffer.getInt(); + int exceptionsLength = byteBuffer.getInt(); + int dictionaryMapStart = byteBuffer.getInt(); + int dictionaryMapLength = byteBuffer.getInt(); + int dataSchemaStart = byteBuffer.getInt(); + int dataSchemaLength = byteBuffer.getInt(); + int fixedSizeDataStart = byteBuffer.getInt(); + int fixedSizeDataLength = byteBuffer.getInt(); + int variableSizeDataStart = byteBuffer.getInt(); + int variableSizeDataLength = byteBuffer.getInt(); + + + // Read exceptions. + if (exceptionsLength != 0) { + byteBuffer.position(exceptionsStart); + _errCodeToExceptionMap = deserializeExceptions(byteBuffer); + } else { + _errCodeToExceptionMap = new HashMap<>(); + } + + // Read dictionary. + if (dictionaryMapLength != 0) { + byteBuffer.position(dictionaryMapStart); + _dictionaryMap = deserializeDictionaryMap(byteBuffer); + } else { + _dictionaryMap = null; + } + + // Read data schema. + if (dataSchemaLength != 0) { + byteBuffer.position(dataSchemaStart); + _dataSchema = DataSchema.fromBytes(byteBuffer); + } else { + _dataSchema = null; + } + + // Read fixed size data. + if (fixedSizeDataLength != 0) { + _fixedSizeDataBytes = new byte[fixedSizeDataLength]; + byteBuffer.position(fixedSizeDataStart); + byteBuffer.get(_fixedSizeDataBytes); + _fixedSizeData = ByteBuffer.wrap(_fixedSizeDataBytes); + } else { + _fixedSizeDataBytes = null; + _fixedSizeData = null; + } + + // Read variable size data. + if (variableSizeDataLength != 0) { + _variableSizeDataBytes = new byte[variableSizeDataLength]; + byteBuffer.position(variableSizeDataStart); + byteBuffer.get(_variableSizeDataBytes); + _variableSizeData = ByteBuffer.wrap(_variableSizeDataBytes); + } else { + _variableSizeDataBytes = null; + _variableSizeData = null; + } + + // Read metadata. + int metadataLength = byteBuffer.getInt(); + if (metadataLength != 0) { + _metadata = deserializeMetadata(byteBuffer); + } + } + + /** + * Return the int serialized form of the data block version and type. + * @return + */ + protected abstract int getDataBlockVersionType(); + + /** + * position the {@code _fixedSizeDataBytes} member variable to the corresponding row/column ID. + * @param rowId row ID + * @param colId column ID + */ + protected abstract void positionCursorInFixSizedBuffer(int rowId, int colId); + + /** + * position the {@code _variableSizeDataBytes} member variable to the corresponding row/column ID. and return the + * length of bytes to extract from the variable size buffer. + * + * @param rowId row ID + * @param colId column ID + * @return the length to extract from variable size buffer. + */ + protected abstract int positionCursorInVariableBuffer(int rowId, int colId); + + @Override + public Map<String, String> getMetadata() { + return _metadata; + } + + @Override + public DataSchema getDataSchema() { + return _dataSchema; + } + + @Override + public int getNumberOfRows() { + return _numRows; + } + + // -------------------------------------------------------------------------- + // Fixed sized element access. + // -------------------------------------------------------------------------- + + @Override + public int getInt(int rowId, int colId) { + positionCursorInFixSizedBuffer(rowId, colId); + return _fixedSizeData.getInt(); + } + + @Override + public long getLong(int rowId, int colId) { + positionCursorInFixSizedBuffer(rowId, colId); + return _fixedSizeData.getLong(); + } + + @Override + public float getFloat(int rowId, int colId) { + positionCursorInFixSizedBuffer(rowId, colId); + return _fixedSizeData.getFloat(); + } + + @Override + public double getDouble(int rowId, int colId) { + positionCursorInFixSizedBuffer(rowId, colId); + return _fixedSizeData.getDouble(); + } + + @Override + public BigDecimal getBigDecimal(int rowId, int colId) { + int size = positionCursorInVariableBuffer(rowId, colId); + ByteBuffer byteBuffer = _variableSizeData.slice(); + byteBuffer.limit(size); + return BigDecimalUtils.deserialize(byteBuffer); + } + + @Override + public String getString(int rowId, int colId) { + positionCursorInFixSizedBuffer(rowId, colId); + int dictId = _fixedSizeData.getInt(); + return _dictionaryMap.get(_dataSchema.getColumnName(colId)).get(dictId); + } + + @Override + public ByteArray getBytes(int rowId, int colId) { + int size = positionCursorInVariableBuffer(rowId, colId); + byte[] buffer = new byte[size]; + _variableSizeData.get(buffer); + return new ByteArray(buffer); + } + + // -------------------------------------------------------------------------- + // Variable sized element access. + // -------------------------------------------------------------------------- + + @Override + public <T> T getObject(int rowId, int colId) { + int size = positionCursorInVariableBuffer(rowId, colId); + int objectTypeValue = _variableSizeData.getInt(); + ByteBuffer byteBuffer = _variableSizeData.slice(); + byteBuffer.limit(size); + return ObjectSerDeUtils.deserialize(byteBuffer, objectTypeValue); + } + + @Override + public int[] getIntArray(int rowId, int colId) { + int length = positionCursorInVariableBuffer(rowId, colId); + int[] ints = new int[length]; + for (int i = 0; i < length; i++) { + ints[i] = _variableSizeData.getInt(); + } + return ints; + } + + @Override + public long[] getLongArray(int rowId, int colId) { + int length = positionCursorInVariableBuffer(rowId, colId); + long[] longs = new long[length]; + for (int i = 0; i < length; i++) { + longs[i] = _variableSizeData.getLong(); + } + return longs; + } + + @Override + public float[] getFloatArray(int rowId, int colId) { + int length = positionCursorInVariableBuffer(rowId, colId); + float[] floats = new float[length]; + for (int i = 0; i < length; i++) { + floats[i] = _variableSizeData.getFloat(); + } + return floats; + } + + @Override + public double[] getDoubleArray(int rowId, int colId) { + int length = positionCursorInVariableBuffer(rowId, colId); + double[] doubles = new double[length]; + for (int i = 0; i < length; i++) { + doubles[i] = _variableSizeData.getDouble(); + } + return doubles; + } + + @Override + public String[] getStringArray(int rowId, int colId) { + int length = positionCursorInVariableBuffer(rowId, colId); + String[] strings = new String[length]; + Map<Integer, String> dictionary = _dictionaryMap.get(_dataSchema.getColumnName(colId)); + for (int i = 0; i < length; i++) { + strings[i] = dictionary.get(_variableSizeData.getInt()); + } + return strings; + } + + // -------------------------------------------------------------------------- + // Ser/De and exception handling + // -------------------------------------------------------------------------- + + /** + * Helper method to serialize dictionary map. + */ + protected byte[] serializeDictionaryMap() + throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + + dataOutputStream.writeInt(_dictionaryMap.size()); + for (Map.Entry<String, Map<Integer, String>> dictionaryMapEntry : _dictionaryMap.entrySet()) { + String columnName = dictionaryMapEntry.getKey(); + Map<Integer, String> dictionary = dictionaryMapEntry.getValue(); + byte[] bytes = columnName.getBytes(UTF_8); + dataOutputStream.writeInt(bytes.length); + dataOutputStream.write(bytes); + dataOutputStream.writeInt(dictionary.size()); + + for (Map.Entry<Integer, String> dictionaryEntry : dictionary.entrySet()) { + dataOutputStream.writeInt(dictionaryEntry.getKey()); + byte[] valueBytes = dictionaryEntry.getValue().getBytes(UTF_8); + dataOutputStream.writeInt(valueBytes.length); + dataOutputStream.write(valueBytes); + } + } + + return byteArrayOutputStream.toByteArray(); + } + + /** + * Helper method to deserialize dictionary map. + */ + protected Map<String, Map<Integer, String>> deserializeDictionaryMap(ByteBuffer buffer) + throws IOException { + int numDictionaries = buffer.getInt(); + Map<String, Map<Integer, String>> dictionaryMap = new HashMap<>(numDictionaries); + + for (int i = 0; i < numDictionaries; i++) { + String column = DataTableUtils.decodeString(buffer); + int dictionarySize = buffer.getInt(); + Map<Integer, String> dictionary = new HashMap<>(dictionarySize); + for (int j = 0; j < dictionarySize; j++) { + int key = buffer.getInt(); + String value = DataTableUtils.decodeString(buffer); + dictionary.put(key, value); + } + dictionaryMap.put(column, dictionary); + } + + return dictionaryMap; + } + + @Override + public void addException(ProcessingException processingException) { + _errCodeToExceptionMap.put(processingException.getErrorCode(), processingException.getMessage()); + } + + @Override + public void addException(int errCode, String errMsg) { + _errCodeToExceptionMap.put(errCode, errMsg); + } + + @Override + public Map<Integer, String> getExceptions() { + return _errCodeToExceptionMap; + } + + @Override + public byte[] toBytes() + throws IOException { + ThreadTimer threadTimer = new ThreadTimer(); + + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + writeLeadingSections(dataOutputStream); + + // Add table serialization time metadata if thread timer is enabled. + if (ThreadTimer.isThreadCpuTimeMeasurementEnabled()) { + long responseSerializationCpuTimeNs = threadTimer.getThreadTimeNs(); + getMetadata().put(MetadataKey.RESPONSE_SER_CPU_TIME_NS.getName(), String.valueOf(responseSerializationCpuTimeNs)); + } + + // Write metadata: length followed by actual metadata bytes. + // NOTE: We ignore metadata serialization time in "responseSerializationCpuTimeNs" as it's negligible while + // considering it will bring a lot code complexity. + byte[] metadataBytes = serializeMetadata(); + dataOutputStream.writeInt(metadataBytes.length); + dataOutputStream.write(metadataBytes); + + return byteArrayOutputStream.toByteArray(); + } + + private void writeLeadingSections(DataOutputStream dataOutputStream) + throws IOException { + dataOutputStream.writeInt(getDataBlockVersionType()); + dataOutputStream.writeInt(_numRows); + dataOutputStream.writeInt(_numColumns); + int dataOffset = HEADER_SIZE; + + // Write exceptions section offset(START|SIZE). + dataOutputStream.writeInt(dataOffset); + byte[] exceptionsBytes; + exceptionsBytes = serializeExceptions(); + dataOutputStream.writeInt(exceptionsBytes.length); + dataOffset += exceptionsBytes.length; + + // Write dictionary map section offset(START|SIZE). + dataOutputStream.writeInt(dataOffset); + byte[] dictionaryMapBytes = null; + if (_dictionaryMap != null) { + dictionaryMapBytes = serializeDictionaryMap(); + dataOutputStream.writeInt(dictionaryMapBytes.length); + dataOffset += dictionaryMapBytes.length; + } else { + dataOutputStream.writeInt(0); + } + + // Write data schema section offset(START|SIZE). + dataOutputStream.writeInt(dataOffset); + byte[] dataSchemaBytes = null; + if (_dataSchema != null) { + dataSchemaBytes = _dataSchema.toBytes(); + dataOutputStream.writeInt(dataSchemaBytes.length); + dataOffset += dataSchemaBytes.length; + } else { + dataOutputStream.writeInt(0); + } + + // Write fixed size data section offset(START|SIZE). + dataOutputStream.writeInt(dataOffset); + if (_fixedSizeDataBytes != null) { + dataOutputStream.writeInt(_fixedSizeDataBytes.length); + dataOffset += _fixedSizeDataBytes.length; + } else { + dataOutputStream.writeInt(0); + } + + // Write variable size data section offset(START|SIZE). + dataOutputStream.writeInt(dataOffset); + if (_variableSizeDataBytes != null) { + dataOutputStream.writeInt(_variableSizeDataBytes.length); + } else { + dataOutputStream.writeInt(0); + } + + // Write actual data. + // Write exceptions bytes. + dataOutputStream.write(exceptionsBytes); + // Write dictionary map bytes. + if (dictionaryMapBytes != null) { + dataOutputStream.write(dictionaryMapBytes); + } + // Write data schema bytes. + if (dataSchemaBytes != null) { + dataOutputStream.write(dataSchemaBytes); + } + // Write fixed size data bytes. + if (_fixedSizeDataBytes != null) { + dataOutputStream.write(_fixedSizeDataBytes); + } + // Write variable size data bytes. + if (_variableSizeDataBytes != null) { + dataOutputStream.write(_variableSizeDataBytes); + } + } + + /** + * Serialize metadata section to bytes. + * Format of the bytes looks like: + * [numEntries, bytesOfKV2, bytesOfKV2, bytesOfKV3] + * For each KV pair: + * - if the value type is String, encode it as: [enumKeyOrdinal, valueLength, Utf8EncodedValue]. + * - if the value type is int, encode it as: [enumKeyOrdinal, bigEndianRepresentationOfIntValue] + * - if the value type is long, encode it as: [enumKeyOrdinal, bigEndianRepresentationOfLongValue] + * + * Unlike V2, where numeric metadata values (int and long) in V3 are encoded in UTF-8 in the wire format, + * in V3 big endian representation is used. + */ + private byte[] serializeMetadata() + throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + + dataOutputStream.writeInt(_metadata.size()); + + for (Map.Entry<String, String> entry : _metadata.entrySet()) { + MetadataKey key = MetadataKey.getByName(entry.getKey()); + // Ignore unknown keys. + if (key == null) { + continue; + } + String value = entry.getValue(); + dataOutputStream.writeInt(key.ordinal()); + if (key.getValueType() == MetadataValueType.INT) { + dataOutputStream.write(Ints.toByteArray(Integer.parseInt(value))); + } else if (key.getValueType() == MetadataValueType.LONG) { + dataOutputStream.write(Longs.toByteArray(Long.parseLong(value))); + } else { + byte[] valueBytes = value.getBytes(UTF_8); + dataOutputStream.writeInt(valueBytes.length); + dataOutputStream.write(valueBytes); + } + } + + return byteArrayOutputStream.toByteArray(); + } + + /** + * Even though the wire format of V3 uses UTF-8 for string/bytes and big-endian for numeric values, + * the in-memory representation is STRING based for processing the metadata before serialization + * (by the server as it adds the statistics in metadata) and after deserialization (by the broker as it receives + * DataTable from each server and aggregates the values). + * This is to make V3 implementation keep the consumers of Map<String, String> getMetadata() API in the code happy + * by internally converting it. + * + * This method use relative operations on the ByteBuffer and expects the buffer's position to be set correctly. + */ + private Map<String, String> deserializeMetadata(ByteBuffer buffer) + throws IOException { + int numEntries = buffer.getInt(); + Map<String, String> metadata = new HashMap<>(); + for (int i = 0; i < numEntries; i++) { + int keyId = buffer.getInt(); + MetadataKey key = MetadataKey.getByOrdinal(keyId); + // Ignore unknown keys. + if (key == null) { + continue; + } + if (key.getValueType() == MetadataValueType.INT) { + String value = "" + buffer.getInt(); + metadata.put(key.getName(), value); + } else if (key.getValueType() == MetadataValueType.LONG) { + String value = "" + buffer.getLong(); + metadata.put(key.getName(), value); + } else { + String value = DataTableUtils.decodeString(buffer); + metadata.put(key.getName(), value); + } + } + return metadata; + } + + private byte[] serializeExceptions() + throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream); + + dataOutputStream.writeInt(_errCodeToExceptionMap.size()); + + for (Map.Entry<Integer, String> entry : _errCodeToExceptionMap.entrySet()) { + int key = entry.getKey(); + String value = entry.getValue(); + byte[] valueBytes = value.getBytes(UTF_8); + dataOutputStream.writeInt(key); + dataOutputStream.writeInt(valueBytes.length); + dataOutputStream.write(valueBytes); + } + + return byteArrayOutputStream.toByteArray(); + } + + private Map<Integer, String> deserializeExceptions(ByteBuffer buffer) + throws IOException { + int numExceptions = buffer.getInt(); + Map<Integer, String> exceptions = new HashMap<>(numExceptions); + for (int i = 0; i < numExceptions; i++) { + int errCode = buffer.getInt(); + String errMessage = DataTableUtils.decodeString(buffer); + exceptions.put(errCode, errMessage); + } + return exceptions; + } + + @Override + public String toString() { + if (_dataSchema == null) { + return _metadata.toString(); + } + + StringBuilder stringBuilder = new StringBuilder(); + stringBuilder.append(_dataSchema.toString()).append('\n'); + stringBuilder.append("numRows: ").append(_numRows).append('\n'); + + DataSchema.ColumnDataType[] storedColumnDataTypes = _dataSchema.getStoredColumnDataTypes(); + _fixedSizeData.position(0); + for (int rowId = 0; rowId < _numRows; rowId++) { + for (int colId = 0; colId < _numColumns; colId++) { + switch (storedColumnDataTypes[colId]) { + case INT: + stringBuilder.append(_fixedSizeData.getInt()); + break; + case LONG: + stringBuilder.append(_fixedSizeData.getLong()); + break; + case FLOAT: + stringBuilder.append(_fixedSizeData.getFloat()); + break; + case DOUBLE: + stringBuilder.append(_fixedSizeData.getDouble()); + break; + case STRING: + stringBuilder.append(_fixedSizeData.getInt()); + break; + // Object and array. + default: + stringBuilder.append(String.format("(%s:%s)", _fixedSizeData.getInt(), _fixedSizeData.getInt())); + break; + } + stringBuilder.append("\t"); + } + stringBuilder.append("\n"); + } + return stringBuilder.toString(); + } + + public enum Type { + ROW(0), + COLUMNAR(1), + METADATA(2); + + private final int _ordinal; + + Type(int ordinal) { + _ordinal = ordinal; + } + + public static Type fromOrdinal(int ordinal) { + switch (ordinal) { + case 0: + return ROW; + case 1: + return COLUMNAR; + case 2: + return METADATA; + default: + throw new IllegalArgumentException("Invalid ordinal: " + ordinal); + } + } + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ColumnarDataBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ColumnarDataBlock.java new file mode 100644 index 0000000000..8510043251 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/ColumnarDataBlock.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.blocks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.pinot.common.utils.DataSchema; + + +/** + * Column-wise data table. It stores data in columnar-major format. + */ +public class ColumnarDataBlock extends BaseDataBlock { + private static final int VERSION = 1; + protected int[] _cumulativeColumnOffsetSizeInBytes; + protected int[] _columnSizeInBytes; + + public ColumnarDataBlock() { + super(); + } + + public ColumnarDataBlock(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap, + byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) { + super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes, variableSizeDataBytes); + computeBlockObjectConstants(); + } + + public ColumnarDataBlock(ByteBuffer byteBuffer) + throws IOException { + super(byteBuffer); + computeBlockObjectConstants(); + } + + protected void computeBlockObjectConstants() { + if (_dataSchema != null) { + _cumulativeColumnOffsetSizeInBytes = new int[_numColumns]; + _columnSizeInBytes = new int[_numColumns]; + DataBlockUtils.computeColumnSizeInBytes(_dataSchema, _columnSizeInBytes); + int cumulativeColumnOffset = 0; + for (int i = 0; i < _numColumns; i++) { + _cumulativeColumnOffsetSizeInBytes[i] = cumulativeColumnOffset; + cumulativeColumnOffset += _columnSizeInBytes[i] * _numRows; + } + } + } + + @Override + protected int getDataBlockVersionType() { + return VERSION + (Type.COLUMNAR.ordinal() << DataBlockUtils.VERSION_TYPE_SHIFT); + } + + @Override + protected void positionCursorInFixSizedBuffer(int rowId, int colId) { + int position = _cumulativeColumnOffsetSizeInBytes[colId] + _columnSizeInBytes[colId] * rowId; + _fixedSizeData.position(position); + } + + @Override + protected int positionCursorInVariableBuffer(int rowId, int colId) { + positionCursorInFixSizedBuffer(rowId, colId); + _variableSizeData.position(_fixedSizeData.getInt()); + return _fixedSizeData.getInt(); + } + + @Override + public ColumnarDataBlock toMetadataOnlyDataTable() { + ColumnarDataBlock metadataOnlyDataTable = new ColumnarDataBlock(); + metadataOnlyDataTable._metadata.putAll(_metadata); + metadataOnlyDataTable._errCodeToExceptionMap.putAll(_errCodeToExceptionMap); + return metadataOnlyDataTable; + } + + @Override + public ColumnarDataBlock toDataOnlyDataTable() { + return new ColumnarDataBlock(_numRows, _dataSchema, _dictionaryMap, _fixedSizeDataBytes, _variableSizeDataBytes); + } + + // TODO: add whole-column access methods. +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockBuilder.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockBuilder.java new file mode 100644 index 0000000000..0c456c1d85 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockBuilder.java @@ -0,0 +1,406 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.blocks; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.math.BigDecimal; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.core.common.ObjectSerDeUtils; +import org.apache.pinot.core.common.datatable.DataTableUtils; +import org.apache.pinot.spi.utils.ArrayCopyUtils; +import org.apache.pinot.spi.utils.BigDecimalUtils; +import org.apache.pinot.spi.utils.ByteArray; + + +public class DataBlockBuilder { + private final DataSchema _dataSchema; + private final BaseDataBlock.Type _blockType; + private final DataSchema.ColumnDataType[] _columnDataType; + + private int[] _columnOffsets; + private int _rowSizeInBytes; + private int[] _cumulativeColumnOffsetSizeInBytes; + private int[] _columnSizeInBytes; + + private int _numRows; + private int _numColumns; + + private final Map<String, Map<String, Integer>> _dictionaryMap = new HashMap<>(); + private final Map<String, Map<Integer, String>> _reverseDictionaryMap = new HashMap<>(); + private final ByteArrayOutputStream _fixedSizeDataByteArrayOutputStream = new ByteArrayOutputStream(); + private final ByteArrayOutputStream _variableSizeDataByteArrayOutputStream = new ByteArrayOutputStream(); + private final DataOutputStream _variableSizeDataOutputStream = + new DataOutputStream(_variableSizeDataByteArrayOutputStream); + + + private ByteBuffer _currentRowDataByteBuffer; + + private DataBlockBuilder(DataSchema dataSchema, BaseDataBlock.Type blockType) { + _dataSchema = dataSchema; + _columnDataType = dataSchema.getStoredColumnDataTypes(); + _blockType = blockType; + _numColumns = dataSchema.size(); + if (_blockType == BaseDataBlock.Type.COLUMNAR) { + _cumulativeColumnOffsetSizeInBytes = new int[_numColumns]; + _columnSizeInBytes = new int[_numColumns]; + DataBlockUtils.computeColumnSizeInBytes(_dataSchema, _columnSizeInBytes); + int cumulativeColumnOffset = 0; + for (int i = 0; i < _numColumns; i++) { + _cumulativeColumnOffsetSizeInBytes[i] = cumulativeColumnOffset; + cumulativeColumnOffset += _columnSizeInBytes[i] * _numRows; + } + } else if (_blockType == BaseDataBlock.Type.ROW) { + _columnOffsets = new int[_numColumns]; + _rowSizeInBytes = DataTableUtils.computeColumnOffsets(dataSchema, _columnOffsets); + } + } + + public static RowDataBlock buildFromRows(List<Object[]> rows, DataSchema dataSchema) + throws IOException { + DataBlockBuilder rowBuilder = new DataBlockBuilder(dataSchema, BaseDataBlock.Type.ROW); + rowBuilder._numRows = rows.size(); + for (Object[] row : rows) { + ByteBuffer byteBuffer = ByteBuffer.allocate(rowBuilder._rowSizeInBytes); + for (int i = 0; i < rowBuilder._numColumns; i++) { + Object value = row[i]; + switch (rowBuilder._columnDataType[i]) { + // Single-value column + case INT: + byteBuffer.putInt(((Number) value).intValue()); + break; + case LONG: + byteBuffer.putLong(((Number) value).longValue()); + break; + case FLOAT: + byteBuffer.putFloat(((Number) value).floatValue()); + break; + case DOUBLE: + byteBuffer.putDouble(((Number) value).doubleValue()); + break; + case BIG_DECIMAL: + setColumn(rowBuilder, byteBuffer, (BigDecimal) value); + break; + case STRING: + setColumn(rowBuilder, byteBuffer, i, (String) value); + break; + case BYTES: + setColumn(rowBuilder, byteBuffer, (ByteArray) value); + break; + case OBJECT: + setColumn(rowBuilder, byteBuffer, value); + break; + // Multi-value column + case BOOLEAN_ARRAY: + case INT_ARRAY: + setColumn(rowBuilder, byteBuffer, (int[]) value); + break; + case TIMESTAMP_ARRAY: + case LONG_ARRAY: + // LONG_ARRAY type covers INT_ARRAY and LONG_ARRAY + if (value instanceof int[]) { + int[] ints = (int[]) value; + int length = ints.length; + long[] longs = new long[length]; + ArrayCopyUtils.copy(ints, longs, length); + setColumn(rowBuilder, byteBuffer, longs); + } else { + setColumn(rowBuilder, byteBuffer, (long[]) value); + } + break; + case FLOAT_ARRAY: + setColumn(rowBuilder, byteBuffer, (float[]) value); + break; + case DOUBLE_ARRAY: + // DOUBLE_ARRAY type covers INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY and DOUBLE_ARRAY + if (value instanceof int[]) { + int[] ints = (int[]) value; + int length = ints.length; + double[] doubles = new double[length]; + ArrayCopyUtils.copy(ints, doubles, length); + setColumn(rowBuilder, byteBuffer, doubles); + } else if (value instanceof long[]) { + long[] longs = (long[]) value; + int length = longs.length; + double[] doubles = new double[length]; + ArrayCopyUtils.copy(longs, doubles, length); + setColumn(rowBuilder, byteBuffer, doubles); + } else if (value instanceof float[]) { + float[] floats = (float[]) value; + int length = floats.length; + double[] doubles = new double[length]; + ArrayCopyUtils.copy(floats, doubles, length); + setColumn(rowBuilder, byteBuffer, doubles); + } else { + setColumn(rowBuilder, byteBuffer, (double[]) value); + } + break; + case BYTES_ARRAY: + case STRING_ARRAY: + setColumn(rowBuilder, byteBuffer, i, (String[]) value); + break; + default: + throw new IllegalStateException(String.format( + "Unsupported data type: %s for column: %s", rowBuilder._columnDataType[i], + rowBuilder._dataSchema.getColumnName(i))); + } + } + rowBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 0, byteBuffer.position()); + } + return buildRowBlock(rowBuilder); + } + + public static ColumnarDataBlock buildFromColumns(List<Object[]> columns, DataSchema dataSchema) + throws IOException { + DataBlockBuilder columnarBuilder = new DataBlockBuilder(dataSchema, BaseDataBlock.Type.COLUMNAR); + for (int i = 0; i < columns.size(); i++) { + Object[] column = columns.get(i); + columnarBuilder._numRows = column.length; + ByteBuffer byteBuffer = ByteBuffer.allocate(columnarBuilder._numRows * columnarBuilder._columnSizeInBytes[i]); + switch (columnarBuilder._columnDataType[i]) { + // Single-value column + case INT: + for (Object value : column) { + byteBuffer.putInt(((Number) value).intValue()); + } + break; + case LONG: + for (Object value : column) { + byteBuffer.putLong(((Number) value).longValue()); + } + break; + case FLOAT: + for (Object value : column) { + byteBuffer.putFloat(((Number) value).floatValue()); + } + break; + case DOUBLE: + for (Object value : column) { + byteBuffer.putDouble(((Number) value).doubleValue()); + } + break; + case BIG_DECIMAL: + for (Object value : column) { + setColumn(columnarBuilder, byteBuffer, (BigDecimal) value); + } + break; + case STRING: + for (Object value : column) { + setColumn(columnarBuilder, byteBuffer, i, (String) value); + } + break; + case BYTES: + for (Object value : column) { + setColumn(columnarBuilder, byteBuffer, (ByteArray) value); + } + break; + case OBJECT: + for (Object value : column) { + setColumn(columnarBuilder, byteBuffer, value); + } + break; + // Multi-value column + case BOOLEAN_ARRAY: + case INT_ARRAY: + for (Object value : column) { + setColumn(columnarBuilder, byteBuffer, (int[]) value); + } + break; + case TIMESTAMP_ARRAY: + case LONG_ARRAY: + for (Object value : column) { + if (value instanceof int[]) { + // LONG_ARRAY type covers INT_ARRAY and LONG_ARRAY + int[] ints = (int[]) value; + int length = ints.length; + long[] longs = new long[length]; + ArrayCopyUtils.copy(ints, longs, length); + setColumn(columnarBuilder, byteBuffer, longs); + } else { + setColumn(columnarBuilder, byteBuffer, (long[]) value); + } + } + break; + case FLOAT_ARRAY: + for (Object value : column) { + setColumn(columnarBuilder, byteBuffer, (float[]) value); + } + break; + case DOUBLE_ARRAY: + for (Object value : column) { + // DOUBLE_ARRAY type covers INT_ARRAY, LONG_ARRAY, FLOAT_ARRAY and DOUBLE_ARRAY + if (value instanceof int[]) { + int[] ints = (int[]) value; + int length = ints.length; + double[] doubles = new double[length]; + ArrayCopyUtils.copy(ints, doubles, length); + setColumn(columnarBuilder, byteBuffer, doubles); + } else if (value instanceof long[]) { + long[] longs = (long[]) value; + int length = longs.length; + double[] doubles = new double[length]; + ArrayCopyUtils.copy(longs, doubles, length); + setColumn(columnarBuilder, byteBuffer, doubles); + } else if (value instanceof float[]) { + float[] floats = (float[]) value; + int length = floats.length; + double[] doubles = new double[length]; + ArrayCopyUtils.copy(floats, doubles, length); + setColumn(columnarBuilder, byteBuffer, doubles); + } else { + setColumn(columnarBuilder, byteBuffer, (double[]) value); + } + } + break; + case BYTES_ARRAY: + case STRING_ARRAY: + for (Object value : column) { + setColumn(columnarBuilder, byteBuffer, i, (String[]) value); + } + break; + default: + throw new IllegalStateException(String.format( + "Unsupported data type: %s for column: %s", columnarBuilder._columnDataType[i], + columnarBuilder._dataSchema.getColumnName(i))); + } + columnarBuilder._fixedSizeDataByteArrayOutputStream.write(byteBuffer.array(), 0, byteBuffer.position()); + } + return buildColumnarBlock(columnarBuilder); + } + + private static RowDataBlock buildRowBlock(DataBlockBuilder builder) { + return new RowDataBlock(builder._numRows, builder._dataSchema, builder._reverseDictionaryMap, + builder._fixedSizeDataByteArrayOutputStream.toByteArray(), + builder._variableSizeDataByteArrayOutputStream.toByteArray()); + } + + private static ColumnarDataBlock buildColumnarBlock(DataBlockBuilder builder) { + return new ColumnarDataBlock(builder._numRows, builder._dataSchema, builder._reverseDictionaryMap, + builder._fixedSizeDataByteArrayOutputStream.toByteArray(), + builder._variableSizeDataByteArrayOutputStream.toByteArray()); + } + + private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, BigDecimal value) + throws IOException { + byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size()); + byte[] bytes = BigDecimalUtils.serialize(value); + byteBuffer.putInt(bytes.length); + builder._variableSizeDataByteArrayOutputStream.write(bytes); + } + + private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, int colId, String value) { + String columnName = builder._dataSchema.getColumnName(colId); + Map<String, Integer> dictionary = builder._dictionaryMap.get(columnName); + if (dictionary == null) { + dictionary = new HashMap<>(); + builder._dictionaryMap.put(columnName, dictionary); + builder._reverseDictionaryMap.put(columnName, new HashMap<>()); + } + Integer dictId = dictionary.get(value); + if (dictId == null) { + dictId = dictionary.size(); + dictionary.put(value, dictId); + builder._reverseDictionaryMap.get(columnName).put(dictId, value); + } + byteBuffer.putInt(dictId); + } + + private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, ByteArray value) + throws IOException { + byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size()); + byte[] bytes = value.getBytes(); + byteBuffer.putInt(bytes.length); + builder._variableSizeDataByteArrayOutputStream.write(bytes); + } + + private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, Object value) + throws IOException { + byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size()); + int objectTypeValue = ObjectSerDeUtils.ObjectType.getObjectType(value).getValue(); + byte[] bytes = ObjectSerDeUtils.serialize(value, objectTypeValue); + byteBuffer.putInt(bytes.length); + builder._variableSizeDataOutputStream.writeInt(objectTypeValue); + builder._variableSizeDataByteArrayOutputStream.write(bytes); + } + + private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, int[] values) + throws IOException { + byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size()); + byteBuffer.putInt(values.length); + for (int value : values) { + builder._variableSizeDataOutputStream.writeInt(value); + } + } + + private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, long[] values) + throws IOException { + byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size()); + byteBuffer.putInt(values.length); + for (long value : values) { + builder._variableSizeDataOutputStream.writeLong(value); + } + } + + private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, float[] values) + throws IOException { + byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size()); + byteBuffer.putInt(values.length); + for (float value : values) { + builder._variableSizeDataOutputStream.writeFloat(value); + } + } + + private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, double[] values) + throws IOException { + byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size()); + byteBuffer.putInt(values.length); + for (double value : values) { + builder._variableSizeDataOutputStream.writeDouble(value); + } + } + + private static void setColumn(DataBlockBuilder builder, ByteBuffer byteBuffer, int colId, String[] values) + throws IOException { + byteBuffer.putInt(builder._variableSizeDataByteArrayOutputStream.size()); + byteBuffer.putInt(values.length); + + String columnName = builder._dataSchema.getColumnName(colId); + Map<String, Integer> dictionary = builder._dictionaryMap.get(columnName); + if (dictionary == null) { + dictionary = new HashMap<>(); + builder._dictionaryMap.put(columnName, dictionary); + builder._reverseDictionaryMap.put(columnName, new HashMap<>()); + } + + for (String value : values) { + Integer dictId = dictionary.get(value); + if (dictId == null) { + dictId = dictionary.size(); + dictionary.put(value, dictId); + builder._reverseDictionaryMap.get(columnName).put(dictId, value); + } + builder._variableSizeDataOutputStream.writeInt(dictId); + } + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockUtils.java new file mode 100644 index 0000000000..690be1353a --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataBlockUtils.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.blocks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.response.ProcessingException; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataTable; + + +public final class DataBlockUtils { + protected static final int VERSION_TYPE_SHIFT = 5; + private DataBlockUtils() { + // do not instantiate. + } + + private static final DataSchema EMPTY_SCHEMA = new DataSchema(new String[0], new DataSchema.ColumnDataType[0]); + private static final MetadataBlock EOS_DATA_BLOCK = new MetadataBlock(EMPTY_SCHEMA); + static { + EOS_DATA_BLOCK._metadata.put(DataTable.MetadataKey.TABLE.getName(), "END_OF_STREAM"); + } + private static final TransferableBlock EOS_TRANSFERABLE_BLOCK = new TransferableBlock(EOS_DATA_BLOCK); + + public static TransferableBlock getEndOfStreamTransferableBlock() { + return EOS_TRANSFERABLE_BLOCK; + } + + public static MetadataBlock getEndOfStreamDataBlock() { + return EOS_DATA_BLOCK; + } + + public static MetadataBlock getErrorDataBlock(Exception e) { + MetadataBlock errorBlock = new MetadataBlock(EMPTY_SCHEMA); + errorBlock._metadata.put(DataTable.MetadataKey.TABLE.getName(), "ERROR"); + if (e instanceof ProcessingException) { + errorBlock.addException(((ProcessingException) e).getErrorCode(), e.getMessage()); + } else { + errorBlock.addException(QueryException.UNKNOWN_ERROR_CODE, e.getMessage()); + } + return errorBlock; + } + + public static TransferableBlock getErrorTransferableBlock(Exception e) { + return new TransferableBlock(getErrorDataBlock(e)); + } + + public static MetadataBlock getEmptyDataBlock(DataSchema dataSchema) { + return dataSchema == null ? EOS_DATA_BLOCK : new MetadataBlock(dataSchema); + } + + public static TransferableBlock getEmptyTransferableBlock(DataSchema dataSchema) { + return new TransferableBlock(getEmptyDataBlock(dataSchema)); + } + + public static boolean isEndOfStream(TransferableBlock transferableBlock) { + return transferableBlock.getType().equals(BaseDataBlock.Type.METADATA) + && "END_OF_STREAM".equals(transferableBlock.getDataBlock().getMetadata() + .get(DataTable.MetadataKey.TABLE.getName())); + } + + public static BaseDataBlock getDataBlock(ByteBuffer byteBuffer) + throws IOException { + int versionType = byteBuffer.getInt(); + int version = versionType & ((1 << VERSION_TYPE_SHIFT) - 1); + BaseDataBlock.Type type = BaseDataBlock.Type.fromOrdinal(versionType >> VERSION_TYPE_SHIFT); + switch (type) { + case COLUMNAR: + return new ColumnarDataBlock(byteBuffer); + case ROW: + return new RowDataBlock(byteBuffer); + case METADATA: + return new MetadataBlock(byteBuffer); + default: + throw new UnsupportedOperationException("Unsupported data table version: " + version + " with type: " + type); + } + } + + /** + * Given a {@link DataSchema}, compute each column's offset and fill them into the passed in array, then return the + * row size in bytes. + * + * @param dataSchema data schema. + * @param columnOffsets array of column offsets. + * @return row size in bytes. + */ + public static int computeColumnOffsets(DataSchema dataSchema, int[] columnOffsets) { + int numColumns = columnOffsets.length; + assert numColumns == dataSchema.size(); + + DataSchema.ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes(); + int rowSizeInBytes = 0; + for (int i = 0; i < numColumns; i++) { + columnOffsets[i] = rowSizeInBytes; + switch (storedColumnDataTypes[i]) { + case INT: + rowSizeInBytes += 4; + break; + case LONG: + rowSizeInBytes += 8; + break; + case FLOAT: + rowSizeInBytes += 4; + break; + case DOUBLE: + rowSizeInBytes += 8; + break; + case STRING: + rowSizeInBytes += 4; + break; + // Object and array. (POSITION|LENGTH) + default: + rowSizeInBytes += 8; + break; + } + } + + return rowSizeInBytes; + } + + /** + * Given a {@link DataSchema}, compute each column's size and fill them into the passed in array. + * + * @param dataSchema data schema. + * @param columnSizes array of column size. + * @return row size in bytes. + */ + public static void computeColumnSizeInBytes(DataSchema dataSchema, int[] columnSizes) { + int numColumns = columnSizes.length; + assert numColumns == dataSchema.size(); + + DataSchema.ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes(); + for (int i = 0; i < numColumns; i++) { + switch (storedColumnDataTypes[i]) { + case INT: + columnSizes[i] = 4; + break; + case LONG: + columnSizes[i] = 8; + break; + case FLOAT: + columnSizes[i] = 4; + break; + case DOUBLE: + columnSizes[i] = 8; + break; + case STRING: + columnSizes[i] = 4; + break; + // Object and array. (POSITION|LENGTH) + default: + columnSizes[i] = 8; + break; + } + } + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlockUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlockUtils.java deleted file mode 100644 index c429e2e014..0000000000 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlockUtils.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.query.runtime.blocks; - -import org.apache.pinot.common.exception.QueryException; -import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.common.utils.DataTable; -import org.apache.pinot.core.common.datatable.DataTableBuilder; - - -public final class DataTableBlockUtils { - private DataTableBlockUtils() { - // do not instantiate. - } - - // used to indicate a datatable block status - private static final DataSchema EMPTY_SCHEMA = new DataSchema(new String[0], new DataSchema.ColumnDataType[0]); - private static final DataTable EMPTY_DATATABLE = new DataTableBuilder(EMPTY_SCHEMA).build(); - private static final DataTableBlock END_OF_STREAM_DATATABLE_BLOCK = new DataTableBlock(EMPTY_DATATABLE); - - public static DataTableBlock getEndOfStreamDataTableBlock() { - return END_OF_STREAM_DATATABLE_BLOCK; - } - - public static DataTable getEndOfStreamDataTable() { - return EMPTY_DATATABLE; - } - - public static DataTable getErrorDataTable(Exception e) { - DataTable errorDataTable = new DataTableBuilder(EMPTY_SCHEMA).build(); - errorDataTable.addException(QueryException.UNKNOWN_ERROR_CODE, e.getMessage()); - return errorDataTable; - } - - public static DataTableBlock getErrorDatatableBlock(Exception e) { - return new DataTableBlock(getErrorDataTable(e)); - } - - public static DataTable getEmptyDataTable(DataSchema dataSchema) { - if (dataSchema != null) { - return new DataTableBuilder(dataSchema).build(); - } else { - return EMPTY_DATATABLE; - } - } - - public static DataTableBlock getEmptyDataTableBlock(DataSchema dataSchema) { - return new DataTableBlock(getEmptyDataTable(dataSchema)); - } - - public static boolean isEndOfStream(DataTableBlock dataTableBlock) { - DataSchema dataSchema = dataTableBlock.getDataTable().getDataSchema(); - return dataSchema.getColumnNames().length == 0 && dataSchema.getColumnDataTypes().length == 0; - } -} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MetadataBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MetadataBlock.java new file mode 100644 index 0000000000..a3ffc7a126 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/MetadataBlock.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.blocks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collections; +import org.apache.pinot.common.utils.DataSchema; + + +/** + * Wrapper for row-wise data table. It stores data in row-major format. + */ +public class MetadataBlock extends BaseDataBlock { + private static final int VERSION = 1; + + public MetadataBlock(DataSchema dataSchema) { + super(0, dataSchema, Collections.emptyMap(), new byte[]{0}, new byte[]{0}); + } + + public MetadataBlock(ByteBuffer byteBuffer) + throws IOException { + super(byteBuffer); + } + + @Override + protected int getDataBlockVersionType() { + return VERSION + (Type.METADATA.ordinal() << DataBlockUtils.VERSION_TYPE_SHIFT); + } + + @Override + protected void positionCursorInFixSizedBuffer(int rowId, int colId) { + throw new UnsupportedOperationException(); + } + + @Override + protected int positionCursorInVariableBuffer(int rowId, int colId) { + throw new UnsupportedOperationException(); + } + + @Override + public MetadataBlock toMetadataOnlyDataTable() { + return this; + } + + @Override + public MetadataBlock toDataOnlyDataTable() { + return new MetadataBlock(_dataSchema); + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowDataBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowDataBlock.java new file mode 100644 index 0000000000..f8520c54f6 --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/RowDataBlock.java @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.blocks; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Map; +import org.apache.pinot.common.utils.DataSchema; + + +/** + * Wrapper for row-wise data table. It stores data in row-major format. + */ +public class RowDataBlock extends BaseDataBlock { + private static final int VERSION = 1; + protected int[] _columnOffsets; + protected int _rowSizeInBytes; + + public RowDataBlock() { + super(); + } + + public RowDataBlock(int numRows, DataSchema dataSchema, Map<String, Map<Integer, String>> dictionaryMap, + byte[] fixedSizeDataBytes, byte[] variableSizeDataBytes) { + super(numRows, dataSchema, dictionaryMap, fixedSizeDataBytes, variableSizeDataBytes); + computeBlockObjectConstants(); + } + + public RowDataBlock(ByteBuffer byteBuffer) + throws IOException { + super(byteBuffer); + computeBlockObjectConstants(); + } + + protected void computeBlockObjectConstants() { + if (_dataSchema != null) { + _columnOffsets = new int[_numColumns]; + _rowSizeInBytes = DataBlockUtils.computeColumnOffsets(_dataSchema, _columnOffsets); + } + } + + @Override + protected int getDataBlockVersionType() { + return VERSION + (Type.ROW.ordinal() << DataBlockUtils.VERSION_TYPE_SHIFT); + } + + @Override + protected void positionCursorInFixSizedBuffer(int rowId, int colId) { + int position = rowId * _rowSizeInBytes + _columnOffsets[colId]; + _fixedSizeData.position(position); + } + + @Override + protected int positionCursorInVariableBuffer(int rowId, int colId) { + positionCursorInFixSizedBuffer(rowId, colId); + _variableSizeData.position(_fixedSizeData.getInt()); + return _fixedSizeData.getInt(); + } + + @Override + public RowDataBlock toMetadataOnlyDataTable() { + RowDataBlock metadataOnlyDataTable = new RowDataBlock(); + metadataOnlyDataTable._metadata.putAll(_metadata); + metadataOnlyDataTable._errCodeToExceptionMap.putAll(_errCodeToExceptionMap); + return metadataOnlyDataTable; + } + + @Override + public RowDataBlock toDataOnlyDataTable() { + return new RowDataBlock(_numRows, _dataSchema, _dictionaryMap, _fixedSizeDataBytes, _variableSizeDataBytes); + } + + // TODO: add whole-row access methods. +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlock.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java similarity index 70% rename from pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlock.java rename to pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java index a8cc63c624..9284011f06 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/DataTableBlock.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/blocks/TransferableBlock.java @@ -19,31 +19,39 @@ package org.apache.pinot.query.runtime.blocks; import java.io.IOException; -import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.common.Block; import org.apache.pinot.core.common.BlockDocIdSet; import org.apache.pinot.core.common.BlockDocIdValueSet; import org.apache.pinot.core.common.BlockMetadata; import org.apache.pinot.core.common.BlockValSet; -import org.apache.pinot.core.operator.blocks.InstanceResponseBlock; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** - * A {@code DataTableBlock} is a row-based data block backed by a {@link DataTable}. + * A {@code TransferableBlock} is a wrapper around {@link BaseDataBlock} for transferring data using + * {@link org.apache.pinot.common.proto.Mailbox}. */ -public class DataTableBlock implements Block { - private static final Logger LOGGER = LoggerFactory.getLogger(InstanceResponseBlock.class); +public class TransferableBlock implements Block { - private DataTable _dataTable; + private BaseDataBlock _dataBlock; + private BaseDataBlock.Type _type; - public DataTableBlock(DataTable dataTable) { - _dataTable = dataTable; + public TransferableBlock(BaseDataBlock dataBlock) { + _dataBlock = dataBlock; + _type = dataBlock instanceof ColumnarDataBlock ? BaseDataBlock.Type.COLUMNAR + : dataBlock instanceof RowDataBlock ? BaseDataBlock.Type.ROW : BaseDataBlock.Type.METADATA; } - public DataTable getDataTable() { - return _dataTable; + public BaseDataBlock getDataBlock() { + return _dataBlock; + } + + public BaseDataBlock.Type getType() { + return _type; + } + + public byte[] toBytes() + throws IOException { + return _dataBlock.toBytes(); } @Override @@ -65,9 +73,4 @@ public class DataTableBlock implements Block { public BlockMetadata getMetadata() { throw new UnsupportedOperationException(); } - - public byte[] toBytes() - throws IOException { - return _dataTable.toBytes(); - } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java index 9a66bf86c1..d8ec43e040 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/executor/WorkerQueryExecutor.java @@ -36,8 +36,8 @@ import org.apache.pinot.query.planner.stage.MailboxReceiveNode; import org.apache.pinot.query.planner.stage.MailboxSendNode; import org.apache.pinot.query.planner.stage.ProjectNode; import org.apache.pinot.query.planner.stage.StageNode; -import org.apache.pinot.query.runtime.blocks.DataTableBlock; -import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils; +import org.apache.pinot.query.runtime.blocks.DataBlockUtils; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.operator.HashJoinOperator; import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; import org.apache.pinot.query.runtime.operator.MailboxSendOperator; @@ -83,12 +83,12 @@ public class WorkerQueryExecutor { ExecutorService executorService) { long requestId = Long.parseLong(requestMetadataMap.get("REQUEST_ID")); StageNode stageRoot = queryRequest.getStageRoot(); - BaseOperator<DataTableBlock> rootOperator = getOperator(requestId, stageRoot, queryRequest.getMetadataMap()); + BaseOperator<TransferableBlock> rootOperator = getOperator(requestId, stageRoot, queryRequest.getMetadataMap()); executorService.submit(new TraceRunnable() { @Override public void runJob() { ThreadTimer executionThreadTimer = new ThreadTimer(); - while (!DataTableBlockUtils.isEndOfStream(rootOperator.nextBlock())) { + while (!DataBlockUtils.isEndOfStream(rootOperator.nextBlock())) { LOGGER.debug("Result Block acquired"); } LOGGER.info("Execution time:" + executionThreadTimer.getThreadTimeNs()); @@ -97,7 +97,7 @@ public class WorkerQueryExecutor { } // TODO: split this PhysicalPlanner into a separate module - private BaseOperator<DataTableBlock> getOperator(long requestId, StageNode stageNode, + private BaseOperator<TransferableBlock> getOperator(long requestId, StageNode stageNode, Map<Integer, StageMetadata> metadataMap) { // TODO: optimize this into a framework. (physical planner) if (stageNode instanceof MailboxReceiveNode) { @@ -107,15 +107,15 @@ public class WorkerQueryExecutor { requestId, receiveNode.getSenderStageId()); } else if (stageNode instanceof MailboxSendNode) { MailboxSendNode sendNode = (MailboxSendNode) stageNode; - BaseOperator<DataTableBlock> nextOperator = getOperator(requestId, sendNode.getInputs().get(0), metadataMap); + BaseOperator<TransferableBlock> nextOperator = getOperator(requestId, sendNode.getInputs().get(0), metadataMap); StageMetadata receivingStageMetadata = metadataMap.get(sendNode.getReceiverStageId()); return new MailboxSendOperator(_mailboxService, nextOperator, receivingStageMetadata.getServerInstances(), sendNode.getExchangeType(), sendNode.getPartitionKeySelector(), _hostName, _port, requestId, sendNode.getStageId()); } else if (stageNode instanceof JoinNode) { JoinNode joinNode = (JoinNode) stageNode; - BaseOperator<DataTableBlock> leftOperator = getOperator(requestId, joinNode.getInputs().get(0), metadataMap); - BaseOperator<DataTableBlock> rightOperator = getOperator(requestId, joinNode.getInputs().get(1), metadataMap); + BaseOperator<TransferableBlock> leftOperator = getOperator(requestId, joinNode.getInputs().get(0), metadataMap); + BaseOperator<TransferableBlock> rightOperator = getOperator(requestId, joinNode.getInputs().get(1), metadataMap); return new HashJoinOperator(leftOperator, rightOperator, joinNode.getCriteria()); } else if (stageNode instanceof FilterNode) { throw new UnsupportedOperationException("Unsupported!"); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java index 238c1d807b..ff99dae5ed 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java @@ -24,14 +24,15 @@ import java.util.HashMap; import java.util.List; import javax.annotation.Nullable; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.common.Operator; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; import org.apache.pinot.query.planner.partitioning.KeySelector; import org.apache.pinot.query.planner.stage.JoinNode; -import org.apache.pinot.query.runtime.blocks.DataTableBlock; -import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils; +import org.apache.pinot.query.runtime.blocks.BaseDataBlock; +import org.apache.pinot.query.runtime.blocks.DataBlockBuilder; +import org.apache.pinot.query.runtime.blocks.DataBlockUtils; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; /** @@ -42,12 +43,12 @@ import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils; * * <p>For each of the data block received from the left table, it will generate a joint data block. */ -public class HashJoinOperator extends BaseOperator<DataTableBlock> { +public class HashJoinOperator extends BaseOperator<TransferableBlock> { private static final String EXPLAIN_NAME = "BROADCAST_JOIN"; private final HashMap<Object, List<Object[]>> _broadcastHashTable; - private final BaseOperator<DataTableBlock> _leftTableOperator; - private final BaseOperator<DataTableBlock> _rightTableOperator; + private final BaseOperator<TransferableBlock> _leftTableOperator; + private final BaseOperator<TransferableBlock> _rightTableOperator; private DataSchema _leftTableSchema; private DataSchema _rightTableSchema; @@ -56,8 +57,8 @@ public class HashJoinOperator extends BaseOperator<DataTableBlock> { private KeySelector<Object[], Object> _leftKeySelector; private KeySelector<Object[], Object> _rightKeySelector; - public HashJoinOperator(BaseOperator<DataTableBlock> leftTableOperator, - BaseOperator<DataTableBlock> rightTableOperator, List<JoinNode.JoinClause> criteria) { + public HashJoinOperator(BaseOperator<TransferableBlock> leftTableOperator, + BaseOperator<TransferableBlock> rightTableOperator, List<JoinNode.JoinClause> criteria) { // TODO: this assumes right table is broadcast. _leftKeySelector = criteria.get(0).getLeftJoinKeySelector(); _rightKeySelector = criteria.get(0).getRightJoinKeySelector(); @@ -80,25 +81,25 @@ public class HashJoinOperator extends BaseOperator<DataTableBlock> { } @Override - protected DataTableBlock getNextBlock() { + protected TransferableBlock getNextBlock() { buildBroadcastHashTable(); try { - return new DataTableBlock(buildJoinedDataTable(_leftTableOperator.nextBlock())); + return new TransferableBlock(buildJoinedDataBlock(_leftTableOperator.nextBlock())); } catch (Exception e) { - return DataTableBlockUtils.getErrorDatatableBlock(e); + return DataBlockUtils.getErrorTransferableBlock(e); } } private void buildBroadcastHashTable() { if (!_isHashTableBuilt) { - DataTableBlock rightBlock = _rightTableOperator.nextBlock(); - while (!DataTableBlockUtils.isEndOfStream(rightBlock)) { - DataTable dataTable = rightBlock.getDataTable(); - _rightTableSchema = dataTable.getDataSchema(); - int numRows = dataTable.getNumberOfRows(); + TransferableBlock rightBlock = _rightTableOperator.nextBlock(); + while (!DataBlockUtils.isEndOfStream(rightBlock)) { + BaseDataBlock dataBlock = rightBlock.getDataBlock(); + _rightTableSchema = dataBlock.getDataSchema(); + int numRows = dataBlock.getNumberOfRows(); // put all the rows into corresponding hash collections keyed by the key selector function. for (int rowId = 0; rowId < numRows; rowId++) { - Object[] objects = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId); + Object[] objects = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId); List<Object[]> hashCollection = _broadcastHashTable.computeIfAbsent(_rightKeySelector.getKey(objects), k -> new ArrayList<>()); hashCollection.add(objects); @@ -109,25 +110,25 @@ public class HashJoinOperator extends BaseOperator<DataTableBlock> { } } - private DataTable buildJoinedDataTable(DataTableBlock block) + private BaseDataBlock buildJoinedDataBlock(TransferableBlock block) throws Exception { - if (DataTableBlockUtils.isEndOfStream(block)) { - return DataTableBlockUtils.getEndOfStreamDataTable(); + if (DataBlockUtils.isEndOfStream(block)) { + return DataBlockUtils.getEndOfStreamDataBlock(); } List<Object[]> rows = new ArrayList<>(); - DataTable dataTable = block.getDataTable(); - _leftTableSchema = dataTable.getDataSchema(); + BaseDataBlock dataBlock = block.getDataBlock(); + _leftTableSchema = dataBlock.getDataSchema(); _resultRowSize = _leftTableSchema.size() + _rightTableSchema.size(); - int numRows = dataTable.getNumberOfRows(); + int numRows = dataBlock.getNumberOfRows(); for (int rowId = 0; rowId < numRows; rowId++) { - Object[] leftRow = SelectionOperatorUtils.extractRowFromDataTable(dataTable, rowId); + Object[] leftRow = SelectionOperatorUtils.extractRowFromDataTable(dataBlock, rowId); List<Object[]> hashCollection = _broadcastHashTable.getOrDefault(_leftKeySelector.getKey(leftRow), Collections.emptyList()); for (Object[] rightRow : hashCollection) { rows.add(joinRow(leftRow, rightRow)); } } - return SelectionOperatorUtils.getDataTableFromRows(rows, computeSchema()); + return DataBlockBuilder.buildFromRows(rows, computeSchema()); } private Object[] joinRow(Object[] leftRow, Object[] rightRow) { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java index 7b396b13cc..b84d8f40f0 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java @@ -24,16 +24,15 @@ import javax.annotation.Nullable; import org.apache.calcite.rel.RelDistribution; import org.apache.pinot.common.proto.Mailbox; import org.apache.pinot.common.utils.DataSchema; -import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.common.Operator; -import org.apache.pinot.core.common.datatable.DataTableFactory; import org.apache.pinot.core.operator.BaseOperator; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.mailbox.ReceivingMailbox; import org.apache.pinot.query.mailbox.StringMailboxIdentifier; -import org.apache.pinot.query.runtime.blocks.DataTableBlock; -import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils; +import org.apache.pinot.query.runtime.blocks.BaseDataBlock; +import org.apache.pinot.query.runtime.blocks.DataBlockUtils; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,7 +41,7 @@ import org.slf4j.LoggerFactory; * This {@code MailboxReceiveOperator} receives data from a {@link ReceivingMailbox} and serve it out from the * {@link BaseOperator#getNextBlock()} API. */ -public class MailboxReceiveOperator extends BaseOperator<DataTableBlock> { +public class MailboxReceiveOperator extends BaseOperator<TransferableBlock> { private static final Logger LOGGER = LoggerFactory.getLogger(MailboxReceiveOperator.class); private static final String EXPLAIN_NAME = "MAILBOX_RECEIVE"; private static final long DEFAULT_TIMEOUT_NANO = 10_000_000_000L; @@ -80,7 +79,7 @@ public class MailboxReceiveOperator extends BaseOperator<DataTableBlock> { } @Override - protected DataTableBlock getNextBlock() { + protected TransferableBlock getNextBlock() { // TODO: do a round robin check against all MailboxContentStreamObservers and find which one that has data. boolean hasOpenedMailbox = true; DataSchema dataSchema = null; @@ -99,10 +98,10 @@ public class MailboxReceiveOperator extends BaseOperator<DataTableBlock> { if (mailboxContent != null) { ByteBuffer byteBuffer = mailboxContent.getPayload().asReadOnlyByteBuffer(); if (byteBuffer.hasRemaining()) { - DataTable dataTable = DataTableFactory.getDataTable(byteBuffer); - if (dataTable.getNumberOfRows() > 0) { + BaseDataBlock dataBlock = DataBlockUtils.getDataBlock(byteBuffer); + if (dataBlock.getNumberOfRows() > 0) { // here we only return data table block when it is not empty. - return new DataTableBlock(dataTable); + return new TransferableBlock(dataBlock); } } } @@ -117,7 +116,7 @@ public class MailboxReceiveOperator extends BaseOperator<DataTableBlock> { } // TODO: we need to at least return one data table with schema if there's no error. // we need to condition this on whether there's already things being returned or not. - return DataTableBlockUtils.getEndOfStreamDataTableBlock(); + return DataBlockUtils.getEndOfStreamTransferableBlock(); } public RelDistribution.Type getExchangeType() { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index 51f24c97d4..8629663b60 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -37,16 +37,18 @@ import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.mailbox.SendingMailbox; import org.apache.pinot.query.mailbox.StringMailboxIdentifier; import org.apache.pinot.query.planner.partitioning.KeySelector; -import org.apache.pinot.query.runtime.blocks.DataTableBlock; -import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils; +import org.apache.pinot.query.runtime.blocks.BaseDataBlock; +import org.apache.pinot.query.runtime.blocks.DataBlockBuilder; +import org.apache.pinot.query.runtime.blocks.DataBlockUtils; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This {@code MailboxSendOperator} is created to send {@link DataTableBlock}s to the receiving end. + * This {@code MailboxSendOperator} is created to send {@link TransferableBlock}s to the receiving end. */ -public class MailboxSendOperator extends BaseOperator<DataTableBlock> { +public class MailboxSendOperator extends BaseOperator<TransferableBlock> { private static final Logger LOGGER = LoggerFactory.getLogger(MailboxSendOperator.class); private static final String EXPLAIN_NAME = "MAILBOX_SEND"; private static final Set<RelDistribution.Type> SUPPORTED_EXCHANGE_TYPE = @@ -61,11 +63,11 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> { private final long _jobId; private final int _stageId; private final MailboxService<Mailbox.MailboxContent> _mailboxService; - private BaseOperator<DataTableBlock> _dataTableBlockBaseOperator; - private DataTable _dataTable; + private BaseOperator<TransferableBlock> _dataTableBlockBaseOperator; + private BaseDataBlock _dataTable; public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> mailboxService, - BaseOperator<DataTableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances, + BaseOperator<TransferableBlock> dataTableBlockBaseOperator, List<ServerInstance> receivingStageInstances, RelDistribution.Type exchangeType, KeySelector<Object[], Object> keySelector, String hostName, int port, long jobId, int stageId) { _mailboxService = mailboxService; @@ -86,7 +88,7 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> { * we create a {@link org.apache.pinot.core.query.executor.ServerQueryExecutorV1Impl} that can handle the * creation of MailboxSendOperator we should not use this API. */ - public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> mailboxService, DataTable dataTable, + public MailboxSendOperator(MailboxService<Mailbox.MailboxContent> mailboxService, BaseDataBlock dataTable, List<ServerInstance> receivingStageInstances, RelDistribution.Type exchangeType, KeySelector<Object[], Object> keySelector, String hostName, int port, long jobId, int stageId) { _mailboxService = mailboxService; @@ -113,14 +115,14 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> { } @Override - protected DataTableBlock getNextBlock() { - DataTable dataTable; - DataTableBlock dataTableBlock = null; + protected TransferableBlock getNextBlock() { + BaseDataBlock dataTable; + TransferableBlock transferableBlock = null; boolean isEndOfStream; if (_dataTableBlockBaseOperator != null) { - dataTableBlock = _dataTableBlockBaseOperator.nextBlock(); - dataTable = dataTableBlock.getDataTable(); - isEndOfStream = DataTableBlockUtils.isEndOfStream(dataTableBlock); + transferableBlock = _dataTableBlockBaseOperator.nextBlock(); + dataTable = transferableBlock.getDataBlock(); + isEndOfStream = DataBlockUtils.isEndOfStream(transferableBlock); } else { dataTable = _dataTable; isEndOfStream = true; @@ -137,7 +139,7 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> { // we no longer need to send data to the rest of the receiving instances, but we still need to transfer // the dataTable over indicating that we are a potential sender. thus next time a random server is selected // it might still be useful. - dataTable = DataTableBlockUtils.getEmptyDataTable(dataTable.getDataSchema()); + dataTable = DataBlockUtils.getEmptyDataBlock(dataTable.getDataSchema()); } break; case BROADCAST_DISTRIBUTED: @@ -147,7 +149,7 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> { break; case HASH_DISTRIBUTED: // TODO: ensure that server instance list is sorted using same function in sender. - List<DataTable> dataTableList = constructPartitionedDataBlock(dataTable, _keySelector, + List<BaseDataBlock> dataTableList = constructPartitionedDataBlock(dataTable, _keySelector, _receivingStageInstances.size()); for (int i = 0; i < _receivingStageInstances.size(); i++) { sendDataTableBlock(_receivingStageInstances.get(i), dataTableList.get(i), isEndOfStream); @@ -162,10 +164,10 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> { } catch (Exception e) { LOGGER.error("Exception occur while sending data via mailbox", e); } - return dataTableBlock; + return transferableBlock; } - private static List<DataTable> constructPartitionedDataBlock(DataTable dataTable, + private static List<BaseDataBlock> constructPartitionedDataBlock(DataTable dataTable, KeySelector<Object[], Object> keySelector, int partitionSize) throws Exception { List<List<Object[]>> temporaryRows = new ArrayList<>(partitionSize); @@ -178,10 +180,10 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> { // TODO: support other partitioning algorithm temporaryRows.get(hashToIndex(key, partitionSize)).add(row); } - List<DataTable> dataTableList = new ArrayList<>(partitionSize); + List<BaseDataBlock> dataTableList = new ArrayList<>(partitionSize); for (int i = 0; i < partitionSize; i++) { List<Object[]> objects = temporaryRows.get(i); - dataTableList.add(SelectionOperatorUtils.getDataTableFromRows(objects, dataTable.getDataSchema())); + dataTableList.add(DataBlockBuilder.buildFromRows(objects, dataTable.getDataSchema())); } return dataTableList; } @@ -190,7 +192,7 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> { return (key.hashCode()) % partitionSize; } - private void sendDataTableBlock(ServerInstance serverInstance, DataTable dataTable, boolean isEndOfStream) + private void sendDataTableBlock(ServerInstance serverInstance, BaseDataBlock dataTable, boolean isEndOfStream) throws IOException { String mailboxId = toMailboxId(serverInstance); SendingMailbox<Mailbox.MailboxContent> sendingMailbox = _mailboxService.getSendingMailbox(mailboxId); @@ -201,10 +203,10 @@ public class MailboxSendOperator extends BaseOperator<DataTableBlock> { } } - private Mailbox.MailboxContent toMailboxContent(String mailboxId, DataTable dataTable, boolean isEndOfStream) + private Mailbox.MailboxContent toMailboxContent(String mailboxId, BaseDataBlock dataTable, boolean isEndOfStream) throws IOException { Mailbox.MailboxContent.Builder builder = Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId) - .setPayload(ByteString.copyFrom(new DataTableBlock(dataTable).toBytes())); + .setPayload(ByteString.copyFrom(new TransferableBlock(dataTable).toBytes())); if (isEndOfStream) { builder.putMetadata("finished", "true"); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java index 7ea69b8d21..fa9f04808c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/QueryDispatcher.java @@ -33,8 +33,9 @@ import org.apache.pinot.query.mailbox.MailboxService; import org.apache.pinot.query.planner.QueryPlan; import org.apache.pinot.query.planner.StageMetadata; import org.apache.pinot.query.planner.stage.MailboxReceiveNode; -import org.apache.pinot.query.runtime.blocks.DataTableBlock; -import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils; +import org.apache.pinot.query.runtime.blocks.BaseDataBlock; +import org.apache.pinot.query.runtime.blocks.DataBlockUtils; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; @@ -58,24 +59,13 @@ public class QueryDispatcher { throws Exception { // submit all the distributed stages. int reduceStageId = submit(requestId, queryPlan); - - // run reduce stage. + // run reduce stage and return result. MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(reduceStageId); MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(mailboxService, queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(), requestId, reduceNode.getSenderStageId(), mailboxService.getHostname(), mailboxService.getMailboxPort()); - - List<DataTable> queryResults = new ArrayList<>(); - long timeoutWatermark = System.nanoTime() + timeoutNano; - while (System.nanoTime() < timeoutWatermark) { - DataTableBlock dataTableBlock = mailboxReceiveOperator.nextBlock(); - queryResults.add(dataTableBlock.getDataTable()); - if (DataTableBlockUtils.isEndOfStream(dataTableBlock)) { - break; - } - } - return queryResults; + return reduceMailboxReceive(mailboxReceiveOperator); } public int submit(long requestId, QueryPlan queryPlan) @@ -109,12 +99,9 @@ public class QueryDispatcher { return reduceStageId; } - protected MailboxReceiveOperator createReduceStageOperator(MailboxService<Mailbox.MailboxContent> mailboxService, - List<ServerInstance> sendingInstances, long jobId, int stageId, String hostname, int port) { - MailboxReceiveOperator mailboxReceiveOperator = - new MailboxReceiveOperator(mailboxService, RelDistribution.Type.ANY, sendingInstances, hostname, port, jobId, - stageId); - return mailboxReceiveOperator; + private DispatchClient getOrCreateDispatchClient(String host, int port) { + String key = String.format("%s_%d", host, port); + return _dispatchClientMap.computeIfAbsent(key, k -> new DispatchClient(host, port)); } public static DistributedStagePlan constructDistributedStagePlan(QueryPlan queryPlan, int stageId, @@ -123,9 +110,28 @@ public class QueryDispatcher { queryPlan.getStageMetadataMap()); } - private DispatchClient getOrCreateDispatchClient(String host, int port) { - String key = String.format("%s_%d", host, port); - return _dispatchClientMap.computeIfAbsent(key, k -> new DispatchClient(host, port)); + public static List<DataTable> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator) { + List<DataTable> resultDataBlocks = new ArrayList<>(); + TransferableBlock transferableBlock; + while (true) { + transferableBlock = mailboxReceiveOperator.nextBlock(); + if (DataBlockUtils.isEndOfStream(transferableBlock)) { + break; + } + if (transferableBlock.getDataBlock() != null) { + BaseDataBlock dataTable = transferableBlock.getDataBlock(); + resultDataBlocks.add(dataTable); + } + } + return resultDataBlocks; + } + + public static MailboxReceiveOperator createReduceStageOperator(MailboxService<Mailbox.MailboxContent> mailboxService, + List<ServerInstance> sendingInstances, long jobId, int stageId, String hostname, int port) { + MailboxReceiveOperator mailboxReceiveOperator = + new MailboxReceiveOperator(mailboxService, RelDistribution.Type.ANY, sendingInstances, hostname, port, jobId, + stageId); + return mailboxReceiveOperator; } public static class DispatchClient { diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java index f1863bc339..e254778db1 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcMailboxServiceTest.java @@ -24,8 +24,8 @@ import com.google.protobuf.ByteString; import java.io.IOException; import java.util.Map; import org.apache.pinot.common.proto.Mailbox; -import org.apache.pinot.core.common.datatable.DataTableBuilder; -import org.apache.pinot.query.runtime.blocks.DataTableBlock; +import org.apache.pinot.query.runtime.blocks.DataBlockUtils; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.util.TestUtils; import org.testng.Assert; import org.testng.annotations.Test; @@ -67,6 +67,7 @@ public class GrpcMailboxServiceTest extends GrpcMailboxServiceTestBase { throws IOException { return Mailbox.MailboxContent.newBuilder().setMailboxId(mailboxId) .putAllMetadata(ImmutableMap.of("key", "value", "finished", "true")) - .setPayload(ByteString.copyFrom(new DataTableBlock(DataTableBuilder.getEmptyDataTable()).toBytes())).build(); + .setPayload(ByteString.copyFrom(new TransferableBlock(DataBlockUtils.getEndOfStreamDataBlock()).toBytes())) + .build(); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java index 2316195bce..d133584797 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/QueryRunnerTest.java @@ -27,7 +27,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Random; -import org.apache.calcite.rel.RelDistribution; import org.apache.commons.io.FileUtils; import org.apache.pinot.common.utils.DataTable; import org.apache.pinot.core.transport.ServerInstance; @@ -38,8 +37,6 @@ import org.apache.pinot.query.mailbox.GrpcMailboxService; import org.apache.pinot.query.planner.QueryPlan; import org.apache.pinot.query.planner.stage.MailboxReceiveNode; import org.apache.pinot.query.routing.WorkerInstance; -import org.apache.pinot.query.runtime.blocks.DataTableBlock; -import org.apache.pinot.query.runtime.blocks.DataTableBlockUtils; import org.apache.pinot.query.runtime.operator.MailboxReceiveOperator; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; import org.apache.pinot.query.service.QueryConfig; @@ -84,8 +81,8 @@ public class QueryRunnerTest { _mailboxService = new GrpcMailboxService(_reducerHostname, _reducerGrpcPort); _mailboxService.start(); - _queryEnvironment = - QueryEnvironmentTestUtils.getQueryEnvironment(_reducerGrpcPort, server1.getPort(), server2.getPort()); + _queryEnvironment = QueryEnvironmentTestUtils.getQueryEnvironment(_reducerGrpcPort, server1.getPort(), + server2.getPort()); server1.start(); server2.start(); // this doesn't test the QueryServer functionality so the server port can be the same as the mailbox port. @@ -111,9 +108,10 @@ public class QueryRunnerTest { for (int stageId : queryPlan.getStageMetadataMap().keySet()) { if (queryPlan.getQueryStageMap().get(stageId) instanceof MailboxReceiveNode) { MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(stageId); - mailboxReceiveOperator = createReduceStageOperator( + mailboxReceiveOperator = QueryDispatcher.createReduceStageOperator(_mailboxService, queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(), - Long.parseLong(requestMetadataMap.get("REQUEST_ID")), reduceNode.getSenderStageId(), _reducerGrpcPort); + Long.parseLong(requestMetadataMap.get("REQUEST_ID")), reduceNode.getSenderStageId(), "localhost", + _reducerGrpcPort); } else { for (ServerInstance serverInstance : queryPlan.getStageMetadataMap().get(stageId).getServerInstances()) { DistributedStagePlan distributedStagePlan = @@ -124,10 +122,21 @@ public class QueryRunnerTest { } Preconditions.checkNotNull(mailboxReceiveOperator); - List<Object[]> resultRows = reduceMailboxReceive(mailboxReceiveOperator); + List<Object[]> resultRows = toRows(QueryDispatcher.reduceMailboxReceive(mailboxReceiveOperator)); Assert.assertEquals(resultRows.size(), expectedRowCount); } + private static List<Object[]> toRows(List<DataTable> dataTables) { + List<Object[]> resultRows = new ArrayList<>(); + for (DataTable dataTable : dataTables) { + int numRows = dataTable.getNumberOfRows(); + for (int rowId = 0; rowId < numRows; rowId++) { + resultRows.add(extractRowFromDataTable(dataTable, rowId)); + } + } + return resultRows; + } + @DataProvider(name = "testDataWithSqlToFinalRowCount") private Object[][] provideTestSqlAndRowCount() { return new Object[][] { @@ -152,31 +161,4 @@ public class QueryRunnerTest { + " WHERE a.col3 >= 0 AND a.col2 = 'foo' AND b.col3 >= 0", 3}, }; } - - protected static List<Object[]> reduceMailboxReceive(MailboxReceiveOperator mailboxReceiveOperator) { - List<Object[]> resultRows = new ArrayList<>(); - DataTableBlock dataTableBlock; - while (true) { - dataTableBlock = mailboxReceiveOperator.nextBlock(); - if (DataTableBlockUtils.isEndOfStream(dataTableBlock)) { - break; - } - if (dataTableBlock.getDataTable() != null) { - DataTable dataTable = dataTableBlock.getDataTable(); - int numRows = dataTable.getNumberOfRows(); - for (int rowId = 0; rowId < numRows; rowId++) { - resultRows.add(extractRowFromDataTable(dataTable, rowId)); - } - } - } - return resultRows; - } - - protected MailboxReceiveOperator createReduceStageOperator(List<ServerInstance> sendingInstances, long jobId, - int stageId, int port) { - MailboxReceiveOperator mailboxReceiveOperator = - new MailboxReceiveOperator(_mailboxService, RelDistribution.Type.ANY, sendingInstances, "localhost", port, - jobId, stageId); - return mailboxReceiveOperator; - } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTest.java new file mode 100644 index 0000000000..bf33160b92 --- /dev/null +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.blocks; + +import java.io.IOException; +import java.util.List; +import org.apache.pinot.common.exception.QueryException; +import org.apache.pinot.common.response.ProcessingException; +import org.apache.pinot.common.utils.DataSchema; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class DataBlockTest { + private static final int TEST_ROW_COUNT = 2; + + @Test + public void testException() + throws IOException { + Exception originalException = new UnsupportedOperationException("Expected test exception."); + ProcessingException processingException = + QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, originalException); + String expected = processingException.getMessage(); + + BaseDataBlock dataBlock = DataBlockUtils.getErrorDataBlock(originalException); + dataBlock.addException(processingException); + Assert.assertEquals(dataBlock.getDataSchema().getColumnNames().length, 0); + Assert.assertEquals(dataBlock.getDataSchema().getColumnDataTypes().length, 0); + Assert.assertEquals(dataBlock.getNumberOfRows(), 0); + + // Assert processing exception and original exception both matches. + String actual = dataBlock.getExceptions().get(QueryException.QUERY_EXECUTION_ERROR.getErrorCode()); + Assert.assertEquals(actual, expected); + Assert.assertEquals(dataBlock.getExceptions().get(QueryException.UNKNOWN_ERROR_CODE), + originalException.getMessage()); + } + + @Test + public void testAllDataTypes() + throws IOException { + DataSchema.ColumnDataType[] columnDataTypes = DataSchema.ColumnDataType.values(); + int numColumns = columnDataTypes.length; + String[] columnNames = new String[numColumns]; + for (int i = 0; i < numColumns; i++) { + columnNames[i] = columnDataTypes[i].name(); + } + + DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes); + List<Object[]> rows = DataBlockTestUtils.getRandomRows(dataSchema, TEST_ROW_COUNT); + List<Object[]> columnars = DataBlockTestUtils.convertColumnar(dataSchema, rows); + RowDataBlock rowBlock = DataBlockBuilder.buildFromRows(rows, dataSchema); + ColumnarDataBlock columnarBlock = DataBlockBuilder.buildFromColumns(columnars, dataSchema); + + for (int colId = 0; colId < dataSchema.getColumnNames().length; colId++) { + DataSchema.ColumnDataType columnDataType = dataSchema.getColumnDataType(colId); + for (int rowId = 0; rowId < TEST_ROW_COUNT; rowId++) { + Object rowVal = DataBlockTestUtils.getElement(rowBlock, rowId, colId, columnDataType); + Object colVal = DataBlockTestUtils.getElement(columnarBlock, rowId, colId, columnDataType); + Assert.assertEquals(rowVal, colVal, "Error comparing Row/Column Block at (" + rowId + "," + colId + ")" + + " of Type: " + columnDataType + "! rowValue: [" + rowVal + "], columnarValue: [" + colVal + "]"); + } + } + } +} diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTestUtils.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTestUtils.java new file mode 100644 index 0000000000..caa9b55ccf --- /dev/null +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/blocks/DataBlockTestUtils.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.blocks; + +import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; +import java.util.Random; +import org.apache.commons.lang.RandomStringUtils; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.spi.utils.ByteArray; + + +public class DataBlockTestUtils { + private static final Random RANDOM = new Random(); + private static final int ARRAY_SIZE = 5; + + private DataBlockTestUtils() { + // do not instantiate. + } + + public static Object[] getRandomRow(DataSchema dataSchema) { + final int numColumns = dataSchema.getColumnNames().length; + DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes(); + Object[] row = new Object[numColumns]; + for (int colId = 0; colId < numColumns; colId++) { + switch (columnDataTypes[colId].getStoredType()) { + case INT: + row[colId] = RANDOM.nextInt(); + break; + case LONG: + row[colId] = RANDOM.nextLong(); + break; + case FLOAT: + row[colId] = RANDOM.nextFloat(); + break; + case DOUBLE: + row[colId] = RANDOM.nextDouble(); + break; + case BIG_DECIMAL: + row[colId] = BigDecimal.valueOf(RANDOM.nextDouble()); + break; + case STRING: + row[colId] = RandomStringUtils.random(RANDOM.nextInt(20)); + break; + case BYTES: + row[colId] = new ByteArray(RandomStringUtils.random(RANDOM.nextInt(20)).getBytes()); + break; + // Just test Double here, all object types will be covered in ObjectCustomSerDeTest. + case OBJECT: + row[colId] = RANDOM.nextDouble(); + break; + case BOOLEAN_ARRAY: + case INT_ARRAY: + int length = RANDOM.nextInt(ARRAY_SIZE); + int[] intArray = new int[length]; + for (int i = 0; i < length; i++) { + intArray[i] = RANDOM.nextInt(); + } + row[colId] = intArray; + break; + case TIMESTAMP_ARRAY: + case LONG_ARRAY: + length = RANDOM.nextInt(ARRAY_SIZE); + long[] longArray = new long[length]; + for (int i = 0; i < length; i++) { + longArray[i] = RANDOM.nextLong(); + } + row[colId] = longArray; + break; + case FLOAT_ARRAY: + length = RANDOM.nextInt(ARRAY_SIZE); + float[] floatArray = new float[length]; + for (int i = 0; i < length; i++) { + floatArray[i] = RANDOM.nextFloat(); + } + row[colId] = floatArray; + break; + case DOUBLE_ARRAY: + length = RANDOM.nextInt(ARRAY_SIZE); + double[] doubleArray = new double[length]; + for (int i = 0; i < length; i++) { + doubleArray[i] = RANDOM.nextDouble(); + } + row[colId] = doubleArray; + break; + case BYTES_ARRAY: + case STRING_ARRAY: + length = RANDOM.nextInt(ARRAY_SIZE); + String[] stringArray = new String[length]; + for (int i = 0; i < length; i++) { + stringArray[i] = RandomStringUtils.random(RANDOM.nextInt(20)); + } + row[colId] = stringArray; + break; + default: + throw new UnsupportedOperationException("Can't fill random data for column type: " + columnDataTypes[colId]); + } + } + return row; + } + + public static Object getElement(BaseDataBlock dataBlock, int rowId, int colId, + DataSchema.ColumnDataType columnDataType) { + switch (columnDataType.getStoredType()) { + case INT: + return dataBlock.getInt(rowId, colId); + case LONG: + return dataBlock.getLong(rowId, colId); + case FLOAT: + return dataBlock.getFloat(rowId, colId); + case DOUBLE: + return dataBlock.getDouble(rowId, colId); + case BIG_DECIMAL: + return dataBlock.getBigDecimal(rowId, colId); + case STRING: + return dataBlock.getString(rowId, colId); + case BYTES: + return dataBlock.getBytes(rowId, colId); + case OBJECT: + return dataBlock.getObject(rowId, colId); + case BOOLEAN_ARRAY: + case INT_ARRAY: + return dataBlock.getIntArray(rowId, colId); + case TIMESTAMP_ARRAY: + case LONG_ARRAY: + return dataBlock.getLongArray(rowId, colId); + case FLOAT_ARRAY: + return dataBlock.getFloatArray(rowId, colId); + case DOUBLE_ARRAY: + return dataBlock.getDoubleArray(rowId, colId); + case BYTES_ARRAY: + case STRING_ARRAY: + return dataBlock.getStringArray(rowId, colId); + default: + throw new UnsupportedOperationException("Can't retrieve data for column type: " + columnDataType); + } + } + + public static List<Object[]> getRandomRows(DataSchema dataSchema, int numRows) { + List<Object[]> rows = new ArrayList<>(numRows); + for (int i = 0; i < numRows; i++) { + rows.add(getRandomRow(dataSchema)); + } + return rows; + } + + public static List<Object[]> getRandomColumnar(DataSchema dataSchema, int numRows) { + List<Object[]> rows = getRandomRows(dataSchema, numRows); + return convertColumnar(dataSchema, rows); + } + + public static List<Object[]> convertColumnar(DataSchema dataSchema, List<Object[]> rows) { + final int numRows = rows.size(); + final int numColumns = dataSchema.getColumnNames().length; + List<Object[]> columnars = new ArrayList<>(numColumns); + for (int colId = 0; colId < numColumns; colId++) { + columnars.add(new Object[numRows]); + for (int rowId = 0; rowId < numRows; rowId++) { + columnars.get(colId)[rowId] = rows.get(rowId)[colId]; + } + } + return columnars; + } +} diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java new file mode 100644 index 0000000000..9057d1a7ec --- /dev/null +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryDispatcherTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.service; + +import com.google.common.collect.Lists; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.apache.pinot.query.QueryEnvironment; +import org.apache.pinot.query.QueryEnvironmentTestUtils; +import org.apache.pinot.query.planner.PlannerUtils; +import org.apache.pinot.query.planner.QueryPlan; +import org.apache.pinot.query.runtime.QueryRunner; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + + +public class QueryDispatcherTest { + private static final Random RANDOM_REQUEST_ID_GEN = new Random(); + private static final int QUERY_SERVER_COUNT = 2; + private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>(); + private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>(); + + private QueryEnvironment _queryEnvironment; + + @BeforeClass + public void setUp() + throws Exception { + + for (int i = 0; i < QUERY_SERVER_COUNT; i++) { + int availablePort = QueryEnvironmentTestUtils.getAvailablePort(); + QueryRunner queryRunner = Mockito.mock(QueryRunner.class); + QueryServer queryServer = new QueryServer(availablePort, queryRunner); + queryServer.start(); + _queryServerMap.put(availablePort, queryServer); + _queryRunnerMap.put(availablePort, queryRunner); + } + + List<Integer> portList = Lists.newArrayList(_queryServerMap.keySet()); + + // reducer port doesn't matter, we are testing the worker instance not GRPC. + _queryEnvironment = QueryEnvironmentTestUtils.getQueryEnvironment(1, portList.get(0), portList.get(1)); + } + + @AfterClass + public void tearDown() { + for (QueryServer worker : _queryServerMap.values()) { + worker.shutdown(); + } + } + + @Test(dataProvider = "testDataWithSqlToCompiledAsWorkerRequest") + public void testQueryDispatcherCanSendCorrectPayload(String sql) + throws Exception { + QueryPlan queryPlan = _queryEnvironment.planQuery(sql); + QueryDispatcher dispatcher = new QueryDispatcher(); + int reducerStageId = dispatcher.submit(RANDOM_REQUEST_ID_GEN.nextLong(), queryPlan); + Assert.assertTrue(PlannerUtils.isRootStage(reducerStageId)); + } + + @DataProvider(name = "testDataWithSqlToCompiledAsWorkerRequest") + private Object[][] provideTestSqlToCompiledToWorkerRequest() { + return new Object[][] { + new Object[]{"SELECT * FROM b"}, + new Object[]{"SELECT * FROM a"}, + new Object[]{"SELECT * FROM a JOIN b ON a.col3 = b.col3"}, + new Object[]{"SELECT a.col1, a.ts, c.col2, c.col3 FROM a JOIN c ON a.col1 = c.col2 " + + " WHERE (a.col3 >= 0 OR a.col2 = 'foo') AND c.col3 >= 0"}, + }; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org