Repository: camel Updated Branches: refs/heads/master 2bab577e6 -> a00c190f9
CAMEL-9869 Create Apache Flink Component Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/62631da8 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/62631da8 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/62631da8 Branch: refs/heads/master Commit: 62631da832fa5019fdfba4df67cb3e21803ce4a5 Parents: 2bab577 Author: Subhobrata Dey <sbc...@gmail.com> Authored: Thu Apr 14 22:07:48 2016 -0400 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Apr 25 08:16:20 2016 +0200 ---------------------------------------------------------------------- components/camel-flink/pom.xml | 87 ++++++++ .../flink/ConvertingDataSetCallback.java | 49 +++++ .../camel/component/flink/DataSetCallback.java | 30 +++ .../component/flink/DataSetFlinkProducer.java | 81 ++++++++ .../camel/component/flink/EndpointType.java | 23 +++ .../camel/component/flink/FlinkComponent.java | 65 ++++++ .../camel/component/flink/FlinkConstants.java | 25 +++ .../camel/component/flink/FlinkEndpoint.java | 138 +++++++++++++ .../apache/camel/component/flink/Flinks.java | 31 +++ .../component/flink/VoidDataSetCallback.java | 31 +++ .../annotations/AnnotatedDataSetCallback.java | 80 ++++++++ .../flink/annotations/DataSetCallback.java | 27 +++ .../src/main/resources/META-INF/LICENSE.txt | 203 +++++++++++++++++++ .../src/main/resources/META-INF/NOTICE.txt | 11 + .../services/org/apache/camel/component/flink | 18 ++ .../component/flink/FlinkProducerTest.java | 179 ++++++++++++++++ .../camel-flink/src/test/resources/testds.txt | 19 ++ components/pom.xml | 1 + 18 files changed, 1098 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/pom.xml ---------------------------------------------------------------------- diff --git a/components/camel-flink/pom.xml b/components/camel-flink/pom.xml new file mode 100644 index 0000000..24407b6 --- /dev/null +++ b/components/camel-flink/pom.xml @@ -0,0 +1,87 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <artifactId>components</artifactId> + <groupId>org.apache.camel</groupId> + <version>2.18-SNAPSHOT</version> + </parent> + + <artifactId>camel-flink</artifactId> + <packaging>jar</packaging> + <name>Camel :: Apache Flink</name> + <description>Camel Apache Flink support</description> + + <properties> + <camel.osgi.export.pkg>org.apache.camel.component.flink.*</camel.osgi.export.pkg> + <camel.osgi.export.service>org.apache.camel.spi.ComponentResolver;component=flink</camel.osgi.export.service> + </properties> + + <dependencies> + <!--camel--> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-core</artifactId> + </dependency> + + <!--flink--> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-java</artifactId> + <version>0.10.2</version> + </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-clients</artifactId> + <version>0.10.2</version> + </dependency> + + <!--testing--> + <dependency> + <groupId>com.google.truth</groupId> + <artifactId>truth</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + <scope>test</scope> + </dependency> + + <!--optional--> + <dependency> + <groupId>org.scala-lang</groupId> + <artifactId>scala-library</artifactId> + <version>2.10.6</version> + </dependency> + </dependencies> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java new file mode 100644 index 0000000..a4fa4fa --- /dev/null +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.flink; + +import org.apache.camel.CamelContext; +import org.apache.flink.api.java.DataSet; + +import static java.lang.String.format; + +public abstract class ConvertingDataSetCallback<T> implements DataSetCallback<T> { + + private final CamelContext camelContext; + + private final Class[] payloadTypes; + + public ConvertingDataSetCallback(CamelContext camelContext, Class... payloadTypes) { + this.camelContext = camelContext; + this.payloadTypes = payloadTypes; + } + + @Override + public T onDataSet(DataSet ds, Object... payloads) { + if (payloads.length != payloadTypes.length) { + String message = format("Received %d payloads, but expected %d.", payloads.length, payloadTypes.length); + throw new IllegalArgumentException(message); + } + for (int i=0; i < payloads.length;i++) { + payloads[i] = camelContext.getTypeConverter().convertTo(payloadTypes[i], payloads[i]); + } + return doOnDataSet(ds, payloads); + } + + public abstract T doOnDataSet(DataSet ds, Object... payloads); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java new file mode 100644 index 0000000..82d05ce --- /dev/null +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.flink; + +import org.apache.flink.api.java.DataSet; + +/** + * Generic block of code with parameters which can be executed against DataSet and return results. + * + * @param <T> results type + */ +public interface DataSetCallback<T> { + + T onDataSet(DataSet ds, Object... payloads); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java new file mode 100644 index 0000000..855da2c --- /dev/null +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.flink; + +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultProducer; +import org.apache.flink.api.java.DataSet; + +import java.util.List; + +public class DataSetFlinkProducer extends DefaultProducer { + + public DataSetFlinkProducer(FlinkEndpoint endpoint) { + super(endpoint); + } + + @Override + public void process(Exchange exchange) throws Exception { + DataSet ds = resolveDataSet(exchange); + DataSetCallback dataSetCallback = resolveDataSetCallback(exchange); + Object body = exchange.getIn().getBody(); + Object result = body instanceof List ? dataSetCallback.onDataSet(ds, ((List) body).toArray(new Object[0])) : dataSetCallback.onDataSet(ds, body); + collectResults(exchange, result); + } + + @Override + public FlinkEndpoint getEndpoint() { + return (FlinkEndpoint) super.getEndpoint(); + } + + protected void collectResults(Exchange exchange, Object result) throws Exception { + if (result instanceof DataSet) { + DataSet dsResults = (DataSet) result; + if (getEndpoint().isCollect()) { + exchange.getIn().setBody(dsResults.collect()); + } + else { + exchange.getIn().setBody(result); + exchange.getIn().setHeader(FlinkConstants.FLINK_DATASET_HEADER, result); + } + } + else { + exchange.getIn().setBody(result); + } + } + + protected DataSet resolveDataSet(Exchange exchange) { + if (exchange.getIn().getHeader(FlinkConstants.FLINK_DATASET_HEADER) != null) { + return (DataSet) exchange.getIn().getHeader(FlinkConstants.FLINK_DATASET_HEADER); + } else if (getEndpoint().getDataSet() != null) { + return getEndpoint().getDataSet(); + } else { + throw new IllegalStateException("No DataSet defined"); + } + } + + protected DataSetCallback resolveDataSetCallback(Exchange exchange) { + if (exchange.getIn().getHeader(FlinkConstants.FLINK_DATASET_CALLBACK_HEADER) != null) { + return (DataSetCallback) exchange.getIn().getHeader(FlinkConstants.FLINK_DATASET_CALLBACK_HEADER); + } else if (getEndpoint().getDataSetCallback() != null) { + return getEndpoint().getDataSetCallback(); + } else { + throw new IllegalStateException("Cannot resolve DataSet callback."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/main/java/org/apache/camel/component/flink/EndpointType.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/EndpointType.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/EndpointType.java new file mode 100644 index 0000000..0d3cde0 --- /dev/null +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/EndpointType.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.flink; + +public enum EndpointType { + + dataset, datastream +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java new file mode 100644 index 0000000..6173044 --- /dev/null +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.flink; + +import org.apache.camel.Endpoint; +import org.apache.camel.impl.UriEndpointComponent; +import org.apache.flink.api.java.DataSet; + +import java.util.Map; + +/** + * The flink component can be used to send DataSet or DataStream jobs to Apache Flink cluster. + */ +public class FlinkComponent extends UriEndpointComponent { + + private DataSet ds; + private DataSetCallback dataSetCallback; + + public FlinkComponent() { + super(FlinkEndpoint.class); + } + + @Override + protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { + EndpointType type = getCamelContext().getTypeConverter().mandatoryConvertTo(EndpointType.class, remaining); + return new FlinkEndpoint(uri, this, type); + } + + public DataSet getDataSet() { + return ds; + } + + /** + * DataSet to compute against. + */ + public void setDataSet(DataSet ds) { + this.ds = ds; + } + + public DataSetCallback getDataSetCallback() { + return dataSetCallback; + } + + /** + * Function performing action against a DataSet. + */ + public void setDataSetCallback(DataSetCallback dataSetCallback) { + this.dataSetCallback = dataSetCallback; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java new file mode 100644 index 0000000..e34f844 --- /dev/null +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java @@ -0,0 +1,25 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.flink; + +public class FlinkConstants { + + public static final String FLINK_DATASET_HEADER = "CAMEL_FLINK_DATASET"; + + public static final String FLINK_DATASET_CALLBACK_HEADER = "CAMEL_FLINK_RDD_CALLBACK"; +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java new file mode 100644 index 0000000..ffed807 --- /dev/null +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.flink; + +import org.apache.camel.Consumer; +import org.apache.camel.Processor; +import org.apache.camel.Producer; +import org.apache.camel.impl.DefaultEndpoint; +import org.apache.camel.spi.Metadata; +import org.apache.camel.spi.UriEndpoint; +import org.apache.camel.spi.UriParam; +import org.apache.camel.spi.UriPath; +import org.apache.flink.api.java.DataSet; +import org.slf4j.Logger; + +import static org.slf4j.LoggerFactory.getLogger; + +/** + * The flink component can be used to send DataSet jobs to Apache Flink cluster. + */ +@UriEndpoint(scheme = "META-INF/services/org/apache/camel/component/flink", title = "Apache Flink", syntax = "flink:endpointType", + producerOnly = true, label = "flink engine, hadoop") +public class FlinkEndpoint extends DefaultEndpoint { + + private static final Logger LOG = getLogger(FlinkEndpoint.class); + + @UriPath @Metadata(required = "true") + private EndpointType endpointType; + + // DataSet to compute against. + @UriParam + private DataSet dataSet; + + @UriParam + private DataSetCallback dataSetCallback; + + @UriParam(defaultValue = "true") + private boolean collect = true; + + public FlinkEndpoint(String endpointUri, FlinkComponent component, EndpointType endpointType) { + super(endpointUri, component); + this.endpointType = endpointType; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + + if (dataSet == null) { + dataSet = getComponent().getDataSet(); + } + + if (dataSetCallback == null) { + dataSetCallback = getComponent().getDataSetCallback(); + } + } + + @Override + public Producer createProducer() throws Exception { + LOG.trace("Creating {} Flink Producer.", endpointType); + if (endpointType == EndpointType.dataset) { + LOG.trace("About to create Dataset Producer."); + return new DataSetFlinkProducer(this); + } + else + return null; + } + + @Override + public Consumer createConsumer(Processor processor) throws Exception { + throw new UnsupportedOperationException("Flink Component supports producer endpoints only."); + } + + @Override + public boolean isSingleton() { + return true; + } + + @Override + public FlinkComponent getComponent() { + return (FlinkComponent) super.getComponent(); + } + + /** + * Type of the endpoint (dataset, datastream). + */ + public void setEndpointType(EndpointType endpointType) { + this.endpointType = endpointType; + } + + public DataSet getDataSet() { + return dataSet; + } + + /** + * DataSet to compute against. + */ + public void setDataSet(DataSet ds) { + this.dataSet = ds; + } + + public DataSetCallback getDataSetCallback() { + return dataSetCallback; + } + + /** + * Function performing action against a DataSet. + */ + public void setDataSetCallback(DataSetCallback dataSetCallback) { + this.dataSetCallback = dataSetCallback; + } + + public boolean isCollect() { + return collect; + } + + /** + * Indicates if results should be collected or counted. + */ + public void setCollect(boolean collect) { + this.collect = collect; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java new file mode 100644 index 0000000..3fbfb79 --- /dev/null +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.flink; + +import org.apache.flink.api.java.ExecutionEnvironment; + +public final class Flinks { + + private Flinks() { + + } + + public static ExecutionEnvironment createExecutionEnvironment() { + return ExecutionEnvironment.getExecutionEnvironment(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java new file mode 100644 index 0000000..2e50c58 --- /dev/null +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java @@ -0,0 +1,31 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.flink; + +import org.apache.flink.api.java.DataSet; + +public abstract class VoidDataSetCallback implements DataSetCallback<Void> { + + public abstract void doOnDataSet(DataSet ds, Object... payloads); + + @Override + public Void onDataSet(DataSet ds, Object... payloads) { + doOnDataSet(ds, payloads); + return null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java new file mode 100644 index 0000000..b199e2c --- /dev/null +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.flink.annotations; + +import org.apache.camel.CamelContext; +import org.apache.flink.api.java.DataSet; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static org.apache.camel.util.ObjectHelper.findMethodsWithAnnotation; + +/** + * Provides facade for working with annotated DataSet callbacks i.e. POJO classes with an appropriate annotations on + * selected methods. + */ +public class AnnotatedDataSetCallback implements org.apache.camel.component.flink.DataSetCallback { + + private final Object objectWithCallback; + + private final List<Method> dataSetCallbacks; + + private final CamelContext camelContext; + + public AnnotatedDataSetCallback(Object objectWithCallback, CamelContext camelContext) { + this.objectWithCallback = objectWithCallback; + this.camelContext = camelContext; + this.dataSetCallbacks = findMethodsWithAnnotation(objectWithCallback.getClass(), DataSetCallback.class); + if (dataSetCallbacks.size() == 0) { + throw new UnsupportedOperationException("Can't find methods annotated with @DataSetCallback"); + } + } + + public AnnotatedDataSetCallback(Object objectWithCallback) { + this(objectWithCallback, null); + } + + @Override + public Object onDataSet(DataSet ds, Object... payloads) { + try { + List<Object> arguments = new ArrayList<>(payloads.length + 1); + arguments.add(ds); + arguments.addAll(Arrays.asList(payloads)); + if (arguments.get(1) == null) { + arguments.remove(1); + } + + Method callbackMethod = dataSetCallbacks.get(0); + callbackMethod.setAccessible(true); + + if (camelContext != null) { + for (int i = 1;i < arguments.size();i++) { + arguments.set(i, camelContext.getTypeConverter().convertTo(callbackMethod.getParameterTypes()[i], arguments.get(i))); + } + } + + return callbackMethod.invoke(objectWithCallback, arguments.toArray(new Object[arguments.size()])); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java new file mode 100644 index 0000000..2133468 --- /dev/null +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.flink.annotations; + +import java.lang.annotation.*; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.METHOD, ElementType.PARAMETER}) +@Inherited +public @interface DataSetCallback { + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/main/resources/META-INF/LICENSE.txt ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/resources/META-INF/LICENSE.txt b/components/camel-flink/src/main/resources/META-INF/LICENSE.txt new file mode 100644 index 0000000..6b0b127 --- /dev/null +++ b/components/camel-flink/src/main/resources/META-INF/LICENSE.txt @@ -0,0 +1,203 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/main/resources/META-INF/NOTICE.txt ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/resources/META-INF/NOTICE.txt b/components/camel-flink/src/main/resources/META-INF/NOTICE.txt new file mode 100644 index 0000000..2e215bf --- /dev/null +++ b/components/camel-flink/src/main/resources/META-INF/NOTICE.txt @@ -0,0 +1,11 @@ + ========================================================================= + == NOTICE file corresponding to the section 4 d of == + == the Apache License, Version 2.0, == + == in this case for the Apache Camel distribution. == + ========================================================================= + + This product includes software developed by + The Apache Software Foundation (http://www.apache.org/). + + Please read the different LICENSE files present in the licenses directory of + this distribution. http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/main/resources/META-INF/services/org/apache/camel/component/flink ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/resources/META-INF/services/org/apache/camel/component/flink b/components/camel-flink/src/main/resources/META-INF/services/org/apache/camel/component/flink new file mode 100644 index 0000000..3c36c74 --- /dev/null +++ b/components/camel-flink/src/main/resources/META-INF/services/org/apache/camel/component/flink @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +class=org.apache.camel.component.flink.FlinkComponent http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java b/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java new file mode 100644 index 0000000..d875bbb --- /dev/null +++ b/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.flink; + +import com.google.common.truth.Truth; +import org.apache.camel.component.flink.annotations.AnnotatedDataSetCallback; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.Arrays; + +public class FlinkProducerTest extends CamelTestSupport { + + static ExecutionEnvironment executionEnvironment = Flinks.createExecutionEnvironment(); + + String flinkUri = "flink:dataSet?dataSet=#myDataSet"; + + int numberOfLinesInTestFile = 19; + + @Override + protected JndiRegistry createRegistry() throws Exception { + JndiRegistry registry = super.createRegistry(); + + registry.bind("myDataSet", executionEnvironment.readTextFile("src/test/resources/testds.txt")); + + registry.bind("countLinesContaining", new DataSetCallback() { + @Override + public Object onDataSet(DataSet ds, Object... payloads) { + try { + return ds.count(); + } catch (Exception e) { + return null; + } + } + }); + return registry; + } + + @Test + public void shouldExecuteDataSetCallback() { + Long linesCount = template.requestBodyAndHeader(flinkUri, null, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new DataSetCallback() { + @Override + public Object onDataSet(DataSet ds, Object... payloads) { + try { + return ds.count(); + } catch (Exception e) { + return null; + } + } + }, Long.class); + + Truth.assertThat(linesCount).isEqualTo(numberOfLinesInTestFile); + } + + @Test + public void shouldExecuteDataSetCallbackWithSinglePayload() { + Long linesCount = template.requestBodyAndHeader(flinkUri, 10, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new DataSetCallback() { + @Override + public Object onDataSet(DataSet ds, Object... payloads) { + try { + return ds.count() * (int) payloads[0]; + } catch (Exception e) { + return null; + } + } + }, Long.class); + + Truth.assertThat(linesCount).isEqualTo(numberOfLinesInTestFile * 10); + } + + @Test + public void shouldExecuteDataSetCallbackWithPayloads() { + Long linesCount = template.requestBodyAndHeader(flinkUri, Arrays.<Integer>asList(10, 10), FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new DataSetCallback() { + @Override + public Object onDataSet(DataSet ds, Object... payloads) { + try { + return ds.count() * (int) payloads[0] * (int) payloads[1]; + } catch (Exception e) { + return null; + } + } + }, Long.class); + + Truth.assertThat(linesCount).isEqualTo(numberOfLinesInTestFile * 10 * 10); + } + + @Test + public void shouldUseTransformationFromRegistry() { + Long linesCount = template.requestBody(flinkUri + "&dataSetCallback=#countLinesContaining", null, Long.class); + Truth.assertThat(linesCount).isGreaterThan(0L); + } + + @Test + public void shouldExecuteVoidCallback() throws IOException { + final File output = File.createTempFile("camel", "flink"); + output.delete(); + + template.sendBodyAndHeader(flinkUri, null, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, new VoidDataSetCallback() { + @Override + public void doOnDataSet(DataSet ds, Object... payloads) { + ds.writeAsText(output.getAbsolutePath()); + } + }); + + Truth.assertThat(output.length()).isAtLeast(0L); + } + + @Test + public void shouldExecuteAnnotatedCallback() { + DataSetCallback dataSetCallback = new AnnotatedDataSetCallback(new Object(){ + @org.apache.camel.component.flink.annotations.DataSetCallback + Long countLines(DataSet<String> textFile) { + try { + return textFile.count(); + } catch (Exception e) { + return null; + } + } + }); + + long pomLinesCount = template.requestBodyAndHeader(flinkUri, null, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, dataSetCallback, Long.class); + + Truth.assertThat(pomLinesCount).isEqualTo(19); + } + + @Test + public void shouldExecuteAnnotatedVoidCallback() throws IOException { + final File output = File.createTempFile("camel", "flink"); + output.delete(); + + DataSetCallback dataSetCallback = new AnnotatedDataSetCallback(new Object() { + @org.apache.camel.component.flink.annotations.DataSetCallback + void countLines(DataSet<String> textFile) { + textFile.writeAsText(output.getAbsolutePath()); + } + }); + + template.sendBodyAndHeader(flinkUri, null, FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, dataSetCallback); + + Truth.assertThat(output.length()).isAtLeast(0L); + } + + @Test + public void shouldExecuteAnnotatedCallbackWithParameters() { + DataSetCallback dataSetCallback = new AnnotatedDataSetCallback(new Object(){ + @org.apache.camel.component.flink.annotations.DataSetCallback + Long countLines(DataSet<String> textFile, int first, int second) { + try { + return textFile.count() * first * second; + } catch (Exception e) { + return null; + } + } + }); + + long pomLinesCount = template.requestBodyAndHeader(flinkUri, Arrays.<Integer>asList(10, 10), FlinkConstants.FLINK_DATASET_CALLBACK_HEADER, dataSetCallback, Long.class); + Truth.assertThat(pomLinesCount).isEqualTo(numberOfLinesInTestFile * 10 * 10); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/camel-flink/src/test/resources/testds.txt ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/test/resources/testds.txt b/components/camel-flink/src/test/resources/testds.txt new file mode 100644 index 0000000..5bb464c --- /dev/null +++ b/components/camel-flink/src/test/resources/testds.txt @@ -0,0 +1,19 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +foo bar +baz qux \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/62631da8/components/pom.xml ---------------------------------------------------------------------- diff --git a/components/pom.xml b/components/pom.xml index ec1f230..a46a45a 100644 --- a/components/pom.xml +++ b/components/pom.xml @@ -105,6 +105,7 @@ <module>camel-exec</module> <module>camel-facebook</module> <module>camel-flatpack</module> + <module>camel-flink</module> <module>camel-fop</module> <module>camel-freemarker</module> <module>camel-ftp</module>