Repository: camel Updated Branches: refs/heads/master 86e22d436 -> c24985175
Added Hive producer. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/c2498517 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/c2498517 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/c2498517 Branch: refs/heads/master Commit: c24985175d8f0423ceead2df6ed83a8d60de84b2 Parents: 86e22d4 Author: Henryk Konsek <hekon...@gmail.com> Authored: Mon Dec 7 09:25:09 2015 +0100 Committer: Henryk Konsek <hekon...@gmail.com> Committed: Mon Dec 7 09:25:09 2015 +0100 ---------------------------------------------------------------------- .../camel/component/spark/EndpointType.java | 2 +- .../component/spark/HiveSparkProducer.java | 55 ++++++++++++++++++++ .../camel/component/spark/SparkEndpoint.java | 8 ++- .../component/spark/SparkProducerTest.java | 21 ++++---- 4 files changed, 72 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/c2498517/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java index b471e39..139c082 100644 --- a/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/EndpointType.java @@ -18,6 +18,6 @@ package org.apache.camel.component.spark; public enum EndpointType { - rdd, dataframe + rdd, dataframe, hive } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/c2498517/components/camel-spark/src/main/java/org/apache/camel/component/spark/HiveSparkProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/HiveSparkProducer.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/HiveSparkProducer.java new file mode 100644 index 0000000..eb64637 --- /dev/null +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/HiveSparkProducer.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.spark; + +import java.util.Set; + +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; +import org.apache.spark.sql.DataFrame; +import org.apache.spark.sql.hive.HiveContext; + +public class HiveSparkProducer extends DefaultProducer { + + public HiveSparkProducer(SparkEndpoint endpoint) { + super(endpoint); + } + + @Override + public void process(Exchange exchange) throws Exception { + HiveContext hiveContext = resolveHiveContext(); + String sql = exchange.getIn().getBody(String.class); + DataFrame resultFrame = hiveContext.sql(sql); + exchange.getIn().setBody(getEndpoint().isCollect() ? resultFrame.collectAsList() : resultFrame.count()); + } + + @Override + public SparkEndpoint getEndpoint() { + return (SparkEndpoint) super.getEndpoint(); + } + + // Helpers + + protected HiveContext resolveHiveContext() { + Set<HiveContext> hiveContexts = getEndpoint().getComponent().getCamelContext().getRegistry().findByType(HiveContext.class); + if(hiveContexts.size() == 1) { + return hiveContexts.iterator().next(); + } + return null; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/c2498517/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java index 8a5247b..cb4e1a9 100644 --- a/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java +++ b/components/camel-spark/src/main/java/org/apache/camel/component/spark/SparkEndpoint.java @@ -53,7 +53,13 @@ public class SparkEndpoint extends DefaultEndpoint { @Override public Producer createProducer() throws Exception { - return endpointType == EndpointType.rdd ? new RddSparkProducer(this) : new DataFrameSparkProducer(this); + if (endpointType == EndpointType.rdd) { + return new RddSparkProducer(this); + } else if (endpointType == EndpointType.dataframe) { + return new DataFrameSparkProducer(this); + } else { + return new HiveSparkProducer(this); + } } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/c2498517/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java index 4b2ac18..81a28e6 100644 --- a/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java +++ b/components/camel-spark/src/test/java/org/apache/camel/component/spark/SparkProducerTest.java @@ -49,6 +49,8 @@ public class SparkProducerTest extends CamelTestSupport { String sparkDataFrameUri = "spark:dataframe?dataFrame=#jsonCars"; + String sparkHiveUri = "spark:hive"; + // Routes fixtures @Override @@ -57,6 +59,7 @@ public class SparkProducerTest extends CamelTestSupport { registry.bind("pomRdd", sparkContext.textFile("testrdd.txt")); + registry.bind("hiveContext", hiveContext); DataFrame jsonCars = hiveContext.read().json("src/test/resources/cars.json"); jsonCars.registerTempTable("cars"); registry.bind("jsonCars", jsonCars); @@ -80,7 +83,7 @@ public class SparkProducerTest extends CamelTestSupport { return rdd.count(); } }, Long.class); - Truth.assertThat(pomLinesCount).isEqualTo(17); + Truth.assertThat(pomLinesCount).isEqualTo(19); } @Test @@ -91,7 +94,7 @@ public class SparkProducerTest extends CamelTestSupport { return rdd.count() * (int) payloads[0]; } }, Long.class); - Truth.assertThat(pomLinesCount).isEqualTo(170); + Truth.assertThat(pomLinesCount).isEqualTo(190); } @Test @@ -102,7 +105,7 @@ public class SparkProducerTest extends CamelTestSupport { return rdd.count() * (int) payloads[0] * (int) payloads[1]; } }, Long.class); - Truth.assertThat(pomLinesCount).isEqualTo(1700); + Truth.assertThat(pomLinesCount).isEqualTo(1900); } @Test @@ -114,7 +117,7 @@ public class SparkProducerTest extends CamelTestSupport { } }; long pomLinesCount = template.requestBodyAndHeader(sparkUri, asList("10", "10"), SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class); - Truth.assertThat(pomLinesCount).isEqualTo(1700); + Truth.assertThat(pomLinesCount).isEqualTo(1900); } @Test @@ -150,7 +153,7 @@ public class SparkProducerTest extends CamelTestSupport { } }); long pomLinesCount = template.requestBodyAndHeader(sparkUri, null, SPARK_RDD_CALLBACK_HEADER, rddCallback, Long.class); - Truth.assertThat(pomLinesCount).isEqualTo(17); + Truth.assertThat(pomLinesCount).isEqualTo(19); } @Test @@ -177,13 +180,7 @@ public class SparkProducerTest extends CamelTestSupport { @Test public void shouldExecuteHiveQuery() { - DataFrameCallback callback = new DataFrameCallback<Long>() { - @Override - public Long onDataFrame(DataFrame dataFrame, Object... payloads) { - return hiveContext.sql("SELECT * FROM cars").count(); - } - }; - long tablesCount = template.requestBodyAndHeader(sparkDataFrameUri, null, SPARK_DATAFRAME_CALLBACK_HEADER, callback, Long.class); + long tablesCount = template.requestBody(sparkHiveUri + "?collect=false", "SELECT * FROM cars", Long.class); Truth.assertThat(tablesCount).isEqualTo(2); }