KKcorps commented on code in PR #12061:
URL: https://github.com/apache/pinot/pull/12061#discussion_r1429804967


##########
pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java:
##########
@@ -75,81 +71,68 @@ public PulsarMessageBatch 
fetchMessages(StreamPartitionMsgOffset startMsgOffset,
     final MessageId endMessageId =
         endMsgOffset == null ? MessageId.latest : ((MessageIdStreamOffset) 
endMsgOffset).getMessageId();
 
-    List<PulsarStreamMessage> messagesList = new ArrayList<>();
-    Future<PulsarMessageBatch> pulsarResultFuture =
-        _executorService.submit(() -> fetchMessages(startMessageId, 
endMessageId, messagesList));
+    final Collection<PulsarStreamMessage> messages = 
Collections.synchronizedList(new ArrayList<>());
+
+    CompletableFuture<PulsarMessageBatch> pulsarResultFuture = 
fetchMessagesAsync(startMessageId, endMessageId,
+        messages)
+        .orTimeout(timeoutMillis, TimeUnit.MILLISECONDS)
+        .handle((v, t) -> {
+          if (!(t instanceof TimeoutException)) {
+            LOGGER.warn("Error while fetching records from Pulsar", t);
+          }
+          return new PulsarMessageBatch(buildOffsetFilteringIterable(messages, 
startMessageId, endMessageId),
+              _enableKeyValueStitch);
+        });
 
     try {
-      return pulsarResultFuture.get(timeoutMillis, TimeUnit.MILLISECONDS);
-    } catch (TimeoutException e) {
-      // The fetchMessages has thrown an exception. Most common cause is the 
timeout.
-      // We return the records fetched till now along with the next start 
offset.
-      pulsarResultFuture.cancel(true);
-      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, 
startMessageId, endMessageId),
-          _enableKeyValueStitch);
+      return pulsarResultFuture.get();
     } catch (Exception e) {
       LOGGER.warn("Error while fetching records from Pulsar", e);
-      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, 
startMessageId, endMessageId),
+      return new PulsarMessageBatch(buildOffsetFilteringIterable(messages, 
startMessageId, endMessageId),
           _enableKeyValueStitch);
     }
   }
 
-  public PulsarMessageBatch fetchMessages(MessageId startMessageId, MessageId 
endMessageId,
-      List<PulsarStreamMessage> messagesList) {
-    try {
-      _reader.seek(startMessageId);
-
-      while (_reader.hasMessageAvailable()) {
-        Message<byte[]> nextMessage = _reader.readNext();
-
-        if (endMessageId != null) {
-          if (nextMessage.getMessageId().compareTo(endMessageId) > 0) {
-            break;
-          }
-        }
-        messagesList.add(
-            PulsarUtils.buildPulsarStreamMessage(nextMessage, 
_enableKeyValueStitch, _pulsarMetadataExtractor));
+  public CompletableFuture<Void> fetchMessagesAsync(MessageId startMessageId, 
MessageId endMessageId,
+      Collection<PulsarStreamMessage> messages) {
+      CompletableFuture<Void> seekFut = _reader.seekAsync(startMessageId);
+      return seekFut.thenCompose((v) -> 
fetchNextMessageAndAddToCollection(endMessageId, messages));
+  }
 
-        if (Thread.interrupted()) {
-          break;
-        }
-      }
+  public CompletableFuture<Void> fetchNextMessageAndAddToCollection(MessageId 
endMessageId,
+      Collection<PulsarStreamMessage> messages) {
+    CompletableFuture<Boolean> hasMessagesFut = 
_reader.hasMessageAvailableAsync();
+    CompletableFuture<Message<byte[]>> messageFut = 
hasMessagesFut.thenCompose(msgAvailable ->
+        (msgAvailable) ? _reader.readNextAsync() : 
CompletableFuture.completedFuture(null));
+    CompletableFuture<Void> handleMessageFut = 
messageFut.thenCompose(messageOrNull ->
+        readMessageAndFetchNextOrComplete(endMessageId, messages, 
messageOrNull));
+    return handleMessageFut;
+  }
 
-      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, 
startMessageId, endMessageId),
-          _enableKeyValueStitch);
-    } catch (PulsarClientException e) {
-      LOGGER.warn("Error consuming records from Pulsar topic", e);
-      return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, 
startMessageId, endMessageId),
-          _enableKeyValueStitch);
+  public CompletableFuture<Void> readMessageAndFetchNextOrComplete(MessageId 
endMessageId,
+      Collection<PulsarStreamMessage> messages, Message<byte[]> messageOrNull) 
{
+    if (messageOrNull == null) {
+      return CompletableFuture.completedFuture(null);
     }
+    if (endMessageId != null && 
messageOrNull.getMessageId().compareTo(endMessageId) > 0) {
+      return CompletableFuture.completedFuture(null);
+    }
+    messages.add(PulsarUtils.buildPulsarStreamMessage(messageOrNull, 
_enableKeyValueStitch, _pulsarMetadataExtractor));
+    return fetchNextMessageAndAddToCollection(endMessageId, messages);

Review Comment:
   Can we not use recursion here and do an iterative implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to