This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit ba241aed8ab9fbcca0abb2d5ae0e8e76b8f9da69 Author: Andrea Cosentino <[email protected]> AuthorDate: Wed Jul 24 09:11:27 2019 +0200 Camel-Flink: Use BindToRegistry annotation where possible --- .../camel/component/flink/FlinkProducerTest.java | 26 +++++++++++++--------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java b/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java index 36b3dae..c74444e 100644 --- a/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java +++ b/components/camel-flink/src/test/java/org/apache/camel/component/flink/FlinkProducerTest.java @@ -21,12 +21,16 @@ import java.io.IOException; import java.util.Arrays; import com.google.common.truth.Truth; + +import org.apache.camel.BindToRegistry; import org.apache.camel.component.flink.annotations.AnnotatedDataSetCallback; import org.apache.camel.impl.JndiRegistry; import org.apache.camel.test.junit4.CamelTestSupport; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.operators.DataSource; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.Test; @@ -34,20 +38,21 @@ public class FlinkProducerTest extends CamelTestSupport { static ExecutionEnvironment executionEnvironment = Flinks.createExecutionEnvironment(); static StreamExecutionEnvironment streamExecutionEnvironment = Flinks.createStreamExecutionEnvironment(); + + @BindToRegistry("myDataSet") + private DataSource<String> ds = executionEnvironment.readTextFile("src/test/resources/testds.txt"); + + @BindToRegistry("myDataStream") + private DataStreamSource<String> dss = streamExecutionEnvironment.readTextFile("src/test/resources/testds.txt"); String flinkDataSetUri = "flink:dataSet?dataSet=#myDataSet"; String flinkDataStreamUri = "flink:dataStream?dataStream=#myDataStream"; int numberOfLinesInTestFile = 19; - - @Override - protected JndiRegistry createRegistry() throws Exception { - JndiRegistry registry = super.createRegistry(); - - registry.bind("myDataSet", executionEnvironment.readTextFile("src/test/resources/testds.txt")); - registry.bind("myDataStream", streamExecutionEnvironment.readTextFile("src/test/resources/testds.txt")); - - registry.bind("countLinesContaining", new DataSetCallback() { + + @BindToRegistry("countLinesContaining") + public DataSetCallback addDataSetCallback() { + return new DataSetCallback() { @Override public Object onDataSet(DataSet ds, Object... payloads) { try { @@ -56,8 +61,7 @@ public class FlinkProducerTest extends CamelTestSupport { return null; } } - }); - return registry; + }; } @Test
