Repository: camel
Updated Branches:
  refs/heads/master 86e22d436 -> c24985175


Added Hive producer.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c2498517
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c2498517
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c2498517

Branch: refs/heads/master
Commit: c24985175d8f0423ceead2df6ed83a8d60de84b2
Parents: 86e22d4
Author: Henryk Konsek <hekon...@gmail.com>
Authored: Mon Dec 7 09:25:09 2015 +0100
Committer: Henryk Konsek <hekon...@gmail.com>
Committed: Mon Dec 7 09:25:09 2015 +0100

----------------------------------------------------------------------
 .../camel/component/spark/EndpointType.java     |  2 +-
 .../component/spark/HiveSparkProducer.java      | 55 ++++++++++++++++++++
 .../camel/component/spark/SparkEndpoint.java    |  8 ++-
 .../component/spark/SparkProducerTest.java      | 21 ++++----
 4 files changed, 72 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/c2498517/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java
index b471e39..139c082 100644
--- 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java
@@ -18,6 +18,6 @@ package org.apache.camel.component.spark;
 
 public enum EndpointType {
 
-    rdd, dataframe
+    rdd, dataframe, hive
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/c2498517/components/camel-spark/src/main/java/org/apache/camel/component/spark/HiveSparkProducer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/HiveSparkProducer.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/HiveSparkProducer.java
new file mode 100644
index 0000000..eb64637
--- /dev/null
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/HiveSparkProducer.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.spark;
+
+import java.util.Set;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultProducer;
+import org.apache.spark.sql.DataFrame;
+import org.apache.spark.sql.hive.HiveContext;
+
+public class HiveSparkProducer extends DefaultProducer {
+
+    public HiveSparkProducer(SparkEndpoint endpoint) {
+        super(endpoint);
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        HiveContext hiveContext = resolveHiveContext();
+        String sql = exchange.getIn().getBody(String.class);
+        DataFrame resultFrame = hiveContext.sql(sql);
+        exchange.getIn().setBody(getEndpoint().isCollect() ? 
resultFrame.collectAsList() : resultFrame.count());
+    }
+
+    @Override
+    public SparkEndpoint getEndpoint() {
+        return (SparkEndpoint) super.getEndpoint();
+    }
+
+    // Helpers
+
+    protected HiveContext resolveHiveContext() {
+        Set<HiveContext> hiveContexts = 
getEndpoint().getComponent().getCamelContext().getRegistry().findByType(HiveContext.class);
+        if(hiveContexts.size() == 1) {
+            return hiveContexts.iterator().next();
+        }
+        return null;
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/c2498517/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java
 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java
index 8a5247b..cb4e1a9 100644
--- 
a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java
+++ 
b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java
@@ -53,7 +53,13 @@ public class SparkEndpoint extends DefaultEndpoint {
 
     @Override
     public Producer createProducer() throws Exception {
-        return endpointType == EndpointType.rdd ? new RddSparkProducer(this) : 
new DataFrameSparkProducer(this);
+        if (endpointType == EndpointType.rdd) {
+            return new RddSparkProducer(this);
+        } else if (endpointType == EndpointType.dataframe) {
+            return new DataFrameSparkProducer(this);
+        } else {
+            return new HiveSparkProducer(this);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/camel/blob/c2498517/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
 
b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
index 4b2ac18..81a28e6 100644
--- 
a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
+++ 
b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java
@@ -49,6 +49,8 @@ public class SparkProducerTest extends CamelTestSupport {
 
     String sparkDataFrameUri = "spark:dataframe?dataFrame=#jsonCars";
 
+    String sparkHiveUri = "spark:hive";
+
     // Routes fixtures
 
     @Override
@@ -57,6 +59,7 @@ public class SparkProducerTest extends CamelTestSupport {
 
         registry.bind("pomRdd", sparkContext.textFile("testrdd.txt"));
 
+        registry.bind("hiveContext", hiveContext);
         DataFrame jsonCars = 
hiveContext.read().json("src/test/resources/cars.json");
         jsonCars.registerTempTable("cars");
         registry.bind("jsonCars", jsonCars);
@@ -80,7 +83,7 @@ public class SparkProducerTest extends CamelTestSupport {
                 return rdd.count();
             }
         }, Long.class);
-        Truth.assertThat(pomLinesCount).isEqualTo(17);
+        Truth.assertThat(pomLinesCount).isEqualTo(19);
     }
 
     @Test
@@ -91,7 +94,7 @@ public class SparkProducerTest extends CamelTestSupport {
                 return rdd.count() * (int) payloads[0];
             }
         }, Long.class);
-        Truth.assertThat(pomLinesCount).isEqualTo(170);
+        Truth.assertThat(pomLinesCount).isEqualTo(190);
     }
 
     @Test
@@ -102,7 +105,7 @@ public class SparkProducerTest extends CamelTestSupport {
                 return rdd.count() * (int) payloads[0] * (int) payloads[1];
             }
         }, Long.class);
-        Truth.assertThat(pomLinesCount).isEqualTo(1700);
+        Truth.assertThat(pomLinesCount).isEqualTo(1900);
     }
 
     @Test
@@ -114,7 +117,7 @@ public class SparkProducerTest extends CamelTestSupport {
             }
         };
         long pomLinesCount = template.requestBodyAndHeader(sparkUri, 
asList("10", "10"), SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class);
-        Truth.assertThat(pomLinesCount).isEqualTo(1700);
+        Truth.assertThat(pomLinesCount).isEqualTo(1900);
     }
 
     @Test
@@ -150,7 +153,7 @@ public class SparkProducerTest extends CamelTestSupport {
             }
         });
         long pomLinesCount = template.requestBodyAndHeader(sparkUri, null, 
SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class);
-        Truth.assertThat(pomLinesCount).isEqualTo(17);
+        Truth.assertThat(pomLinesCount).isEqualTo(19);
     }
 
     @Test
@@ -177,13 +180,7 @@ public class SparkProducerTest extends CamelTestSupport {
 
     @Test
     public void shouldExecuteHiveQuery() {
-        DataFrameCallback callback = new DataFrameCallback<Long>() {
-            @Override
-            public Long onDataFrame(DataFrame dataFrame, Object... payloads) {
-                return hiveContext.sql("SELECT * FROM cars").count();
-            }
-        };
-        long tablesCount = template.requestBodyAndHeader(sparkDataFrameUri, 
null, SPARK_DATAFRAME_CALLBACK_HEADER, callback, Long.class);
+        long tablesCount = template.requestBody(sparkHiveUri + 
"?collect=false", "SELECT * FROM cars", Long.class);
         Truth.assertThat(tablesCount).isEqualTo(2);
     }
 

Reply via email to