This is an automated email from the ASF dual-hosted git repository. aldettinger pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
The following commit(s) were added to refs/heads/main by this push: new 14fcebe Salesforce: Expand Consumer integration tests : add tests for Streaming API 14fcebe is described below commit 14fcebea86410ed83479344902f3bc8ff1cc2c66 Author: Zineb Bendhiba <bendhiba.zi...@gmail.com> AuthorDate: Mon Aug 9 15:40:20 2021 +0200 Salesforce: Expand Consumer integration tests : add tests for Streaming API --- .../salesforce/deployment/SalesforceProcessor.java | 6 ++++ .../component/salesforce/SalesforceResource.java | 40 ++++++++++++++++++++++ .../component/salesforce/SalesforceRoutes.java | 16 +++++++++ .../salesforce/SalesforceIntegrationTest.java | 39 +++++++++++++++++++-- 4 files changed, 99 insertions(+), 2 deletions(-) diff --git a/extensions/salesforce/deployment/src/main/java/org/apache/camel/quarkus/component/salesforce/deployment/SalesforceProcessor.java b/extensions/salesforce/deployment/src/main/java/org/apache/camel/quarkus/component/salesforce/deployment/SalesforceProcessor.java index 5017515..3aacea1 100644 --- a/extensions/salesforce/deployment/src/main/java/org/apache/camel/quarkus/component/salesforce/deployment/SalesforceProcessor.java +++ b/extensions/salesforce/deployment/src/main/java/org/apache/camel/quarkus/component/salesforce/deployment/SalesforceProcessor.java @@ -23,6 +23,7 @@ import io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem; import io.quarkus.deployment.builditem.FeatureBuildItem; import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; import org.apache.camel.component.salesforce.api.dto.AbstractDTOBase; +import org.apache.camel.component.salesforce.internal.dto.PushTopic; import org.jboss.jandex.DotName; import org.jboss.jandex.IndexView; @@ -58,8 +59,13 @@ class SalesforceProcessor { .stream() .map(classInfo -> classInfo.name().toString()) .filter(className -> className.startsWith("org.apache.camel.component.salesforce.internal.dto")) + // it is registred below with fields accessible + .filter(className -> className != PushTopic.class.getName()) .toArray(String[]::new); reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, internalDtoClasses)); + + // enabling the search for private fields : related to issue https://issues.apache.org/jira/browse/CAMEL-16860 + reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, PushTopic.class)); } } diff --git a/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceResource.java b/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceResource.java index 0520bdd..6f4d184 100644 --- a/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceResource.java +++ b/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceResource.java @@ -52,6 +52,8 @@ import org.apache.camel.component.salesforce.api.dto.bulk.ContentType; import org.apache.camel.component.salesforce.api.dto.bulk.JobInfo; import org.apache.camel.component.salesforce.api.dto.bulk.OperationEnum; import org.apache.camel.component.salesforce.api.utils.QueryHelper; +import org.apache.camel.component.salesforce.internal.dto.PushTopic; +import org.apache.camel.component.salesforce.internal.dto.QueryRecordsPushTopic; import org.apache.camel.quarkus.component.salesforce.generated.Account; import org.apache.camel.quarkus.component.salesforce.generated.QueryRecordsAccount; import org.apache.camel.quarkus.component.salesforce.model.GlobalObjectsAndHeaders; @@ -249,4 +251,42 @@ public class SalesforceResource { return template.to("salesforce:limits").request(Limits.class); } + @Path("streaming") + @GET + @Produces(MediaType.APPLICATION_JSON) + public String getSubscribedObjects() { + Account account = consumerTemplate.receiveBody("seda:CamelTestTopic", 10000, Account.class); + return account.getName(); + } + + @Path("streaming/raw") + @GET + @Produces(MediaType.APPLICATION_JSON) + public String getRawSubscribedObjects() { + return consumerTemplate.receiveBody("seda:RawPayloadCamelTestTopic", 10000, String.class); + } + + @Path("/topic/{id}") + @DELETE + public Response deleteTopic(@PathParam("id") String topicId) { + PushTopic topic = new PushTopic(); + topic.setId(topicId); + + template.to("salesforce:deleteSObject") + .withBody(topic) + .send(); + + return Response.noContent().build(); + } + + @Path("/topic") + @GET + public String getTopicId() { + QueryRecordsPushTopic queryRecordsPushTopic = template + .to("salesforce:query?sObjectQuery=SELECT Id FROM PushTopic WHERE Name = 'CamelTestTopic'&" + + "sObjectClass=" + QueryRecordsPushTopic.class.getName()) + .request(QueryRecordsPushTopic.class); + + return queryRecordsPushTopic.getRecords().get(0).getId(); + } } diff --git a/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceRoutes.java b/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceRoutes.java index 387e701..66658b8 100644 --- a/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceRoutes.java +++ b/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceRoutes.java @@ -24,6 +24,7 @@ import javax.inject.Named; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.salesforce.AuthenticationType; import org.apache.camel.component.salesforce.SalesforceComponent; +import org.apache.camel.quarkus.component.salesforce.generated.Account; import org.eclipse.microprofile.config.ConfigProvider; import org.eclipse.microprofile.config.inject.ConfigProperty; @@ -69,8 +70,23 @@ public class SalesforceRoutes extends RouteBuilder { Optional<String> wireMockUrl = ConfigProvider.getConfig().getOptionalValue("wiremock.url", String.class); // Wiremock used only with Templates - this Route is used only with Salesforce credentials if (!wireMockUrl.isPresent()) { + + // Change Data Capture from("salesforce:/data/AccountChangeEvent?replayId=-1").routeId("cdc").autoStartup(false) .to("seda:events"); + + // Streaming API : topic consumer - getting Account object + from("salesforce:CamelTestTopic?notifyForFields=ALL&" + + "notifyForOperationCreate=true¬ifyForOperationDelete=true¬ifyForOperationUpdate=true&" + + "sObjectClass=" + Account.class.getName() + "&updateTopic=true&sObjectQuery=SELECT Id, Name FROM Account") + .to("seda:CamelTestTopic"); + + // Streaming API : topic consumer with RAW Payload - getting json as String + from("salesforce:CamelTestTopic?rawPayload=true¬ifyForFields=ALL&" + + "notifyForOperationCreate=true¬ifyForOperationDelete=true¬ifyForOperationUpdate=true&" + + "updateTopic=true&sObjectQuery=SELECT Id, Name FROM Account") + .to("seda:RawPayloadCamelTestTopic"); + } } } diff --git a/integration-tests/salesforce/src/test/java/org/apache/camel/quarkus/component/salesforce/SalesforceIntegrationTest.java b/integration-tests/salesforce/src/test/java/org/apache/camel/quarkus/component/salesforce/SalesforceIntegrationTest.java index 91c68d8..5dffd4e 100644 --- a/integration-tests/salesforce/src/test/java/org/apache/camel/quarkus/component/salesforce/SalesforceIntegrationTest.java +++ b/integration-tests/salesforce/src/test/java/org/apache/camel/quarkus/component/salesforce/SalesforceIntegrationTest.java @@ -23,7 +23,9 @@ import io.restassured.RestAssured; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertNotNull; @EnabledIfEnvironmentVariable(named = "SALESFORCE_USERNAME", matches = ".+") @EnabledIfEnvironmentVariable(named = "SALESFORCE_PASSWORD", matches = ".+") @@ -33,8 +35,9 @@ import static org.hamcrest.Matchers.is; public class SalesforceIntegrationTest { @Test - public void testChangeDataCaptureEvents() { + public void testCDCAndStreamingEvents() { String accountId = null; + String topicId = null; try { // Start the Salesforce CDC consumer RestAssured.post("/salesforce/cdc/start") @@ -42,7 +45,7 @@ public class SalesforceIntegrationTest { .statusCode(200); // Create an account - String accountName = "Camel Quarkus Account Test: " + UUID.randomUUID().toString(); + String accountName = "Camel Quarkus Account Test: " + UUID.randomUUID(); accountId = RestAssured.given() .body(accountName) .post("/salesforce/account") @@ -58,6 +61,30 @@ public class SalesforceIntegrationTest { .then() .statusCode(200) .body("Name", is(accountName)); + + // Verify we can stream the Account as Object + RestAssured.given() + .get("/salesforce/streaming") + .then() + .statusCode(200) + .body(equalTo(accountName)); + + // Verify we can stream the Account as Raw payload + RestAssured.given() + .get("/salesforce/streaming/raw") + .then() + .statusCode(200) + .body("Name", equalTo(accountName)); + + // Get the topic ID + topicId = RestAssured.given() + .get("/salesforce/topic") + .then() + .statusCode(200) + .extract() + .body() + .asString(); + assertNotNull(topicId); } finally { // Shut down the CDC consumer RestAssured.post("/salesforce/cdc/stop") @@ -70,6 +97,14 @@ public class SalesforceIntegrationTest { .then() .statusCode(204); } + + // delete the topic + if (topicId != null) { + RestAssured.delete("/salesforce/topic/" + topicId) + .then() + .statusCode(204); + } } } + }