orpiske commented on a change in pull request #428:
URL: 
https://github.com/apache/camel-kafka-connector/pull/428#discussion_r484650459



##########
File path: 
core/src/main/java/org/apache/camel/kafkaconnector/CamelSourceTask.java
##########
@@ -119,53 +119,63 @@ public void start(Map<String, String> props) {
         }
     }
 
+    private long remaining(long startPollEpochMilli, long maxPollDuration)  {
+        return maxPollDuration - (Instant.now().toEpochMilli() - 
startPollEpochMilli);
+    }
+
+
     @Override
     public synchronized List<SourceRecord> poll() {
-        long startPollEpochMilli = Instant.now().toEpochMilli();
+        final long startPollEpochMilli = Instant.now().toEpochMilli();
+
+        long remaining = remaining(startPollEpochMilli, maxPollDuration);
         long collectedRecords = 0L;
 
-        List<SourceRecord> records = new ArrayList<>();
-        while (collectedRecords < maxBatchPollSize && 
(Instant.now().toEpochMilli() - startPollEpochMilli) < maxPollDuration) {
-            Exchange exchange = consumer.receiveNoWait();
+        List<SourceRecord> records = null;
+        while (collectedRecords < maxBatchPollSize && remaining > 0) {
+            Exchange exchange = consumer.receive(remaining);
+            if (exchange == null) {

Review comment:
       > what is the point of this check? if the exchange is null, since you 
receive waiting for the whole remaining time it means you have finished the 
time so there is no point in updating it earlier and continue.
   
   Great catch, man! I think you are right!
   
   > 
   > I would rather just check if exchange is not `null` and in that case do 
all the work except updating remaining time, i.e. something like:
   > 
   > ```java
   > while(...) {
   >     Exchange exchange = consumer.receive(remaining);
   >     if(exchange != null) {
   >     //whole code here
   >     }
   >     remaining = remaining(startPollEpochMilli, maxPollDuration);
   > }
   > ...
   > ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to