[CAMEL-9869] Create Apache Flink Component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/702db0c9 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/702db0c9 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/702db0c9 Branch: refs/heads/master Commit: 702db0c9832b5549ecc188d99f91c946775e052f Parents: 01ec93d Author: Subhobrata Dey <sbc...@gmail.com> Authored: Tue Apr 26 16:30:52 2016 -0400 Committer: Subhobrata Dey <sbc...@gmail.com> Committed: Wed Apr 27 11:31:28 2016 -0400 ---------------------------------------------------------------------- components/camel-flink/pom.xml | 8 +- .../component/flink/DataStreamCallback.java | 29 ++++++++ .../flink/DataStreamFlinkProducer.java | 78 ++++++++++++++++++++ .../camel/component/flink/FlinkComponent.java | 25 +++++++ .../camel/component/flink/FlinkConstants.java | 5 +- .../camel/component/flink/FlinkEndpoint.java | 34 ++++++++- .../apache/camel/component/flink/Flinks.java | 6 ++ .../component/flink/VoidDataStreamCallback.java | 30 ++++++++ .../component/flink/FlinkProducerTest.java | 38 +++++++--- 9 files changed, 241 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-flink/pom.xml b/components/camel-flink/pom.xml index f176dd4..4b7b2f3 100644 --- a/components/camel-flink/pom.xml +++ b/components/camel-flink/pom.xml @@ -37,7 +37,6 @@ </properties> <dependencies> - <!--camel--> <dependency> <groupId>org.apache.camel</groupId> @@ -55,6 +54,13 @@ <artifactId>flink-clients</artifactId> <version>${flink-version}</version> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java</artifactId> + <version>${flink-version}</version> + </dependency> + + <!--flink--> <!-- scala --> <dependency> http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamCallback.java new file mode 100644 index 0000000..3038695 --- /dev/null +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamCallback.java @@ -0,0 +1,29 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.flink; + +import org.apache.flink.streaming.api.datastream.DataStream; + +/** + * Generic block of code with parameters which can be executed against DataStream and return results. + * + * @param <T> results type + */ +public interface DataStreamCallback<T> { + + T onDataStream(DataStream ds, Object... payloads) throws Exception; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java new file mode 100644 index 0000000..bc1aa16 --- /dev/null +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.flink; + +import java.util.List; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; +import org.apache.flink.streaming.api.datastream.DataStream; + +public class DataStreamFlinkProducer extends DefaultProducer { + + public DataStreamFlinkProducer(FlinkEndpoint endpoint) { + super(endpoint); + } + + @Override + public void process(Exchange exchange) throws Exception { + DataStream ds = resolveDataStream(exchange); + DataStreamCallback dataStreamCallback = resolveDataStreamCallback(exchange); + Object body = exchange.getIn().getBody(); + Object result = body instanceof List ? dataStreamCallback.onDataStream(ds, ((List) body).toArray(new Object[0])) : dataStreamCallback.onDataStream(ds, body); + collectResults(exchange, result); + } + + @Override + public FlinkEndpoint getEndpoint() { + return (FlinkEndpoint) super.getEndpoint(); + } + + protected void collectResults(Exchange exchange, Object result) { + if (result instanceof DataStream) { + DataStream dsResults = (DataStream) result; + + if (getEndpoint().isCollect()) { + throw new IllegalArgumentException("collect mode not supported for Flink DataStreams."); + } else { + exchange.getIn().setBody(result); + exchange.getIn().setHeader(FlinkConstants.FLINK_DATASTREAM_HEADER, result); + } + } else { + exchange.getIn().setBody(result); + } + } + + protected DataStream resolveDataStream(Exchange exchange) { + if (exchange.getIn().getHeader(FlinkConstants.FLINK_DATASTREAM_HEADER) != null) { + return (DataStream) exchange.getIn().getHeader(FlinkConstants.FLINK_DATASTREAM_HEADER); + } else if (getEndpoint().getDataStream() != null) { + return getEndpoint().getDataStream(); + } else { + throw new IllegalArgumentException("No DataStream defined"); + } + } + + protected DataStreamCallback resolveDataStreamCallback(Exchange exchange) { + if (exchange.getIn().getHeader(FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER) != null) { + return (DataStreamCallback) exchange.getIn().getHeader(FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER); + } else if (getEndpoint().getDataStreamCallback() != null) { + return getEndpoint().getDataStreamCallback(); + } else { + throw new IllegalArgumentException("Cannot resolve DataStream callback."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java index 70c6c39..811ab5c 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.camel.Endpoint; import org.apache.camel.impl.UriEndpointComponent; import org.apache.flink.api.java.DataSet; +import org.apache.flink.streaming.api.datastream.DataStream; /** * The flink component can be used to send DataSet or DataStream jobs to Apache Flink cluster. @@ -29,6 +30,8 @@ public class FlinkComponent extends UriEndpointComponent { private DataSet ds; private DataSetCallback dataSetCallback; + private DataStream dataStream; + private DataStreamCallback dataStreamCallback; public FlinkComponent() { super(FlinkEndpoint.class); @@ -44,6 +47,10 @@ public class FlinkComponent extends UriEndpointComponent { return ds; } + public DataStream getDataStream() { + return dataStream; + } + /** * DataSet to compute against. */ @@ -51,14 +58,32 @@ public class FlinkComponent extends UriEndpointComponent { this.ds = ds; } + /** + * DataStream to compute against. + */ + public void setDataStream(DataStream dataStream) { + this.dataStream = dataStream; + } + public DataSetCallback getDataSetCallback() { return dataSetCallback; } + public DataStreamCallback getDataStreamCallback() { + return dataStreamCallback; + } + /** * Function performing action against a DataSet. */ public void setDataSetCallback(DataSetCallback dataSetCallback) { this.dataSetCallback = dataSetCallback; } + + /** + * Function performing action against a DataStream. + */ + public void setDataStreamCallback(DataStreamCallback dataStreamCallback) { + this.dataStreamCallback = dataStreamCallback; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java index 6f43a16..1c72e99 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java @@ -22,7 +22,10 @@ public final class FlinkConstants { public static final String FLINK_DATASET_CALLBACK_HEADER = "CamelFlinkDataSetCallback"; + public static final String FLINK_DATASTREAM_HEADER = "CamelFlinkDataStream"; + + public static final String FLINK_DATASTREAM_CALLBACK_HEADER = "CamelFlinkDataStreamCallback"; + private FlinkConstants() { } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java index 396d7df..d5bd872 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java @@ -25,6 +25,7 @@ import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.flink.api.java.DataSet; +import org.apache.flink.streaming.api.datastream.DataStream; /** * The flink component can be used to send DataSet jobs to Apache Flink cluster. @@ -39,6 +40,13 @@ public class FlinkEndpoint extends DefaultEndpoint { private DataSet dataSet; @UriParam private DataSetCallback dataSetCallback; + + @UriParam + private DataStream dataStream; + + @UriParam + private DataStreamCallback dataStreamCallback; + @UriParam(defaultValue = "true") private boolean collect = true; @@ -64,8 +72,10 @@ public class FlinkEndpoint extends DefaultEndpoint { public Producer createProducer() throws Exception { if (endpointType == EndpointType.dataset) { return new DataSetFlinkProducer(this); + } else if (endpointType == EndpointType.datastream) { + return new DataStreamFlinkProducer(this); } else { - throw new UnsupportedOperationException("datastream not yet supported"); + return null; } } @@ -95,6 +105,10 @@ public class FlinkEndpoint extends DefaultEndpoint { return dataSet; } + public DataStream getDataStream() { + return dataStream; + } + /** * DataSet to compute against. */ @@ -102,10 +116,21 @@ public class FlinkEndpoint extends DefaultEndpoint { this.dataSet = ds; } + /** + * DataStream to compute against. + */ + public void setDataStream(DataStream ds) { + this.dataStream = ds; + } + public DataSetCallback getDataSetCallback() { return dataSetCallback; } + public DataStreamCallback getDataStreamCallback() { + return dataStreamCallback; + } + /** * Function performing action against a DataSet. */ @@ -113,6 +138,13 @@ public class FlinkEndpoint extends DefaultEndpoint { this.dataSetCallback = dataSetCallback; } + /** + * Function performing action against a DataStream. + */ + public void setDataStreamCallback(DataStreamCallback dataStreamCallback) { + this.dataStreamCallback = dataStreamCallback; + } + public boolean isCollect() { return collect; } http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java index 927a57d..e85c9af 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java @@ -17,13 +17,19 @@ package org.apache.camel.component.flink; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public final class Flinks { private Flinks() { + } public static ExecutionEnvironment createExecutionEnvironment() { return ExecutionEnvironment.getExecutionEnvironment(); } + + public static StreamExecutionEnvironment createStreamExecutionEnvironment() { + return StreamExecutionEnvironment.getExecutionEnvironment(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataStreamCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataStreamCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataStreamCallback.java new file mode 100644 index 0000000..493c5d8 --- /dev/null +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataStreamCallback.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.flink; + +import org.apache.flink.streaming.api.datastream.DataStream; + +public abstract class VoidDataStreamCallback implements DataStreamCallback<Void> { + + public abstract void doOnDataStream(DataStream ds, Object... payloads) throws Exception; + + @Override + public Void onDataStream(DataStream ds, Object... payloads) throws Exception { + doOnDataStream(ds, payloads); + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/702db0c9/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java b/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java index 4f3e863..c96796f 100644 --- a/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java +++ b/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java @@ -26,13 +26,17 @@ import org.apache.camel.impl.JndiRegistry; import org.apache.camel.test.junit4.CamelTestSupport; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.Test; public class FlinkProducerTest extends CamelTestSupport { static ExecutionEnvironment executionEnvironment = Flinks.createExecutionEnvironment(); + static StreamExecutionEnvironment streamExecutionEnvironment = Flinks.createStreamExecutionEnvironment(); - String flinkUri = "flink:dataSet?dataSet=#myDataSet"; + String flinkDataSetUri = "flink:dataSet?dataSet=#myDataSet"; + String flinkDataStreamUri = "flink:dataStream?dataStream=#myDataStream"; int numberOfLinesInTestFile = 19; @@ -41,6 +45,7 @@ public class FlinkProducerTest extends CamelTestSupport { JndiRegistry registry = super.createRegistry(); registry.bind("myDataSet", executionEnvironment.readTextFile("src/test/resources/testds.txt")); + registry.bind("myDataStream", streamExecutionEnvironment.readTextFile("src/test/resources/testds.txt")); registry.bind("countLinesContaining", new DataSetCallback() { @Override @@ -57,7 +62,7 @@ public class FlinkProducerTest extends CamelTestSupport { @Test public void shouldExecuteDataSetCallback() { - Long linesCount = template.requestBodyAndHeader(flinkUri, null, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new DataSetCallback() { + Long linesCount = template.requestBodyAndHeader(flinkDataSetUri, null, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new DataSetCallback() { @Override public Object onDataSet(DataSet ds, Object... payloads) { try { @@ -73,7 +78,7 @@ public class FlinkProducerTest extends CamelTestSupport { @Test public void shouldExecuteDataSetCallbackWithSinglePayload() { - Long linesCount = template.requestBodyAndHeader(flinkUri, 10, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new DataSetCallback() { + Long linesCount = template.requestBodyAndHeader(flinkDataSetUri, 10, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new DataSetCallback() { @Override public Object onDataSet(DataSet ds, Object... payloads) { try { @@ -89,7 +94,7 @@ public class FlinkProducerTest extends CamelTestSupport { @Test public void shouldExecuteDataSetCallbackWithPayloads() { - Long linesCount = template.requestBodyAndHeader(flinkUri, Arrays.<Integer>asList(10, 10), FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new DataSetCallback() { + Long linesCount = template.requestBodyAndHeader(flinkDataSetUri, Arrays.<Integer>asList(10, 10), FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new DataSetCallback() { @Override public Object onDataSet(DataSet ds, Object... payloads) { try { @@ -105,7 +110,7 @@ public class FlinkProducerTest extends CamelTestSupport { @Test public void shouldUseTransformationFromRegistry() { - Long linesCount = template.requestBody(flinkUri + "&dataSetCallback=#countLinesContaining", null, Long.class); + Long linesCount = template.requestBody(flinkDataSetUri + "&dataSetCallback=#countLinesContaining", null, Long.class); Truth.assertThat(linesCount).isGreaterThan(0L); } @@ -114,7 +119,7 @@ public class FlinkProducerTest extends CamelTestSupport { final File output = File.createTempFile("camel", "flink"); output.delete(); - template.sendBodyAndHeader(flinkUri, null, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new VoidDataSetCallback() { + template.sendBodyAndHeader(flinkDataSetUri, null, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new VoidDataSetCallback() { @Override public void doOnDataSet(DataSet ds, Object... payloads) { ds.writeAsText(output.getAbsolutePath()); @@ -137,7 +142,7 @@ public class FlinkProducerTest extends CamelTestSupport { } }); - long pomLinesCount = template.requestBodyAndHeader(flinkUri, null, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, dataSetCallback, Long.class); + long pomLinesCount = template.requestBodyAndHeader(flinkDataSetUri, null, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, dataSetCallback, Long.class); Truth.assertThat(pomLinesCount).isEqualTo(19); } @@ -154,7 +159,7 @@ public class FlinkProducerTest extends CamelTestSupport { } }); - template.sendBodyAndHeader(flinkUri, null, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, dataSetCallback); + template.sendBodyAndHeader(flinkDataSetUri, null, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, dataSetCallback); Truth.assertThat(output.length()).isAtLeast(0L); } @@ -172,7 +177,22 @@ public class FlinkProducerTest extends CamelTestSupport { } }); - long pomLinesCount = template.requestBodyAndHeader(flinkUri, Arrays.<Integer>asList(10, 10), FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, dataSetCallback, Long.class); + long pomLinesCount = template.requestBodyAndHeader(flinkDataSetUri, Arrays.<Integer>asList(10, 10), FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, dataSetCallback, Long.class); Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10 * 10); } + + @Test + public void shouldExecuteVoidDataStreamCallback() throws IOException { + final File output = File.createTempFile("camel", "flink"); + output.delete(); + + template.sendBodyAndHeader(flinkDataStreamUri, null, FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + ds.writeAsText(output.getAbsolutePath()); + } + }); + + Truth.assertThat(output.length()).isAtLeast(0L); + } } \ No newline at end of file