valdar commented on a change in pull request #428: URL: https://github.com/apache/camel-kafka-connector/pull/428#discussion_r484557883
########## File path: core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java ########## @@ -119,53 +119,63 @@ public void start(Map<String, String> props) { } } + private long remaining(long startPollEpochMilli, long maxPollDuration) { + return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli); + } + + @Override public synchronized List<SourceRecord> poll() { - long startPollEpochMilli = Instant.now().toEpochMilli(); + final long startPollEpochMilli = Instant.now().toEpochMilli(); + + long remaining = remaining(startPollEpochMilli, maxPollDuration); long collectedRecords = 0L; - List<SourceRecord> records = new ArrayList<>(); - while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) { - Exchange exchange = consumer.receiveNoWait(); + List<SourceRecord> records = null; + while (collectedRecords < maxBatchPollSize && remaining > 0) { + Exchange exchange = consumer.receive(remaining); + if (exchange == null) { Review comment: what is the point of this check? if the exchange is null, since you receive waiting for the whole remaining time it means you have finished the time so there is no point in updating it earlier and continue. I would rather just check if exchange is not `null` and in that case do all the work except updating remaining time, i.e. something like: ```java while(...) { Exchange exchange = consumer.receive(remaining); if(exchange != null) { //whole code here } remaining = remaining(startPollEpochMilli, maxPollDuration); } ... ``` ########## File path: core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java ########## @@ -119,53 +119,63 @@ public void start(Map<String, String> props) { } } + private long remaining(long startPollEpochMilli, long maxPollDuration) { + return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli); + } + + @Override public synchronized List<SourceRecord> poll() { - long startPollEpochMilli = Instant.now().toEpochMilli(); + final long startPollEpochMilli = Instant.now().toEpochMilli(); + + long remaining = remaining(startPollEpochMilli, maxPollDuration); long collectedRecords = 0L; - List<SourceRecord> records = new ArrayList<>(); - while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) { - Exchange exchange = consumer.receiveNoWait(); + List<SourceRecord> records = null; + while (collectedRecords < maxBatchPollSize && remaining > 0) { + Exchange exchange = consumer.receive(remaining); + if (exchange == null) { Review comment: I might have completely overlooked something here not understanding your motivations, feel free to disagree and point me wrong. After all I was the author of the buggy/not performant previous version of this section of the codebase :P ########## File path: core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java ########## @@ -119,53 +119,63 @@ public void start(Map<String, String> props) { } } + private long remaining(long startPollEpochMilli, long maxPollDuration) { + return maxPollDuration - (Instant.now().toEpochMilli() - startPollEpochMilli); + } + + @Override public synchronized List<SourceRecord> poll() { - long startPollEpochMilli = Instant.now().toEpochMilli(); + final long startPollEpochMilli = Instant.now().toEpochMilli(); + + long remaining = remaining(startPollEpochMilli, maxPollDuration); long collectedRecords = 0L; - List<SourceRecord> records = new ArrayList<>(); - while (collectedRecords < maxBatchPollSize && (Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) { - Exchange exchange = consumer.receiveNoWait(); + List<SourceRecord> records = null; + while (collectedRecords < maxBatchPollSize && remaining > 0) { + Exchange exchange = consumer.receive(remaining); + if (exchange == null) { + remaining = remaining(startPollEpochMilli, maxPollDuration); + continue; + } + + if (records == null) { Review comment: I would rather initialize `records` with and empty list check at the end if it is empty to return `null` for readability, than getting (little?) gain in performance... ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org