This is an automated email from the ASF dual-hosted git repository.
fjtiradosarti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-kie-kogito-apps.git
The following commit(s) were added to refs/heads/main by this push:
new 9675f4e20 [Fix_#2188] Enhance readability of waitForEvent method in
Kafka IT class (#2189)
9675f4e20 is described below
commit 9675f4e20a06b617a96a03e789faa3d138a6a442
Author: Gonzalo Muñoz <[email protected]>
AuthorDate: Fri Feb 14 17:21:08 2025 +0100
[Fix_#2188] Enhance readability of waitForEvent method in Kafka IT class
(#2189)
---
.../kogito/it/jobs/KafkaBaseSwitchStateTimeoutsIT.java | 17 ++++-------------
1 file changed, 4 insertions(+), 13 deletions(-)
diff --git
a/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-common-quarkus/src/test/java/org/kie/kogito/it/jobs/KafkaBaseSwitchStateTimeoutsIT.java
b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-common-quarkus/src/test/java/org/kie/kogito/it/jobs/KafkaBaseSwitchStateTimeoutsIT.java
index 20ab7530b..b9303dd3a 100644
---
a/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-common-quarkus/src/test/java/org/kie/kogito/it/jobs/KafkaBaseSwitchStateTimeoutsIT.java
+++
b/apps-integration-tests/integration-tests-jobs-service/integration-tests-jobs-service-quarkus/integration-tests-jobs-service-common-quarkus/src/test/java/org/kie/kogito/it/jobs/KafkaBaseSwitchStateTimeoutsIT.java
@@ -18,9 +18,8 @@
*/
package org.kie.kogito.it.jobs;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.microprofile.config.ConfigProvider;
import org.junit.jupiter.api.AfterEach;
@@ -30,8 +29,6 @@ import
org.kie.kogito.testcontainers.quarkus.KafkaQuarkusTestResource;
import io.restassured.path.json.JsonPath;
-import static org.assertj.core.api.Assertions.assertThat;
-
public class KafkaBaseSwitchStateTimeoutsIT extends BaseSwitchStateTimeoutsIT {
private KafkaTestClient kafkaClient;
@@ -49,15 +46,9 @@ public class KafkaBaseSwitchStateTimeoutsIT extends
BaseSwitchStateTimeoutsIT {
}
private static JsonPath waitForEvent(KafkaTestClient kafkaClient, String
topic, long seconds) throws Exception {
- final CountDownLatch countDownLatch = new CountDownLatch(1);
- final AtomicReference<String> cloudEvent = new AtomicReference<>();
- kafkaClient.consume(topic, rawCloudEvent -> {
- cloudEvent.set(rawCloudEvent);
- countDownLatch.countDown();
- });
- // give some time to consume the event.
- assertThat(countDownLatch.await(seconds, TimeUnit.SECONDS)).isTrue();
- return new JsonPath(cloudEvent.get());
+ CompletableFuture<String> cloudEvent = new CompletableFuture<>();
+ kafkaClient.consume(topic, cloudEvent::complete);
+ return new JsonPath(cloudEvent.orTimeout(seconds,
TimeUnit.SECONDS).get());
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]