Repository: camel
Updated Branches:
  refs/heads/master c4bdb4463 -> a68c79d68


Added support for input conversions for @RddCallback.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/a68c79d6
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/a68c79d6
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/a68c79d6

Branch: refs/heads/master
Commit: a68c79d68a5f256006da0287ed32403b808ed984
Parents: c4bdb44
Author: Henryk Konsek <hekon...@gmail.com>
Authored: Fri Dec 11 18:15:55 2015 +0100
Committer: Henryk Konsek <hekon...@gmail.com>
Committed: Fri Dec 11 18:15:55 2015 +0100

----------------------------------------------------------------------
 .../spark/annotations/AnnotatedRddCallback.java |  9 +++++++
 .../annotations/AnnotatedRddCallbackProxy.java  | 17 +++++++++++-
 .../component/spark/SparkProducerTest.java      | 28 ++++++++++++++------
 3 files changed, 45 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/a68c79d6/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java
index dfd8e62..ee41697 100644
--- 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java
@@ -16,8 +16,13 @@
  */
 package org.apache.camel.component.spark.annotations;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.component.spark.RddCallback;
 
+/**
+ * Provides facade for working with annotated RDD callbacks i.e. POJO classes 
with an appropriate annotations on
+ * selected methods.
+ */
 public final class AnnotatedRddCallback {
 
     private AnnotatedRddCallback() {
@@ -27,4 +32,8 @@ public final class AnnotatedRddCallback {
         return new AnnotatedRddCallbackProxy(objectWithCallback);
     }
 
+    public static RddCallback annotatedRddCallback(Object objectWithCallback, 
CamelContext camelContext) {
+        return new AnnotatedRddCallbackProxy(objectWithCallback, camelContext);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/a68c79d6/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java
index ff5337f..5d2e9c9 100644
--- 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import static java.util.Arrays.asList;
 
+import org.apache.camel.CamelContext;
 import org.apache.camel.component.spark.RddCallback;
 import org.apache.spark.api.java.AbstractJavaRDDLike;
 
@@ -34,14 +35,21 @@ class AnnotatedRddCallbackProxy implements RddCallback {
 
     private final List<Method> rddCallbacks;
 
-    public AnnotatedRddCallbackProxy(Object objectWithCallback) {
+    private final CamelContext camelContext;
+
+    public AnnotatedRddCallbackProxy(Object objectWithCallback, CamelContext 
camelContext) {
         this.objectWithCallback = objectWithCallback;
+        this.camelContext = camelContext;
         this.rddCallbacks = 
findMethodsWithAnnotation(objectWithCallback.getClass(), 
org.apache.camel.component.spark.annotations.RddCallback.class);
         if (rddCallbacks.size() == 0) {
             throw new UnsupportedOperationException("Can't find methods 
annotated with @RddCallback.");
         }
     }
 
+    public AnnotatedRddCallbackProxy(Object objectWithCallback) {
+        this(objectWithCallback, null);
+    }
+
     @Override
     public Object onRdd(AbstractJavaRDDLike rdd, Object... payloads) {
         try {
@@ -54,6 +62,13 @@ class AnnotatedRddCallbackProxy implements RddCallback {
 
             Method callbackMethod = rddCallbacks.get(0);
             callbackMethod.setAccessible(true);
+
+            if (camelContext != null) {
+                for (int i = 1; i < arguments.size(); i++) {
+                    arguments.set(i, 
camelContext.getTypeConverter().convertTo(callbackMethod.getParameterTypes()[i],
 arguments.get(i)));
+                }
+            }
+
             return callbackMethod.invoke(objectWithCallback, 
arguments.toArray(new Object[arguments.size()]));
         } catch (IllegalAccessException | InvocationTargetException e) {
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/camel/blob/a68c79d6/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
 
b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
index ae34cc8..44fc203 100644
--- 
a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
+++ 
b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
@@ -104,24 +104,24 @@ public class SparkProducerTest extends CamelTestSupport {
 
     @Test
     public void shouldExecuteRddCallbackWithSinglePayload() {
-        long pomLinesCount = template.requestBodyAndHeader(sparkUri, 10, 
SPARK_RDD_CALLBACK_HEADER, new org.apache.camel.component.spark.RddCallback() {
+        long linesCount = template.requestBodyAndHeader(sparkUri, 10, 
SPARK_RDD_CALLBACK_HEADER, new org.apache.camel.component.spark.RddCallback() {
             @Override
             public Long onRdd(AbstractJavaRDDLike rdd, Object... payloads) {
                 return rdd.count() * (int) payloads[0];
             }
         }, Long.class);
-        Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 
10);
+        Truth.assertThat(linesCount).isEqualTo(numberOfLinesInTestFile * 10);
     }
 
     @Test
     public void shouldExecuteRddCallbackWithPayloads() {
-        long pomLinesCount = template.requestBodyAndHeader(sparkUri, 
asList(10, 10), SPARK_RDD_CALLBACK_HEADER, new 
org.apache.camel.component.spark.RddCallback() {
+        long linesCount = template.requestBodyAndHeader(sparkUri, asList(10, 
10), SPARK_RDD_CALLBACK_HEADER, new 
org.apache.camel.component.spark.RddCallback() {
             @Override
             public Long onRdd(AbstractJavaRDDLike rdd, Object... payloads) {
                 return rdd.count() * (int) payloads[0] * (int) payloads[1];
             }
         }, Long.class);
-        Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10 
* 10);
+        Truth.assertThat(linesCount).isEqualTo(numberOfLinesInTestFile * 10 * 
10);
     }
 
     @Test
@@ -132,14 +132,14 @@ public class SparkProducerTest extends CamelTestSupport {
                 return rdd.count() * (int) payloads[0] * (int) payloads[1];
             }
         };
-        long pomLinesCount = template.requestBodyAndHeader(sparkUri, 
asList("10", "10"), SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class);
-        Truth.assertThat(pomLinesCount).isEqualTo(1900);
+        long linesCount = template.requestBodyAndHeader(sparkUri, asList("10", 
"10"), SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class);
+        Truth.assertThat(linesCount).isEqualTo(1900);
     }
 
     @Test
     public void shouldUseTransformationFromRegistry() {
-        long pomLinesCount = template.requestBody(sparkUri + 
"&rddCallback=#countLinesTransformation", null, Long.class);
-        Truth.assertThat(pomLinesCount).isGreaterThan(0L);
+        long linesCount = template.requestBody(sparkUri + 
"&rddCallback=#countLinesTransformation", null, Long.class);
+        Truth.assertThat(linesCount).isGreaterThan(0L);
     }
 
     @Test
@@ -204,6 +204,18 @@ public class SparkProducerTest extends CamelTestSupport {
         Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10 
* 10);
     }
 
+    @Test
+    public void shouldExecuteAnnotatedCallbackWithConversions() {
+        org.apache.camel.component.spark.RddCallback rddCallback = 
annotatedRddCallback(new Object() {
+            @RddCallback
+            long countLines(JavaRDD<String> textFile, int first, int second) {
+                return textFile.count() * first * second;
+            }
+        }, context);
+        long pomLinesCount = template.requestBodyAndHeader(sparkUri, 
asList(10, "10"), SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class);
+        Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10 
* 10);
+    }
+
     // Hive tests
 
     @Test

Reply via email to