Fixed CS. This closes #946
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4ca2a64a Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4ca2a64a Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4ca2a64a Branch: refs/heads/master Commit: 4ca2a64a943a56e90c0ad3483d6766157f6647ed Parents: 62631da Author: Claus Ibsen <davscl...@apache.org> Authored: Mon Apr 25 08:27:53 2016 +0200 Committer: Claus Ibsen <davscl...@apache.org> Committed: Mon Apr 25 08:27:53 2016 +0200 ---------------------------------------------------------------------- .../flink/ConvertingDataSetCallback.java | 7 ++-- .../camel/component/flink/DataSetCallback.java | 1 - .../component/flink/DataSetFlinkProducer.java | 21 +++++++---- .../camel/component/flink/EndpointType.java | 1 - .../camel/component/flink/FlinkComponent.java | 5 +-- .../camel/component/flink/FlinkConstants.java | 11 ++++-- .../camel/component/flink/FlinkEndpoint.java | 22 +++-------- .../apache/camel/component/flink/Flinks.java | 2 - .../component/flink/VoidDataSetCallback.java | 1 - .../annotations/AnnotatedDataSetCallback.java | 9 ++--- .../flink/annotations/DataSetCallback.java | 7 +++- .../component/flink/FlinkProducerTest.java | 13 +++---- .../src/test/resources/log4j.properties | 39 ++++++++++++++++++++ 13 files changed, 84 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4ca2a64a/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java index a4fa4fa..b462a56 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java @@ -14,14 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.flink; +import static java.lang.String.format; + import org.apache.camel.CamelContext; import org.apache.flink.api.java.DataSet; -import static java.lang.String.format; - public abstract class ConvertingDataSetCallback<T> implements DataSetCallback<T> { private final CamelContext camelContext; @@ -39,7 +38,7 @@ public abstract class ConvertingDataSetCallback<T> implements DataSetCallback<T> String message = format("Received %d payloads, but expected %d.", payloads.length, payloadTypes.length); throw new IllegalArgumentException(message); } - for (int i=0; i < payloads.length;i++) { + for (int i = 0; i < payloads.length; i++) { payloads[i] = camelContext.getTypeConverter().convertTo(payloadTypes[i], payloads[i]); } return doOnDataSet(ds, payloads); http://git-wip-us.apache.org/repos/asf/camel/blob/4ca2a64a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java index 82d05ce..196cf1b 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.flink; import org.apache.flink.api.java.DataSet; http://git-wip-us.apache.org/repos/asf/camel/blob/4ca2a64a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java index 855da2c..054180d 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java @@ -14,15 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.flink; +import java.util.List; + import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; import org.apache.flink.api.java.DataSet; -import java.util.List; - public class DataSetFlinkProducer extends DefaultProducer { public DataSetFlinkProducer(FlinkEndpoint endpoint) { @@ -34,7 +33,15 @@ public class DataSetFlinkProducer extends DefaultProducer { DataSet ds = resolveDataSet(exchange); DataSetCallback dataSetCallback = resolveDataSetCallback(exchange); Object body = exchange.getIn().getBody(); - Object result = body instanceof List ? dataSetCallback.onDataSet(ds, ((List) body).toArray(new Object[0])) : dataSetCallback.onDataSet(ds, body); + + Object result; + if (body instanceof List) { + List list = (List) body; + Object[] array = list.toArray(new Object[list.size()]); + result = dataSetCallback.onDataSet(ds, array); + } else { + result = dataSetCallback.onDataSet(ds, body); + } collectResults(exchange, result); } @@ -48,13 +55,11 @@ public class DataSetFlinkProducer extends DefaultProducer { DataSet dsResults = (DataSet) result; if (getEndpoint().isCollect()) { exchange.getIn().setBody(dsResults.collect()); - } - else { + } else { exchange.getIn().setBody(result); exchange.getIn().setHeader(FlinkConstants.FLINK_DATASET_HEADER, result); } - } - else { + } else { exchange.getIn().setBody(result); } } http://git-wip-us.apache.org/repos/asf/camel/blob/4ca2a64a/components/camel-flink/src/main/java/org/apache/camel/component/flink/EndpointType.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/EndpointType.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/EndpointType.java index 0d3cde0..1cd3e92 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/EndpointType.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/EndpointType.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.flink; public enum EndpointType { http://git-wip-us.apache.org/repos/asf/camel/blob/4ca2a64a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java index 6173044..70c6c39 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java @@ -14,15 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.flink; +import java.util.Map; + import org.apache.camel.Endpoint; import org.apache.camel.impl.UriEndpointComponent; import org.apache.flink.api.java.DataSet; -import java.util.Map; - /** * The flink component can be used to send DataSet or DataStream jobs to Apache Flink cluster. */ http://git-wip-us.apache.org/repos/asf/camel/blob/4ca2a64a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java index e34f844..6f43a16 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkConstants.java @@ -14,12 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.flink; -public class FlinkConstants { +public final class FlinkConstants { + + public static final String FLINK_DATASET_HEADER = "CamelFlinkDataSet"; + + public static final String FLINK_DATASET_CALLBACK_HEADER = "CamelFlinkDataSetCallback"; - public static final String FLINK_DATASET_HEADER = "CAMEL_FLINK_DATASET"; + private FlinkConstants() { + } - public static final String FLINK_DATASET_CALLBACK_HEADER = "CAMEL_FLINK_RDD_CALLBACK"; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/4ca2a64a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java index ffed807..396d7df 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.flink; import org.apache.camel.Consumer; @@ -26,29 +25,20 @@ import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; import org.apache.flink.api.java.DataSet; -import org.slf4j.Logger; - -import static org.slf4j.LoggerFactory.getLogger; /** * The flink component can be used to send DataSet jobs to Apache Flink cluster. */ -@UriEndpoint(scheme = "META-INF/services/org/apache/camel/component/flink", title = "Apache Flink", syntax = "flink:endpointType", - producerOnly = true, label = "flink engine, hadoop") +@UriEndpoint(scheme = "flink", title = "Apache Flink", syntax = "flink:endpointType", producerOnly = true, label = "hadoop") public class FlinkEndpoint extends DefaultEndpoint { - private static final Logger LOG = getLogger(FlinkEndpoint.class); - - @UriPath @Metadata(required = "true") + @UriPath + @Metadata(required = "true") private EndpointType endpointType; - - // DataSet to compute against. @UriParam private DataSet dataSet; - @UriParam private DataSetCallback dataSetCallback; - @UriParam(defaultValue = "true") private boolean collect = true; @@ -72,13 +62,11 @@ public class FlinkEndpoint extends DefaultEndpoint { @Override public Producer createProducer() throws Exception { - LOG.trace("Creating {} Flink Producer.", endpointType); if (endpointType == EndpointType.dataset) { - LOG.trace("About to create Dataset Producer."); return new DataSetFlinkProducer(this); + } else { + throw new UnsupportedOperationException("datastream not yet supported"); } - else - return null; } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/4ca2a64a/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java index 3fbfb79..927a57d 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/Flinks.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.flink; import org.apache.flink.api.java.ExecutionEnvironment; @@ -22,7 +21,6 @@ import org.apache.flink.api.java.ExecutionEnvironment; public final class Flinks { private Flinks() { - } public static ExecutionEnvironment createExecutionEnvironment() { http://git-wip-us.apache.org/repos/asf/camel/blob/4ca2a64a/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java index 2e50c58..e7479c7 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.flink; import org.apache.flink.api.java.DataSet; http://git-wip-us.apache.org/repos/asf/camel/blob/4ca2a64a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java index b199e2c..d6e55f0 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java @@ -14,18 +14,17 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.flink.annotations; -import org.apache.camel.CamelContext; -import org.apache.flink.api.java.DataSet; - import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import org.apache.camel.CamelContext; +import org.apache.flink.api.java.DataSet; + import static org.apache.camel.util.ObjectHelper.findMethodsWithAnnotation; /** @@ -67,7 +66,7 @@ public class AnnotatedDataSetCallback implements org.apache.camel.component.flin callbackMethod.setAccessible(true); if (camelContext != null) { - for (int i = 1;i < arguments.size();i++) { + for (int i = 1; i < arguments.size(); i++) { arguments.set(i, camelContext.getTypeConverter().convertTo(callbackMethod.getParameterTypes()[i], arguments.get(i))); } } http://git-wip-us.apache.org/repos/asf/camel/blob/4ca2a64a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java index 2133468..b85a231 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java @@ -14,10 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.flink.annotations; -import java.lang.annotation.*; +import java.lang.annotation.ElementType; +import java.lang.annotation.Inherited; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.METHOD, ElementType.PARAMETER}) http://git-wip-us.apache.org/repos/asf/camel/blob/4ca2a64a/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java b/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java index d875bbb..4f3e863 100644 --- a/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java +++ b/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java @@ -14,9 +14,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.flink; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; + import com.google.common.truth.Truth; import org.apache.camel.component.flink.annotations.AnnotatedDataSetCallback; import org.apache.camel.impl.JndiRegistry; @@ -25,10 +28,6 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.junit.Test; -import java.io.File; -import java.io.IOException; -import java.util.Arrays; - public class FlinkProducerTest extends CamelTestSupport { static ExecutionEnvironment executionEnvironment = Flinks.createExecutionEnvironment(); @@ -127,7 +126,7 @@ public class FlinkProducerTest extends CamelTestSupport { @Test public void shouldExecuteAnnotatedCallback() { - DataSetCallback dataSetCallback = new AnnotatedDataSetCallback(new Object(){ + DataSetCallback dataSetCallback = new AnnotatedDataSetCallback(new Object() { @org.apache.camel.component.flink.annotations.DataSetCallback Long countLines(DataSet<String> textFile) { try { @@ -162,7 +161,7 @@ public class FlinkProducerTest extends CamelTestSupport { @Test public void shouldExecuteAnnotatedCallbackWithParameters() { - DataSetCallback dataSetCallback = new AnnotatedDataSetCallback(new Object(){ + DataSetCallback dataSetCallback = new AnnotatedDataSetCallback(new Object() { @org.apache.camel.component.flink.annotations.DataSetCallback Long countLines(DataSet<String> textFile, int first, int second) { try { http://git-wip-us.apache.org/repos/asf/camel/blob/4ca2a64a/components/camel-flink/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/components/camel-flink/src/test/resources/log4j.properties b/components/camel-flink/src/test/resources/log4j.properties new file mode 100644 index 0000000..45f294a --- /dev/null +++ b/components/camel-flink/src/test/resources/log4j.properties @@ -0,0 +1,39 @@ +## ------------------------------------------------------------------------ +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## ------------------------------------------------------------------------ + +# +# The logging properties used for testing +# +log4j.rootLogger=INFO, file + +#log4j.logger.org.apache.camel.component.flink=DEBUG + +# CONSOLE appender not used by default +log4j.appender.out=org.apache.log4j.ConsoleAppender +log4j.appender.out.layout=org.apache.log4j.PatternLayout +log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n +# MDC +#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %-10.10X{camel.breadcrumbId} - %-10.10X{camel.exchangeId} - %-10.10X{camel.correlationId} - %-10.10X{camel.routeId} - %m%n + +# File appender +log4j.appender.file=org.apache.log4j.FileAppender +log4j.appender.file.layout=org.apache.log4j.PatternLayout +log4j.appender.file.file=target/camel-test-flink.log +log4j.appender.file.append=true +log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %m%n +# MDC +#log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} - %-10.10X{camel.breadcrumbId} - %-10.10X{camel.exchangeId} - %-10.10X{camel.correlationId} - %-10.10X{camel.routeId} - %m%n