This is an automated email from the ASF dual-hosted git repository. siddteotia pushed a commit to branch multi_stage_query_engine in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/multi_stage_query_engine by this push: new 791248a318 Use proto for query plan serialization (#8479) 791248a318 is described below commit 791248a3188d70bad6d3d69e4580dddbfe206c16 Author: Rong Rong <ro...@apache.org> AuthorDate: Fri Apr 15 14:24:38 2022 -0700 Use proto for query plan serialization (#8479) * serializable format * fix linter * bump version since feature branch based bumped * fix auto-rebase error * fix test async mock intercept issue * use reflection for ser/de * also fix test coverage on all node types * fix java8 * fix JOINNode member type as well as plan.proto comments Co-authored-by: Rong Rong <ro...@startree.ai> --- .../pinot/common/config/provider/TableCache.java | 10 - .../src/main/proto/{worker.proto => plan.proto} | 66 ++++--- pinot-common/src/main/proto/worker.proto | 4 +- pinot-query-planner/pom.xml | 2 +- .../pinot/query/planner/RelToStageConverter.java | 5 +- .../query/planner/nodes/AbstractStageNode.java | 15 +- .../apache/pinot/query/planner/nodes/CalcNode.java | 7 +- .../apache/pinot/query/planner/nodes/JoinNode.java | 21 ++- .../query/planner/nodes/MailboxReceiveNode.java | 8 +- .../pinot/query/planner/nodes/MailboxSendNode.java | 8 +- .../pinot/query/planner/nodes/SerDeUtils.java | 65 +++++++ .../pinot/query/planner/nodes/TableScanNode.java | 8 +- .../ProtoSerializable.java} | 22 ++- .../nodes/serde/ProtoSerializationUtils.java | 209 +++++++++++++++++++++ .../partitioning/FieldSelectionKeySelector.java | 7 + pinot-query-runtime/pom.xml | 2 +- .../runtime/plan/serde/QueryPlanSerDeUtils.java | 6 +- .../runtime/plan/serde/StageNodeSerDeUtils.java | 56 ------ .../pinot/query/service/QueryServerTest.java | 66 ++++++- 19 files changed, 458 insertions(+), 129 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java index 8d3499c653..1f5d286608 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/config/provider/TableCache.java @@ -217,16 +217,6 @@ public class TableCache implements PinotConfigProvider { } } - /** - * Return a map between lower-case table name and their canonicalized form. Key-value pair are only different in - * case-sensitive environment. - * - * @return the table name map. - */ - public Map<String, String> getTableNameMap() { - return _tableNameMap; - } - private void addTableConfigs(List<String> paths) { // Subscribe data changes before reading the data to avoid missing changes for (String path : paths) { diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/plan.proto similarity index 54% copy from pinot-common/src/main/proto/worker.proto copy to pinot-common/src/main/proto/plan.proto index c64798fa63..47018197fc 100644 --- a/pinot-common/src/main/proto/worker.proto +++ b/pinot-common/src/main/proto/plan.proto @@ -39,36 +39,56 @@ syntax = "proto3"; package org.apache.pinot.common.proto; -service PinotQueryWorker { - // Dispatch a QueryRequest to a PinotQueryWorker - rpc Submit(QueryRequest) returns (QueryResponse); +message StageNode { + int32 stageId = 1; + string nodeName = 2; + repeated StageNode inputs = 3; + ObjectField objectField = 4; } -// QueryRequest is the dispatched content for a specific query stage on a specific worker. -message QueryRequest { - map<string, string> metadata = 1; - StagePlan stagePlan = 2; +// MemberVariableField defines the serialized format of the member variables of a class object. +// MemberVariableField can be one of +// 1. literal +// 2. list +// 3. map +// 4. complex class object +message MemberVariableField { + oneof member_variable_field { + LiteralField literalField = 1; + ListField listField = 2; + MapField mapField = 3; + ObjectField objectField = 4; + } } -// QueryResponse is the dispatched response from worker, it doesn't contain actual data, only dispatch status. -message QueryResponse { - map<string, string> metadata = 1; - bytes payload = 2; +// ObjectField defines the serialized format of a complex class object. +// it contains: +// 1. its fully-qualified clazz name; +// 2. its MemberVariableField map. +message ObjectField { + string objectClassName = 1; + map<string, MemberVariableField> memberVariables = 2; } -message StagePlan { - int32 stageId = 1; - string instanceId = 2; - bytes serializedStageRoot = 3; - map<int32, StageMetadata> stageMetadata = 4; +// LiteralField defines the serialized format of a literal field. +message LiteralField { + oneof literal_field { + bool boolField = 1; + int32 intField = 2; + int64 longField = 3; + double doubleField = 4; + string stringField = 5; + } } -message StageMetadata { - repeated string instances = 1; - repeated string dataSources = 2; - map<string, SegmentList> instanceToSegmentList = 3; +// ListField defines the serialized format of a list field. +// The content of the list is a MemberVariableField. +message ListField { + repeated MemberVariableField content = 1; } -message SegmentList { - repeated string segments = 1; -} +// ListField defines the serialized format of a map field. +// The key of the map is a string and the value of the map is a MemberVariableField. +message MapField { + map<string, MemberVariableField> content = 1; +} \ No newline at end of file diff --git a/pinot-common/src/main/proto/worker.proto b/pinot-common/src/main/proto/worker.proto index c64798fa63..87aecc8391 100644 --- a/pinot-common/src/main/proto/worker.proto +++ b/pinot-common/src/main/proto/worker.proto @@ -39,6 +39,8 @@ syntax = "proto3"; package org.apache.pinot.common.proto; +import "plan.proto"; + service PinotQueryWorker { // Dispatch a QueryRequest to a PinotQueryWorker rpc Submit(QueryRequest) returns (QueryResponse); @@ -59,7 +61,7 @@ message QueryResponse { message StagePlan { int32 stageId = 1; string instanceId = 2; - bytes serializedStageRoot = 3; + StageNode stageRoot = 3; map<int32, StageMetadata> stageMetadata = 4; } diff --git a/pinot-query-planner/pom.xml b/pinot-query-planner/pom.xml index 8d1af64a19..05b9461cdc 100644 --- a/pinot-query-planner/pom.xml +++ b/pinot-query-planner/pom.xml @@ -26,7 +26,7 @@ <parent> <artifactId>pinot</artifactId> <groupId>org.apache.pinot</groupId> - <version>0.10.0-SNAPSHOT</version> + <version>0.11.0-SNAPSHOT</version> </parent> <artifactId>pinot-query-planner</artifactId> <name>Pinot Query Planner</name> diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java index e558694aab..572302ef92 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/RelToStageConverter.java @@ -37,7 +37,6 @@ import org.apache.pinot.query.planner.nodes.JoinNode; import org.apache.pinot.query.planner.nodes.StageNode; import org.apache.pinot.query.planner.nodes.TableScanNode; import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; -import org.apache.pinot.query.planner.partitioning.KeySelector; /** @@ -93,8 +92,8 @@ public final class RelToStageConverter { RelDataType rightRowType = node.getRight().getRowType(); int leftOperandIndex = ((RexInputRef) joinCondition.getOperands().get(0)).getIndex(); int rightOperandIndex = ((RexInputRef) joinCondition.getOperands().get(1)).getIndex(); - KeySelector<Object[], Object> leftFieldSelectionKeySelector = new FieldSelectionKeySelector(leftOperandIndex); - KeySelector<Object[], Object> rightFieldSelectionKeySelector = + FieldSelectionKeySelector leftFieldSelectionKeySelector = new FieldSelectionKeySelector(leftOperandIndex); + FieldSelectionKeySelector rightFieldSelectionKeySelector = new FieldSelectionKeySelector(rightOperandIndex - leftRowType.getFieldNames().size()); return new JoinNode(currentStageId, joinType, Collections.singletonList(new JoinNode.JoinClause( leftFieldSelectionKeySelector, rightFieldSelectionKeySelector))); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java index b99075429a..ed1fc9ba3e 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/AbstractStageNode.java @@ -20,9 +20,12 @@ package org.apache.pinot.query.planner.nodes; import java.util.ArrayList; import java.util.List; +import org.apache.pinot.common.proto.Plan; +import org.apache.pinot.query.planner.nodes.serde.ProtoSerializable; +import org.apache.pinot.query.planner.nodes.serde.ProtoSerializationUtils; -public abstract class AbstractStageNode implements StageNode { +public abstract class AbstractStageNode implements StageNode, ProtoSerializable { protected final int _stageId; protected final List<StageNode> _inputs; @@ -46,4 +49,14 @@ public abstract class AbstractStageNode implements StageNode { public int getStageId() { return _stageId; } + + @Override + public void setObjectField(Plan.ObjectField objectField) { + ProtoSerializationUtils.fromObjectField(this, objectField); + } + + @Override + public Plan.ObjectField getObjectField() { + return ProtoSerializationUtils.toObjectField(this); + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java index b188b8e2f7..0aa8c94ec8 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java @@ -19,8 +19,13 @@ package org.apache.pinot.query.planner.nodes; + public class CalcNode extends AbstractStageNode { - private final String _expression; + private String _expression; + + public CalcNode(int stageId) { + super(stageId); + } public CalcNode(int stageId, String expression) { super(stageId); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java index 94af122c74..bf380639d8 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/JoinNode.java @@ -18,15 +18,19 @@ */ package org.apache.pinot.query.planner.nodes; -import java.io.Serializable; import java.util.List; import org.apache.calcite.rel.core.JoinRelType; +import org.apache.pinot.query.planner.partitioning.FieldSelectionKeySelector; import org.apache.pinot.query.planner.partitioning.KeySelector; public class JoinNode extends AbstractStageNode { - private final JoinRelType _joinRelType; - private final List<JoinClause> _criteria; + private JoinRelType _joinRelType; + private List<JoinClause> _criteria; + + public JoinNode(int stageId) { + super(stageId); + } public JoinNode(int stageId, JoinRelType joinRelType, List<JoinClause> criteria ) { @@ -43,11 +47,14 @@ public class JoinNode extends AbstractStageNode { return _criteria; } - public static class JoinClause implements Serializable { - private final KeySelector<Object[], Object> _leftJoinKeySelector; - private final KeySelector<Object[], Object> _rightJoinKeySelector; + public static class JoinClause { + private KeySelector<Object[], Object> _leftJoinKeySelector; + private KeySelector<Object[], Object> _rightJoinKeySelector; + + public JoinClause() { + } - public JoinClause(KeySelector<Object[], Object> leftKeySelector, KeySelector<Object[], Object> rightKeySelector) { + public JoinClause(FieldSelectionKeySelector leftKeySelector, FieldSelectionKeySelector rightKeySelector) { _leftJoinKeySelector = leftKeySelector; _rightJoinKeySelector = rightKeySelector; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java index d8269346f3..8f0c619b79 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxReceiveNode.java @@ -22,8 +22,12 @@ import org.apache.calcite.rel.RelDistribution; public class MailboxReceiveNode extends AbstractStageNode { - private final int _senderStageId; - private final RelDistribution.Type _exchangeType; + private int _senderStageId; + private RelDistribution.Type _exchangeType; + + public MailboxReceiveNode(int stageId) { + super(stageId); + } public MailboxReceiveNode(int stageId, int senderStageId, RelDistribution.Type exchangeType) { super(stageId); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java index ea39ad3493..9867a16f61 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/MailboxSendNode.java @@ -22,8 +22,12 @@ import org.apache.calcite.rel.RelDistribution; public class MailboxSendNode extends AbstractStageNode { - private final int _receiverStageId; - private final RelDistribution.Type _exchangeType; + private int _receiverStageId; + private RelDistribution.Type _exchangeType; + + public MailboxSendNode(int stageId) { + super(stageId); + } public MailboxSendNode(int stageId, int receiverStageId, RelDistribution.Type exchangeType) { super(stageId); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/SerDeUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/SerDeUtils.java new file mode 100644 index 0000000000..ad7184cdb1 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/SerDeUtils.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes; + +import org.apache.pinot.common.proto.Plan; + + +public final class SerDeUtils { + private SerDeUtils() { + // do not instantiate. + } + + public static AbstractStageNode deserializeStageNode(Plan.StageNode protoNode) { + AbstractStageNode stageNode = newNodeInstance(protoNode.getNodeName(), protoNode.getStageId()); + stageNode.setObjectField(protoNode.getObjectField()); + for (Plan.StageNode protoChild : protoNode.getInputsList()) { + stageNode.addInput(deserializeStageNode(protoChild)); + } + return stageNode; + } + + public static Plan.StageNode serializeStageNode(AbstractStageNode stageNode) { + Plan.StageNode.Builder builder = Plan.StageNode.newBuilder() + .setStageId(stageNode.getStageId()) + .setNodeName(stageNode.getClass().getSimpleName()) + .setObjectField(stageNode.getObjectField()); + for (StageNode childNode : stageNode.getInputs()) { + builder.addInputs(serializeStageNode((AbstractStageNode) childNode)); + } + return builder.build(); + } + + private static AbstractStageNode newNodeInstance(String nodeName, int stageId) { + switch (nodeName) { + case "TableScanNode": + return new TableScanNode(stageId); + case "JoinNode": + return new JoinNode(stageId); + case "CalcNode": + return new CalcNode(stageId); + case "MailboxSendNode": + return new MailboxSendNode(stageId); + case "MailboxReceiveNode": + return new MailboxReceiveNode(stageId); + default: + throw new IllegalArgumentException("Unknown node name: " + nodeName); + } + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java index 8d78ec6d0f..9375a7e986 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/TableScanNode.java @@ -22,8 +22,12 @@ import java.util.List; public class TableScanNode extends AbstractStageNode { - private final String _tableName; - private final List<String> _tableScanColumns; + private String _tableName; + private List<String> _tableScanColumns; + + public TableScanNode(int stageId) { + super(stageId); + } public TableScanNode(int stageId, String tableName, List<String> tableScanColumns) { super(stageId); diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializable.java similarity index 69% copy from pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java copy to pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializable.java index b188b8e2f7..2b99003e87 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/CalcNode.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializable.java @@ -16,18 +16,20 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.pinot.query.planner.nodes; +/** + * Autogenerated by Thrift Compiler (0.13.0) + * + * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING + * @generated + */ +package org.apache.pinot.query.planner.nodes.serde; + +import org.apache.pinot.common.proto.Plan; -public class CalcNode extends AbstractStageNode { - private final String _expression; +public interface ProtoSerializable { - public CalcNode(int stageId, String expression) { - super(stageId); - _expression = expression; - } + void setObjectField(Plan.ObjectField objFields); - public String getExpression() { - return _expression; - } + Plan.ObjectField getObjectField(); } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializationUtils.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializationUtils.java new file mode 100644 index 0000000000..c30295a101 --- /dev/null +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/nodes/serde/ProtoSerializationUtils.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.planner.nodes.serde; + +import com.google.common.base.Preconditions; +import java.lang.reflect.Field; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.pinot.common.proto.Plan; + + +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ProtoSerializationUtils { + private static final String ENUM_VALUE_KEY = "ENUM_VALUE_KEY"; + + private ProtoSerializationUtils() { + // do not instantiate. + } + + public static void fromObjectField(Object object, Plan.ObjectField objectField) { + Map<String, Plan.MemberVariableField> memberVariablesMap = objectField.getMemberVariablesMap(); + try { + for (Map.Entry<String, Plan.MemberVariableField> e : memberVariablesMap.entrySet()) { + Object memberVarObject = constructMemberVariable(e.getValue()); + if (memberVarObject != null) { + Field declaredField = object.getClass().getDeclaredField(e.getKey()); + declaredField.setAccessible(true); + declaredField.set(object, memberVarObject); + } + } + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new IllegalStateException("Unable to set Object field for: " + objectField.getObjectClassName(), e); + } + } + + public static Plan.ObjectField toObjectField(Object object) { + Plan.ObjectField.Builder builder = Plan.ObjectField.newBuilder(); + builder.setObjectClassName(object.getClass().getName()); + // special handling for enum + if (object instanceof Enum) { + builder.putMemberVariables(ENUM_VALUE_KEY, serializeMemberVariable(((Enum) object).name())); + } else { + try { + for (Field field : object.getClass().getDeclaredFields()) { + field.setAccessible(true); + Object fieldObject = field.get(object); + builder.putMemberVariables(field.getName(), serializeMemberVariable(fieldObject)); + } + } catch (IllegalAccessException e) { + throw new IllegalStateException("Unable to serialize Object: " + object.getClass(), e); + } + } + return builder.build(); + } + + // -------------------------------------------------------------------------- + // Serialize Utils + // -------------------------------------------------------------------------- + + private static Plan.LiteralField boolField(boolean val) { + return Plan.LiteralField.newBuilder().setBoolField(val).build(); + } + + private static Plan.LiteralField intField(int val) { + return Plan.LiteralField.newBuilder().setIntField(val).build(); + } + + private static Plan.LiteralField longField(long val) { + return Plan.LiteralField.newBuilder().setLongField(val).build(); + } + + private static Plan.LiteralField doubleField(double val) { + return Plan.LiteralField.newBuilder().setDoubleField(val).build(); + } + + private static Plan.LiteralField stringField(String val) { + return Plan.LiteralField.newBuilder().setStringField(val).build(); + } + + private static Plan.MemberVariableField serializeMemberVariable(Object fieldObject) { + Plan.MemberVariableField.Builder builder = Plan.MemberVariableField.newBuilder(); + if (fieldObject instanceof Boolean) { + builder.setLiteralField(boolField((Boolean) fieldObject)); + } else if (fieldObject instanceof Integer) { + builder.setLiteralField(intField((Integer) fieldObject)); + } else if (fieldObject instanceof Long) { + builder.setLiteralField(longField((Long) fieldObject)); + } else if (fieldObject instanceof Double) { + builder.setLiteralField(doubleField((Double) fieldObject)); + } else if (fieldObject instanceof String) { + builder.setLiteralField(stringField((String) fieldObject)); + } else if (fieldObject instanceof List) { + builder.setListField(serializeListMemberVariable(fieldObject)); + } else if (fieldObject instanceof Map) { + builder.setMapField(serializeMapMemberVariable(fieldObject)); + } else { + builder.setObjectField(toObjectField(fieldObject)); + } + return builder.build(); + } + + private static Plan.ListField serializeListMemberVariable(Object fieldObject) { + Preconditions.checkState(fieldObject instanceof List); + Plan.ListField.Builder builder = Plan.ListField.newBuilder(); + for (Object e : (List) fieldObject) { + builder.addContent(serializeMemberVariable(e)); + } + return builder.build(); + } + + private static Plan.MapField serializeMapMemberVariable(Object fieldObject) { + Preconditions.checkState(fieldObject instanceof Map); + Plan.MapField.Builder builder = Plan.MapField.newBuilder(); + Set<Map.Entry<String, Object>> entrySet = ((Map) fieldObject).entrySet(); + for (Map.Entry<String, Object> e : entrySet) { + builder.putContent(e.getKey(), serializeMemberVariable(e.getValue())); + } + return builder.build(); + } + + // -------------------------------------------------------------------------- + // Deserialize Utils + // -------------------------------------------------------------------------- + + private static Object constructMemberVariable(Plan.MemberVariableField memberVariableField) { + switch (memberVariableField.getMemberVariableFieldCase()) { + case LITERALFIELD: + return constructLiteral(memberVariableField.getLiteralField()); + case LISTFIELD: + return constructList(memberVariableField.getListField()); + case MAPFIELD: + return constructMap(memberVariableField.getMapField()); + case OBJECTFIELD: + return constructObject(memberVariableField.getObjectField()); + case MEMBERVARIABLEFIELD_NOT_SET: + default: + return null; + } + } + + private static Object constructLiteral(Plan.LiteralField literalField) { + switch (literalField.getLiteralFieldCase()) { + case BOOLFIELD: + return literalField.getBoolField(); + case INTFIELD: + return literalField.getIntField(); + case LONGFIELD: + return literalField.getLongField(); + case DOUBLEFIELD: + return literalField.getDoubleField(); + case STRINGFIELD: + return literalField.getStringField(); + case LITERALFIELD_NOT_SET: + default: + return null; + } + } + + private static List constructList(Plan.ListField listField) { + List list = new ArrayList(); + for (Plan.MemberVariableField e : listField.getContentList()) { + list.add(constructMemberVariable(e)); + } + return list; + } + + private static Object constructMap(Plan.MapField mapField) { + Map map = new HashMap(); + for (Map.Entry<String, Plan.MemberVariableField> e : mapField.getContentMap().entrySet()) { + map.put(e.getKey(), constructMemberVariable(e.getValue())); + } + return map; + } + + private static Object constructObject(Plan.ObjectField objectField) { + try { + Class<?> clazz = Class.forName(objectField.getObjectClassName()); + if (clazz.isEnum()) { + return Enum.valueOf((Class<Enum>) clazz, + objectField.getMemberVariablesOrDefault(ENUM_VALUE_KEY, null).getLiteralField().getStringField()); + } else { + Object obj = clazz.newInstance(); + fromObjectField(obj, objectField); + return obj; + } + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + throw new IllegalStateException("Unable to create Object of type: " + objectField.getObjectClassName(), e); + } + } +} diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java index 0b846e555c..95991d558b 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/planner/partitioning/FieldSelectionKeySelector.java @@ -26,10 +26,17 @@ public class FieldSelectionKeySelector implements KeySelector<Object[], Object> private int _columnIndex; + public FieldSelectionKeySelector() { + } + public FieldSelectionKeySelector(int columnIndex) { _columnIndex = columnIndex; } + public int getColumnIndex() { + return _columnIndex; + } + @Override public Object getKey(Object[] input) { return input[_columnIndex]; diff --git a/pinot-query-runtime/pom.xml b/pinot-query-runtime/pom.xml index 3607e51e31..03fa270ea4 100644 --- a/pinot-query-runtime/pom.xml +++ b/pinot-query-runtime/pom.xml @@ -26,7 +26,7 @@ <parent> <artifactId>pinot</artifactId> <groupId>org.apache.pinot</groupId> - <version>0.10.0-SNAPSHOT</version> + <version>0.11.0-SNAPSHOT</version> </parent> <artifactId>pinot-query-runtime</artifactId> <name>Pinot Query Runtime</name> diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java index 1b4ecbbdc0..358ebb8465 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/QueryPlanSerDeUtils.java @@ -25,6 +25,8 @@ import org.apache.commons.lang3.StringUtils; import org.apache.pinot.common.proto.Worker; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.planner.StageMetadata; +import org.apache.pinot.query.planner.nodes.AbstractStageNode; +import org.apache.pinot.query.planner.nodes.SerDeUtils; import org.apache.pinot.query.routing.WorkerInstance; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; @@ -41,7 +43,7 @@ public class QueryPlanSerDeUtils { public static DistributedStagePlan deserialize(Worker.StagePlan stagePlan) { DistributedStagePlan distributedStagePlan = new DistributedStagePlan(stagePlan.getStageId()); distributedStagePlan.setServerInstance(stringToInstance(stagePlan.getInstanceId())); - distributedStagePlan.setStageRoot(StageNodeSerDeUtils.deserializeStageRoot(stagePlan.getSerializedStageRoot())); + distributedStagePlan.setStageRoot(SerDeUtils.deserializeStageNode(stagePlan.getStageRoot())); Map<Integer, Worker.StageMetadata> metadataMap = stagePlan.getStageMetadataMap(); distributedStagePlan.getMetadataMap().putAll(protoMapToStageMetadataMap(metadataMap)); return distributedStagePlan; @@ -51,7 +53,7 @@ public class QueryPlanSerDeUtils { return Worker.StagePlan.newBuilder() .setStageId(distributedStagePlan.getStageId()) .setInstanceId(instanceToString(distributedStagePlan.getServerInstance())) - .setSerializedStageRoot(StageNodeSerDeUtils.serializeStageRoot(distributedStagePlan.getStageRoot())) + .setStageRoot(SerDeUtils.serializeStageNode((AbstractStageNode) distributedStagePlan.getStageRoot())) .putAllStageMetadata(stageMetadataMapToProtoMap(distributedStagePlan.getMetadataMap())).build(); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/StageNodeSerDeUtils.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/StageNodeSerDeUtils.java deleted file mode 100644 index 80370128cf..0000000000 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/serde/StageNodeSerDeUtils.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.query.runtime.plan.serde; - -import com.google.common.base.Preconditions; -import com.google.protobuf.ByteString; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import org.apache.pinot.query.planner.nodes.StageNode; - - -public class StageNodeSerDeUtils { - private StageNodeSerDeUtils() { - // do not instantiate. - } - - public static StageNode deserializeStageRoot(ByteString serializeStagePlan) { - try (ByteArrayInputStream bs = new ByteArrayInputStream(serializeStagePlan.toByteArray()); - ObjectInputStream is = new ObjectInputStream(bs)) { - Object o = is.readObject(); - Preconditions.checkState(o instanceof StageNode, "invalid worker query request object"); - return (StageNode) o; - } catch (Exception e) { - throw new RuntimeException(e); - } - } - - public static ByteString serializeStageRoot(StageNode stageRoot) { - try (ByteArrayOutputStream bs = new ByteArrayOutputStream(); - ObjectOutputStream os = new ObjectOutputStream(bs)) { - os.writeObject(stageRoot); - return ByteString.copyFrom(bs.toByteArray()); - } catch (IOException e) { - throw new RuntimeException(e); - } - } -} diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java index 539d5d9a53..eeed6d20d9 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/service/QueryServerTest.java @@ -20,29 +20,37 @@ package org.apache.pinot.query.service; import com.google.common.collect.Lists; import io.grpc.ManagedChannelBuilder; +import java.util.Comparator; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutorService; import org.apache.pinot.common.proto.PinotQueryWorkerGrpc; import org.apache.pinot.common.proto.Worker; import org.apache.pinot.core.transport.ServerInstance; import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.QueryEnvironmentTestUtils; import org.apache.pinot.query.planner.QueryPlan; +import org.apache.pinot.query.planner.StageMetadata; +import org.apache.pinot.query.planner.nodes.StageNode; import org.apache.pinot.query.routing.WorkerInstance; import org.apache.pinot.query.runtime.QueryRunner; import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; +import org.apache.pinot.util.TestUtils; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.mockito.ArgumentMatchers.any; + public class QueryServerTest { private static final int QUERY_SERVER_COUNT = 2; - private Map<Integer, QueryServer> _queryServerMap = new HashMap<>(); - private Map<Integer, ServerInstance> _queryServerInstanceMap = new HashMap<>(); + private final Map<Integer, QueryServer> _queryServerMap = new HashMap<>(); + private final Map<Integer, ServerInstance> _queryServerInstanceMap = new HashMap<>(); + private final Map<Integer, QueryRunner> _queryRunnerMap = new HashMap<>(); private QueryEnvironment _queryEnvironment; @@ -52,9 +60,11 @@ public class QueryServerTest { for (int i = 0; i < QUERY_SERVER_COUNT; i++) { int availablePort = QueryEnvironmentTestUtils.getAvailablePort(); - QueryServer queryServer = new QueryServer(availablePort, Mockito.mock(QueryRunner.class)); + QueryRunner queryRunner = Mockito.mock(QueryRunner.class); + QueryServer queryServer = new QueryServer(availablePort, queryRunner); queryServer.start(); _queryServerMap.put(availablePort, queryServer); + _queryRunnerMap.put(availablePort, queryRunner); // this only test the QueryServer functionality so the server port can be the same as the mailbox port. // this is only use for test identifier purpose. _queryServerInstanceMap.put(availablePort, new WorkerInstance("localhost", availablePort, availablePort)); @@ -73,17 +83,59 @@ public class QueryServerTest { } } + @SuppressWarnings("unchecked") @Test public void testWorkerAcceptsWorkerRequestCorrect() throws Exception { QueryPlan queryPlan = _queryEnvironment.planQuery("SELECT * FROM a JOIN b ON a.col1 = b.col2"); - int singleServerStageId = QueryEnvironmentTestUtils.getTestStageByServerCount(queryPlan, 1); + for (int stageId : queryPlan.getStageMetadataMap().keySet()) { + if (stageId > 0) { // we do not test reduce stage. + Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, stageId); + + // submit the request for testing. + submitRequest(queryRequest); + + StageMetadata stageMetadata = queryPlan.getStageMetadataMap().get(stageId); + + // ensure mock query runner received correctly deserialized payload. + // since submitRequest is async, we need to wait for the mockRunner to receive the query payload. + QueryRunner mockRunner = _queryRunnerMap.get(stageMetadata.getServerInstances().get(0).getPort()); + TestUtils.waitForCondition(aVoid -> { + try { + Mockito.verify(mockRunner).processQuery(Mockito.argThat(distributedStagePlan -> { + StageNode stageNode = queryPlan.getQueryStageMap().get(stageId); + return isStageNodesEqual(stageNode, distributedStagePlan.getStageRoot()) && isMetadataMapsEqual( + stageMetadata, distributedStagePlan.getMetadataMap().get(stageId)); + }), any(ExecutorService.class), any(Map.class)); + return true; + } catch (Throwable t) { + return false; + } + }, 1000L, "Error verifying mock QueryRunner intercepted query payload!"); + } + } + } - Worker.QueryRequest queryRequest = getQueryRequest(queryPlan, singleServerStageId); + private static boolean isMetadataMapsEqual(StageMetadata left, StageMetadata right) { + return left.getServerInstances().equals(right.getServerInstances()) + && left.getServerInstanceToSegmentsMap().equals(right.getServerInstanceToSegmentsMap()) + && left.getScannedTables().equals(right.getScannedTables()); + } - // submit the request for testing. - submitRequest(queryRequest); + private static boolean isStageNodesEqual(StageNode left, StageNode right) { + if (left.getStageId() != right.getStageId() || left.getClass() != right.getClass() + || left.getInputs().size() != right.getInputs().size()) { + return false; + } + left.getInputs().sort(Comparator.comparingInt(StageNode::getStageId)); + right.getInputs().sort(Comparator.comparingInt(StageNode::getStageId)); + for (int i = 0; i < left.getInputs().size(); i++) { + if (!isStageNodesEqual(left.getInputs().get(i), right.getInputs().get(i))) { + return false; + } + } + return true; } private void submitRequest(Worker.QueryRequest queryRequest) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org