This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch aggregate-attempt
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 1ed3a7904cce156640eaad861d47e1d4e63d93f3
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Mon Jun 22 17:10:02 2020 +0200

    Added a first spike of aggregate support
---
 .../kafkaconnector/CamelSinkConnectorConfig.java   | 12 +++++-
 .../apache/camel/kafkaconnector/CamelSinkTask.java |  3 +-
 .../camel/kafkaconnector/CamelSourceTask.java      |  2 +-
 .../kafkaconnector/utils/CamelMainSupport.java     | 13 +++++--
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 35 +++++++++++++++++
 .../camel/kafkaconnector/DataFormatTest.java       |  6 +--
 .../kafkaconnector/utils/SampleAggregator.java     | 44 ++++++++++++++++++++++
 7 files changed, 106 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
index 666661b..a16f7de 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java
@@ -47,11 +47,21 @@ public class CamelSinkConnectorConfig extends 
AbstractConfig {
     public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_CONF = 
"camel.sink.contentLogLevel";
     public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_DOC = "Log level 
for the record's content (default: " + CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT + 
"). Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF.";
 
+    public static final String CAMEL_SINK_AGGREGATE_DEFAULT = null;
+    public static final String CAMEL_SINK_AGGREGATE_CONF = 
"camel.beans.aggregate";
+    public static final String CAMEL_SINK_AGGREGATE_DOC = "A reference to an 
aggregate bean, in the form of #class:";    
+
+    public static final Integer CAMEL_SINK_AGGREGATE_SIZE_DEFAULT = 10;
+    public static final String CAMEL_SINK_AGGREGATE_SIZE_CONF = 
"camel.beans.aggregation.size";
+    public static final String CAMEL_SINK_AGGREGATE_SIZE_DOC = "The size of 
the aggregation, to be used in combination with camel.beans.aggregate";       
+
     private static final ConfigDef CONFIG_DEF = new ConfigDef()
         .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, 
Importance.HIGH, CAMEL_SINK_URL_DOC)
         .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, 
CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC)
         .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, 
CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC)
-        .define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, 
CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, 
CAMEL_SINK_CONTENT_LOG_LEVEL_DOC);
+        .define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, 
CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, 
CAMEL_SINK_CONTENT_LOG_LEVEL_DOC)
+        .define(CAMEL_SINK_AGGREGATE_CONF, Type.STRING, 
CAMEL_SINK_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_DOC)
+        .define(CAMEL_SINK_AGGREGATE_SIZE_CONF, Type.INT, 
CAMEL_SINK_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, 
CAMEL_SINK_AGGREGATE_SIZE_DOC);
 
     public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> 
parsedConfig) {
         super(config, parsedConfig);
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 422d3ed..41ad9a2 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -70,6 +70,7 @@ public class CamelSinkTask extends SinkTask {
 
             String remoteUrl = 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF);
             final String marshaller = 
config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF);
+            final int size = 
config.getInt(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF);
 
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
@@ -80,7 +81,7 @@ public class CamelSinkTask extends SinkTask {
                                                 
CAMEL_SINK_PATH_PROPERTIES_PREFIX);
             }
 
-            cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, 
marshaller, null, camelContext);
+            cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, 
marshaller, null, size, camelContext);
 
             producer = cms.createProducerTemplate();
 
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index c8333ed..9779874 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -91,7 +91,7 @@ public class CamelSourceTask extends SourceTask {
                         CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, 
CAMEL_SOURCE_PATH_PROPERTIES_PREFIX);
             }
 
-            cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, 
unmarshaller, camelContext);
+            cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, 
unmarshaller, 10, camelContext);
 
             Endpoint endpoint = cms.getEndpoint(localUrl);
             consumer = endpoint.createPollingConsumer();
diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
index 66f46fb..b695b1f 100644
--- 
a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
+++ 
b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java
@@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
+import org.apache.camel.AggregationStrategy;
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelContextAware;
 import org.apache.camel.ConsumerTemplate;
