itschrispeck commented on code in PR #15371:
URL: https://github.com/apache/pinot/pull/15371#discussion_r2025237779


##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/PinotDataDistribution.java:
##########
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.calcite.rel.RelCollation;
+import org.apache.calcite.rel.RelCollations;
+import org.apache.calcite.rel.RelDistribution;
+import org.apache.calcite.util.mapping.Mappings;
+
+
+/**
+ * Describes how the data will be distributed across a distributed set of 
output streams for a given Plan Node.
+ * They integer based keys, such as those in {@link HashDistributionDesc} are 
based on the output Row Type of the
+ * corresponding plan node.
+ */
+public class PinotDataDistribution {
+  /**
+   * Denotes the type of distribution: broadcast, singleton, etc.
+   */
+  private final RelDistribution.Type _type;
+  /**
+   * In the format: "index@instanceId".
+   * <p>
+   *   <b>TODO:</b> An alternative is to store workers separately. One reason 
workers are needed is because
+   *     Exchange is often required because the workers for RelNode with 
multiple inputs may be different.
+   *     If we store workers separately, then we can just store the number of 
streams here. But that means then that
+   *     we have to handle Exchanges for scenarios where workers are different 
in sender/receiver separately from
+   *     Exchanges added due to the other reason.
+   * </p>
+   */
+  private final List<String> _workers;
+  /**
+   * Precomputed hashCode of workers to allow quick comparisons. In large 
deployments, it is common to have 30-100+
+   * servers, and if the plan is big enough, comparison of workers can become 
a bottleneck.
+   */
+  private final long _workerHash;
+  /**
+   * The set of hash distribution descriptors. This is a set, because a given 
stream can be partitioned by multiple
+   * different sets of keys.
+   */
+  private final Set<HashDistributionDesc> _hashDistributionDesc;
+  /**
+   * Denotes the ordering of data in each output stream of data.
+   */
+  private final RelCollation _collation;
+
+  public PinotDataDistribution(RelDistribution.Type type, List<String> 
workers, long workerHash,
+      @Nullable Set<HashDistributionDesc> desc, @Nullable RelCollation 
collation) {
+    _type = type;
+    _workers = workers;
+    _workerHash = workerHash;
+    _hashDistributionDesc = desc == null ? Collections.emptySet() : desc;
+    _collation = collation == null ? RelCollations.EMPTY : collation;
+    validate();
+  }
+
+  public static PinotDataDistribution singleton(String worker, @Nullable 
RelCollation collation) {
+    List<String> workers = ImmutableList.of(worker);
+    return new PinotDataDistribution(RelDistribution.Type.SINGLETON, workers, 
workers.hashCode(), null,
+        collation);
+  }
+
+  public PinotDataDistribution withCollation(RelCollation collation) {
+    return new PinotDataDistribution(_type, _workers, _workerHash, 
_hashDistributionDesc, collation);
+  }
+
+  public RelDistribution.Type getType() {
+    return _type;
+  }
+
+  public List<String> getWorkers() {
+    return _workers;
+  }
+
+  public long getWorkerHash() {
+    return _workerHash;
+  }
+
+  public Set<HashDistributionDesc> getHashDistributionDesc() {
+    return _hashDistributionDesc;
+  }
+
+  public RelCollation getCollation() {
+    return _collation;
+  }
+
+  /**
+   * Given a distribution constraint, return whether this physical 
distribution meets the constraint or not.
+   * E.g. say the distribution constraint is Broadcast. That means each stream 
in the output of this Plan Node should
+   * contain all the records. This method will return true if that is already 
the case.
+   */
+  public boolean satisfies(@Nullable RelDistribution distributionConstraint) {
+    if (distributionConstraint == null || _workers.size() == 1) {
+      return true;
+    }
+    RelDistribution.Type constraintType = distributionConstraint.getType();
+    switch (constraintType) {
+      case ANY:
+        return true;
+      case BROADCAST_DISTRIBUTED:
+        return _type == RelDistribution.Type.BROADCAST_DISTRIBUTED;
+      case SINGLETON:
+        return _type == RelDistribution.Type.SINGLETON;
+      case RANDOM_DISTRIBUTED:
+        return _type == RelDistribution.Type.RANDOM_DISTRIBUTED;
+      case HASH_DISTRIBUTED:
+        if (_type != RelDistribution.Type.HASH_DISTRIBUTED) {
+          return false;
+        }
+        return satisfiesHashDistributionDesc(distributionConstraint.getKeys()) 
!= null;
+      default:
+        throw new IllegalStateException("Unexpected distribution constraint 
type: " + distributionConstraint.getType());
+    }
+  }
+
+  /**
+   * Returns a Hash Distribution Desc
+   */
+  @Nullable
+  public HashDistributionDesc satisfiesHashDistributionDesc(List<Integer> 
keys) {
+    Preconditions.checkNotNull(_hashDistributionDesc, "null 
hashDistributionDesc in satisfies");
+    // Return any hash distribution descriptor that matches the given 
constraint *exactly*.
+    // TODO: Add support for partial check (i.e. if distributed by [1], then 
we can avoid re-dist for constraint [1, 2].
+    return _hashDistributionDesc.stream().filter(x -> 
x.getKeys().equals(keys)).findFirst().orElse(null);
+  }
+
+  public boolean satisfies(@Nullable RelCollation relCollation) {
+    if (relCollation == null || relCollation == RelCollations.EMPTY || 
relCollation.getKeys().isEmpty()) {
+      return true;
+    }
+    if (_collation == null) {
+      return false;
+    }
+    return _collation.satisfies(relCollation);
+  }
+
+  public PinotDataDistribution apply(@Nullable Mappings.TargetMapping 
targetMapping) {
+    if (targetMapping == null) {
+      return new PinotDataDistribution(RelDistribution.Type.ANY, _workers, 
_workerHash, null, null);
+    }
+    Set<HashDistributionDesc> newHashDesc = new HashSet<>();
+    if (_hashDistributionDesc != null) {

Review Comment:
   Do we need this check? seems it should never be null 



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/ExchangeStrategy.java:
##########
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2;
+
+/**
+ * Defines how data is transferred across an Exchange.
+ */
+public enum ExchangeStrategy {
+  /**
+   * There's a single stream in the receiver, so each stream in the sender 
sends data to the same.
+   */
+  SINGLETON_EXCHANGE,
+  /**
+   * stream-ID X sends data to stream-ID X. This cannot be modeled by 
PARTITIONING_EXCHANGE because the fan-out for
+   * this type of exchange is 1:1.
+   */
+  IDENTITY_EXCHANGE,
+  /**
+   * Each stream will partition the outgoing stream based on a set of keys and 
a hash function.
+   * Fanout for this type of exchange is 1:all.
+   */
+  PARTITIONING_EXCHANGE,
+  /**
+   * 1-to-1 but the exchange is a permutation of stream-ids.
+   */
+  PERMUTATION_EXCHANGE,

Review Comment:
   When is permutation used? what does the distinction between it and identity 
exchange help with? 



##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/v2/TableScanMetadata.java:
##########
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.planner.physical.v2;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.core.routing.TimeBoundaryInfo;
+import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalTableScan;
+
+
+/**
+ * Additional metadata for the {@link PhysicalTableScan}.
+ */
+public class TableScanMetadata {
+  private final Set<String> _scannedTables;
+  private final Map<Integer, Map<String, List<String>>> _workedIdToSegmentsMap;

Review Comment:
   what is the value here? `Map<String, List<String>`? workerId => table type 
=> segments?
   
   Could we use a class or add a comment so we don't have to track the 
initialization back to understand the structure?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to