This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch flink-improv in repository https://gitbox.apache.org/repos/asf/camel.git
commit 6ceead25ff5a9813b6234254a95c630d5853759f Author: Andrea Cosentino <[email protected]> AuthorDate: Thu Oct 16 11:04:24 2025 +0200 CAMEL-22567 - Camel-Flink: Deprecate DataSet API in favor of DataStream API Signed-off-by: Andrea Cosentino <[email protected]> --- .../component/flink/FlinkEndpointConfigurer.java | 45 +++ .../component/flink/FlinkEndpointUriFactory.java | 10 +- .../org/apache/camel/component/flink/flink.json | 12 +- .../camel-flink/src/main/docs/flink-component.adoc | 210 +++++++++++++ .../component/flink/ConvertingDataSetCallback.java | 9 + .../camel/component/flink/DataSetCallback.java | 6 +- .../component/flink/DataSetFlinkProducer.java | 7 + .../component/flink/DataStreamFlinkProducer.java | 97 ++++++ .../camel/component/flink/FlinkComponent.java | 6 + .../camel/component/flink/FlinkEndpoint.java | 114 +++++++ .../camel/component/flink/VoidDataSetCallback.java | 8 + .../annotations/AnnotatedDataSetCallback.java | 5 + .../flink/annotations/DataSetCallback.java | 8 + .../flink/DataStreamBatchProcessingIT.java | 292 ++++++++++++++++++ .../component/flink/DataStreamConfigurationIT.java | 329 +++++++++++++++++++++ .../component/flink/DataStreamEdgeCasesIT.java | 277 +++++++++++++++++ .../component/flink/DataStreamProducerTest.java | 117 ++++++++ .../ROOT/pages/camel-4x-upgrade-guide-4_16.adoc | 149 +++++++++- 18 files changed, 1696 insertions(+), 5 deletions(-) diff --git a/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointConfigurer.java b/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointConfigurer.java index 79b1a2aa6f47..07206669e81b 100644 --- a/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointConfigurer.java +++ b/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointConfigurer.java @@ -23,6 +23,12 @@ public class FlinkEndpointConfigurer extends PropertyConfigurerSupport implement public boolean configure(CamelContext camelContext, Object obj, String name, Object value, boolean ignoreCase) { FlinkEndpoint target = (FlinkEndpoint) obj; switch (ignoreCase ? name.toLowerCase() : name) { + case "checkpointinterval": + case "checkpointInterval": target.setCheckpointInterval(property(camelContext, java.lang.Long.class, value)); return true; + case "checkpointtimeout": + case "checkpointTimeout": target.setCheckpointTimeout(property(camelContext, java.lang.Long.class, value)); return true; + case "checkpointingmode": + case "checkpointingMode": target.setCheckpointingMode(property(camelContext, java.lang.String.class, value)); return true; case "collect": target.setCollect(property(camelContext, boolean.class, value)); return true; case "dataset": case "dataSet": target.setDataSet(property(camelContext, org.apache.flink.api.java.DataSet.class, value)); return true; @@ -32,8 +38,17 @@ public class FlinkEndpointConfigurer extends PropertyConfigurerSupport implement case "dataStream": target.setDataStream(property(camelContext, org.apache.flink.streaming.api.datastream.DataStream.class, value)); return true; case "datastreamcallback": case "dataStreamCallback": target.setDataStreamCallback(property(camelContext, org.apache.camel.component.flink.DataStreamCallback.class, value)); return true; + case "executionmode": + case "executionMode": target.setExecutionMode(property(camelContext, java.lang.String.class, value)); return true; + case "jobname": + case "jobName": target.setJobName(property(camelContext, java.lang.String.class, value)); return true; case "lazystartproducer": case "lazyStartProducer": target.setLazyStartProducer(property(camelContext, boolean.class, value)); return true; + case "maxparallelism": + case "maxParallelism": target.setMaxParallelism(property(camelContext, java.lang.Integer.class, value)); return true; + case "minpausebetweencheckpoints": + case "minPauseBetweenCheckpoints": target.setMinPauseBetweenCheckpoints(property(camelContext, java.lang.Long.class, value)); return true; + case "parallelism": target.setParallelism(property(camelContext, java.lang.Integer.class, value)); return true; default: return false; } } @@ -41,6 +56,12 @@ public class FlinkEndpointConfigurer extends PropertyConfigurerSupport implement @Override public Class<?> getOptionType(String name, boolean ignoreCase) { switch (ignoreCase ? name.toLowerCase() : name) { + case "checkpointinterval": + case "checkpointInterval": return java.lang.Long.class; + case "checkpointtimeout": + case "checkpointTimeout": return java.lang.Long.class; + case "checkpointingmode": + case "checkpointingMode": return java.lang.String.class; case "collect": return boolean.class; case "dataset": case "dataSet": return org.apache.flink.api.java.DataSet.class; @@ -50,8 +71,17 @@ public class FlinkEndpointConfigurer extends PropertyConfigurerSupport implement case "dataStream": return org.apache.flink.streaming.api.datastream.DataStream.class; case "datastreamcallback": case "dataStreamCallback": return org.apache.camel.component.flink.DataStreamCallback.class; + case "executionmode": + case "executionMode": return java.lang.String.class; + case "jobname": + case "jobName": return java.lang.String.class; case "lazystartproducer": case "lazyStartProducer": return boolean.class; + case "maxparallelism": + case "maxParallelism": return java.lang.Integer.class; + case "minpausebetweencheckpoints": + case "minPauseBetweenCheckpoints": return java.lang.Long.class; + case "parallelism": return java.lang.Integer.class; default: return null; } } @@ -60,6 +90,12 @@ public class FlinkEndpointConfigurer extends PropertyConfigurerSupport implement public Object getOptionValue(Object obj, String name, boolean ignoreCase) { FlinkEndpoint target = (FlinkEndpoint) obj; switch (ignoreCase ? name.toLowerCase() : name) { + case "checkpointinterval": + case "checkpointInterval": return target.getCheckpointInterval(); + case "checkpointtimeout": + case "checkpointTimeout": return target.getCheckpointTimeout(); + case "checkpointingmode": + case "checkpointingMode": return target.getCheckpointingMode(); case "collect": return target.isCollect(); case "dataset": case "dataSet": return target.getDataSet(); @@ -69,8 +105,17 @@ public class FlinkEndpointConfigurer extends PropertyConfigurerSupport implement case "dataStream": return target.getDataStream(); case "datastreamcallback": case "dataStreamCallback": return target.getDataStreamCallback(); + case "executionmode": + case "executionMode": return target.getExecutionMode(); + case "jobname": + case "jobName": return target.getJobName(); case "lazystartproducer": case "lazyStartProducer": return target.isLazyStartProducer(); + case "maxparallelism": + case "maxParallelism": return target.getMaxParallelism(); + case "minpausebetweencheckpoints": + case "minPauseBetweenCheckpoints": return target.getMinPauseBetweenCheckpoints(); + case "parallelism": return target.getParallelism(); default: return null; } } diff --git a/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointUriFactory.java b/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointUriFactory.java index 1807f4f165a0..b3d97dff643e 100644 --- a/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointUriFactory.java +++ b/components/camel-flink/src/generated/java/org/apache/camel/component/flink/FlinkEndpointUriFactory.java @@ -23,14 +23,22 @@ public class FlinkEndpointUriFactory extends org.apache.camel.support.component. private static final Set<String> SECRET_PROPERTY_NAMES; private static final Map<String, String> MULTI_VALUE_PREFIXES; static { - Set<String> props = new HashSet<>(7); + Set<String> props = new HashSet<>(15); + props.add("checkpointInterval"); + props.add("checkpointTimeout"); + props.add("checkpointingMode"); props.add("collect"); props.add("dataSet"); props.add("dataSetCallback"); props.add("dataStream"); props.add("dataStreamCallback"); props.add("endpointType"); + props.add("executionMode"); + props.add("jobName"); props.add("lazyStartProducer"); + props.add("maxParallelism"); + props.add("minPauseBetweenCheckpoints"); + props.add("parallelism"); PROPERTY_NAMES = Collections.unmodifiableSet(props); SECRET_PROPERTY_NAMES = Collections.emptySet(); MULTI_VALUE_PREFIXES = Collections.emptyMap(); diff --git a/components/camel-flink/src/generated/resources/META-INF/org/apache/camel/component/flink/flink.json b/components/camel-flink/src/generated/resources/META-INF/org/apache/camel/component/flink/flink.json index 85f0a4f96399..a19a18dabbb5 100644 --- a/components/camel-flink/src/generated/resources/META-INF/org/apache/camel/component/flink/flink.json +++ b/components/camel-flink/src/generated/resources/META-INF/org/apache/camel/component/flink/flink.json @@ -24,7 +24,7 @@ "remote": true }, "componentProperties": { - "dataSetCallback": { "index": 0, "kind": "property", "displayName": "Data Set Callback", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.flink.DataSetCallback", "deprecated": false, "autowired": false, "secret": false, "description": "Function performing action against a DataSet." }, + "dataSetCallback": { "index": 0, "kind": "property", "displayName": "Data Set Callback", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.flink.DataSetCallback", "deprecated": true, "autowired": false, "secret": false, "description": "Function performing action against a DataSet." }, "dataStream": { "index": 1, "kind": "property", "displayName": "Data Stream", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "org.apache.flink.streaming.api.datastream.DataStream", "deprecated": false, "autowired": false, "secret": false, "description": "DataStream to compute against." }, "dataStreamCallback": { "index": 2, "kind": "property", "displayName": "Data Stream Callback", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.flink.DataStreamCallback", "deprecated": false, "autowired": false, "secret": false, "description": "Function performing action against a DataStream." }, "lazyStartProducer": { "index": 3, "kind": "property", "displayName": "Lazy Start Producer", "group": "producer", "label": "producer", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail [...] @@ -43,6 +43,14 @@ "dataSetCallback": { "index": 3, "kind": "parameter", "displayName": "Data Set Callback", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.flink.DataSetCallback", "deprecated": false, "autowired": false, "secret": false, "description": "Function performing action against a DataSet." }, "dataStream": { "index": 4, "kind": "parameter", "displayName": "Data Stream", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "org.apache.flink.streaming.api.datastream.DataStream", "deprecated": false, "autowired": false, "secret": false, "description": "DataStream to compute against." }, "dataStreamCallback": { "index": 5, "kind": "parameter", "displayName": "Data Stream Callback", "group": "producer", "label": "", "required": false, "type": "object", "javaType": "org.apache.camel.component.flink.DataStreamCallback", "deprecated": false, "autowired": false, "secret": false, "description": "Function performing action against a DataStream." }, - "lazyStartProducer": { "index": 6, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produc [...] + "checkpointingMode": { "index": 6, "kind": "parameter", "displayName": "Checkpointing Mode", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "enum", "javaType": "java.lang.String", "enum": [ "EXACTLY_ONCE", "AT_LEAST_ONCE" ], "deprecated": false, "autowired": false, "secret": false, "description": "Checkpointing mode: EXACTLY_ONCE (default) or AT_LEAST_ONCE. EXACTLY_ONCE provides stronger guarantees but may have higher overhead." }, + "checkpointInterval": { "index": 7, "kind": "parameter", "displayName": "Checkpoint Interval", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "description": "Interval in milliseconds between checkpoints. Enables checkpointing when set. Recommended for streaming jobs to ensure fault tolerance." }, + "checkpointTimeout": { "index": 8, "kind": "parameter", "displayName": "Checkpoint Timeout", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "description": "Timeout in milliseconds for checkpoints. Checkpoints that take longer will be aborted." }, + "executionMode": { "index": 9, "kind": "parameter", "displayName": "Execution Mode", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "enum", "javaType": "java.lang.String", "enum": [ "STREAMING", "BATCH", "AUTOMATIC" ], "deprecated": false, "autowired": false, "secret": false, "description": "Execution mode for the Flink job. Options: STREAMING (default), BATCH, AUTOMATIC. BATCH mode is recommended for bounded streams (batch processing)." }, + "jobName": { "index": 10, "kind": "parameter", "displayName": "Job Name", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "string", "javaType": "java.lang.String", "deprecated": false, "autowired": false, "secret": false, "description": "Name for the Flink job. Useful for identification in the Flink UI and logs." }, + "lazyStartProducer": { "index": 11, "kind": "parameter", "displayName": "Lazy Start Producer", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "boolean", "javaType": "boolean", "deprecated": false, "autowired": false, "secret": false, "defaultValue": false, "description": "Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a produ [...] + "maxParallelism": { "index": 12, "kind": "parameter", "displayName": "Max Parallelism", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Maximum parallelism for the Flink job. Defines the upper bound for dynamic scaling and the number of key groups for stateful operators." }, + "minPauseBetweenCheckpoints": { "index": 13, "kind": "parameter", "displayName": "Min Pause Between Checkpoints", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "integer", "javaType": "java.lang.Long", "deprecated": false, "autowired": false, "secret": false, "description": "Minimum pause in milliseconds between consecutive checkpoints. Helps prevent checkpoint storms under heavy load." }, + "parallelism": { "index": 14, "kind": "parameter", "displayName": "Parallelism", "group": "producer (advanced)", "label": "producer,advanced", "required": false, "type": "integer", "javaType": "java.lang.Integer", "deprecated": false, "autowired": false, "secret": false, "description": "Parallelism for the Flink job. If not set, uses the default parallelism of the execution environment." } } } diff --git a/components/camel-flink/src/main/docs/flink-component.adoc b/components/camel-flink/src/main/docs/flink-component.adoc index d4172f7e4133..8e86f995c957 100644 --- a/components/camel-flink/src/main/docs/flink-component.adoc +++ b/components/camel-flink/src/main/docs/flink-component.adoc @@ -44,6 +44,10 @@ flink:dataset?dataset=#myDataSet&dataSetCallback=#dataSetCallback flink:datastream?datastream=#myDataStream&dataStreamCallback=#dataStreamCallback ------------------------------------------------- +IMPORTANT: The DataSet API has been deprecated by Apache Flink since version 1.12. +It is recommended to migrate to the DataStream API with bounded streams for batch processing. +See the Migration Guide section below for more details. + // component-configure options: START @@ -116,5 +120,211 @@ try { } ----------------------------------- +=== Modern DataStream Batch Processing Example + +The recommended approach using the DataStream API in batch mode: + +[source,java] +----------------------------------- +@Bean +public StreamExecutionEnvironment streamExecutionEnvironment() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // Configure for batch processing + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + return env; +} + +@Bean +public DataStreamSource<String> myDataStream(StreamExecutionEnvironment env) { + return env.readTextFile("src/test/resources/testds.txt"); +} + +@Bean +public DataStreamCallback wordCountCallback() { + return new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream dataStream, Object... payloads) throws Exception { + dataStream + .flatMap((String line, Collector<Tuple2<String, Integer>> out) -> { + for (String word : line.split("\\s+")) { + out.collect(Tuple2.of(word, 1)); + } + }) + .returns(Types.TUPLE(Types.STRING, Types.INT)) + .keyBy(tuple -> tuple.f0) + .sum(1) + .print(); + } + }; +} + +// In your route +from("direct:wordCount") + .to("flink:datastream?dataStream=#myDataStream&dataStreamCallback=#wordCountCallback"); +----------------------------------- + +=== Real-time Streaming Example + +For true streaming use cases with unbounded data: + +[source,java] +----------------------------------- +@Bean +public StreamExecutionEnvironment streamingEnvironment() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + // Configure for streaming (default mode) + env.setRuntimeMode(RuntimeExecutionMode.STREAMING); + // Enable checkpointing for fault tolerance + env.enableCheckpointing(10000); // checkpoint every 10 seconds + return env; +} + +@Bean +public DataStreamCallback streamingProcessCallback() { + return new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream dataStream, Object... payloads) throws Exception { + dataStream + .map(event -> processEvent(event)) + .keyBy(event -> event.getKey()) + .window(TumblingEventTimeWindows.of(Time.minutes(5))) + .aggregate(new MyAggregateFunction()) + .addSink(new MyCustomSink()); + + // Execute the streaming job + dataStream.getExecutionEnvironment().execute("Streaming Job"); + } + }; +} +----------------------------------- + +=== Advanced Configuration Examples + +==== Batch Processing with Configuration + +Configure a DataStream endpoint for batch processing with custom settings: + +[source,java] +----------------------------------- +from("direct:batchProcess") + .to("flink:datastream?dataStream=#myDataStream" + + "&dataStreamCallback=#myCallback" + + "&executionMode=BATCH" + + "¶llelism=4" + + "&jobName=MyBatchJob"); +----------------------------------- + +==== Streaming with Checkpointing + +Configure a streaming job with checkpointing for fault tolerance: + +[source,java] +----------------------------------- +from("direct:streamProcess") + .to("flink:datastream?dataStream=#myDataStream" + + "&dataStreamCallback=#streamCallback" + + "&executionMode=STREAMING" + + "&checkpointInterval=60000" // Checkpoint every 60 seconds + + "&checkpointingMode=EXACTLY_ONCE" // Exactly-once semantics + + "&checkpointTimeout=120000" // 2 minute timeout + + "&minPauseBetweenCheckpoints=30000" // 30 second minimum pause + + "¶llelism=8" + + "&maxParallelism=128" + + "&jobName=StreamingPipeline"); +----------------------------------- + +==== Configuration Options Reference + +[cols="1,1,1,3", options="header"] +|=== +|Parameter |Type |Default |Description + +|executionMode +|String +|STREAMING +|Runtime execution mode: STREAMING, BATCH, or AUTOMATIC. BATCH is recommended for bounded streams. + +|checkpointInterval +|Long +|null +|Checkpoint interval in milliseconds. Setting this enables checkpointing. + +|checkpointingMode +|String +|EXACTLY_ONCE +|Checkpointing mode: EXACTLY_ONCE or AT_LEAST_ONCE. + +|checkpointTimeout +|Long +|10 minutes +|Maximum time in milliseconds that a checkpoint may take. + +|minPauseBetweenCheckpoints +|Long +|0 +|Minimum time in milliseconds between consecutive checkpoints. + +|parallelism +|Integer +|Default parallelism +|Parallelism for the Flink job operations. + +|maxParallelism +|Integer +|128 +|Maximum parallelism, which defines the upper bound for dynamic scaling. + +|jobName +|String +|null +|Name for the Flink job, useful for identification in Flink UI. + +|collect +|Boolean +|true +|Whether to collect results (not applicable for unbounded streams). +|=== + + +=== Common Patterns + +==== Counting Elements + +.Before (DataSet) +[source,java] +----------------------------------- +long count = dataSet.count(); +----------------------------------- + +.After (DataStream) +[source,java] +----------------------------------- +// Use a custom sink or reduce operation +dataStream + .map(e -> 1L) + .reduce(Long::sum) + .print(); +----------------------------------- + +==== Collecting Results + +.Before (DataSet) +[source,java] +----------------------------------- +List<String> results = dataSet.collect(); +----------------------------------- + +.After (DataStream) +[source,java] +----------------------------------- +// In batch mode, use executeAndCollect() or a sink +List<String> results = dataStream.executeAndCollect(1000); +----------------------------------- + +=== Additional Resources + +* https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/dataset/overview/[Apache Flink DataSet API Migration Guide] +* https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/overview/[DataStream API Documentation] +* https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/execution_mode/[Batch Execution Mode] include::spring-boot:partial$starter.adoc[] diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java index f91bd9a366c4..9ba05cb3b0cc 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/ConvertingDataSetCallback.java @@ -21,6 +21,15 @@ import org.apache.flink.api.java.DataSet; import static java.lang.String.format; +/** + * DataSet callback with automatic type conversion for payloads. + * + * @param <T> results type + * @deprecated The DataSet API is deprecated since Flink 1.12. Use the DataStream API with bounded streams instead. + * See the Flink migration guide for details on migrating from DataSet to DataStream API. This class + * will be maintained for backward compatibility but may be removed in future versions. + */ +@Deprecated(since = "4.16.0") public abstract class ConvertingDataSetCallback<T> implements DataSetCallback<T> { private final CamelContext camelContext; diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java index 3979bfe9208b..93a253250c1d 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetCallback.java @@ -21,8 +21,12 @@ import org.apache.flink.api.java.DataSet; /** * Generic block of code with parameters which can be executed against DataSet and return results. * - * @param <T> results type + * @param <T> results type + * @deprecated The DataSet API is deprecated since Flink 1.12. Use the DataStream API with bounded streams instead. + * See the Flink migration guide for details on migrating from DataSet to DataStream API. This class + * will be maintained for backward compatibility but may be removed in future versions. */ +@Deprecated(since = "4.16.0") public interface DataSetCallback<T> { T onDataSet(DataSet ds, Object... payloads); diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java index 75cb4798d890..dc038b76a8dc 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataSetFlinkProducer.java @@ -22,6 +22,13 @@ import org.apache.camel.Exchange; import org.apache.camel.support.DefaultProducer; import org.apache.flink.api.java.DataSet; +/** + * Producer for executing Flink DataSet operations. + * + * @deprecated The DataSet API is deprecated since Flink 1.12. Use the DataStream API with bounded streams instead. See + * the Flink migration guide for details on migrating from DataSet to DataStream API. + */ +@Deprecated(since = "4.16.0") public class DataSetFlinkProducer extends DefaultProducer { public DataSetFlinkProducer(FlinkEndpoint endpoint) { diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java index 53d8bf5bac7d..25a359694713 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/DataStreamFlinkProducer.java @@ -20,10 +20,23 @@ import java.util.List; import org.apache.camel.Exchange; import org.apache.camel.support.DefaultProducer; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +/** + * Producer for executing Flink DataStream operations with support for modern Flink features including execution mode + * configuration, checkpointing, and parallelism settings. + */ public class DataStreamFlinkProducer extends DefaultProducer { + private static final Logger LOG = LoggerFactory.getLogger(DataStreamFlinkProducer.class); + + private volatile boolean environmentConfigured = false; + public DataStreamFlinkProducer(FlinkEndpoint endpoint) { super(endpoint); } @@ -31,6 +44,17 @@ public class DataStreamFlinkProducer extends DefaultProducer { @Override public void process(Exchange exchange) throws Exception { DataStream ds = resolveDataStream(exchange); + + // Configure environment on first use when DataStream is available + if (!environmentConfigured && ds != null) { + synchronized (this) { + if (!environmentConfigured) { + configureStreamExecutionEnvironment(ds); + environmentConfigured = true; + } + } + } + DataStreamCallback dataStreamCallback = resolveDataStreamCallback(exchange); Object body = exchange.getIn().getBody(); Object result = body instanceof List @@ -76,4 +100,77 @@ public class DataStreamFlinkProducer extends DefaultProducer { throw new IllegalArgumentException("Cannot resolve DataStream callback."); } } + + /** + * Configures the StreamExecutionEnvironment with the settings from the endpoint. This includes execution mode, + * checkpointing, parallelism, and other advanced options. + * + * @param dataStream the DataStream to configure the environment for + */ + protected void configureStreamExecutionEnvironment(DataStream dataStream) { + if (dataStream == null) { + LOG.debug("No DataStream provided, skipping environment configuration"); + return; + } + + StreamExecutionEnvironment env = dataStream.getExecutionEnvironment(); + + // Configure execution mode (BATCH, STREAMING, AUTOMATIC) + if (getEndpoint().getExecutionMode() != null) { + try { + RuntimeExecutionMode mode = RuntimeExecutionMode.valueOf(getEndpoint().getExecutionMode()); + env.setRuntimeMode(mode); + LOG.info("Set Flink runtime execution mode to: {}", mode); + } catch (IllegalArgumentException e) { + LOG.warn("Invalid execution mode '{}'. Valid values are: STREAMING, BATCH, AUTOMATIC", + getEndpoint().getExecutionMode()); + } + } + + // Configure parallelism + if (getEndpoint().getParallelism() != null) { + env.setParallelism(getEndpoint().getParallelism()); + LOG.info("Set Flink parallelism to: {}", getEndpoint().getParallelism()); + } + + // Configure max parallelism + if (getEndpoint().getMaxParallelism() != null) { + env.setMaxParallelism(getEndpoint().getMaxParallelism()); + LOG.info("Set Flink max parallelism to: {}", getEndpoint().getMaxParallelism()); + } + + // Configure checkpointing + if (getEndpoint().getCheckpointInterval() != null && getEndpoint().getCheckpointInterval() > 0) { + env.enableCheckpointing(getEndpoint().getCheckpointInterval()); + LOG.info("Enabled checkpointing with interval: {} ms", getEndpoint().getCheckpointInterval()); + + // Configure checkpointing mode + if (getEndpoint().getCheckpointingMode() != null) { + try { + CheckpointingMode mode = CheckpointingMode.valueOf(getEndpoint().getCheckpointingMode()); + env.getCheckpointConfig().setCheckpointingMode(mode); + LOG.info("Set checkpointing mode to: {}", mode); + } catch (IllegalArgumentException e) { + LOG.warn("Invalid checkpointing mode '{}'. Valid values are: EXACTLY_ONCE, AT_LEAST_ONCE", + getEndpoint().getCheckpointingMode()); + } + } + + // Configure checkpoint timeout + if (getEndpoint().getCheckpointTimeout() != null) { + env.getCheckpointConfig().setCheckpointTimeout(getEndpoint().getCheckpointTimeout()); + LOG.info("Set checkpoint timeout to: {} ms", getEndpoint().getCheckpointTimeout()); + } + + // Configure min pause between checkpoints + if (getEndpoint().getMinPauseBetweenCheckpoints() != null) { + env.getCheckpointConfig() + .setMinPauseBetweenCheckpoints(getEndpoint().getMinPauseBetweenCheckpoints()); + LOG.info("Set min pause between checkpoints to: {} ms", + getEndpoint().getMinPauseBetweenCheckpoints()); + } + } + + LOG.debug("StreamExecutionEnvironment configuration completed"); + } } diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java index f0f950153ad3..7c1ee50a48a2 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkComponent.java @@ -58,7 +58,10 @@ public class FlinkComponent extends DefaultComponent { /** * DataSet to compute against. + * + * @deprecated The DataSet API is deprecated since Flink 1.12. Use the DataStream API with bounded streams instead. */ + @Deprecated(since = "4.16.0") public void setDataSet(DataSet ds) { this.ds = ds; } @@ -80,7 +83,10 @@ public class FlinkComponent extends DefaultComponent { /** * Function performing action against a DataSet. + * + * @deprecated The DataSet API is deprecated since Flink 1.12. Use the DataStream API with bounded streams instead. */ + @Deprecated(since = "4.16.0") public void setDataSetCallback(DataSetCallback dataSetCallback) { this.dataSetCallback = dataSetCallback; } diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java index 5d2eea15ae8a..8bffd74cf250 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/FlinkEndpoint.java @@ -48,6 +48,22 @@ public class FlinkEndpoint extends DefaultEndpoint { private DataStreamCallback dataStreamCallback; @UriParam(defaultValue = "true") private boolean collect = true; + @UriParam(label = "producer,advanced", enums = "STREAMING,BATCH,AUTOMATIC") + private String executionMode; + @UriParam(label = "producer,advanced") + private Long checkpointInterval; + @UriParam(label = "producer,advanced", enums = "EXACTLY_ONCE,AT_LEAST_ONCE") + private String checkpointingMode; + @UriParam(label = "producer,advanced") + private Integer parallelism; + @UriParam(label = "producer,advanced") + private Integer maxParallelism; + @UriParam(label = "producer,advanced") + private String jobName; + @UriParam(label = "producer,advanced") + private Long checkpointTimeout; + @UriParam(label = "producer,advanced") + private Long minPauseBetweenCheckpoints; public FlinkEndpoint(String endpointUri, FlinkComponent component, EndpointType endpointType) { super(endpointUri, component); @@ -105,7 +121,10 @@ public class FlinkEndpoint extends DefaultEndpoint { /** * DataSet to compute against. + * + * @deprecated The DataSet API is deprecated since Flink 1.12. Use the DataStream API with bounded streams instead. */ + @Deprecated(since = "4.16.0") public void setDataSet(DataSet ds) { this.dataSet = ds; } @@ -127,7 +146,10 @@ public class FlinkEndpoint extends DefaultEndpoint { /** * Function performing action against a DataSet. + * + * @deprecated The DataSet API is deprecated since Flink 1.12. Use the DataStream API with bounded streams instead. */ + @Deprecated(since = "4.16.0") public void setDataSetCallback(DataSetCallback dataSetCallback) { this.dataSetCallback = dataSetCallback; } @@ -149,4 +171,96 @@ public class FlinkEndpoint extends DefaultEndpoint { public void setCollect(boolean collect) { this.collect = collect; } + + public String getExecutionMode() { + return executionMode; + } + + /** + * Execution mode for the Flink job. Options: STREAMING (default), BATCH, AUTOMATIC. BATCH mode is recommended for + * bounded streams (batch processing). + */ + public void setExecutionMode(String executionMode) { + this.executionMode = executionMode; + } + + public Long getCheckpointInterval() { + return checkpointInterval; + } + + /** + * Interval in milliseconds between checkpoints. Enables checkpointing when set. Recommended for streaming jobs to + * ensure fault tolerance. + */ + public void setCheckpointInterval(Long checkpointInterval) { + this.checkpointInterval = checkpointInterval; + } + + public String getCheckpointingMode() { + return checkpointingMode; + } + + /** + * Checkpointing mode: EXACTLY_ONCE (default) or AT_LEAST_ONCE. EXACTLY_ONCE provides stronger guarantees but may + * have higher overhead. + */ + public void setCheckpointingMode(String checkpointingMode) { + this.checkpointingMode = checkpointingMode; + } + + public Integer getParallelism() { + return parallelism; + } + + /** + * Parallelism for the Flink job. If not set, uses the default parallelism of the execution environment. + */ + public void setParallelism(Integer parallelism) { + this.parallelism = parallelism; + } + + public Integer getMaxParallelism() { + return maxParallelism; + } + + /** + * Maximum parallelism for the Flink job. Defines the upper bound for dynamic scaling and the number of key groups + * for stateful operators. + */ + public void setMaxParallelism(Integer maxParallelism) { + this.maxParallelism = maxParallelism; + } + + public String getJobName() { + return jobName; + } + + /** + * Name for the Flink job. Useful for identification in the Flink UI and logs. + */ + public void setJobName(String jobName) { + this.jobName = jobName; + } + + public Long getCheckpointTimeout() { + return checkpointTimeout; + } + + /** + * Timeout in milliseconds for checkpoints. Checkpoints that take longer will be aborted. + */ + public void setCheckpointTimeout(Long checkpointTimeout) { + this.checkpointTimeout = checkpointTimeout; + } + + public Long getMinPauseBetweenCheckpoints() { + return minPauseBetweenCheckpoints; + } + + /** + * Minimum pause in milliseconds between consecutive checkpoints. Helps prevent checkpoint storms under heavy load. + */ + public void setMinPauseBetweenCheckpoints(Long minPauseBetweenCheckpoints) { + this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints; + } } diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java index 250ea1b3d95e..0230da4a92ae 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/VoidDataSetCallback.java @@ -18,6 +18,14 @@ package org.apache.camel.component.flink; import org.apache.flink.api.java.DataSet; +/** + * Void implementation of DataSetCallback for operations that don't return results. + * + * @deprecated The DataSet API is deprecated since Flink 1.12. Use the DataStream API with bounded streams instead. See + * the Flink migration guide for details on migrating from DataSet to DataStream API. This class will be + * maintained for backward compatibility but may be removed in future versions. + */ +@Deprecated(since = "4.16.0") public abstract class VoidDataSetCallback implements DataSetCallback<Void> { public abstract void doOnDataSet(DataSet ds, Object... payloads); diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java index bcd1cc1e6dc0..885427ff5a78 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/AnnotatedDataSetCallback.java @@ -32,7 +32,12 @@ import static org.apache.camel.util.ObjectHelper.findMethodsWithAnnotation; /** * Provides facade for working with annotated DataSet callbacks i.e. POJO classes with an appropriate annotations on * selected methods. + * + * @deprecated The DataSet API is deprecated since Flink 1.12. Use the DataStream API with bounded streams instead. See + * the Flink migration guide for details on migrating from DataSet to DataStream API. This class will be + * maintained for backward compatibility but may be removed in future versions. */ +@Deprecated(since = "4.16.0") public class AnnotatedDataSetCallback implements org.apache.camel.component.flink.DataSetCallback { private final Object objectWithCallback; diff --git a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java index 2f02ce20e6c6..ad177103fe16 100644 --- a/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java +++ b/components/camel-flink/src/main/java/org/apache/camel/component/flink/annotations/DataSetCallback.java @@ -22,6 +22,14 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +/** + * Annotation for marking methods as DataSet callbacks. + * + * @deprecated The DataSet API is deprecated since Flink 1.12. Use the DataStream API with bounded streams instead. See + * the Flink migration guide for details on migrating from DataSet to DataStream API. This annotation will + * be maintained for backward compatibility but may be removed in future versions. + */ +@Deprecated(since = "4.16.0") @Retention(RetentionPolicy.RUNTIME) @Target({ ElementType.METHOD, ElementType.PARAMETER }) @Inherited diff --git a/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamBatchProcessingIT.java b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamBatchProcessingIT.java new file mode 100644 index 000000000000..28c40445dec2 --- /dev/null +++ b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamBatchProcessingIT.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.flink; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.camel.BindToRegistry; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Integration tests for DataStream batch processing with various configurations. Tests end-to-end scenarios with actual + * data transformations. + */ +public class DataStreamBatchProcessingIT extends CamelTestSupport { + + // Create separate environments for isolation + StreamExecutionEnvironment batchEnv = Flinks.createStreamExecutionEnvironment(); + StreamExecutionEnvironment transformEnv = Flinks.createStreamExecutionEnvironment(); + + @BindToRegistry("numberStream") + private DataStreamSource<Integer> numberStream = batchEnv.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + + @BindToRegistry("textStream") + private DataStreamSource<String> textStream + = transformEnv.fromElements("apache", "camel", "flink", "integration", "test"); + + @BindToRegistry("multiplyCallback") + public DataStreamCallback multiplyCallback() { + return new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + int multiplier = (Integer) payloads[0]; + ds.map((MapFunction<Integer, Integer>) value -> value * multiplier) + .print(); + } + }; + } + + @Test + public void testBatchProcessingWithTransformation() { + // Verify that the callback executes without error and the transformation is set up + template.sendBodyAndHeader( + "direct:batchTransform", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + // Verify environment is configured with batch mode and parallelism=2 + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + Assertions.assertThat(env.getParallelism()).isEqualTo(2); + + // Set up transformation (won't execute in test context) + ds.map((MapFunction<Integer, Integer>) value -> value * 2).print(); + } + }); + } + + @Test + public void testBatchProcessingWithPayload() { + List<Integer> results = new ArrayList<>(); + + template.sendBodyAndHeader( + "direct:withPayload", + 3, // multiplier + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + Assertions.assertThat(payloads).hasSize(1); + int multiplier = (Integer) payloads[0]; + Assertions.assertThat(multiplier).isEqualTo(3); + + ds.map((MapFunction<Integer, Integer>) value -> value * multiplier) + .print(); + } + }); + } + + @Test + public void testBatchProcessingWithFilter() { + // Verify filter operation can be set up + template.sendBodyAndHeader( + "direct:batchFilter", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + // Verify environment configuration + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + Assertions.assertThat(env.getParallelism()).isEqualTo(1); + + // Set up filter (won't execute in test context) + ds.filter(value -> ((Integer) value) % 2 == 0).print(); + } + }); + } + + @Test + public void testStringProcessingWithBatchMode() { + // Verify string transformation can be set up + template.sendBodyAndHeader( + "direct:stringTransform", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + // Verify environment configuration + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + Assertions.assertThat(env.getParallelism()).isEqualTo(3); + + // Set up transformation + ds.map((MapFunction<String, String>) String::toUpperCase).print(); + } + }); + } + + @Test + public void testHighParallelismProcessing() { + // Verify high parallelism configuration + template.sendBodyAndHeader( + "direct:highParallelism", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + + // Verify high parallelism was set + Assertions.assertThat(env.getParallelism()).isEqualTo(16); + Assertions.assertThat(env.getMaxParallelism()).isEqualTo(256); + + // Set up transformation + ds.map((MapFunction<Integer, Integer>) value -> value * value).print(); + } + }); + } + + @Test + public void testCallbackFromRegistry() { + // Track that the callback was actually invoked + final boolean[] callbackInvoked = { false }; + + // Send body with multiplier and verify callback executes + template.sendBodyAndHeader( + "direct:registryCallback", + 5, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + // Verify the callback is invoked + callbackInvoked[0] = true; + + // Verify environment configuration + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + Assertions.assertThat(env.getParallelism()).isEqualTo(4); + + // Verify payload was passed correctly + Assertions.assertThat(payloads).hasSize(1); + Assertions.assertThat(payloads[0]).isEqualTo(5); + + // Set up the transformation (using the registry callback pattern) + ds.map((MapFunction<Integer, Integer>) value -> value * (Integer) payloads[0]).print(); + } + }); + + // Verify callback was executed + Assertions.assertThat(callbackInvoked[0]).isTrue(); + } + + @Test + public void testMultipleOperations() { + // Verify chained operations can be set up + template.sendBodyAndHeader( + "direct:multipleOps", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + // Verify environment configuration + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + Assertions.assertThat(env.getParallelism()).isEqualTo(4); + + // Set up chained operations + ds.filter(value -> ((Integer) value) > 3) + .map((MapFunction<Integer, Integer>) value -> value * 10) + .map((MapFunction<Integer, Integer>) value -> value + 5) + .print(); + } + }); + } + + @Test + public void testConfigurationPersistsAcrossInvocations() { + // First invocation + template.sendBodyAndHeader( + "direct:batchTransform", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + Assertions.assertThat(env.getParallelism()).isEqualTo(2); + } + }); + + // Second invocation - should have same configuration + template.sendBodyAndHeader( + "direct:batchTransform", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + Assertions.assertThat(env.getParallelism()).isEqualTo(2); + } + }); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:batchTransform") + .to("flink:datastream?dataStream=#numberStream" + + "&executionMode=BATCH" + + "¶llelism=2"); + + from("direct:withPayload") + .to("flink:datastream?dataStream=#numberStream" + + "&executionMode=BATCH"); + + from("direct:batchFilter") + .to("flink:datastream?dataStream=#numberStream" + + "&executionMode=BATCH" + + "¶llelism=1"); + + from("direct:stringTransform") + .to("flink:datastream?dataStream=#textStream" + + "&executionMode=BATCH" + + "¶llelism=3"); + + from("direct:highParallelism") + .to("flink:datastream?dataStream=#numberStream" + + "&executionMode=BATCH" + + "¶llelism=16" + + "&maxParallelism=256"); + + from("direct:registryCallback") + .to("flink:datastream?dataStream=#numberStream" + + "&dataStreamCallback=#multiplyCallback" + + "&executionMode=BATCH" + + "¶llelism=4"); + + from("direct:multipleOps") + .to("flink:datastream?dataStream=#numberStream" + + "&executionMode=BATCH" + + "¶llelism=4"); + } + }; + } +} diff --git a/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamConfigurationIT.java b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamConfigurationIT.java new file mode 100644 index 000000000000..cb25c080f640 --- /dev/null +++ b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamConfigurationIT.java @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.flink; + +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.camel.BindToRegistry; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.CheckpointConfig; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Integration tests for DataStream producer configuration options including execution mode, checkpointing, and + * parallelism settings. + */ +public class DataStreamConfigurationIT extends CamelTestSupport { + + @BindToRegistry("batchDataStream") + public DataStreamSource<String> createBatchDataStream() { + return Flinks.createStreamExecutionEnvironment().fromElements("test1", "test2", "test3"); + } + + @BindToRegistry("streamingDataStream") + public DataStreamSource<String> createStreamingDataStream() { + return Flinks.createStreamExecutionEnvironment().fromElements("stream1", "stream2"); + } + + @BindToRegistry("checkpointDataStream") + public DataStreamSource<String> createCheckpointDataStream() { + return Flinks.createStreamExecutionEnvironment().fromElements("data1", "data2"); + } + + @BindToRegistry("parallelismDataStream") + public DataStreamSource<String> createParallelismDataStream() { + return Flinks.createStreamExecutionEnvironment().fromElements("parallel1", "parallel2"); + } + + @BindToRegistry("fullConfigDataStream") + public DataStreamSource<String> createFullConfigDataStream() { + return Flinks.createStreamExecutionEnvironment().fromElements("config1", "config2"); + } + + @BindToRegistry("captureEnvCallback") + public DataStreamCallback captureEnvCallback() { + return new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + // Just capture the environment for testing + ds.print(); + } + }; + } + + @Test + public void testBatchExecutionModeConfiguration() { + AtomicReference<StreamExecutionEnvironment> envRef = new AtomicReference<>(); + + template.sendBodyAndHeader( + "direct:batchMode", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + envRef.set(env); + } + }); + + StreamExecutionEnvironment env = envRef.get(); + Assertions.assertThat(env).isNotNull(); + + // Verify BATCH mode was set + RuntimeExecutionMode mode = env.getConfiguration() + .get(org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE); + Assertions.assertThat(mode).isEqualTo(RuntimeExecutionMode.BATCH); + } + + @Test + public void testStreamingExecutionModeConfiguration() { + AtomicReference<StreamExecutionEnvironment> envRef = new AtomicReference<>(); + + template.sendBodyAndHeader( + "direct:streamingMode", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + envRef.set(env); + } + }); + + StreamExecutionEnvironment env = envRef.get(); + Assertions.assertThat(env).isNotNull(); + + // Verify STREAMING mode was set + RuntimeExecutionMode mode = env.getConfiguration() + .get(org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE); + Assertions.assertThat(mode).isEqualTo(RuntimeExecutionMode.STREAMING); + } + + @Test + public void testCheckpointingConfiguration() { + AtomicReference<CheckpointConfig> checkpointConfigRef = new AtomicReference<>(); + + template.sendBodyAndHeader( + "direct:checkpointing", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + checkpointConfigRef.set(env.getCheckpointConfig()); + } + }); + + CheckpointConfig config = checkpointConfigRef.get(); + Assertions.assertThat(config).isNotNull(); + + // Verify checkpoint interval + Assertions.assertThat(config.getCheckpointInterval()).isEqualTo(5000L); + + // Verify checkpointing mode + Assertions.assertThat(config.getCheckpointingMode()).isEqualTo(CheckpointingMode.EXACTLY_ONCE); + + // Verify checkpoint timeout + Assertions.assertThat(config.getCheckpointTimeout()).isEqualTo(30000L); + + // Verify min pause between checkpoints + Assertions.assertThat(config.getMinPauseBetweenCheckpoints()).isEqualTo(2000L); + } + + @Test + public void testParallelismConfiguration() { + AtomicReference<StreamExecutionEnvironment> envRef = new AtomicReference<>(); + + template.sendBodyAndHeader( + "direct:parallelism", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + envRef.set(env); + } + }); + + StreamExecutionEnvironment env = envRef.get(); + Assertions.assertThat(env).isNotNull(); + + // Verify parallelism settings + Assertions.assertThat(env.getParallelism()).isEqualTo(4); + Assertions.assertThat(env.getMaxParallelism()).isEqualTo(64); + } + + @Test + public void testFullConfiguration() { + AtomicReference<StreamExecutionEnvironment> envRef = new AtomicReference<>(); + AtomicReference<CheckpointConfig> checkpointConfigRef = new AtomicReference<>(); + + template.sendBodyAndHeader( + "direct:fullConfig", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + envRef.set(env); + checkpointConfigRef.set(env.getCheckpointConfig()); + } + }); + + StreamExecutionEnvironment env = envRef.get(); + CheckpointConfig checkpointConfig = checkpointConfigRef.get(); + + Assertions.assertThat(env).isNotNull(); + Assertions.assertThat(checkpointConfig).isNotNull(); + + // Verify execution mode + RuntimeExecutionMode mode = env.getConfiguration() + .get(org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE); + Assertions.assertThat(mode).isEqualTo(RuntimeExecutionMode.STREAMING); + + // Verify parallelism + Assertions.assertThat(env.getParallelism()).isEqualTo(8); + Assertions.assertThat(env.getMaxParallelism()).isEqualTo(128); + + // Verify checkpointing + Assertions.assertThat(checkpointConfig.getCheckpointInterval()).isEqualTo(10000L); + Assertions.assertThat(checkpointConfig.getCheckpointingMode()).isEqualTo(CheckpointingMode.AT_LEAST_ONCE); + Assertions.assertThat(checkpointConfig.getCheckpointTimeout()).isEqualTo(60000L); + Assertions.assertThat(checkpointConfig.getMinPauseBetweenCheckpoints()).isEqualTo(5000L); + } + + @Test + public void testInvalidExecutionModeHandling() { + // Should not throw exception, just log warning + template.sendBodyAndHeader( + "direct:invalidMode", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + // Should execute without error despite invalid mode + Assertions.assertThat(ds).isNotNull(); + } + }); + } + + @Test + public void testInvalidCheckpointingModeHandling() { + // Should not throw exception, just log warning + template.sendBodyAndHeader( + "direct:invalidCheckpointMode", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + // Should execute without error despite invalid checkpoint mode + Assertions.assertThat(ds).isNotNull(); + } + }); + } + + @Test + public void testConfigurationViaRouteParameters() { + AtomicReference<StreamExecutionEnvironment> envRef = new AtomicReference<>(); + + template.sendBodyAndHeader( + "direct:routeConfig", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + envRef.set(env); + } + }); + + StreamExecutionEnvironment env = envRef.get(); + Assertions.assertThat(env).isNotNull(); + + // Verify the configuration was applied via route parameters + Assertions.assertThat(env.getParallelism()).isEqualTo(2); + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:batchMode") + .to("flink:datastream?dataStream=#batchDataStream" + + "&executionMode=BATCH"); + + from("direct:streamingMode") + .to("flink:datastream?dataStream=#streamingDataStream" + + "&executionMode=STREAMING"); + + from("direct:checkpointing") + .to("flink:datastream?dataStream=#checkpointDataStream" + + "&checkpointInterval=5000" + + "&checkpointingMode=EXACTLY_ONCE" + + "&checkpointTimeout=30000" + + "&minPauseBetweenCheckpoints=2000"); + + from("direct:parallelism") + .to("flink:datastream?dataStream=#parallelismDataStream" + + "¶llelism=4" + + "&maxParallelism=64"); + + from("direct:fullConfig") + .to("flink:datastream?dataStream=#fullConfigDataStream" + + "&executionMode=STREAMING" + + "&checkpointInterval=10000" + + "&checkpointingMode=AT_LEAST_ONCE" + + "&checkpointTimeout=60000" + + "&minPauseBetweenCheckpoints=5000" + + "¶llelism=8" + + "&maxParallelism=128" + + "&jobName=FullConfigTest"); + + from("direct:invalidMode") + .to("flink:datastream?dataStream=#batchDataStream" + + "&executionMode=INVALID_MODE" + + "&dataStreamCallback=#captureEnvCallback"); + + from("direct:invalidCheckpointMode") + .to("flink:datastream?dataStream=#checkpointDataStream" + + "&checkpointInterval=5000" + + "&checkpointingMode=INVALID_CHECKPOINT_MODE" + + "&dataStreamCallback=#captureEnvCallback"); + + from("direct:routeConfig") + .to("flink:datastream?dataStream=#parallelismDataStream" + + "¶llelism=2"); + } + }; + } +} diff --git a/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamEdgeCasesIT.java b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamEdgeCasesIT.java new file mode 100644 index 000000000000..9bcef953c825 --- /dev/null +++ b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamEdgeCasesIT.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.flink; + +import org.apache.camel.BindToRegistry; +import org.apache.camel.CamelExecutionException; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +/** + * Integration tests for edge cases and error handling in DataStream configuration. + */ +public class DataStreamEdgeCasesIT extends CamelTestSupport { + + // Create separate environment for isolation + StreamExecutionEnvironment testEnv = Flinks.createStreamExecutionEnvironment(); + + @BindToRegistry("testDataStream") + private DataStreamSource<String> testDs = testEnv.fromElements("test1", "test2"); + + @Test + public void testMissingDataStreamThrowsException() { + // Should throw exception when no DataStream is defined + Assertions.assertThatThrownBy(() -> { + template.sendBodyAndHeader( + "direct:noDataStream", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + ds.print(); + } + }); + }).isInstanceOf(CamelExecutionException.class) + .cause() + .hasMessageContaining("No DataStream defined"); + } + + @Test + public void testMissingCallbackThrowsException() { + // Should throw exception when no callback is defined + Assertions.assertThatThrownBy(() -> { + template.sendBody("direct:noCallback", null); + }).isInstanceOf(CamelExecutionException.class) + .cause() + .hasMessageContaining("Cannot resolve DataStream callback"); + } + + @Test + public void testInvalidParallelismHandling() { + // Flink will reject parallelism of 0 or negative values during configuration + // This test verifies that the error is clear + Assertions.assertThatThrownBy(() -> { + StreamExecutionEnvironment invalidEnv = Flinks.createStreamExecutionEnvironment(); + invalidEnv.setParallelism(0); + }).isInstanceOf(IllegalArgumentException.class) + .hasMessageContaining("Parallelism must be at least one"); + } + + @Test + public void testCheckpointWithoutIntervalIgnored() { + template.sendBodyAndHeader( + "direct:checkpointNoInterval", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + // Checkpointing mode is set but interval is not, so checkpointing won't be enabled + Assertions.assertThat(env).isNotNull(); + } + }); + } + + @Test + public void testNullPayloadsHandling() { + template.sendBodyAndHeader( + "direct:withDataStream", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + // Null body results in single null payload + Assertions.assertThat(payloads).hasSize(1); + Assertions.assertThat(payloads[0]).isNull(); + } + }); + } + + @Test + public void testEmptyListPayload() { + template.sendBodyAndHeader( + "direct:withDataStream", + java.util.Collections.emptyList(), + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + // Empty list should result in empty array + Assertions.assertThat(payloads).isEmpty(); + } + }); + } + + @Test + public void testVeryHighParallelism() { + template.sendBodyAndHeader( + "direct:veryHighParallelism", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + // Very high parallelism should be accepted (Flink will validate) + Assertions.assertThat(env.getParallelism()).isEqualTo(1000); + } + }); + } + + @Test + public void testAutomaticExecutionMode() { + template.sendBodyAndHeader( + "direct:automaticMode", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + // AUTOMATIC mode should be accepted + Assertions.assertThat(ds).isNotNull(); + } + }); + } + + @Test + public void testVeryShortCheckpointInterval() { + template.sendBodyAndHeader( + "direct:shortCheckpoint", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + StreamExecutionEnvironment env = ds.getExecutionEnvironment(); + // Very short interval (100ms) should be configured, though not recommended + Assertions.assertThat(env.getCheckpointConfig().getCheckpointInterval()).isEqualTo(100L); + } + }); + } + + @Test + public void testCallbackExceptionPropagation() { + Assertions.assertThatThrownBy(() -> { + template.sendBodyAndHeader( + "direct:withDataStream", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + throw new RuntimeException("Test exception from callback"); + } + }); + }).isInstanceOf(CamelExecutionException.class) + .cause() + .hasMessageContaining("Test exception from callback"); + } + + @Test + public void testHeaderOverridesEndpointCallback() { + // When both endpoint callback and header callback are present, header should win + boolean[] headerCallbackCalled = { false }; + + template.sendBodyAndHeader( + "direct:withEndpointCallback", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + headerCallbackCalled[0] = true; + } + }); + + Assertions.assertThat(headerCallbackCalled[0]).isTrue(); + } + + @Test + public void testConfigurationWithAllNullOptionals() { + // All optional configuration parameters are null - should use defaults + template.sendBodyAndHeader( + "direct:minimalConfig", + null, + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + // Should work with defaults + Assertions.assertThat(ds).isNotNull(); + } + }); + } + + @BindToRegistry("endpointCallback") + public DataStreamCallback endpointCallback() { + return new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + throw new AssertionError("Endpoint callback should not be called when header is present"); + } + }; + } + + @Override + protected RouteBuilder createRouteBuilder() { + return new RouteBuilder() { + @Override + public void configure() { + from("direct:noDataStream") + .to("flink:datastream"); + + from("direct:noCallback") + .to("flink:datastream?dataStream=#testDataStream"); + + from("direct:checkpointNoInterval") + .to("flink:datastream?dataStream=#testDataStream" + + "&checkpointingMode=EXACTLY_ONCE"); + + from("direct:withDataStream") + .to("flink:datastream?dataStream=#testDataStream"); + + from("direct:veryHighParallelism") + .to("flink:datastream?dataStream=#testDataStream" + + "¶llelism=1000" + + "&maxParallelism=2000"); + + from("direct:automaticMode") + .to("flink:datastream?dataStream=#testDataStream" + + "&executionMode=AUTOMATIC"); + + from("direct:shortCheckpoint") + .to("flink:datastream?dataStream=#testDataStream" + + "&checkpointInterval=100"); + + from("direct:withEndpointCallback") + .to("flink:datastream?dataStream=#testDataStream" + + "&dataStreamCallback=#endpointCallback"); + + from("direct:minimalConfig") + .to("flink:datastream?dataStream=#testDataStream"); + } + }; + } +} diff --git a/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamProducerTest.java b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamProducerTest.java new file mode 100644 index 000000000000..f47230c46bc9 --- /dev/null +++ b/components/camel-flink/src/test/java/org/apache/camel/component/flink/DataStreamProducerTest.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.flink; + +import java.util.Arrays; +import java.util.List; + +import org.apache.camel.BindToRegistry; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.streaming.api.CheckpointingMode; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +public class DataStreamProducerTest extends CamelTestSupport { + + static StreamExecutionEnvironment streamExecutionEnvironment = Flinks.createStreamExecutionEnvironment(); + + String flinkDataStreamUri = "flink:dataStream?dataStream=#myDataStream"; + + @BindToRegistry("myDataStream") + private DataStreamSource<String> dss = streamExecutionEnvironment.readTextFile("src/test/resources/testds.txt"); + + @Test + public void shouldExecuteDataStreamCallback() { + template.sendBodyAndHeader(flinkDataStreamUri, null, FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + // Just verify the callback is executed + ds.print(); + } + }); + } + + @Test + public void shouldExecuteDataStreamCallbackWithPayload() { + template.sendBodyAndHeader(flinkDataStreamUri, "test-payload", + FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + Assertions.assertThat(payloads).hasSize(1); + Assertions.assertThat(payloads[0]).isEqualTo("test-payload"); + } + }); + } + + @Test + public void shouldExecuteDataStreamCallbackWithMultiplePayloads() { + List<String> payloads = Arrays.asList("payload1", "payload2", "payload3"); + template.sendBodyAndHeader(flinkDataStreamUri, payloads, FlinkConstants.FLINK_DATASTREAM_CALLBACK_HEADER, + new VoidDataStreamCallback() { + @Override + public void doOnDataStream(DataStream ds, Object... payloads) throws Exception { + Assertions.assertThat(payloads).hasSize(3); + Assertions.assertThat(payloads[0]).isEqualTo("payload1"); + Assertions.assertThat(payloads[1]).isEqualTo("payload2"); + Assertions.assertThat(payloads[2]).isEqualTo("payload3"); + } + }); + } + + @Test + public void shouldConfigureExecutionMode() { + StreamExecutionEnvironment env = streamExecutionEnvironment; + env.setRuntimeMode(RuntimeExecutionMode.BATCH); + + Assertions.assertThat(env.getConfiguration().get( + org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE)) + .isEqualTo(RuntimeExecutionMode.BATCH); + } + + @Test + public void shouldConfigureCheckpointing() { + StreamExecutionEnvironment env = Flinks.createStreamExecutionEnvironment(); + env.enableCheckpointing(5000); + env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); + + Assertions.assertThat(env.getCheckpointConfig().getCheckpointInterval()).isEqualTo(5000); + Assertions.assertThat(env.getCheckpointConfig().getCheckpointingMode()) + .isEqualTo(CheckpointingMode.EXACTLY_ONCE); + } + + @Test + public void shouldConfigureParallelism() { + StreamExecutionEnvironment env = Flinks.createStreamExecutionEnvironment(); + env.setParallelism(4); + + Assertions.assertThat(env.getParallelism()).isEqualTo(4); + } + + @Test + public void shouldConfigureMaxParallelism() { + StreamExecutionEnvironment env = Flinks.createStreamExecutionEnvironment(); + env.setMaxParallelism(128); + + Assertions.assertThat(env.getMaxParallelism()).isEqualTo(128); + } +} diff --git a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_16.adoc b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_16.adoc index 20047a339852..59049c6546d6 100644 --- a/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_16.adoc +++ b/docs/user-manual/modules/ROOT/pages/camel-4x-upgrade-guide-4_16.adoc @@ -94,4 +94,151 @@ item.setDataValueListener(dataValue -> processValue(dataValue)); item.setDataValueListener((monitoredItem, dataValue) -> processValue(dataValue)); ---- -NOTE: Most users will not be affected by these changes as they primarily affect advanced use cases where you directly interact with the Milo API. Standard camel-milo endpoint configurations remain unchanged. \ No newline at end of file +=== camel-flink + +Apache Flink deprecated the DataSet API in version 1.12 in favor of a unified DataStream API that handles both +streaming and batch processing. The DataStream API with bounded streams provides all the functionality of the +DataSet API and more, with better performance and a unified programming model. + +==== Key Differences + +[cols="1,1,1", options="header"] +|=== +|Aspect |DataSet API |DataStream API (Batch Mode) + +|Execution +|Immediate (lazy evaluation) +|Event-driven (requires explicit execution) + +|Data Type +|Bounded datasets +|Bounded or unbounded streams + +|Time Semantics +|Not applicable +|Event time, processing time, ingestion time + +|State Management +|Limited +|Full support for keyed and operator state + +|Windowing +|Not applicable +|Full windowing support +|=== + +==== Migration Guide + +===== Update Endpoint Type + +Replace `flink:dataset` with `flink:datastream`: + +.Before +[source,java] +----------------------------------- +from("direct:start") + .to("flink:dataset?dataSet=#myDataSet&dataSetCallback=#myCallback"); +----------------------------------- + +.After +[source,java] +----------------------------------- +from("direct:start") + .to("flink:datastream?dataStream=#myDataStream&dataStreamCallback=#myCallback"); +----------------------------------- + +===== Configure Batch Execution Mode + +For batch processing with DataStream API, configure the execution environment for batch mode: + +[source,java] +----------------------------------- +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +// Set to batch mode for bounded streams +env.setRuntimeMode(RuntimeExecutionMode.BATCH); +----------------------------------- + +===== Update Data Sources + +.Before (DataSet API) +[source,java] +----------------------------------- +ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); +DataSet<String> dataSet = env.readTextFile("input.txt"); +----------------------------------- + +.After (DataStream API) +[source,java] +----------------------------------- +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +env.setRuntimeMode(RuntimeExecutionMode.BATCH); +DataStream<String> dataStream = env.readTextFile("input.txt"); +----------------------------------- + +===== Update Transformations + +Most transformation operations have direct equivalents: + +[cols="1,1", options="header"] +|=== +|DataSet API |DataStream API + +|`map()` +|`map()` + +|`flatMap()` +|`flatMap()` + +|`filter()` +|`filter()` + +|`reduce()` +|`reduce()` or `keyBy().reduce()` + +|`groupBy()` +|`keyBy()` + +|`join()` +|`join()` (with windowing) + +|`coGroup()` +|`coGroup()` (with windowing) +|=== + +===== Update Callbacks + +Replace `DataSetCallback` with `DataStreamCallback`: + +.Before (DataSet) +[source,java] +----------------------------------- +@Bean +public DataSetCallback<Long> dataSetCallback() { + return new DataSetCallback<Long>() { + public Long onDataSet(DataSet dataSet, Object... objects) { + try { + return dataSet.count(); + } catch (Exception e) { + return -1L; + } + } + }; +} +----------------------------------- + +.After (DataStream) +[source,java] +----------------------------------- +@Bean +public DataStreamCallback dataStreamCallback() { + return new DataStreamCallback() { + public Object onDataStream(DataStream dataStream, Object... objects) { + // For batch mode, ensure runtime mode is set + dataStream.print(); + return null; + } + }; +} +----------------------------------- + +NOTE: Most users will not be affected by these changes as they primarily affect advanced use cases where you directly interact with the Milo API. Standard camel-milo endpoint configurations remain unchanged.
