Jackie-Jiang commented on code in PR #12812: URL: https://github.com/apache/pinot/pull/12812#discussion_r1558332571
########## pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/main/java/org/apache/pinot/plugin/stream/pulsar/PulsarPartitionLevelConsumer.java: ########## @@ -45,93 +40,66 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnectionHandler implements PartitionGroupConsumer { private static final Logger LOGGER = LoggerFactory.getLogger(PulsarPartitionLevelConsumer.class); + private final Reader<byte[]> _reader; + private MessageId _nextMessageId = null; - private final Reader _reader; - - // TODO: Revisit the logic of using a separate executor to manage the request timeout. Currently it is not thread safe - private final ExecutorService _executorService = Executors.newSingleThreadExecutor(); - - public PulsarPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partitionId) { + public PulsarPartitionLevelConsumer(String clientId, StreamConfig streamConfig, int partition) { super(clientId, streamConfig); + String topicName = _config.getPulsarTopicName(); try { - _reader = createReaderForPartition(partitionId); + List<String> partitions = _pulsarClient.getPartitionsForTopic(topicName).get(); + _reader = _pulsarClient.newReader().topic(partitions.get(partition)).startMessageId(MessageId.earliest) + .startMessageIdInclusive().create(); } catch (Exception e) { - throw new RuntimeException("Caught exception while creating Pulsar reader", e); + throw new RuntimeException( + String.format("Caught exception while creating Pulsar reader for topic: %s, partition: %d", topicName, + partition), e); } - LOGGER.info("Created Pulsar reader with topic: {}, partition: {}, initial message id: {}", - _config.getPulsarTopicName(), partitionId, _config.getInitialMessageId()); + LOGGER.info("Created Pulsar reader for topic: {}, partition: {}", topicName, partition); } - /** - * Fetch records from the Pulsar stream between the start and end StreamPartitionMsgOffset - * Used {@link org.apache.pulsar.client.api.Reader} to read the messaged from pulsar partitioned topic - * The reader seeks to the startMsgOffset and starts reading records in a loop until endMsgOffset or timeout is - * reached. - */ @Override - public PulsarMessageBatch fetchMessages(StreamPartitionMsgOffset startMsgOffset, int timeoutMs) { - MessageIdStreamOffset startOffset = (MessageIdStreamOffset) startMsgOffset; + public synchronized PulsarMessageBatch fetchMessages(StreamPartitionMsgOffset startOffset, int timeoutMs) { + MessageId startMessageId = ((MessageIdStreamOffset) startOffset).getMessageId(); + long endTimeMs = System.currentTimeMillis() + timeoutMs; List<BytesStreamMessage> messages = new ArrayList<>(); - Future<PulsarMessageBatch> pulsarResultFuture = _executorService.submit(() -> fetchMessages(startOffset, messages)); - try { - return pulsarResultFuture.get(timeoutMs, TimeUnit.MILLISECONDS); - } catch (TimeoutException e) { - // The fetchMessages has thrown an exception. Most common cause is the timeout. - // We return the records fetched till now along with the next start offset. - pulsarResultFuture.cancel(true); - } catch (Exception e) { - LOGGER.warn("Error while fetching records from Pulsar", e); + + // Seek to the start message id if necessary + // NOTE: Use Objects.equals() to check reference first for performance. + if (!Objects.equals(startMessageId, _nextMessageId)) { + try { + _reader.seek(startMessageId); + } catch (PulsarClientException e) { + throw new RuntimeException("Caught exception while seeking to message id: " + startMessageId, e); + } } - return buildPulsarMessageBatch(startOffset, messages); - } - private PulsarMessageBatch fetchMessages(MessageIdStreamOffset startOffset, List<BytesStreamMessage> messages) { + // Read messages until all available messages are read, or we run out of time try { - MessageId startMessageId = startOffset.getMessageId(); - _reader.seek(startMessageId); - while (_reader.hasMessageAvailable()) { - Message<byte[]> message = _reader.readNext(); - messages.add(PulsarUtils.buildPulsarStreamMessage(message, _config)); - if (Thread.interrupted()) { - break; - } + while (_reader.hasMessageAvailable() && System.currentTimeMillis() < endTimeMs) { + messages.add(PulsarUtils.buildPulsarStreamMessage(_reader.readNext(), _config)); Review Comment: `readNext()` shouldn't block. We call `readNext()` only when there `hasMessageAvailable()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org