This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch aggr-timeout in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit e048e1ed811b0e09721c1512f4f4a5ce334df6a4 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Thu Jun 25 18:17:01 2020 +0200 Added Aggregation timeout --- .../kafkaconnector/CamelSinkConnectorConfig.java | 9 ++++-- .../apache/camel/kafkaconnector/CamelSinkTask.java | 3 +- .../camel/kafkaconnector/CamelSourceTask.java | 2 +- .../kafkaconnector/utils/CamelMainSupport.java | 8 ++--- .../camel/kafkaconnector/CamelSinkTaskTest.java | 36 ++++++++++++++++++++++ .../camel/kafkaconnector/DataFormatTest.java | 6 ++-- 6 files changed, 53 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java index a16f7de..b6a654e 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java @@ -53,7 +53,11 @@ public class CamelSinkConnectorConfig extends AbstractConfig { public static final Integer CAMEL_SINK_AGGREGATE_SIZE_DEFAULT = 10; public static final String CAMEL_SINK_AGGREGATE_SIZE_CONF = "camel.beans.aggregation.size"; - public static final String CAMEL_SINK_AGGREGATE_SIZE_DOC = "The size of the aggregation, to be used in combination with camel.beans.aggregate"; + public static final String CAMEL_SINK_AGGREGATE_SIZE_DOC = "The size of the aggregation, to be used in combination with camel.beans.aggregate"; + + public static final Long CAMEL_SINK_AGGREGATE_TIMEOUT_DEFAULT = 500L; + public static final String CAMEL_SINK_AGGREGATE_TIMEOUT_CONF = "camel.beans.aggregation.timeout"; + public static final String CAMEL_SINK_AGGREGATE_TIMEOUT_DOC = "The timeout of the aggregation, to be used in combination with camel.beans.aggregate"; private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC) @@ -61,7 +65,8 @@ public class CamelSinkConnectorConfig extends AbstractConfig { .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC) .define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SINK_CONTENT_LOG_LEVEL_DOC) .define(CAMEL_SINK_AGGREGATE_CONF, Type.STRING, CAMEL_SINK_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_DOC) - .define(CAMEL_SINK_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_SINK_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_SIZE_DOC); + .define(CAMEL_SINK_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_SINK_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_SIZE_DOC) + .define(CAMEL_SINK_AGGREGATE_TIMEOUT_CONF, Type.LONG, CAMEL_SINK_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_TIMEOUT_DOC); public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 41ad9a2..7e4c2f0 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -71,6 +71,7 @@ public class CamelSinkTask extends SinkTask { String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF); final String marshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF); final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF); + final long timeout = config.getLong(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_TIMEOUT_CONF); CamelContext camelContext = new DefaultCamelContext(); if (remoteUrl == null) { @@ -81,7 +82,7 @@ public class CamelSinkTask extends SinkTask { CAMEL_SINK_PATH_PROPERTIES_PREFIX); } - cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, marshaller, null, size, camelContext); + cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, marshaller, null, size, timeout, camelContext); producer = cms.createProducerTemplate(); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index 9779874..395f700 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -91,7 +91,7 @@ public class CamelSourceTask extends SourceTask { CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SOURCE_PATH_PROPERTIES_PREFIX); } - cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, unmarshaller, 10, camelContext); + cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, unmarshaller, 10, 500, camelContext); Endpoint endpoint = cms.getEndpoint(localUrl); consumer = endpoint.createPollingConsumer(); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java index ee0b6b6..3db1ad5 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java @@ -54,11 +54,11 @@ public class CamelMainSupport { private final ExecutorService exService = Executors.newSingleThreadExecutor(); private final CountDownLatch startFinishedSignal = new CountDownLatch(1); - public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, String marshal, String unmarshal, int aggregationSize) throws Exception { - this(props, fromUrl, toUrl, marshal, unmarshal, aggregationSize, new DefaultCamelContext()); + public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, String marshal, String unmarshal, int aggregationSize, long aggregationTimeout) throws Exception { + this(props, fromUrl, toUrl, marshal, unmarshal, aggregationSize, aggregationTimeout, new DefaultCamelContext()); } - public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, String marshal, String unmarshal, int aggregationSize, CamelContext camelContext) throws Exception { + public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, String marshal, String unmarshal, int aggregationSize, long aggregationTimeout, CamelContext camelContext) throws Exception { camel = camelContext; camelMain = new Main() { @Override @@ -109,7 +109,7 @@ public class CamelMainSupport { } if (camel.getRegistry().lookupByName("aggregate") != null) { AggregationStrategy s = (AggregationStrategy) camel.getRegistry().lookupByName("aggregate"); - rd.aggregate(s).constant(true).completionSize(aggregationSize).to(toUrl); + rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(toUrl); } else { rd.toD(toUrl); } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index 8cb21a1..504e0f8 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -589,5 +589,41 @@ public class CamelSinkTaskTest { sinkTask.stop(); } + + @Test + public void testAggregationBodyAndTimeout() throws InterruptedException { + Map<String, String> props = new HashMap<>(); + props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator"); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF, "5"); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_TIMEOUT_CONF, "100"); + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); + + List<SinkRecord> records = new ArrayList<SinkRecord>(); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42); + SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel2", 42); + SinkRecord record3 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel3", 42); + SinkRecord record4 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel4", 42); + SinkRecord record5 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel5", 42); + records.add(record); + records.add(record1); + records.add(record2); + records.add(record3); + records.add(record4); + records.add(record5); + sinkTask.put(records); + + ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + assertEquals("camel camel1 camel2 camel3 camel4", exchange.getMessage().getBody()); + assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) + .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); + + sinkTask.stop(); + } } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java index fb76939..eeb5823 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java @@ -74,7 +74,7 @@ public class DataFormatTest { assertThrows(UnsupportedOperationException.class, () -> new CamelMainSupport(props, "direct://start", - "log://test", "syslog", "syslog", 10)); + "log://test", "syslog", "syslog", 10, 500)); } @Test @@ -85,7 +85,7 @@ public class DataFormatTest { props.put("camel.source.marshal", "hl7"); DefaultCamelContext dcc = new DefaultCamelContext(); - CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", null, "hl7", 10, dcc); + CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", null, "hl7", 10, 500, dcc); HL7DataFormat hl7df = new HL7DataFormat(); hl7df.setValidate(false); @@ -106,7 +106,7 @@ public class DataFormatTest { props.put("camel.dataformat.hl7.validate", "false"); DefaultCamelContext dcc = new DefaultCamelContext(); - CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", null, "hl7", 10, dcc); + CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", null, "hl7", 10, 500, dcc); cms.start(); HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);