@@ -53,11 +54,11 @@ public class CamelMainSupport {
     private final ExecutorService exService = 
Executors.newSingleThreadExecutor();
     private final CountDownLatch startFinishedSignal = new CountDownLatch(1);
 
-    public CamelMainSupport(Map<String, String> props, String fromUrl, String 
toUrl, String marshal, String unmarshal) throws Exception {
-        this(props, fromUrl, toUrl, marshal, unmarshal, new 
DefaultCamelContext());
+    public CamelMainSupport(Map<String, String> props, String fromUrl, String 
toUrl, String marshal, String unmarshal, int aggregationSize) throws Exception {
+        this(props, fromUrl, toUrl, marshal, unmarshal, aggregationSize, new 
DefaultCamelContext());
     }
 
-    public CamelMainSupport(Map<String, String> props, String fromUrl, String 
toUrl, String marshal, String unmarshal, CamelContext camelContext) throws 
Exception {
+    public CamelMainSupport(Map<String, String> props, String fromUrl, String 
toUrl, String marshal, String unmarshal, int aggregationSize, CamelContext 
camelContext) throws Exception {
         camel = camelContext;
         camelMain = new Main() {
             @Override
@@ -88,6 +89,7 @@ public class CamelMainSupport {
         LOG.info("Setting initial properties in Camel context: [{}]", 
camelProperties);
         
this.camel.getPropertiesComponent().setInitialProperties(camelProperties);
 
+        camelMain.init();
         //creating the actual route
         this.camel.addRoutes(new RouteBuilder() {
             public void configure() {
@@ -105,7 +107,12 @@ public class CamelMainSupport {
                 } else {
                     LOG.info("Creating Camel route from({}).to({})", fromUrl, 
toUrl);
                 }
+                if (camel.getRegistry().lookupByName("aggregate") != null) {
+                       AggregationStrategy s = (AggregationStrategy) 
camel.getRegistry().lookupByName("aggregate");
+                       
rd.aggregate(s).constant(true).completionSize(aggregationSize).to(toUrl);
+                } else {
                 rd.to(toUrl);
+                }
             }
         });
     }
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index 91d4ae5..8cb21a1 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -554,5 +554,40 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
+    
+    @Test
+    public void testAggregationBody() {
+        Map<String, String> props = new HashMap<>();
+        props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_CONF, 
"#class:org.apache.camel.kafkaconnector.utils.SampleAggregator");
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF, 
"5");
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel", 42);
+        SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel1", 42);
+        SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel2", 42);
+        SinkRecord record3 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel3", 42);
+        SinkRecord record4 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel4", 42);
+        SinkRecord record5 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, 
"camel5", 42);
+        records.add(record);
+        records.add(record1);
+        records.add(record2);
+        records.add(record3);
+        records.add(record4);
+        records.add(record5);
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+        assertEquals("camel camel1 camel2 camel3 camel4", 
exchange.getMessage().getBody());
+        assertEquals("test", 
exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
+        assertEquals(LoggingLevel.OFF.toString(), 
sinkTask.getCamelSinkConnectorConfig(props)
+            
.getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+        sinkTask.stop();
+    }
 
 }
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java 
b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
index 911f93d..9f18e28 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java
@@ -73,7 +73,7 @@ public class DataFormatTest {
 
 
         assertThrows(UnsupportedOperationException.class, () -> new 
CamelMainSupport(props, "direct://start",
-                "log://test", "syslog", "syslog"));
+                "log://test", "syslog", "syslog", 10));
     }
 
     @Test
@@ -84,7 +84,7 @@ public class DataFormatTest {
         props.put("camel.source.marshal", "hl7");
 
         DefaultCamelContext dcc = new DefaultCamelContext();
-        CamelMainSupport cms = new CamelMainSupport(props, "direct://start", 
"log://test", null, "hl7", dcc);
+        CamelMainSupport cms = new CamelMainSupport(props, "direct://start", 
"log://test", null, "hl7", 10, dcc);
 
         HL7DataFormat hl7df = new HL7DataFormat();
         hl7df.setValidate(false);
@@ -105,7 +105,7 @@ public class DataFormatTest {
         props.put("camel.dataformat.hl7.validate", "false");
 
         DefaultCamelContext dcc = new DefaultCamelContext();
-        CamelMainSupport cms = new CamelMainSupport(props, "direct://start", 
"log://test", null, "hl7", dcc);
+        CamelMainSupport cms = new CamelMainSupport(props, "direct://start", 
"log://test", null, "hl7", 10, dcc);
 
         cms.start();
         HL7DataFormat hl7dfLoaded = 
dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class);
diff --git 
a/core/src/test/java/org/apache/camel/kafkaconnector/utils/SampleAggregator.java
 
b/core/src/test/java/org/apache/camel/kafkaconnector/utils/SampleAggregator.java
new file mode 100644
index 0000000..4b89148
--- /dev/null
+++ 
b/core/src/test/java/org/apache/camel/kafkaconnector/utils/SampleAggregator.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.kafkaconnector.utils;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+
+public class SampleAggregator implements AggregationStrategy {
+
+    @Override
+    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        // lets append the old body to the new body
+        if (oldExchange == null) {
+            return newExchange;
+        }
+
+        String body = oldExchange.getIn().getBody(String.class);
+        if (body != null) {
+            Message newIn = newExchange.getIn();
+            String newBody = newIn.getBody(String.class);
+            if (newBody != null) {
+                body += " " + newBody;
+            }
+
+            newIn.setBody(body);
+        }
+        return newExchange;
+    }
+}

Reply via email to