OmniaGM commented on code in PR #16757:
URL: https://github.com/apache/kafka/pull/16757#discussion_r1700375840


##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -153,6 +154,49 @@ public void start() {
             producerProps.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
"SSL");
         }
         producer = new KafkaProducer<>(producerProps, new 
ByteArraySerializer(), new ByteArraySerializer());
+
+        verifyClusterReadiness();
+    }
+
+    /**
+     * Perform an extended check to ensure that the primary APIs of the 
cluster are available, including:
+     * <ul>
+     *     <li>Ability to create a topic</li>
+     *     <li>Ability to produce to a topic</li>
+     *     <li>Ability to form a consumer group</li>
+     *     <li>Ability to consume from a topic</li>
+     * </ul>
+     * If this method completes successfully, all resources created to verify 
the cluster health
+     * (such as topics and consumer groups) will be cleaned up before it 
returns.
+     * <p>
+     * This provides extra guarantees compared to other cluster readiness 
checks such as
+     * {@link ConnectAssertions#assertExactlyNumBrokersAreUp(int, String)} and
+     * {@link KafkaClusterTestKit#waitForReadyBrokers()}, which verify that 
brokers have
+     * completed startup and joined the cluster, but do not verify that the 
internal consumer
+     * offsets topic has been created or that it's actually possible for users 
to create and
+     * interact with topics.
+     */
+    public void verifyClusterReadiness() {
+        String consumerGroupId = UUID.randomUUID().toString();
+        Map<String, Object> consumerConfig = 
Collections.singletonMap(GROUP_ID_CONFIG, consumerGroupId);
+        String topic = "consumer-warmup-" + consumerGroupId;
+
+        createTopic(topic);
+        produce(topic, "warmup message key", "warmup message value");
+
+        try (Consumer<?, ?> consumer = 
createConsumerAndSubscribeTo(consumerConfig, topic)) {
+            ConsumerRecords<?, ?> records = 
consumer.poll(Duration.ofMillis(GROUP_COORDINATOR_AVAILABILITY_DURATION_MS));
+            if (records.isEmpty()) {
+                throw new AssertionError("Failed to verify availability of 
group coordinator and produce/consume APIs on Kafka cluster in time");
+            }
+        }
+
+        try (Admin admin = createAdminClient()) {
+            
admin.deleteConsumerGroups(Collections.singleton(consumerGroupId)).all().get(30,
 TimeUnit.SECONDS);
+            admin.deleteTopics(Collections.singleton(topic)).all().get(30, 
TimeUnit.SECONDS);
+        } catch (InterruptedException | ExecutionException | TimeoutException 
e) {
+            throw new AssertionError("Failed to clean up cluster health check 
resource(s)", e);

Review Comment:
   Same btw can be done for `produce` similar to 
`DeleteOffsetsConsumerGroupCommandIntegrationTest::produceRecord`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to