This is an automated email from the ASF dual-hosted git repository.

richardstartin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new c3d17b9ed7 Fix PulsarConsumerTest (#8691)
c3d17b9ed7 is described below

commit c3d17b9ed7616e88352b1813f301949777d8c4d7
Author: Xiang Fu <xiangfu.1...@gmail.com>
AuthorDate: Thu May 12 23:58:54 2022 -0700

    Fix PulsarConsumerTest (#8691)
---
 .../plugin/stream/pulsar/PulsarConsumerTest.java   | 77 ++++++++++++++--------
 1 file changed, 50 insertions(+), 27 deletions(-)

diff --git 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
index 12c0cf45ad..2b6a16e33d 100644
--- 
a/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
+++ 
b/pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 import org.apache.pinot.spi.stream.MessageBatch;
 import org.apache.pinot.spi.stream.PartitionGroupConsumer;
@@ -164,22 +165,24 @@ public class PulsarConsumerTest {
       throws Exception {
     for (int p = 0; p < NUM_PARTITION; p++) {
       final int partition = p;
-      Producer<String> producer =
-          
_pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC).messageRouter(new 
MessageRouter() {
+      try (Producer<String> producer = 
_pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC)
+          .messageRouter(new MessageRouter() {
             @Override
             public int choosePartition(Message<?> msg, TopicMetadata metadata) 
{
               return partition;
             }
-          }).create();
-
-      for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
-        MessageId messageId = producer.send(MESSAGE_PREFIX + "_" + i);
-        if (!_partitionToFirstMessageIdMap.containsKey(partition)) {
-          _partitionToFirstMessageIdMap.put(partition, messageId);
+          }).create()) {
+        for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
+          MessageId messageId = producer.send(MESSAGE_PREFIX + "_" + i);
+          if (!_partitionToFirstMessageIdMap.containsKey(partition)) {
+            _partitionToFirstMessageIdMap.put(partition, messageId);
+          }
         }
+        producer.flush();
       }
-
-      producer.flush();
+      waitForCondition(input -> validatePartitionMessageCount(partition, 
NUM_RECORDS_PER_PARTITION), 15 * 1000L,
+          5 * 60 * 1000L, "Failed to consume " + NUM_RECORDS_PER_PARTITION + " 
messages from partition " + partition,
+          true);
     }
   }
 
@@ -187,29 +190,49 @@ public class PulsarConsumerTest {
       throws Exception {
     for (int p = 0; p < NUM_PARTITION; p++) {
       final int partition = p;
-      Producer<String> producer =
+      try (Producer<String> producer =
           
_pulsarClient.newProducer(Schema.STRING).topic(TEST_TOPIC_BATCH).messageRouter(new
 MessageRouter() {
             @Override
             public int choosePartition(Message<?> msg, TopicMetadata metadata) 
{
               return partition;
             }
-          }).batchingMaxMessages(BATCH_SIZE).batchingMaxPublishDelay(1, 
TimeUnit.SECONDS).create();
-
-      for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
-        CompletableFuture<MessageId> messageIdCompletableFuture = 
producer.sendAsync(MESSAGE_PREFIX + "_" + i);
-        messageIdCompletableFuture.thenAccept(messageId -> {
-
-          List<BatchMessageIdImpl> batchMessageIdList = 
_partitionToMessageIdMapping
-              .getOrDefault(partition, new ArrayList<>());
-          batchMessageIdList.add((BatchMessageIdImpl) messageId);
-          _partitionToMessageIdMapping.put(partition, batchMessageIdList);
-
-          if (!_partitionToFirstMessageIdMapBatch.containsKey(partition)) {
-            _partitionToFirstMessageIdMapBatch.put(partition, messageId);
-          }
-        });
+          }).batchingMaxMessages(BATCH_SIZE).batchingMaxPublishDelay(1, 
TimeUnit.SECONDS).create()) {
+        for (int i = 0; i < NUM_RECORDS_PER_PARTITION; i++) {
+          CompletableFuture<MessageId> messageIdCompletableFuture = 
producer.sendAsync(MESSAGE_PREFIX + "_" + i);
+          messageIdCompletableFuture.thenAccept(messageId -> {
+
+            List<BatchMessageIdImpl> batchMessageIdList = 
_partitionToMessageIdMapping
+                .getOrDefault(partition, new ArrayList<>());
+            batchMessageIdList.add((BatchMessageIdImpl) messageId);
+            _partitionToMessageIdMapping.put(partition, batchMessageIdList);
+
+            if (!_partitionToFirstMessageIdMapBatch.containsKey(partition)) {
+              _partitionToFirstMessageIdMapBatch.put(partition, messageId);
+            }
+          });
+        }
+        producer.flush();
       }
-      producer.flush();
+      waitForCondition(input -> validatePartitionMessageCount(partition, 
NUM_RECORDS_PER_PARTITION), 15 * 1000L,
+          5 * 60 * 1000L, "Failed to consume " + NUM_RECORDS_PER_PARTITION + " 
messages from partition " + partition,
+          true);
+    }
+  }
+
+  private boolean validatePartitionMessageCount(int partition, int 
expectedMsgCount) {
+    final PartitionGroupConsumer consumer = 
StreamConsumerFactoryProvider.create(getStreamConfig(TEST_TOPIC))
+        .createPartitionGroupConsumer(CLIENT_ID,
+            new PartitionGroupConsumptionStatus(partition, 1, new 
MessageIdStreamOffset(MessageId.earliest), null,
+                "CONSUMING"));
+    try {
+      final MessageBatch messageBatch = consumer.fetchMessages(new 
MessageIdStreamOffset(MessageId.earliest),
+          new 
MessageIdStreamOffset(getMessageIdForPartitionAndIndex(partition, 
expectedMsgCount)),
+          CONSUMER_FETCH_TIMEOUT_MILLIS);
+      System.out.println(
+          "Partition: " + partition + ", Consumed messageBatch count = " + 
messageBatch.getMessageCount());
+      return messageBatch.getMessageCount() == expectedMsgCount;
+    } catch (TimeoutException e) {
+      return false;
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to