[
https://issues.apache.org/jira/browse/TINKERPOP-1288?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15274237#comment-15274237
]
ASF GitHub Bot commented on TINKERPOP-1288:
-------------------------------------------
Github user okram commented on a diff in the pull request:
https://github.com/apache/incubator-tinkerpop/pull/305#discussion_r62347949
--- Diff:
spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
---
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.optimization.interceptor;
+
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import
org.apache.tinkerpop.gremlin.process.computer.traversal.MemoryTraversalSideEffects;
+import
org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.NumberHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.Scope;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import
org.apache.tinkerpop.gremlin.process.traversal.step.map.CountGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.FoldStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import
org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupCountStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
+import
org.apache.tinkerpop.gremlin.process.traversal.step.map.MaxGlobalStep;
+import
org.apache.tinkerpop.gremlin.process.traversal.step.map.MeanGlobalStep;
+import
org.apache.tinkerpop.gremlin.process.traversal.step.map.MinGlobalStep;
+import
org.apache.tinkerpop.gremlin.process.traversal.step.map.SumGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import
org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
+import
org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import
org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.spark.process.computer.SparkMemory;
+import
org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.SparkVertexProgramInterceptor;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
+import org.apache.tinkerpop.gremlin.util.function.ArrayListSupplier;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.BinaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SparkStarBarrierInterceptor implements
SparkVertexProgramInterceptor<TraversalVertexProgram> {
+
+ public SparkStarBarrierInterceptor() {
+
+ }
+
+ @Override
+ public JavaPairRDD<Object, VertexWritable> apply(final
TraversalVertexProgram vertexProgram, final JavaPairRDD<Object, VertexWritable>
inputRDD, final SparkMemory memory) {
+ vertexProgram.setup(memory);
+ final Traversal.Admin<Vertex, Object> traversal =
(Traversal.Admin) vertexProgram.getTraversal().getPure().clone();
+ final Object[] graphStepIds = ((GraphStep)
traversal.getStartStep()).getIds(); // any V(1,2,3)-style ids to filter on
+ final ReducingBarrierStep endStep = (ReducingBarrierStep)
traversal.getEndStep(); // needed for the final traverser generation
+ traversal.removeStep(0); //
remove GraphStep
+ traversal.removeStep(traversal.getSteps().size() - 1); //
remove ReducingBarrierStep
+ traversal.applyStrategies(); //
compile
+ boolean identityTraversal = traversal.getSteps().isEmpty(); // if
the traversal is empty, just return the vertex (fast)
+ ///////////////////////////////
+ ((MemoryTraversalSideEffects)
traversal.getSideEffects()).setMemory(memory, true); // any intermediate
sideEffect steps are backed by SparkMemory
+ memory.setInExecute(true);
+ final JavaRDD<Traverser.Admin<Object>> nextRDD = inputRDD.values()
+ .filter(vertexWritable ->
ElementHelper.idExists(vertexWritable.get().id(), graphStepIds)) // ensure
vertex ids are in V(x)
--- End diff --
By "moving this class forward," I mean when we handle `g.E()` ... as you
can see, the logic will start to get gross fast with a 4-condition branch. I
thought it best to get it all clean and clear so its understandable whats going
on.
> Support gremlin.spark.skipPartitioning configuration.
> -----------------------------------------------------
>
> Key: TINKERPOP-1288
> URL: https://issues.apache.org/jira/browse/TINKERPOP-1288
> Project: TinkerPop
> Issue Type: Improvement
> Components: hadoop, process
> Affects Versions: 3.2.0-incubating
> Reporter: Marko A. Rodriguez
> Assignee: Marko A. Rodriguez
> Fix For: 3.2.1
>
>
> If a {{VertexProgram}} does not use message passing, then its best to NOT
> partition after load as its pointless to do so.
> In particular, for {{TraversalVertexProgram}}, if the submitted traversal
> does not contain a {{VertexStep}} then partitioning can be avoided. This can
> be reasoned via a {{SparkPartitionStrategy}}, but for now, simply making the
> configuration and having it do its job is sufficient.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)