KKcorps commented on code in PR #12061: URL: https://github.com/apache/pinot/pull/12061#discussion_r1429804967
########## pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java: ########## @@ -75,81 +71,68 @@ public PulsarMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, final MessageId endMessageId = endMsgOffset == null ? MessageId.latest : ((MessageIdStreamOffset) endMsgOffset).getMessageId(); - List<PulsarStreamMessage> messagesList = new ArrayList<>(); - Future<PulsarMessageBatch> pulsarResultFuture = - _executorService.submit(() -> fetchMessages(startMessageId, endMessageId, messagesList)); + final Collection<PulsarStreamMessage> messages = Collections.synchronizedList(new ArrayList<>()); + + CompletableFuture<PulsarMessageBatch> pulsarResultFuture = fetchMessagesAsync(startMessageId, endMessageId, + messages) + .orTimeout(timeoutMillis, TimeUnit.MILLISECONDS) + .handle((v, t) -> { + if (!(t instanceof TimeoutException)) { + LOGGER.warn("Error while fetching records from Pulsar", t); + } + return new PulsarMessageBatch(buildOffsetFilteringIterable(messages, startMessageId, endMessageId), + _enableKeyValueStitch); + }); try { - return pulsarResultFuture.get(timeoutMillis, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - // The fetchMessages has thrown an exception. Most common cause is the timeout. - // We return the records fetched till now along with the next start offset. - pulsarResultFuture.cancel(true); - return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, startMessageId, endMessageId), - _enableKeyValueStitch); + return pulsarResultFuture.get(); } catch (Exception e) { LOGGER.warn("Error while fetching records from Pulsar", e); - return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, startMessageId, endMessageId), + return new PulsarMessageBatch(buildOffsetFilteringIterable(messages, startMessageId, endMessageId), _enableKeyValueStitch); } } - public PulsarMessageBatch fetchMessages(MessageId startMessageId, MessageId endMessageId, - List<PulsarStreamMessage> messagesList) { - try { - _reader.seek(startMessageId); - - while (_reader.hasMessageAvailable()) { - Message<byte[]> nextMessage = _reader.readNext(); - - if (endMessageId != null) { - if (nextMessage.getMessageId().compareTo(endMessageId) > 0) { - break; - } - } - messagesList.add( - PulsarUtils.buildPulsarStreamMessage(nextMessage, _enableKeyValueStitch, _pulsarMetadataExtractor)); + public CompletableFuture<Void> fetchMessagesAsync(MessageId startMessageId, MessageId endMessageId, + Collection<PulsarStreamMessage> messages) { + CompletableFuture<Void> seekFut = _reader.seekAsync(startMessageId); + return seekFut.thenCompose((v) -> fetchNextMessageAndAddToCollection(endMessageId, messages)); + } - if (Thread.interrupted()) { - break; - } - } + public CompletableFuture<Void> fetchNextMessageAndAddToCollection(MessageId endMessageId, + Collection<PulsarStreamMessage> messages) { + CompletableFuture<Boolean> hasMessagesFut = _reader.hasMessageAvailableAsync(); + CompletableFuture<Message<byte[]>> messageFut = hasMessagesFut.thenCompose(msgAvailable -> + (msgAvailable) ? _reader.readNextAsync() : CompletableFuture.completedFuture(null)); + CompletableFuture<Void> handleMessageFut = messageFut.thenCompose(messageOrNull -> + readMessageAndFetchNextOrComplete(endMessageId, messages, messageOrNull)); + return handleMessageFut; + } - return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, startMessageId, endMessageId), - _enableKeyValueStitch); - } catch (PulsarClientException e) { - LOGGER.warn("Error consuming records from Pulsar topic", e); - return new PulsarMessageBatch(buildOffsetFilteringIterable(messagesList, startMessageId, endMessageId), - _enableKeyValueStitch); + public CompletableFuture<Void> readMessageAndFetchNextOrComplete(MessageId endMessageId, + Collection<PulsarStreamMessage> messages, Message<byte[]> messageOrNull) { + if (messageOrNull == null) { + return CompletableFuture.completedFuture(null); } + if (endMessageId != null && messageOrNull.getMessageId().compareTo(endMessageId) > 0) { + return CompletableFuture.completedFuture(null); + } + messages.add(PulsarUtils.buildPulsarStreamMessage(messageOrNull, _enableKeyValueStitch, _pulsarMetadataExtractor)); + return fetchNextMessageAndAddToCollection(endMessageId, messages); Review Comment: Can we not use recursion here and do an iterative implementation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org