JiriOndrusek commented on code in PR #5184: URL: https://github.com/apache/camel-quarkus/pull/5184#discussion_r1294304159
########## integration-tests/splunk/src/test/java/org/apache/camel/quarkus/component/splunk/it/SplunkTest.java: ########## @@ -16,138 +16,153 @@ */ package org.apache.camel.quarkus.component.splunk.it; -import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; +import java.util.function.Consumer; import io.quarkus.test.common.QuarkusTestResource; import io.quarkus.test.junit.QuarkusTest; import io.restassured.RestAssured; -import io.restassured.common.mapper.TypeRef; import io.restassured.http.ContentType; +import org.apache.camel.component.splunk.ProducerType; import org.apache.camel.util.CollectionHelper; import org.eclipse.microprofile.config.ConfigProvider; -import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.apache.commons.lang3.RandomStringUtils; +import org.testcontainers.shaded.org.awaitility.Awaitility; import static org.hamcrest.Matchers.anyOf; -import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; @QuarkusTest @QuarkusTestResource(SplunkTestResource.class) class SplunkTest { @Test - public void testWriteTcpAndReadNormal() { - write("_normal", SplunkTestResource.TEST_INDEX, "tcp"); - - List<Map<String, String>> result = RestAssured.given() - .contentType(ContentType.TEXT) - .body(String.format( - "search index=%s sourcetype=%s | rex field=_raw \"Name: (?<name>.*) From: (?<from>.*)\"", - SplunkTestResource.TEST_INDEX, SplunkResource.SOURCE_TYPE)) - .post("/splunk/normal") - .then() - .statusCode(200) - .extract().as(new TypeRef<>() { - }); + public void testNormalSearchWithSubmitWithRawData() { + String suffix = "_normalSearchOfSubmit"; - Assertions.assertEquals(3, result.size()); - Assertions.assertEquals("Irma_normal", result.get(0).get("name")); - Assertions.assertEquals("Earth\"", result.get(0).get("from")); - Assertions.assertEquals("Leonard_normal", result.get(1).get("name")); - Assertions.assertEquals("Earth 2.0\"", result.get(1).get("from")); - Assertions.assertEquals("Sheldon_normal", result.get(2).get("name")); - Assertions.assertEquals("Alpha Centauri\"", result.get(2).get("from")); - } + write(suffix, ProducerType.SUBMIT, 0, true); - @Test - public void testWriteSubmitAndReadRealtime() throws InterruptedException, ExecutionException { + Awaitility.await().pollInterval(1000, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS).until( + () -> { - RestAssured.given() - .body(String.format( - "search index=%s sourcetype=%s | rex field=_raw \"Name: (?<name>.*) From: (?<from>.*)\"", - SplunkTestResource.TEST_INDEX, SplunkResource.SOURCE_TYPE)) - .post("/splunk/startRealtimePolling"); - - //wait some time to start polling - TimeUnit.SECONDS.sleep(3); - write("_realtime1", SplunkTestResource.TEST_INDEX, "submit"); - TimeUnit.SECONDS.sleep(1); - write("_realtime2", SplunkTestResource.TEST_INDEX, "submit"); - TimeUnit.SECONDS.sleep(1); - write("_realtime3", SplunkTestResource.TEST_INDEX, "submit"); - //wait some time to gather the pulls from splunk server - TimeUnit.SECONDS.sleep(3); - //there should be some data from realtime search in direct (concrete values depends on the speed of writing into index) - //test is asserting that there are some - RestAssured.get("/splunk/directRealtimePolling") - .then() - .statusCode(200) - .body(containsString("_realtime")); + String result = RestAssured.given() + .contentType(ContentType.TEXT) + .post("/splunk/results/normalSearch") + .then() + .statusCode(200) + .extract().asString(); + + return result.contains("Name: Sheldon" + suffix) + && result.contains("Name: Leonard" + suffix) + && result.contains("Name: Irma" + suffix); + }); } @Test - public void testWriteStreamAndReadSaved() throws InterruptedException { - int defaultPort = RestAssured.port; - String defaultUri = RestAssured.baseURI; - + public void testSavedSearchWithTcp() throws InterruptedException { + String suffix = "_SavedSearchOfTcp"; //create saved search RestAssured.given() .baseUri("http://localhost") .port(ConfigProvider.getConfig().getValue(SplunkResource.PARAM_REMOTE_PORT, Integer.class)) .contentType(ContentType.JSON) - .param("name", SplunkTestResource.SAVED_SEARCH_NAME) + .param("name", SplunkResource.SAVED_SEARCH_NAME) .param("disabled", "0") - .param("description", "descritionText") + .param("description", "descriptionText") .param("search", - "index=" + SplunkTestResource.TEST_INDEX + " sourcetype=" + SplunkResource.SOURCE_TYPE) + "sourcetype=\"TCP\" | rex field=_raw \"Name: (?<name>.*) From: (?<from>.*)\"") .post("/services/saved/searches") .then() .statusCode(anyOf(is(201), is(409))); - write("_s", SplunkTestResource.TEST_INDEX, "stream"); - RestAssured.given() - .contentType(ContentType.TEXT) - .body(SplunkTestResource.SAVED_SEARCH_NAME) - .post("/splunk/savedSearch") - .then() - .statusCode(200) - .body(containsString("Name: Sheldon_s")) - .body(containsString("Name: Leonard_s")) - .body(containsString("Name: Irma_s")); - } - - private void write(String suffix, String index, String endpoint) { - write(CollectionHelper.mapOf("entity", "Name: Sheldon" + suffix + " From: Alpha Centauri"), "submit", - index); - write(CollectionHelper.mapOf("entity", "Name: Leonard" + suffix + " From: Earth 2.0"), "submit", - index); - write(CollectionHelper.mapOf("entity", "Name: Irma" + suffix + " From: Earth"), "submit", index); + //write data via tcp + write(suffix, ProducerType.TCP, 0, false); + + //there might by delay in receiving the data + Awaitility.await().pollInterval(1000, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS).until( + () -> { + String result = RestAssured.given() + .contentType(ContentType.TEXT) + .post("/splunk/results/savedSearch") + .then() + .statusCode(200) + .extract().asString(); + + return result.contains("Name: Sheldon" + suffix) + && result.contains("Name: Leonard" + suffix) + && result.contains("Name: Irma" + suffix); + }); } - private void write(Map<String, String> data, String endpoint, String index) { + @Test + public void testStreamForRealtime() throws InterruptedException, ExecutionException { + String suffix = "_RealtimeSearchOfStream"; + //there is a buffer for stream writing, therefore about 1MB of data has to be written into Splunk + + //data are written in separated thread + ExecutorService executor = Executors.newSingleThreadExecutor(); + //execute component server to wait for the result + Future futureResult = executor.submit( + () -> { + for (int i = 0; i < 5000; i++) { + write(suffix + i, ProducerType.STREAM, 100, false); + } + }); - String expectedResult = expectedResult(data); + try { + Awaitility.await().pollInterval(1000, TimeUnit.MILLISECONDS).atMost(60, TimeUnit.SECONDS).until( + () -> { + + String result = RestAssured.given() + .contentType(ContentType.TEXT) + .post("/splunk/results/realtimeSearch") + .then() + .statusCode(200) + .extract().asString(); + + return result.contains("Name: Sheldon" + suffix) + && result.contains("Name: Leonard" + suffix) + && result.contains("Name: Irma" + suffix); + }); + } finally { + futureResult.cancel(true); + } + } - RestAssured.given() + private void write(String suffix, ProducerType producerType, int lengthOfRandomString, boolean raw) { + Consumer<Map> write = data -> RestAssured.given() .contentType(ContentType.JSON) - .queryParam("index", index) + .queryParam("index", SplunkTestResource.TEST_INDEX) .body(data) - .post("/splunk/" + endpoint) + .post("/splunk/write/" + producerType.name()) .then() - .statusCode(201) - .body(containsString(expectedResult)); + .statusCode(201); + + Map data[] = new Map[3]; Review Comment: TBH I just thought, that it is not necessary to verify it, as the status code 201 means, that insert was done (the data is then verified in the tests, but I can add it back) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@camel.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org