This is an automated email from the ASF dual-hosted git repository.

fjtiradosarti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git


The following commit(s) were added to refs/heads/main by this push:
     new 9675f4e20 [Fix_#2188] Enhance readability of waitForEvent method in 
Kafka IT class (#2189)
9675f4e20 is described below

commit 9675f4e20a06b617a96a03e789faa3d138a6a442
Author: Gonzalo Muñoz <[email protected]>
AuthorDate: Fri Feb 14 17:21:08 2025 +0100

    [Fix_#2188] Enhance readability of waitForEvent method in Kafka IT class 
(#2189)
---
 .../kogito/it/jobs/KafkaBaseSwitchStateTimeoutsIT.java  | 17 ++++-------------
 1 file changed, 4 insertions(+), 13 deletions(-)

diff --git 
a/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-common-quarkus/src/test/java/org/kie/kogito/it/jobs/KafkaBaseSwitchStateTimeoutsIT.java
 
b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-common-quarkus/src/test/java/org/kie/kogito/it/jobs/KafkaBaseSwitchStateTimeoutsIT.java
index 20ab7530b..b9303dd3a 100644
--- 
a/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-common-quarkus/src/test/java/org/kie/kogito/it/jobs/KafkaBaseSwitchStateTimeoutsIT.java
+++ 
b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-common-quarkus/src/test/java/org/kie/kogito/it/jobs/KafkaBaseSwitchStateTimeoutsIT.java
@@ -18,9 +18,8 @@
  */
 package org.kie.kogito.it.jobs;
 
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
 
 import org.eclipse.microprofile.config.ConfigProvider;
 import org.junit.jupiter.api.AfterEach;
@@ -30,8 +29,6 @@ import 
org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource;
 
 import io.restassured.path.json.JsonPath;
 
-import static org.assertj.core.api.Assertions.assertThat;
-
 public class KafkaBaseSwitchStateTimeoutsIT extends BaseSwitchStateTimeoutsIT {
 
     private KafkaTestClient kafkaClient;
@@ -49,15 +46,9 @@ public class KafkaBaseSwitchStateTimeoutsIT extends 
BaseSwitchStateTimeoutsIT {
     }
 
     private static JsonPath waitForEvent(KafkaTestClient kafkaClient, String 
topic, long seconds) throws Exception {
-        final CountDownLatch countDownLatch = new CountDownLatch(1);
-        final AtomicReference<String> cloudEvent = new AtomicReference<>();
-        kafkaClient.consume(topic, rawCloudEvent -> {
-            cloudEvent.set(rawCloudEvent);
-            countDownLatch.countDown();
-        });
-        // give some time to consume the event.
-        assertThat(countDownLatch.await(seconds, TimeUnit.SECONDS)).isTrue();
-        return new JsonPath(cloudEvent.get());
+       CompletableFuture<String> cloudEvent = new CompletableFuture<>();
+        kafkaClient.consume(topic, cloudEvent::complete);
+        return new JsonPath(cloudEvent.orTimeout(seconds, 
TimeUnit.SECONDS).get());
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to