Repository: camel
Updated Branches:
  refs/heads/master 61b562738 -> c4bdb4463


Extracted annotated proxy class.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c4bdb446
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c4bdb446
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c4bdb446

Branch: refs/heads/master
Commit: c4bdb4463b9bff2a047b3badf13799d4d2732b09
Parents: 61b5627
Author: Henryk Konsek <hekon...@gmail.com>
Authored: Fri Dec 11 17:06:52 2015 +0100
Committer: Henryk Konsek <hekon...@gmail.com>
Committed: Fri Dec 11 17:06:52 2015 +0100

----------------------------------------------------------------------
 .../spark/annotations/AnnotatedRddCallback.java | 37 +-----------
 .../annotations/AnnotatedRddCallbackProxy.java  | 63 ++++++++++++++++++++
 .../component/spark/SparkProducerTest.java      | 20 ++++++-
 3 files changed, 83 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c4bdb446/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java
index 8a988f4..dfd8e62 100644
--- 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallback.java
@@ -16,46 +16,15 @@
  */
 package org.apache.camel.component.spark.annotations;
 
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.List;
-
-import static java.util.Arrays.asList;
-
-import org.apache.spark.api.java.AbstractJavaRDDLike;
-
-import static org.apache.camel.util.ObjectHelper.findMethodsWithAnnotation;
+import org.apache.camel.component.spark.RddCallback;
 
 public final class AnnotatedRddCallback {
 
     private AnnotatedRddCallback() {
     }
 
-    public static org.apache.camel.component.spark.RddCallback 
annotatedRddCallback(final Object callback) {
-        final List<Method> rddCallbacks = 
findMethodsWithAnnotation(callback.getClass(), RddCallback.class);
-        if (rddCallbacks.size() > 0) {
-            return new org.apache.camel.component.spark.RddCallback() {
-                @Override
-                public Object onRdd(AbstractJavaRDDLike rdd, Object... 
payloads) {
-                    try {
-                        List<Object> arguments = new 
ArrayList<>(payloads.length + 1);
-                        arguments.add(rdd);
-                        arguments.addAll(asList(payloads));
-                        if (arguments.get(1) == null) {
-                            arguments.remove(1);
-                        }
-
-                        Method callbackMethod = rddCallbacks.get(0);
-                        callbackMethod.setAccessible(true);
-                        return callbackMethod.invoke(callback, 
arguments.toArray(new Object[arguments.size()]));
-                    } catch (IllegalAccessException | 
InvocationTargetException e) {
-                        throw new RuntimeException(e);
-                    }
-                }
-            };
-        }
-        throw new UnsupportedOperationException("Can't find methods annotated 
with @Rdd.");
+    public static RddCallback annotatedRddCallback(Object objectWithCallback) {
+        return new AnnotatedRddCallbackProxy(objectWithCallback);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/c4bdb446/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java
new file mode 100644
index 0000000..ff5337f
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/annotations/AnnotatedRddCallbackProxy.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark.annotations;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+
+import static java.util.Arrays.asList;
+
+import org.apache.camel.component.spark.RddCallback;
+import org.apache.spark.api.java.AbstractJavaRDDLike;
+
+import static org.apache.camel.util.ObjectHelper.findMethodsWithAnnotation;
+
+class AnnotatedRddCallbackProxy implements RddCallback {
+
+    private final Object objectWithCallback;
+
+    private final List<Method> rddCallbacks;
+
+    public AnnotatedRddCallbackProxy(Object objectWithCallback) {
+        this.objectWithCallback = objectWithCallback;
+        this.rddCallbacks = 
findMethodsWithAnnotation(objectWithCallback.getClass(), 
org.apache.camel.component.spark.annotations.RddCallback.class);
+        if (rddCallbacks.size() == 0) {
+            throw new UnsupportedOperationException("Can't find methods 
annotated with @RddCallback.");
+        }
+    }
+
+    @Override
+    public Object onRdd(AbstractJavaRDDLike rdd, Object... payloads) {
+        try {
+            List<Object> arguments = new ArrayList<>(payloads.length + 1);
+            arguments.add(rdd);
+            arguments.addAll(asList(payloads));
+            if (arguments.get(1) == null) {
+                arguments.remove(1);
+            }
+
+            Method callbackMethod = rddCallbacks.get(0);
+            callbackMethod.setAccessible(true);
+            return callbackMethod.invoke(objectWithCallback, 
arguments.toArray(new Object[arguments.size()]));
+        } catch (IllegalAccessException | InvocationTargetException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/c4bdb446/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
 
b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
index 4617569..ae34cc8 100644
--- 
a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
+++ 
b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
@@ -56,6 +56,8 @@ public class SparkProducerTest extends CamelTestSupport {
 
     String sparkHiveUri = "spark:hive";
 
+    int numberOfLinesInTestFile = 19;
+
     @BeforeClass
     public static void beforeClass() {
         if (shouldRunHive) {
@@ -97,7 +99,7 @@ public class SparkProducerTest extends CamelTestSupport {
                 return rdd.count();
             }
         }, Long.class);
-        Truth.assertThat(linesCount).isEqualTo(19);
+        Truth.assertThat(linesCount).isEqualTo(numberOfLinesInTestFile);
     }
 
     @Test
@@ -108,7 +110,7 @@ public class SparkProducerTest extends CamelTestSupport {
                 return rdd.count() * (int) payloads[0];
             }
         }, Long.class);
-        Truth.assertThat(pomLinesCount).isEqualTo(190);
+        Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 
10);
     }
 
     @Test
@@ -119,7 +121,7 @@ public class SparkProducerTest extends CamelTestSupport {
                 return rdd.count() * (int) payloads[0] * (int) payloads[1];
             }
         }, Long.class);
-        Truth.assertThat(pomLinesCount).isEqualTo(1900);
+        Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10 
* 10);
     }
 
     @Test
@@ -190,6 +192,18 @@ public class SparkProducerTest extends CamelTestSupport {
         Truth.assertThat(output.length()).isGreaterThan(0L);
     }
 
+    @Test
+    public void shouldExecuteAnnotatedCallbackWithParameters() {
+        org.apache.camel.component.spark.RddCallback rddCallback = 
annotatedRddCallback(new Object() {
+            @RddCallback
+            long countLines(JavaRDD<String> textFile, int first, int second) {
+                return textFile.count() * first * second;
+            }
+        });
+        long pomLinesCount = template.requestBodyAndHeader(sparkUri, 
asList(10, 10), SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class);
+        Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10 
* 10);
+    }
+
     // Hive tests
 
     @Test

Reply via email to