This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch aggr-timeout
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit e048e1ed811b0e09721c1512f4f4a5ce334df6a4
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Thu Jun 25 18:17:01 2020 +0200

    Added Aggregation timeout
---
 .../kafkaconnector/CamelSinkConnectorConfig.java   |  9 ++++--
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  3 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |  2 +-
 .../kafkaconnector/utils/CamelMainSupport.java     |  8 ++---
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 36 ++++++++++++++++++++++
 .../camel/kafkaconnector/DataFormatTest.java       |  6 ++--
 6 files changed, 53 insertions(+), 11 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index a16f7de..b6a654e 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -53,7 +53,11 @@ public class CamelSinkConnectorConfig extends AbstractConfig 
{
 
     public static final Integer CAMEL_SINK_AGGREGATE_SIZE_DEFAULT = 10;
     public static final String CAMEL_SINK_AGGREGATE_SIZE_CONF = 
"camel.beans.aggregation.size";
-    public static final String CAMEL_SINK_AGGREGATE_SIZE_DOC = "The size of 
the aggregation, to be used in combination with camel.beans.aggregate";       
+    public static final String CAMEL_SINK_AGGREGATE_SIZE_DOC = "The size of 
the aggregation, to be used in combination with camel.beans.aggregate";
+    
+    public static final Long CAMEL_SINK_AGGREGATE_TIMEOUT_DEFAULT = 500L;
+    public static final String CAMEL_SINK_AGGREGATE_TIMEOUT_CONF = 
"camel.beans.aggregation.timeout";
+    public static final String CAMEL_SINK_AGGREGATE_TIMEOUT_DOC = "The timeout 
of the aggregation, to be used in combination with camel.beans.aggregate";    
 
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, 
Importance.HIGH, CAMEL_SINK_URL_DOC)
@@ -61,7 +65,8 @@ public class CamelSinkConnectorConfig extends AbstractConfig {
         .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, 
CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC)
         .define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, 
CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, 
CAMEL_SINK_CONTENT_LOG_LEVEL_DOC)
         .define(CAMEL_SINK_AGGREGATE_CONF, Type.STRING, 
CAMEL_SINK_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_DOC)
-        .define(CAMEL_SINK_AGGREGATE_SIZE_CONF, Type.INT, 
CAMEL_SINK_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, 
CAMEL_SINK_AGGREGATE_SIZE_DOC);
+        .define(CAMEL_SINK_AGGREGATE_SIZE_CONF, Type.INT, 
CAMEL_SINK_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, 
CAMEL_SINK_AGGREGATE_SIZE_DOC)
+        .define(CAMEL_SINK_AGGREGATE_TIMEOUT_CONF, Type.LONG, 
CAMEL_SINK_AGGREGATE_TIMEOUT_DEFAULT, Importance.MEDIUM, 
CAMEL_SINK_AGGREGATE_TIMEOUT_DOC);
 
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> 
parsedConfig) {
         super(config, parsedConfig);
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 41ad9a2..7e4c2f0 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -71,6 +71,7 @@ public class CamelSinkTask extends SinkTask {
             String remoteUrl = 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
             final String marshaller = 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
             final int size = 
config.getInt(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF);
+            final long timeout = 
config.getLong(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_TIMEOUT_CONF);
 
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
@@ -81,7 +82,7 @@ public class CamelSinkTask extends SinkTask {
                                                 
CAMEL_SINK_PATH_PROPERTIES_PREFIX);
             }
 
-            cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, 
marshaller, null, size, camelContext);
+            cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, 
marshaller, null, size, timeout, camelContext);
 
             producer = cms.createProducerTemplate();
 
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index 9779874..395f700 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -91,7 +91,7 @@ public class CamelSourceTask extends SourceTask {
                         CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, 
CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
             }
 
