This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch multiple-topics in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit e6de1f11af649c9d47789c486d2c23b753fa5445 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Wed Sep 2 12:35:26 2020 +0200 Added support for multiple topics on the source connector side --- .../camel/kafkaconnector/CamelSourceTask.java | 92 ++++++++++------------ 1 file changed, 43 insertions(+), 49 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index 36aa96e..f48446f 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -22,6 +22,7 @@ import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.time.Instant; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.Date; import java.util.LinkedList; @@ -57,11 +58,11 @@ public class CamelSourceTask extends SourceTask { private static final String LOCAL_URL = "direct:end"; - private CamelMainSupport cms; private CamelSourceConnectorConfig config; private PollingConsumer consumer; private String topic; + private List<String> topics; private Long maxBatchPollSize; private Long maxPollDuration; private String camelMessageHeaderKey; @@ -94,14 +95,15 @@ public class CamelSourceTask extends SourceTask { dataformats.add(new CamelKafkaConnectDataformat(marshaller, CamelKafkaConnectDataformat.CamelKafkaConnectDataformatKind.MARSHALL)); } topic = config.getString(CamelSourceConnectorConfig.TOPIC_CONF); + topics = Arrays.asList(topic.split(",")); String localUrl = getLocalUrlWithPollingOptions(config); CamelContext camelContext = new DefaultCamelContext(); if (remoteUrl == null) { - remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(), - actualProps, config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), - CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, CAMEL_SOURCE_PATH_PROPERTIES_PREFIX); + remoteUrl = TaskHelper.buildUrl(camelContext.adapt(ExtendedCamelContext.class).getRuntimeCamelCatalog(), actualProps, + config.getString(CamelSourceConnectorConfig.CAMEL_SOURCE_COMPONENT_CONF), CAMEL_SOURCE_ENDPOINT_PROPERTIES_PREFIX, + CAMEL_SOURCE_PATH_PROPERTIES_PREFIX); } cms = new CamelMainSupport(actualProps, remoteUrl, localUrl, dataformats, 10, 500, camelContext); @@ -123,48 +125,40 @@ public class CamelSourceTask extends SourceTask { long collectedRecords = 0L; List<SourceRecord> records = new ArrayList<>(); - - while (collectedRecords < maxBatchPollSize - && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) { - Exchange exchange = consumer.receiveNoWait(); - - if (exchange != null) { - LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(), - exchange.getMessage().getMessageId(), exchange.getFromEndpoint()); - - // TODO: see if there is a better way to use sourcePartition an sourceOffset - Map<String, String> sourcePartition = Collections.singletonMap("filename", - exchange.getFromEndpoint().toString()); - Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId()); - - final Object messageHeaderKey = camelMessageHeaderKey != null - ? exchange.getMessage().getHeader(camelMessageHeaderKey) - : null; - final Object messageBodyValue = exchange.getMessage().getBody(); - - final Schema messageKeySchema = messageHeaderKey != null - ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) - : null; - final Schema messageBodySchema = messageBodyValue != null - ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) - : null; - - SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, messageKeySchema, - messageHeaderKey, messageBodySchema, messageBodyValue); - if (exchange.getMessage().hasHeaders()) { - setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX); + while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) { + Exchange exchange = consumer.receiveNoWait(); + + if (exchange != null) { + LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(), exchange.getMessage().getMessageId(), exchange.getFromEndpoint()); + + // TODO: see if there is a better way to use sourcePartition + // an sourceOffset + Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString()); + Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId()); + + final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null; + final Object messageBodyValue = exchange.getMessage().getBody(); + + final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null; + final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null; + + for (String singleTopic : topics) { + SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, messageKeySchema, messageHeaderKey, messageBodySchema, messageBodyValue); + if (exchange.getMessage().hasHeaders()) { + setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX); + } + if (exchange.hasProperties()) { + setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX); + } + + TaskHelper.logRecordContent(LOG, record, config); + records.add(record); + collectedRecords++; + } + } else { + break; } - if (exchange.hasProperties()) { - setAdditionalHeaders(record, exchange.getProperties(), PROPERTY_CAMEL_PREFIX); - } - - TaskHelper.logRecordContent(LOG, record, config); - records.add(record); - collectedRecords++; - } else { - break; } - } if (records.isEmpty()) { return Collections.EMPTY_LIST; @@ -172,7 +166,6 @@ public class CamelSourceTask extends SourceTask { return records; } - } @Override @@ -189,8 +182,9 @@ public class CamelSourceTask extends SourceTask { } try { /* - If the CamelMainSupport instance fails to be instantiated (ie.: due to missing classes or similar - issues) then it won't be assigned and de-referencing it could cause an NPE. + * If the CamelMainSupport instance fails to be instantiated (ie.: + * due to missing classes or similar issues) then it won't be + * assigned and de-referencing it could cause an NPE. */ if (cms != null) { cms.stop(); @@ -264,8 +258,8 @@ public class CamelSourceTask extends SourceTask { long pollingConsumerQueueSize = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_QUEUE_SIZE_CONF); long pollingConsumerBlockTimeout = config.getLong(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_TIMEOUT_CONF); boolean pollingConsumerBlockWhenFull = config.getBoolean(CamelSourceConnectorConfig.CAMEL_SOURCE_POLLING_CONSUMER_BLOCK_WHEN_FULL_CONF); - return LOCAL_URL + "?pollingConsumerQueueSize=" + pollingConsumerQueueSize + "&pollingConsumerBlockTimeout=" - + pollingConsumerBlockTimeout + "&pollingConsumerBlockWhenFull=" + pollingConsumerBlockWhenFull; + return LOCAL_URL + "?pollingConsumerQueueSize=" + pollingConsumerQueueSize + "&pollingConsumerBlockTimeout=" + pollingConsumerBlockTimeout + + "&pollingConsumerBlockWhenFull=" + pollingConsumerBlockWhenFull; } public CamelMainSupport getCms() {