xiangfu0 commented on code in PR #8554: URL: https://github.com/apache/pinot/pull/8554#discussion_r851847264
########## pinot-plugins/pinot-stream-ingestion/pinot-pulsar/src/test/java/org/apache/pinot/plugin/stream/pulsar/PulsarConsumerTest.java: ########## @@ -57,45 +68,90 @@ public class PulsarConsumerTest { public static final int NUM_PARTITION = 1; public static final int NUM_RECORDS_PER_PARTITION = 1000; public static final int BATCH_SIZE = 10; + public static final int CONSUMER_FETCH_TIMEOUT_MILLIS = (int) Duration.ofMinutes(5).toMillis(); private PulsarClient _pulsarClient; - private PulsarStandaloneCluster _pulsarStandaloneCluster; + private PulsarContainer _pulsar = null; private HashMap<Integer, MessageId> _partitionToFirstMessageIdMap = new HashMap<>(); private HashMap<Integer, MessageId> _partitionToFirstMessageIdMapBatch = new HashMap<>(); @BeforeClass public void setUp() throws Exception { try { - _pulsarStandaloneCluster = new PulsarStandaloneCluster(); + _pulsar = new PulsarContainer(PULSAR_IMAGE).withStartupTimeout(Duration.ofMinutes(5)); + _pulsar.start(); - _pulsarStandaloneCluster.start(); + // Waiting for namespace to be created. + // There should be a better approach. + Thread.sleep(20 * 1000L); - PulsarAdmin admin = - PulsarAdmin.builder().serviceHttpUrl("http://localhost:" + _pulsarStandaloneCluster.getAdminPort()).build(); + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(_pulsar.getHttpServiceUrl()).build(); - String bootstrapServer = "pulsar://localhost:" + _pulsarStandaloneCluster.getBrokerPort(); + String bootstrapServer = _pulsar.getPulsarBrokerUrl(); _pulsarClient = PulsarClient.builder().serviceUrl(bootstrapServer).build(); - admin.topics().createPartitionedTopic(TEST_TOPIC, NUM_PARTITION); - admin.topics().createPartitionedTopic(TEST_TOPIC_BATCH, NUM_PARTITION); + createTopics(admin); publishRecords(); publishRecordsBatch(); + + waitForMessagesToPublish(admin, TEST_TOPIC); + waitForMessagesToPublish(admin, TEST_TOPIC_BATCH); + + admin.close(); } catch (Exception e) { - if (_pulsarStandaloneCluster != null) { - _pulsarStandaloneCluster.stop(); + if (_pulsar != null) { + _pulsar.stop(); + _pulsar = null; } throw new RuntimeException("Failed to setUp test environment", e); } } + private void createTopics(PulsarAdmin admin) + throws PulsarAdminException { + InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(); + inactiveTopicPolicies.setDeleteWhileInactive(false); + admin.namespaces().setInactiveTopicPolicies("public/default", inactiveTopicPolicies); + + admin.topics().createPartitionedTopic(TEST_TOPIC, NUM_PARTITION); + admin.topics().createPartitionedTopic(TEST_TOPIC_BATCH, NUM_PARTITION); + } + + private void waitForMessagesToPublish(PulsarAdmin admin, String topicName) { + waitForCondition(new Function<Void, Boolean>() { Review Comment: We already have `TestUtils.waitForCondition`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org