Repository: camel Updated Branches: refs/heads/master c4bdb4463 -> a68c79d68
Added support for input conversions for @RddCallback. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a68c79d6 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a68c79d6 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a68c79d6 Branch: refs/heads/master Commit: a68c79d68a5f256006da0287ed32403b808ed984 Parents: c4bdb44 Author: Henryk Konsek <hekon...@gmail.com> Authored: Fri Dec 11 18:15:55 2015 +0100 Committer: Henryk Konsek <hekon...@gmail.com> Committed: Fri Dec 11 18:15:55 2015 +0100 ---------------------------------------------------------------------- .../spark/annotations/AnnotatedRddCallback.java | 9 +++++++ .../annotations/AnnotatedRddCallbackProxy.java | 17 +++++++++++- .../component/spark/SparkProducerTest.java | 28 ++++++++++++++------ 3 files changed, 45 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/a68c79d6/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java index dfd8e62..ee41697 100644 --- a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java @@ -16,8 +16,13 @@ */ package org.apache.camel.component.spark.annotations; +import org.apache.camel.CamelContext; import org.apache.camel.component.spark.RddCallback; +/** + * Provides facade for working with annotated RDD callbacks i.e. POJO classes with an appropriate annotations on + * selected methods. + */ public final class AnnotatedRddCallback { private AnnotatedRddCallback() { @@ -27,4 +32,8 @@ public final class AnnotatedRddCallback { return new AnnotatedRddCallbackProxy(objectWithCallback); } + public static RddCallback annotatedRddCallback(Object objectWithCallback, CamelContext camelContext) { + return new AnnotatedRddCallbackProxy(objectWithCallback, camelContext); + } + } http://git-wip-us.apache.org/repos/asf/camel/blob/a68c79d6/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java index ff5337f..5d2e9c9 100644 --- a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java @@ -23,6 +23,7 @@ import java.util.List; import static java.util.Arrays.asList; +import org.apache.camel.CamelContext; import org.apache.camel.component.spark.RddCallback; import org.apache.spark.api.java.AbstractJavaRDDLike; @@ -34,14 +35,21 @@ class AnnotatedRddCallbackProxy implements RddCallback { private final List<Method> rddCallbacks; - public AnnotatedRddCallbackProxy(Object objectWithCallback) { + private final CamelContext camelContext; + + public AnnotatedRddCallbackProxy(Object objectWithCallback, CamelContext camelContext) { this.objectWithCallback = objectWithCallback; + this.camelContext = camelContext; this.rddCallbacks = findMethodsWithAnnotation(objectWithCallback.getClass(), org.apache.camel.component.spark.annotations.RddCallback.class); if (rddCallbacks.size() == 0) { throw new UnsupportedOperationException("Can't find methods annotated with @RddCallback."); } } + public AnnotatedRddCallbackProxy(Object objectWithCallback) { + this(objectWithCallback, null); + } + @Override public Object onRdd(AbstractJavaRDDLike rdd, Object... payloads) { try { @@ -54,6 +62,13 @@ class AnnotatedRddCallbackProxy implements RddCallback { Method callbackMethod = rddCallbacks.get(0); callbackMethod.setAccessible(true); + + if (camelContext != null) { + for (int i = 1; i < arguments.size(); i++) { + arguments.set(i, camelContext.getTypeConverter().convertTo(callbackMethod.getParameterTypes()[i], arguments.get(i))); + } + } + return callbackMethod.invoke(objectWithCallback, arguments.toArray(new Object[arguments.size()])); } catch (IllegalAccessException | InvocationTargetException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/camel/blob/a68c79d6/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java index ae34cc8..44fc203 100644 --- a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java +++ b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java @@ -104,24 +104,24 @@ public class SparkProducerTest extends CamelTestSupport { @Test public void shouldExecuteRddCallbackWithSinglePayload() { - long pomLinesCount = template.requestBodyAndHeader(sparkUri, 10, SPARK_RDD_CALLBACK_HEADER, new org.apache.camel.component.spark.RddCallback() { + long linesCount = template.requestBodyAndHeader(sparkUri, 10, SPARK_RDD_CALLBACK_HEADER, new org.apache.camel.component.spark.RddCallback() { @Override public Long onRdd(AbstractJavaRDDLike rdd, Object... payloads) { return rdd.count() * (int) payloads[0]; } }, Long.class); - Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10); + Truth.assertThat(linesCount).isEqualTo(numberOfLinesInTestFile * 10); } @Test public void shouldExecuteRddCallbackWithPayloads() { - long pomLinesCount = template.requestBodyAndHeader(sparkUri, asList(10, 10), SPARK_RDD_CALLBACK_HEADER, new org.apache.camel.component.spark.RddCallback() { + long linesCount = template.requestBodyAndHeader(sparkUri, asList(10, 10), SPARK_RDD_CALLBACK_HEADER, new org.apache.camel.component.spark.RddCallback() { @Override public Long onRdd(AbstractJavaRDDLike rdd, Object... payloads) { return rdd.count() * (int) payloads[0] * (int) payloads[1]; } }, Long.class); - Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10 * 10); + Truth.assertThat(linesCount).isEqualTo(numberOfLinesInTestFile * 10 * 10); } @Test @@ -132,14 +132,14 @@ public class SparkProducerTest extends CamelTestSupport { return rdd.count() * (int) payloads[0] * (int) payloads[1]; } }; - long pomLinesCount = template.requestBodyAndHeader(sparkUri, asList("10", "10"), SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class); - Truth.assertThat(pomLinesCount).isEqualTo(1900); + long linesCount = template.requestBodyAndHeader(sparkUri, asList("10", "10"), SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class); + Truth.assertThat(linesCount).isEqualTo(1900); } @Test public void shouldUseTransformationFromRegistry() { - long pomLinesCount = template.requestBody(sparkUri + "&rddCallback=#countLinesTransformation", null, Long.class); - Truth.assertThat(pomLinesCount).isGreaterThan(0L); + long linesCount = template.requestBody(sparkUri + "&rddCallback=#countLinesTransformation", null, Long.class); + Truth.assertThat(linesCount).isGreaterThan(0L); } @Test @@ -204,6 +204,18 @@ public class SparkProducerTest extends CamelTestSupport { Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10 * 10); } + @Test + public void shouldExecuteAnnotatedCallbackWithConversions() { + org.apache.camel.component.spark.RddCallback rddCallback = annotatedRddCallback(new Object() { + @RddCallback + long countLines(JavaRDD<String> textFile, int first, int second) { + return textFile.count() * first * second; + } + }, context); + long pomLinesCount = template.requestBodyAndHeader(sparkUri, asList(10, "10"), SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class); + Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10 * 10); + } + // Hive tests @Test