This is an automated email from the ASF dual-hosted git repository. richardstartin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push: new c3d17b9ed7 Fix PulsarConsumerTest (#8691) c3d17b9ed7 is described below commit c3d17b9ed7616e88352b1813f301949777d8c4d7 Author: Xiang Fu <xiangfu.1...@gmail.com> AuthorDate: Thu May 12 23:58:54 2022 -0700 Fix PulsarConsumerTest (#8691) --- .../plugin/stream/pulsar/PulsarConsumerTest.java | 77 ++++++++++++++-------- 1 file changed, 50 insertions(+), 27 deletions(-) diff --git a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java index 12c0cf45ad..2b6a16e33d 100644 --- a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java +++ b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import org.apache.pinot.spi.stream.MessageBatch; import org.apache.pinot.spi.stream.PartitionGroupConsumer; @@ -164,22 +165,24 @@ public class PulsarConsumerTest { throws Exception { for (int p = 0; p < NUM_PARTITION; p++) { final int partition = p; - Producer<String> producer = - _pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC).messageRouter(new MessageRouter() { + try (Producer<String> producer = _pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC) + .messageRouter(new MessageRouter() { @Override public int choosePartition(Message<?> msg, TopicMetadata metadata) { return partition; } - }).create(); - - for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) { - MessageId messageId = producer.send(MESSAGE_PREFIX + "_" + i); - if (!_partitionToFirstMessageIdMap.containsKey(partition)) { - _partitionToFirstMessageIdMap.put(partition, messageId); + }).create()) { + for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) { + MessageId messageId = producer.send(MESSAGE_PREFIX + "_" + i); + if (!_partitionToFirstMessageIdMap.containsKey(partition)) { + _partitionToFirstMessageIdMap.put(partition, messageId); + } } + producer.flush(); } - - producer.flush(); + waitForCondition(input -> validatePartitionMessageCount(partition, NUM_RECORDS_PER_PARTITION), 15 * 1000L, + 5 * 60 * 1000L, "Failed to consume " + NUM_RECORDS_PER_PARTITION + " messages from partition " + partition, + true); } } @@ -187,29 +190,49 @@ public class PulsarConsumerTest { throws Exception { for (int p = 0; p < NUM_PARTITION; p++) { final int partition = p; - Producer<String> producer = + try (Producer<String> producer = _pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC_BATCH).messageRouter(new MessageRouter() { @Override public int choosePartition(Message<?> msg, TopicMetadata metadata) { return partition; } - }).batchingMaxMessages(BATCH_SIZE).batchingMaxPublishDelay(1, TimeUnit.SECONDS).create(); - - for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) { - CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(MESSAGE_PREFIX + "_" + i); - messageIdCompletableFuture.thenAccept(messageId -> { - - List<BatchMessageIdImpl> batchMessageIdList = _partitionToMessageIdMapping - .getOrDefault(partition, new ArrayList<>()); - batchMessageIdList.add((BatchMessageIdImpl) messageId); - _partitionToMessageIdMapping.put(partition, batchMessageIdList); - - if (!_partitionToFirstMessageIdMapBatch.containsKey(partition)) { - _partitionToFirstMessageIdMapBatch.put(partition, messageId); - } - }); + }).batchingMaxMessages(BATCH_SIZE).batchingMaxPublishDelay(1, TimeUnit.SECONDS).create()) { + for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) { + CompletableFuture<MessageId> messageIdCompletableFuture = producer.sendAsync(MESSAGE_PREFIX + "_" + i); + messageIdCompletableFuture.thenAccept(messageId -> { + + List<BatchMessageIdImpl> batchMessageIdList = _partitionToMessageIdMapping + .getOrDefault(partition, new ArrayList<>()); + batchMessageIdList.add((BatchMessageIdImpl) messageId); + _partitionToMessageIdMapping.put(partition, batchMessageIdList); + + if (!_partitionToFirstMessageIdMapBatch.containsKey(partition)) { + _partitionToFirstMessageIdMapBatch.put(partition, messageId); + } + }); + } + producer.flush(); } - producer.flush(); + waitForCondition(input -> validatePartitionMessageCount(partition, NUM_RECORDS_PER_PARTITION), 15 * 1000L, + 5 * 60 * 1000L, "Failed to consume " + NUM_RECORDS_PER_PARTITION + " messages from partition " + partition, + true); + } + } + + private boolean validatePartitionMessageCount(int partition, int expectedMsgCount) { + final PartitionGroupConsumer consumer = StreamConsumerFactoryProvider.create(getStreamConfig(TEST_TOPIC)) + .createPartitionGroupConsumer(CLIENT_ID, + new PartitionGroupConsumptionStatus(partition, 1, new MessageIdStreamOffset(MessageId.earliest), null, + "CONSUMING")); + try { + final MessageBatch messageBatch = consumer.fetchMessages(new MessageIdStreamOffset(MessageId.earliest), + new MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, expectedMsgCount)), + CONSUMER_FETCH_TIMEOUT_MILLIS); + System.out.println( + "Partition: " + partition + ", Consumed messageBatch count = " + messageBatch.getMessageCount()); + return messageBatch.getMessageCount() == expectedMsgCount; + } catch (TimeoutException e) { + return false; } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org