This is an automated email from the ASF dual-hosted git repository. ankitsultana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new 31933064bf [timeseries] Part-1: Misc. Refactor to Enable Broker Reduce (#14582) 31933064bf is described below commit 31933064bf05cd9123bd8398de515a2f661311a8 Author: Ankit Sultana <ankitsult...@uber.com> AuthorDate: Wed Dec 4 13:11:01 2024 -0600 [timeseries] Part-1: Misc. Refactor to Enable Broker Reduce (#14582) --- .../apache/pinot/common/datablock/DataBlockUtils.java | 17 +++++++++++++++++ .../apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java | 2 +- .../pinot/tsdb/m3ql/plan/KeepLastValuePlanNode.java | 11 ++++++++--- .../pinot/tsdb/m3ql/plan/TransformNullPlanNode.java | 15 ++++++++++----- .../pinot/tsdb/m3ql/time/TimeBucketComputer.java | 2 +- .../apache/pinot/query/mailbox/GrpcSendingMailbox.java | 16 ++-------------- .../timeseries/PhysicalTimeSeriesPlanVisitor.java | 6 +++--- .../timeseries/TimeSeriesPhysicalTableScan.java | 6 ++++++ .../pinot/tsdb/planner/TimeSeriesQueryEnvironment.java | 2 +- .../pinot/tsdb/planner/physical/TableScanVisitor.java | 2 +- .../pinot/tsdb/spi/plan/BaseTimeSeriesPlanNode.java | 16 +++++++++------- .../pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java | 10 ++++++++-- .../pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java | 18 +++++++++--------- 13 files changed, 76 insertions(+), 47 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java index 1ae3f4ad6d..ddd9debbc7 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/datablock/DataBlockUtils.java @@ -20,6 +20,8 @@ package org.apache.pinot.common.datablock; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import com.google.protobuf.UnsafeByteOperations; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; @@ -169,6 +171,21 @@ public final class DataBlockUtils { return result; } + public static ByteString toByteString(DataBlock dataBlock) + throws IOException { + List<ByteBuffer> bytes = dataBlock.serialize(); + ByteString byteString; + if (bytes.isEmpty()) { + byteString = ByteString.EMPTY; + } else { + byteString = UnsafeByteOperations.unsafeWrap(bytes.get(0)); + for (int i = 1; i < bytes.size(); i++) { + byteString = byteString.concat(UnsafeByteOperations.unsafeWrap(bytes.get(i))); + } + } + return byteString; + } + /** * Reads a data block from the given byte buffer. * @param buffer the buffer to read from. The data will be read at the buffer's current position. This position will diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java index aa31692a33..53844048a7 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/M3TimeSeriesPlanner.java @@ -108,7 +108,7 @@ public class M3TimeSeriesPlanner implements TimeSeriesLogicalPlanner { rootNode = currentNode; } if (lastNode != null) { - lastNode.addChildNode(currentNode); + lastNode.addInputNode(currentNode); } lastNode = currentNode; } diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/KeepLastValuePlanNode.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/KeepLastValuePlanNode.java index 26359269cd..8cf62b830b 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/KeepLastValuePlanNode.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/KeepLastValuePlanNode.java @@ -29,8 +29,13 @@ import org.apache.pinot.tsdb.spi.plan.BaseTimeSeriesPlanNode; public class KeepLastValuePlanNode extends BaseTimeSeriesPlanNode { @JsonCreator public KeepLastValuePlanNode(@JsonProperty("id") String id, - @JsonProperty("children") List<BaseTimeSeriesPlanNode> children) { - super(id, children); + @JsonProperty("inputs") List<BaseTimeSeriesPlanNode> inputs) { + super(id, inputs); + } + + @Override + public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode> newInputs) { + return new KeepLastValuePlanNode(_id, newInputs); } @Override @@ -45,7 +50,7 @@ public class KeepLastValuePlanNode extends BaseTimeSeriesPlanNode { @Override public BaseTimeSeriesOperator run() { - BaseTimeSeriesOperator childOperator = _children.get(0).run(); + BaseTimeSeriesOperator childOperator = _inputs.get(0).run(); return new KeepLastValueOperator(List.of(childOperator)); } } diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/TransformNullPlanNode.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/TransformNullPlanNode.java index c98e4f421e..c749fac906 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/TransformNullPlanNode.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/plan/TransformNullPlanNode.java @@ -34,8 +34,8 @@ public class TransformNullPlanNode extends BaseTimeSeriesPlanNode { @JsonCreator public TransformNullPlanNode(@JsonProperty("id") String id, @JsonProperty("defaultValue") Double defaultValue, - @JsonProperty("children") List<BaseTimeSeriesPlanNode> children) { - super(id, children); + @JsonProperty("inputs") List<BaseTimeSeriesPlanNode> inputs) { + super(id, inputs); _defaultValue = defaultValue; } @@ -43,6 +43,11 @@ public class TransformNullPlanNode extends BaseTimeSeriesPlanNode { return _defaultValue; } + @Override + public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode> newInputs) { + return new TransformNullPlanNode(_id, _defaultValue, newInputs); + } + @Override public String getKlass() { return TransformNullPlanNode.class.getName(); @@ -55,9 +60,9 @@ public class TransformNullPlanNode extends BaseTimeSeriesPlanNode { @Override public BaseTimeSeriesOperator run() { - Preconditions.checkState(_children.size() == 1, - "TransformNullPlanNode should have only 1 child, got: %s", _children.size()); - BaseTimeSeriesOperator childOperator = _children.get(0).run(); + Preconditions.checkState(_inputs.size() == 1, + "TransformNullPlanNode should have only 1 child, got: %s", _inputs.size()); + BaseTimeSeriesOperator childOperator = _inputs.get(0).run(); return new TransformNullOperator(_defaultValue, ImmutableList.of(childOperator)); } } diff --git a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java index a7353ee6c8..1cc0e6650b 100644 --- a/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java +++ b/pinot-plugins/pinot-timeseries-lang/pinot-timeseries-m3ql/src/main/java/org/apache/pinot/tsdb/m3ql/time/TimeBucketComputer.java @@ -48,7 +48,7 @@ public class TimeBucketComputer { return constraints; } QueryTimeBoundaryConstraints constraints = new QueryTimeBoundaryConstraints(); - for (BaseTimeSeriesPlanNode childNode : planNode.getChildren()) { + for (BaseTimeSeriesPlanNode childNode : planNode.getInputs()) { QueryTimeBoundaryConstraints childConstraints = process(childNode, request); constraints = QueryTimeBoundaryConstraints.merge(constraints, childConstraints); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java index dec3de0de7..b21d3a7f4a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java @@ -19,13 +19,11 @@ package org.apache.pinot.query.mailbox; import com.google.protobuf.ByteString; -import com.google.protobuf.UnsafeByteOperations; import io.grpc.stub.StreamObserver; import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.datablock.DataBlock; +import org.apache.pinot.common.datablock.DataBlockUtils; import org.apache.pinot.common.datatable.StatMap; import org.apache.pinot.common.proto.Mailbox.MailboxContent; import org.apache.pinot.common.proto.PinotMailboxGrpc; @@ -135,17 +133,7 @@ public class GrpcSendingMailbox implements SendingMailbox { long start = System.currentTimeMillis(); try { DataBlock dataBlock = block.getDataBlock(); - List<ByteBuffer> bytes = dataBlock.serialize(); - - ByteString byteString; - if (bytes.isEmpty()) { - byteString = ByteString.EMPTY; - } else { - byteString = UnsafeByteOperations.unsafeWrap(bytes.get(0)); - for (int i = 1; i < bytes.size(); i++) { - byteString = byteString.concat(UnsafeByteOperations.unsafeWrap(bytes.get(i))); - } - } + ByteString byteString = DataBlockUtils.toByteString(dataBlock); int sizeInBytes = byteString.size(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Serialized block: {} to {} bytes", block, sizeInBytes); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java index d300232625..9d6ab83a4b 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/PhysicalTimeSeriesPlanVisitor.java @@ -66,15 +66,15 @@ public class PhysicalTimeSeriesPlanVisitor { } public void initLeafPlanNode(BaseTimeSeriesPlanNode planNode, TimeSeriesExecutionContext context) { - for (int index = 0; index < planNode.getChildren().size(); index++) { - BaseTimeSeriesPlanNode childNode = planNode.getChildren().get(index); + for (int index = 0; index < planNode.getInputs().size(); index++) { + BaseTimeSeriesPlanNode childNode = planNode.getInputs().get(index); if (childNode instanceof LeafTimeSeriesPlanNode) { LeafTimeSeriesPlanNode leafNode = (LeafTimeSeriesPlanNode) childNode; List<String> segments = context.getPlanIdToSegmentsMap().get(leafNode.getId()); ServerQueryRequest serverQueryRequest = compileLeafServerQueryRequest(leafNode, segments, context); TimeSeriesPhysicalTableScan physicalTableScan = new TimeSeriesPhysicalTableScan(childNode.getId(), serverQueryRequest, _queryExecutor, _executorService); - planNode.getChildren().set(index, physicalTableScan); + planNode.getInputs().set(index, physicalTableScan); } else { initLeafPlanNode(childNode, context); } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java index 272e556498..c823842c24 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/timeseries/TimeSeriesPhysicalTableScan.java @@ -19,6 +19,7 @@ package org.apache.pinot.query.runtime.timeseries; import java.util.Collections; +import java.util.List; import java.util.concurrent.ExecutorService; import org.apache.pinot.core.query.executor.QueryExecutor; import org.apache.pinot.core.query.request.ServerQueryRequest; @@ -54,6 +55,11 @@ public class TimeSeriesPhysicalTableScan extends BaseTimeSeriesPlanNode { return _executorService; } + @Override + public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode> newInputs) { + throw new UnsupportedOperationException("withInputs not supported for TimeSeriesPhysicalTableScan"); + } + public String getKlass() { return TimeSeriesPhysicalTableScan.class.getName(); } diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java index 493efb2361..d061b21074 100644 --- a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/TimeSeriesQueryEnvironment.java @@ -126,7 +126,7 @@ public class TimeSeriesQueryEnvironment { tableNameConsumer.accept(scanNode.getTableName()); return; } - for (BaseTimeSeriesPlanNode childNode : planNode.getChildren()) { + for (BaseTimeSeriesPlanNode childNode : planNode.getInputs()) { findTableNames(childNode, tableNameConsumer); } } diff --git a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java index 99728e0f6d..d9f80b54ac 100644 --- a/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java +++ b/pinot-timeseries/pinot-timeseries-planner/src/main/java/org/apache/pinot/tsdb/planner/physical/TableScanVisitor.java @@ -61,7 +61,7 @@ public class TableScanVisitor { List<String> segments = entry.getValue().getLeft(); context.getPlanIdToSegmentMap().put(sfpNode.getId(), segments); } - for (BaseTimeSeriesPlanNode childNode : planNode.getChildren()) { + for (BaseTimeSeriesPlanNode childNode : planNode.getInputs()) { assignSegmentsToPlan(childNode, timeBuckets, context); } } diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/BaseTimeSeriesPlanNode.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/BaseTimeSeriesPlanNode.java index dd7a951752..1c3847d2bf 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/BaseTimeSeriesPlanNode.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/BaseTimeSeriesPlanNode.java @@ -28,25 +28,27 @@ import org.apache.pinot.tsdb.spi.operator.BaseTimeSeriesOperator; */ public abstract class BaseTimeSeriesPlanNode { protected final String _id; - protected final List<BaseTimeSeriesPlanNode> _children; + protected final List<BaseTimeSeriesPlanNode> _inputs; - public BaseTimeSeriesPlanNode(String id, List<BaseTimeSeriesPlanNode> children) { + public BaseTimeSeriesPlanNode(String id, List<BaseTimeSeriesPlanNode> inputs) { _id = id; - _children = children; + _inputs = inputs; } public String getId() { return _id; } - public List<BaseTimeSeriesPlanNode> getChildren() { - return _children; + public List<BaseTimeSeriesPlanNode> getInputs() { + return _inputs; } - public void addChildNode(BaseTimeSeriesPlanNode planNode) { - _children.add(planNode); + public void addInputNode(BaseTimeSeriesPlanNode planNode) { + _inputs.add(planNode); } + public abstract BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode> newInputs); + public abstract String getKlass(); public abstract String getExplainName(); diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java index 773d675242..1986f4713d 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/LeafTimeSeriesPlanNode.java @@ -47,13 +47,13 @@ public class LeafTimeSeriesPlanNode extends BaseTimeSeriesPlanNode { @JsonCreator public LeafTimeSeriesPlanNode( - @JsonProperty("id") String id, @JsonProperty("children") List<BaseTimeSeriesPlanNode> children, + @JsonProperty("id") String id, @JsonProperty("inputs") List<BaseTimeSeriesPlanNode> inputs, @JsonProperty("tableName") String tableName, @JsonProperty("timeColumn") String timeColumn, @JsonProperty("timeUnit") TimeUnit timeUnit, @JsonProperty("offsetSeconds") Long offsetSeconds, @JsonProperty("filterExpression") String filterExpression, @JsonProperty("valueExpression") String valueExpression, @JsonProperty("aggInfo") AggInfo aggInfo, @JsonProperty("groupByExpressions") List<String> groupByExpressions) { - super(id, children); + super(id, inputs); _tableName = tableName; _timeColumn = timeColumn; _timeUnit = timeUnit; @@ -64,6 +64,12 @@ public class LeafTimeSeriesPlanNode extends BaseTimeSeriesPlanNode { _groupByExpressions = groupByExpressions; } + @Override + public BaseTimeSeriesPlanNode withInputs(List<BaseTimeSeriesPlanNode> newInputs) { + return new LeafTimeSeriesPlanNode(_id, newInputs, _tableName, _timeColumn, _timeUnit, _offsetSeconds, + _filterExpression, _valueExpression, _aggInfo, _groupByExpressions); + } + @Override public String getKlass() { return LeafTimeSeriesPlanNode.class.getName(); diff --git a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java index 1e7775ff5a..62f3166db7 100644 --- a/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java +++ b/pinot-timeseries/pinot-timeseries-spi/src/main/java/org/apache/pinot/tsdb/spi/plan/serde/TimeSeriesPlanSerde.java @@ -67,15 +67,15 @@ public class TimeSeriesPlanSerde { public static BaseTimeSeriesPlanNode create(JsonNode jsonNode) throws JsonProcessingException, ClassNotFoundException { - JsonNode children = null; + JsonNode inputs = null; if (jsonNode instanceof ObjectNode) { - // Remove children field to prevent Jackson from deserializing it. We will handle it manually. + // Remove inputs field to prevent Jackson from deserializing it. We will handle it manually. ObjectNode objectNode = (ObjectNode) jsonNode; - if (objectNode.has("children")) { - children = objectNode.get("children"); - objectNode.remove("children"); + if (objectNode.has("inputs")) { + inputs = objectNode.get("inputs"); + objectNode.remove("inputs"); } - objectNode.putIfAbsent("children", OBJECT_MAPPER.createArrayNode()); + objectNode.putIfAbsent("inputs", OBJECT_MAPPER.createArrayNode()); } BaseTimeSeriesPlanNode planNode = null; try { @@ -83,10 +83,10 @@ public class TimeSeriesPlanSerde { Class<BaseTimeSeriesPlanNode> klass = (Class<BaseTimeSeriesPlanNode>) Class.forName(klassName); planNode = OBJECT_MAPPER.readValue(jsonNode.toString(), klass); } finally { - if (planNode != null && children instanceof ArrayNode) { - ArrayNode childArray = (ArrayNode) children; + if (planNode != null && inputs instanceof ArrayNode) { + ArrayNode childArray = (ArrayNode) inputs; for (JsonNode childJsonNode : childArray) { - planNode.addChildNode(create(childJsonNode)); + planNode.addInputNode(create(childJsonNode)); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org