This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch multiple-topics
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 25b1599b81a5cd3e0ca1cba4519c22be68c1dd10
Author: Andrea Cosentino <anco...@gmail.com>
AuthorDate: Wed Sep 2 12:36:27 2020 +0200

    Fixed CS
---
 .../camel/kafkaconnector/CamelSourceTask.java      | 32 +++++++++++-----------
 1 file changed, 16 insertions(+), 16 deletions(-)

diff --git 
a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java 
b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
index f48446f..399e6d8 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
@@ -125,24 +125,24 @@ public class CamelSourceTask extends SourceTask {
         long collectedRecords = 0L;
 
         List<SourceRecord> records = new ArrayList<>();
-            while (collectedRecords < maxBatchPollSize && 
(Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
-                Exchange exchange = consumer.receiveNoWait();
+        while (collectedRecords < maxBatchPollSize && 
(Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
+            Exchange exchange = consumer.receiveNoWait();
 
-                if (exchange != null) {
-                    LOG.debug("Received Exchange {} with Message {} from 
Endpoint {}", exchange.getExchangeId(), exchange.getMessage().getMessageId(), 
exchange.getFromEndpoint());
+            if (exchange != null) {
+                LOG.debug("Received Exchange {} with Message {} from Endpoint 
{}", exchange.getExchangeId(), exchange.getMessage().getMessageId(), 
exchange.getFromEndpoint());
 
-                    // TODO: see if there is a better way to use 
sourcePartition
-                    // an sourceOffset
-                    Map<String, String> sourcePartition = 
Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
-                    Map<String, String> sourceOffset = 
Collections.singletonMap("position", exchange.getExchangeId());
+                // TODO: see if there is a better way to use sourcePartition
+                // an sourceOffset
+                Map<String, String> sourcePartition = 
Collections.singletonMap("filename", exchange.getFromEndpoint().toString());
+                Map<String, String> sourceOffset = 
Collections.singletonMap("position", exchange.getExchangeId());
 
-                    final Object messageHeaderKey = camelMessageHeaderKey != 
null ? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
-                    final Object messageBodyValue = 
exchange.getMessage().getBody();
+                final Object messageHeaderKey = camelMessageHeaderKey != null 
? exchange.getMessage().getHeader(camelMessageHeaderKey) : null;
+                final Object messageBodyValue = 
exchange.getMessage().getBody();
 
-                    final Schema messageKeySchema = messageHeaderKey != null ? 
SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
-                    final Schema messageBodySchema = messageBodyValue != null 
? SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
+                final Schema messageKeySchema = messageHeaderKey != null ? 
SchemaHelper.buildSchemaBuilderForType(messageHeaderKey) : null;
+                final Schema messageBodySchema = messageBodyValue != null ? 
SchemaHelper.buildSchemaBuilderForType(messageBodyValue) : null;
 
-                    for (String singleTopic : topics) {
+                for (String singleTopic : topics) {
                     SourceRecord record = new SourceRecord(sourcePartition, 
sourceOffset, singleTopic, messageKeySchema, messageHeaderKey, 
messageBodySchema, messageBodyValue);
                     if (exchange.getMessage().hasHeaders()) {
                         setAdditionalHeaders(record, 
exchange.getMessage().getHeaders(), HEADER_CAMEL_PREFIX);
@@ -154,11 +154,11 @@ public class CamelSourceTask extends SourceTask {
                     TaskHelper.logRecordContent(LOG, record, config);
                     records.add(record);
                     collectedRecords++;
-                    }
-                } else {
-                    break;
                 }
+            } else {
+                break;
             }
+        }
 
         if (records.isEmpty()) {
             return Collections.EMPTY_LIST;

Reply via email to