KKcorps commented on code in PR #8538: URL: https://github.com/apache/pinot/pull/8538#discussion_r851301660
########## pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java: ########## @@ -273,13 +269,31 @@ public void testPartitionLevelConsumerBatchMessages() } } - public MessageId getMessageIdForPartitionAndIndex(int partitionNum, int index) { + private static MessageBatch consumeMessageBatch(PartitionGroupConsumer consumer, MessageId startMsgId, + MessageId endMsgId, int expectedMsgCount) + throws TimeoutException { + int retryCount = 0; + MessageBatch messageBatch = null; + while (retryCount < DEFAULT_RETRY_COUNT) { + retryCount++; + messageBatch = consumer.fetchMessages(new MessageIdStreamOffset(startMsgId), Review Comment: This method was added to handle the scenarios where messages are published in batches which actually changes the format of MessageIds in Pulsar. The only way to publish in batch is to enable async producer. In sync mode, all messages are flushed as soon as they are received. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org