This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch multiple-topics in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 25b1599b81a5cd3e0ca1cba4519c22be68c1dd10 Author: Andrea Cosentino <anco...@gmail.com> AuthorDate: Wed Sep 2 12:36:27 2020 +0200 Fixed CS --- .../camel/kafkaconnector/CamelSourceTask.java | 32 +++++++++++----------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java index f48446f..399e6d8 100644 --- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java +++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java @@ -125,24 +125,24 @@ public class CamelSourceTask extends SourceTask { long collectedRecords = 0L; List<SourceRecord> records = new ArrayList<>(); - while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) { - Exchange exchange = consumer.receiveNoWait(); + while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) { + Exchange exchange = consumer.receiveNoWait(); - if (exchange != null) { - LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(), exchange.getMessage().getMessageId(), exchange.getFromEndpoint()); + if (exchange != null) { + LOG.debug("Received Exchange {} with Message {} from Endpoint {}", exchange.getExchangeId(), exchange.getMessage().getMessageId(), exchange.getFromEndpoint()); - // TODO: see if there is a better way to use sourcePartition - // an sourceOffset - Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString()); - Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId()); + // TODO: see if there is a better way to use sourcePartition + // an sourceOffset + Map<String, String> sourcePartition = Collections.singletonMap("filename", exchange.getFromEndpoint().toString()); + Map<String, String> sourceOffset = Collections.singletonMap("position", exchange.getExchangeId()); - final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null; - final Object messageBodyValue = exchange.getMessage().getBody(); + final Object messageHeaderKey = camelMessageHeaderKey != null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null; + final Object messageBodyValue = exchange.getMessage().getBody(); - final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null; - final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null; + final Schema messageKeySchema = messageHeaderKey != null ? SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null; + final Schema messageBodySchema = messageBodyValue != null ? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null; - for (String singleTopic : topics) { + for (String singleTopic : topics) { SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, singleTopic, messageKeySchema, messageHeaderKey, messageBodySchema, messageBodyValue); if (exchange.getMessage().hasHeaders()) { setAdditionalHeaders(record, exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX); @@ -154,11 +154,11 @@ public class CamelSourceTask extends SourceTask { TaskHelper.logRecordContent(LOG, record, config); records.add(record); collectedRecords++; - } - } else { - break; } + } else { + break; } + } if (records.isEmpty()) { return Collections.EMPTY_LIST;