This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch aggregate-attempt in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 1ed3a7904cce156640eaad861d47e1d4e63d93f3 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Mon Jun 22 17:10:02 2020 +0200 Added a first spike of aggregate support --- .../kafkaconnector/CamelSinkConnectorConfig.java | 12 +++++- .../apache/camel/kafkaconnector/CamelSinkTask.java | 3 +- .../camel/kafkaconnector/CamelSourceTask.java | 2 +- .../kafkaconnector/utils/CamelMainSupport.java | 13 +++++-- .../camel/kafkaconnector/CamelSinkTaskTest.java | 35 +++++++++++++++++ .../camel/kafkaconnector/DataFormatTest.java | 6 +-- .../kafkaconnector/utils/SampleAggregator.java | 44 ++++++++++++++++++++++ 7 files changed, 106 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java index 666661b..a16f7de 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkConnectorConfig.java @@ -47,11 +47,21 @@ public class CamelSinkConnectorConfig extends AbstractConfig { public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_CONF = "camel.sink.contentLogLevel"; public static final String CAMEL_SINK_CONTENT_LOG_LEVEL_DOC = "Log level for the record's content (default: " + CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT + "). Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF."; + public static final String CAMEL_SINK_AGGREGATE_DEFAULT = null; + public static final String CAMEL_SINK_AGGREGATE_CONF = "camel.beans.aggregate"; + public static final String CAMEL_SINK_AGGREGATE_DOC = "A reference to an aggregate bean, in the form of #class:"; + + public static final Integer CAMEL_SINK_AGGREGATE_SIZE_DEFAULT = 10; + public static final String CAMEL_SINK_AGGREGATE_SIZE_CONF = "camel.beans.aggregation.size"; + public static final String CAMEL_SINK_AGGREGATE_SIZE_DOC = "The size of the aggregation, to be used in combination with camel.beans.aggregate"; + private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(CAMEL_SINK_URL_CONF, Type.STRING, CAMEL_SINK_URL_DEFAULT, Importance.HIGH, CAMEL_SINK_URL_DOC) .define(CAMEL_SINK_MARSHAL_CONF, Type.STRING, CAMEL_SINK_MARSHAL_DEFAULT, Importance.HIGH, CAMEL_SINK_MARSHAL_DOC) .define(CAMEL_SINK_COMPONENT_CONF, Type.STRING, CAMEL_SINK_COMPONENT_DEFAULT, Importance.HIGH, CAMEL_SINK_COMPONENT_DOC) - .define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SINK_CONTENT_LOG_LEVEL_DOC); + .define(CAMEL_SINK_CONTENT_LOG_LEVEL_CONF, Type.STRING, CAMEL_SINK_CONTENT_LOG_LEVEL_DEFAULT, Importance.HIGH, CAMEL_SINK_CONTENT_LOG_LEVEL_DOC) + .define(CAMEL_SINK_AGGREGATE_CONF, Type.STRING, CAMEL_SINK_AGGREGATE_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_DOC) + .define(CAMEL_SINK_AGGREGATE_SIZE_CONF, Type.INT, CAMEL_SINK_AGGREGATE_SIZE_DEFAULT, Importance.MEDIUM, CAMEL_SINK_AGGREGATE_SIZE_DOC); public CamelSinkConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) { super(config, parsedConfig); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java index 422d3ed..41ad9a2 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java @@ -70,6 +70,7 @@ public class CamelSinkTask extends SinkTask { String remoteUrl = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF); final String marshaller = config.getString(CamelSinkConnectorConfig.CAMEL_SINK_MARSHAL_CONF); + final int size = config.getInt(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF); CamelContext camelContext = new DefaultCamelContext(); if (remoteUrl == null) { @@ -80,7 +81,7 @@ public class CamelSinkTask extends SinkTask { CAMEL_SINK_PATH_PROPERTIES_PREFIX); } - cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, marshaller, null, camelContext); + cms = new CamelMainSupport(actualProps, LOCAL_URL, remoteUrl, marshaller, null, size, camelContext); producer = cms.createProducerTemplate(); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index c8333ed..9779874 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -91,7 +91,7 @@ public class CamelSourceTask extends SourceTask { CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SOURCE_PATH_PROPERTIES_PREFIX); } - cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, unmarshaller, camelContext); + cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, null, unmarshaller, 10, camelContext); Endpoint endpoint = cms.getEndpoint(localUrl); consumer = endpoint.createPollingConsumer(); diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java index 66f46fb..b695b1f 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/utils/CamelMainSupport.java @@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import org.apache.camel.AggregationStrategy; import org.apache.camel.CamelContext; import org.apache.camel.CamelContextAware; import org.apache.camel.ConsumerTemplate; @@ -53,11 +54,11 @@ public class CamelMainSupport { private final ExecutorService exService = Executors.newSingleThreadExecutor(); private final CountDownLatch startFinishedSignal = new CountDownLatch(1); - public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, String marshal, String unmarshal) throws Exception { - this(props, fromUrl, toUrl, marshal, unmarshal, new DefaultCamelContext()); + public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, String marshal, String unmarshal, int aggregationSize) throws Exception { + this(props, fromUrl, toUrl, marshal, unmarshal, aggregationSize, new DefaultCamelContext()); } - public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, String marshal, String unmarshal, CamelContext camelContext) throws Exception { + public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, String marshal, String unmarshal, int aggregationSize, CamelContext camelContext) throws Exception { camel = camelContext; camelMain = new Main() { @Override @@ -88,6 +89,7 @@ public class CamelMainSupport { LOG.info("Setting initial properties in Camel context: [{}]", camelProperties); this.camel.getPropertiesComponent().setInitialProperties(camelProperties); + camelMain.init(); //creating the actual route this.camel.addRoutes(new RouteBuilder() { public void configure() { @@ -105,7 +107,12 @@ public class CamelMainSupport { } else { LOG.info("Creating Camel route from({}).to({})", fromUrl, toUrl); } + if (camel.getRegistry().lookupByName("aggregate") != null) { + AggregationStrategy s = (AggregationStrategy) camel.getRegistry().lookupByName("aggregate"); + rd.aggregate(s).constant(true).completionSize(aggregationSize).to(toUrl); + } else { rd.to(toUrl); + } } }); } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java index 91d4ae5..8cb21a1 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java @@ -554,5 +554,40 @@ public class CamelSinkTaskTest { sinkTask.stop(); } + + @Test + public void testAggregationBody() { + Map<String, String> props = new HashMap<>(); + props.put(CamelSinkConnectorConfig.TOPIC_CONF, TOPIC_NAME); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_CONF, "#class:org.apache.camel.kafkaconnector.utils.SampleAggregator"); + props.put(CamelSinkConnectorConfig.CAMEL_SINK_AGGREGATE_SIZE_CONF, "5"); + CamelSinkTask sinkTask = new CamelSinkTask(); + sinkTask.start(props); + + List<SinkRecord> records = new ArrayList<SinkRecord>(); + SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42); + SinkRecord record1 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel1", 42); + SinkRecord record2 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel2", 42); + SinkRecord record3 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel3", 42); + SinkRecord record4 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel4", 42); + SinkRecord record5 = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel5", 42); + records.add(record); + records.add(record1); + records.add(record2); + records.add(record3); + records.add(record4); + records.add(record5); + sinkTask.put(records); + + ConsumerTemplate consumer = sinkTask.getCms().createConsumerTemplate(); + Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT); + assertEquals("camel camel1 camel2 camel3 camel4", exchange.getMessage().getBody()); + assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)); + assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props) + .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF)); + + sinkTask.stop(); + } } diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java index 911f93d..9f18e28 100644 --- a/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java +++ b/core/src/test/java/org/apache/camel/kafkaconnector/DataFormatTest.java @@ -73,7 +73,7 @@ public class DataFormatTest { assertThrows(UnsupportedOperationException.class, () -> new CamelMainSupport(props, "direct://start", - "log://test", "syslog", "syslog")); + "log://test", "syslog", "syslog", 10)); } @Test @@ -84,7 +84,7 @@ public class DataFormatTest { props.put("camel.source.marshal", "hl7"); DefaultCamelContext dcc = new DefaultCamelContext(); - CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", null, "hl7", dcc); + CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", null, "hl7", 10, dcc); HL7DataFormat hl7df = new HL7DataFormat(); hl7df.setValidate(false); @@ -105,7 +105,7 @@ public class DataFormatTest { props.put("camel.dataformat.hl7.validate", "false"); DefaultCamelContext dcc = new DefaultCamelContext(); - CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", null, "hl7", dcc); + CamelMainSupport cms = new CamelMainSupport(props, "direct://start", "log://test", null, "hl7", 10, dcc); cms.start(); HL7DataFormat hl7dfLoaded = dcc.getRegistry().lookupByNameAndType("hl7", HL7DataFormat.class); diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/utils/SampleAggregator.java b/core/src/test/java/org/apache/camel/kafkaconnector/utils/SampleAggregator.java new file mode 100644 index 0000000..4b89148 --- /dev/null +++ b/core/src/test/java/org/apache/camel/kafkaconnector/utils/SampleAggregator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.utils; + +import org.apache.camel.AggregationStrategy; +import org.apache.camel.Exchange; +import org.apache.camel.Message; + +public class SampleAggregator implements AggregationStrategy { + + @Override + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + // lets append the old body to the new body + if (oldExchange == null) { + return newExchange; + } + + String body = oldExchange.getIn().getBody(String.class); + if (body != null) { + Message newIn = newExchange.getIn(); + String newBody = newIn.getBody(String.class); + if (newBody != null) { + body += " " + newBody; + } + + newIn.setBody(body); + } + return newExchange; + } +}