-            cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, 
unmarshaller, 10, camelContext);
+            cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, 
unmarshaller, 10, 500, camelContext);
 
             Endpoint endpoint = cms.getEndpoint(localUrl);
             consumer = endpoint.createPollingConsumer();
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
index ee0b6b6..3db1ad5 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
@@ -54,11 +54,11 @@ public class CamelMainSupport {
     private final ExecutorService exService = 
Executors.newSingleThreadExecutor();
     private final CountDownLatch startFinishedSignal = new CountDownLatch(1);
 
-    public CamelMainSupport(Map<String, String> props, String fromUrl, String 
toUrl, String marshal, String unmarshal, int aggregationSize) throws Exception {
-        this(props, fromUrl, toUrl, marshal, unmarshal, aggregationSize, new 
DefaultCamelContext());
+    public CamelMainSupport(Map<String, String> props, String fromUrl, String 
toUrl, String marshal, String unmarshal, int aggregationSize, long 
aggregationTimeout) throws Exception {
+        this(props, fromUrl, toUrl, marshal, unmarshal, aggregationSize, 
aggregationTimeout, new DefaultCamelContext());
     }
 
-    public CamelMainSupport(Map<String, String> props, String fromUrl, String 
toUrl, String marshal, String unmarshal, int aggregationSize, CamelContext 
camelContext) throws Exception {
+    public CamelMainSupport(Map<String, String> props, String fromUrl, String 
toUrl, String marshal, String unmarshal, int aggregationSize, long 
aggregationTimeout, CamelContext camelContext) throws Exception {
         camel = camelContext;
         camelMain = new Main() {
             @Override
@@ -109,7 +109,7 @@ public class CamelMainSupport {
                 }
                 if (camel.getRegistry().lookupByName("aggregate") != null) {
                     AggregationStrategy s = (AggregationStrategy) 
camel.getRegistry().lookupByName("aggregate");
-                    
rd.aggregate(s).constant(true).completionSize(aggregationSize).to(toUrl);
+                    
rd.aggregate(s).constant(true).completionSize(aggregationSize).completionTimeout(aggregationTimeout).toD(toUrl);
                 } else {
                     rd.toD(toUrl);
                 }
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 8cb21a1..504e0f8 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -589,5 +589,41 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
+    
+    @Test
+    public void testAggregationBodyAndTimeout() throws InterruptedException {
+        Map<String, String> props = new HashMap<>();
+        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_CONF, 
"#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF, 
"5");
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_TIMEOUT_CONF, 
"100");
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel", 42);
+        SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel1", 42);
+        SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel2", 42);
+        SinkRecord record3 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel3", 42);
+        SinkRecord record4 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel4", 42);
+        SinkRecord record5 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel5", 42);
+        records.add(record);
+        records.add(record1);
+        records.add(record2);
+        records.add(record3);
+        records.add(record4);
+        records.add(record5);
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel camel1 camel2 camel3 camel4", 
exchange.getMessage().getBody());
+        assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), 
sinkTask.getCamelSinkConnectorConfig(props)
+            
.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+        sinkTask.stop();
+    }
 
 }
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
index fb76939..eeb5823 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
@@ -74,7 +74,7 @@ public class DataFormatTest {
 
 
         assertThrows(UnsupportedOperationException.class, () -> new 
CamelMainSupport(props, "direct://start",
-                "log://test", "syslog", "syslog", 10));
+                "log://test", "syslog", "syslog", 10, 500));
     }
 
     @Test
@@ -85,7 +85,7 @@ public class DataFormatTest {
         props.put("camel.source.marshal", "hl7");
 
         DefaultCamelContext dcc = new DefaultCamelContext();
-        CamelMainSupport cms = new CamelMainSupport(props, "direct://start", 
"log://test", null, "hl7", 10, dcc);
+        CamelMainSupport cms = new CamelMainSupport(props, "direct://start", 
"log://test", null, "hl7", 10, 500, dcc);
 
         HL7DataFormat hl7df = new HL7DataFormat();
         hl7df.setValidate(false);
@@ -106,7 +106,7 @@ public class DataFormatTest {
         props.put("camel.dataformat.hl7.validate", "false");
 
         DefaultCamelContext dcc = new DefaultCamelContext();
-        CamelMainSupport cms = new CamelMainSupport(props, "direct://start", 
"log://test", null, "hl7", 10, dcc);
+        CamelMainSupport cms = new CamelMainSupport(props, "direct://start", 
"log://test", null, "hl7", 10, 500, dcc);
 
         cms.start();
         HL7DataFormat hl7dfLoaded = 
dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);

Reply via email to