Repository: camel Updated Branches: refs/heads/master 61b562738 -> c4bdb4463
Extracted annotated proxy class. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c4bdb446 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c4bdb446 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c4bdb446 Branch: refs/heads/master Commit: c4bdb4463b9bff2a047b3badf13799d4d2732b09 Parents: 61b5627 Author: Henryk Konsek <hekon...@gmail.com> Authored: Fri Dec 11 17:06:52 2015 +0100 Committer: Henryk Konsek <hekon...@gmail.com> Committed: Fri Dec 11 17:06:52 2015 +0100 ---------------------------------------------------------------------- .../spark/annotations/AnnotatedRddCallback.java | 37 +----------- .../annotations/AnnotatedRddCallbackProxy.java | 63 ++++++++++++++++++++ .../component/spark/SparkProducerTest.java | 20 ++++++- 3 files changed, 83 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c4bdb446/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java index 8a988f4..dfd8e62 100644 --- a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java @@ -16,46 +16,15 @@ */ package org.apache.camel.component.spark.annotations; -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.ArrayList; -import java.util.List; - -import static java.util.Arrays.asList; - -import org.apache.spark.api.java.AbstractJavaRDDLike; - -import static org.apache.camel.util.ObjectHelper.findMethodsWithAnnotation; +import org.apache.camel.component.spark.RddCallback; public final class AnnotatedRddCallback { private AnnotatedRddCallback() { } - public static org.apache.camel.component.spark.RddCallback annotatedRddCallback(final Object callback) { - final List<Method> rddCallbacks = findMethodsWithAnnotation(callback.getClass(), RddCallback.class); - if (rddCallbacks.size() > 0) { - return new org.apache.camel.component.spark.RddCallback() { - @Override - public Object onRdd(AbstractJavaRDDLike rdd, Object... payloads) { - try { - List<Object> arguments = new ArrayList<>(payloads.length + 1); - arguments.add(rdd); - arguments.addAll(asList(payloads)); - if (arguments.get(1) == null) { - arguments.remove(1); - } - - Method callbackMethod = rddCallbacks.get(0); - callbackMethod.setAccessible(true); - return callbackMethod.invoke(callback, arguments.toArray(new Object[arguments.size()])); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException(e); - } - } - }; - } - throw new UnsupportedOperationException("Can't find methods annotated with @Rdd."); + public static RddCallback annotatedRddCallback(Object objectWithCallback) { + return new AnnotatedRddCallbackProxy(objectWithCallback); } } http://git-wip-us.apache.org/repos/asf/camel/blob/c4bdb446/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java new file mode 100644 index 0000000..ff5337f --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark.annotations; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +import static java.util.Arrays.asList; + +import org.apache.camel.component.spark.RddCallback; +import org.apache.spark.api.java.AbstractJavaRDDLike; + +import static org.apache.camel.util.ObjectHelper.findMethodsWithAnnotation; + +class AnnotatedRddCallbackProxy implements RddCallback { + + private final Object objectWithCallback; + + private final List<Method> rddCallbacks; + + public AnnotatedRddCallbackProxy(Object objectWithCallback) { + this.objectWithCallback = objectWithCallback; + this.rddCallbacks = findMethodsWithAnnotation(objectWithCallback.getClass(), org.apache.camel.component.spark.annotations.RddCallback.class); + if (rddCallbacks.size() == 0) { + throw new UnsupportedOperationException("Can't find methods annotated with @RddCallback."); + } + } + + @Override + public Object onRdd(AbstractJavaRDDLike rdd, Object... payloads) { + try { + List<Object> arguments = new ArrayList<>(payloads.length + 1); + arguments.add(rdd); + arguments.addAll(asList(payloads)); + if (arguments.get(1) == null) { + arguments.remove(1); + } + + Method callbackMethod = rddCallbacks.get(0); + callbackMethod.setAccessible(true); + return callbackMethod.invoke(objectWithCallback, arguments.toArray(new Object[arguments.size()])); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/c4bdb446/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java index 4617569..ae34cc8 100644 --- a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java +++ b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java @@ -56,6 +56,8 @@ public class SparkProducerTest extends CamelTestSupport { String sparkHiveUri = "spark:hive"; + int numberOfLinesInTestFile = 19; + @BeforeClass public static void beforeClass() { if (shouldRunHive) { @@ -97,7 +99,7 @@ public class SparkProducerTest extends CamelTestSupport { return rdd.count(); } }, Long.class); - Truth.assertThat(linesCount).isEqualTo(19); + Truth.assertThat(linesCount).isEqualTo(numberOfLinesInTestFile); } @Test @@ -108,7 +110,7 @@ public class SparkProducerTest extends CamelTestSupport { return rdd.count() * (int) payloads[0]; } }, Long.class); - Truth.assertThat(pomLinesCount).isEqualTo(190); + Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10); } @Test @@ -119,7 +121,7 @@ public class SparkProducerTest extends CamelTestSupport { return rdd.count() * (int) payloads[0] * (int) payloads[1]; } }, Long.class); - Truth.assertThat(pomLinesCount).isEqualTo(1900); + Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10 * 10); } @Test @@ -190,6 +192,18 @@ public class SparkProducerTest extends CamelTestSupport { Truth.assertThat(output.length()).isGreaterThan(0L); } + @Test + public void shouldExecuteAnnotatedCallbackWithParameters() { + org.apache.camel.component.spark.RddCallback rddCallback = annotatedRddCallback(new Object() { + @RddCallback + long countLines(JavaRDD<String> textFile, int first, int second) { + return textFile.count() * first * second; + } + }); + long pomLinesCount = template.requestBodyAndHeader(sparkUri, asList(10, 10), SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class); + Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10 * 10); + } + // Hive tests @Test