This is an automated email from the ASF dual-hosted git repository.

aldettinger pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git


The following commit(s) were added to refs/heads/main by this push:
     new 14fcebe  Salesforce: Expand Consumer integration tests : add tests for 
Streaming API
14fcebe is described below

commit 14fcebea86410ed83479344902f3bc8ff1cc2c66
Author: Zineb Bendhiba <bendhiba.zi...@gmail.com>
AuthorDate: Mon Aug 9 15:40:20 2021 +0200

    Salesforce: Expand Consumer integration tests : add tests for Streaming API
---
 .../salesforce/deployment/SalesforceProcessor.java |  6 ++++
 .../component/salesforce/SalesforceResource.java   | 40 ++++++++++++++++++++++
 .../component/salesforce/SalesforceRoutes.java     | 16 +++++++++
 .../salesforce/SalesforceIntegrationTest.java      | 39 +++++++++++++++++++--
 4 files changed, 99 insertions(+), 2 deletions(-)

diff --git 
a/extensions/salesforce/deployment/src/main/java/org/apache/camel/quarkus/component/salesforce/deployment/SalesforceProcessor.java
 
b/extensions/salesforce/deployment/src/main/java/org/apache/camel/quarkus/component/salesforce/deployment/SalesforceProcessor.java
index 5017515..3aacea1 100644
--- 
a/extensions/salesforce/deployment/src/main/java/org/apache/camel/quarkus/component/salesforce/deployment/SalesforceProcessor.java
+++ 
b/extensions/salesforce/deployment/src/main/java/org/apache/camel/quarkus/component/salesforce/deployment/SalesforceProcessor.java
@@ -23,6 +23,7 @@ import 
io.quarkus.deployment.builditem.ExtensionSslNativeSupportBuildItem;
 import io.quarkus.deployment.builditem.FeatureBuildItem;
 import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
 import org.apache.camel.component.salesforce.api.dto.AbstractDTOBase;
+import org.apache.camel.component.salesforce.internal.dto.PushTopic;
 import org.jboss.jandex.DotName;
 import org.jboss.jandex.IndexView;
 
@@ -58,8 +59,13 @@ class SalesforceProcessor {
                 .stream()
                 .map(classInfo -> classInfo.name().toString())
                 .filter(className -> 
className.startsWith("org.apache.camel.component.salesforce.internal.dto"))
+                // it is registred below with fields accessible
+                .filter(className -> className != PushTopic.class.getName())
                 .toArray(String[]::new);
 
         reflectiveClass.produce(new ReflectiveClassBuildItem(true, false, 
internalDtoClasses));
+
+        // enabling the search for private fields : related to issue 
https://issues.apache.org/jira/browse/CAMEL-16860
+        reflectiveClass.produce(new ReflectiveClassBuildItem(true, true, 
PushTopic.class));
     }
 }
diff --git 
a/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceResource.java
 
b/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceResource.java
index 0520bdd..6f4d184 100644
--- 
a/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceResource.java
+++ 
b/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceResource.java
@@ -52,6 +52,8 @@ import 
org.apache.camel.component.salesforce.api.dto.bulk.ContentType;
 import org.apache.camel.component.salesforce.api.dto.bulk.JobInfo;
 import org.apache.camel.component.salesforce.api.dto.bulk.OperationEnum;
 import org.apache.camel.component.salesforce.api.utils.QueryHelper;
+import org.apache.camel.component.salesforce.internal.dto.PushTopic;
+import 
org.apache.camel.component.salesforce.internal.dto.QueryRecordsPushTopic;
 import org.apache.camel.quarkus.component.salesforce.generated.Account;
 import 
org.apache.camel.quarkus.component.salesforce.generated.QueryRecordsAccount;
 import 
org.apache.camel.quarkus.component.salesforce.model.GlobalObjectsAndHeaders;
@@ -249,4 +251,42 @@ public class SalesforceResource {
         return template.to("salesforce:limits").request(Limits.class);
     }
 
+    @Path("streaming")
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    public String getSubscribedObjects() {
+        Account account = consumerTemplate.receiveBody("seda:CamelTestTopic", 
10000, Account.class);
+        return account.getName();
+    }
+
+    @Path("streaming/raw")
+    @GET
+    @Produces(MediaType.APPLICATION_JSON)
+    public String getRawSubscribedObjects() {
+        return consumerTemplate.receiveBody("seda:RawPayloadCamelTestTopic", 
10000, String.class);
+    }
+
+    @Path("/topic/{id}")
+    @DELETE
+    public Response deleteTopic(@PathParam("id") String topicId) {
+        PushTopic topic = new PushTopic();
+        topic.setId(topicId);
+
+        template.to("salesforce:deleteSObject")
+                .withBody(topic)
+                .send();
+
+        return Response.noContent().build();
+    }
+
+    @Path("/topic")
+    @GET
+    public String getTopicId() {
+        QueryRecordsPushTopic queryRecordsPushTopic = template
+                .to("salesforce:query?sObjectQuery=SELECT Id FROM PushTopic 
WHERE Name = 'CamelTestTopic'&"
+                        + "sObjectClass=" + 
QueryRecordsPushTopic.class.getName())
+                .request(QueryRecordsPushTopic.class);
+
+        return queryRecordsPushTopic.getRecords().get(0).getId();
+    }
 }
diff --git 
a/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceRoutes.java
 
b/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceRoutes.java
index 387e701..66658b8 100644
--- 
a/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceRoutes.java
+++ 
b/integration-tests/salesforce/src/main/java/org/apache/camel/quarkus/component/salesforce/SalesforceRoutes.java
@@ -24,6 +24,7 @@ import javax.inject.Named;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.salesforce.AuthenticationType;
 import org.apache.camel.component.salesforce.SalesforceComponent;
+import org.apache.camel.quarkus.component.salesforce.generated.Account;
 import org.eclipse.microprofile.config.ConfigProvider;
 import org.eclipse.microprofile.config.inject.ConfigProperty;
 
@@ -69,8 +70,23 @@ public class SalesforceRoutes extends RouteBuilder {
         Optional<String> wireMockUrl = 
ConfigProvider.getConfig().getOptionalValue("wiremock.url", String.class);
         // Wiremock used only with Templates - this Route is used only with 
Salesforce credentials
         if (!wireMockUrl.isPresent()) {
+
+            // Change Data Capture
             
from("salesforce:/data/AccountChangeEvent?replayId=-1").routeId("cdc").autoStartup(false)
                     .to("seda:events");
+
+            // Streaming API : topic consumer - getting Account object
+            from("salesforce:CamelTestTopic?notifyForFields=ALL&"
+                    + 
"notifyForOperationCreate=true&notifyForOperationDelete=true&notifyForOperationUpdate=true&"
+                    + "sObjectClass=" + Account.class.getName() + 
"&updateTopic=true&sObjectQuery=SELECT Id, Name FROM Account")
+                            .to("seda:CamelTestTopic");
+
+            // Streaming API : topic consumer with RAW Payload - getting json 
as String
+            
from("salesforce:CamelTestTopic?rawPayload=true&notifyForFields=ALL&"
+                    + 
"notifyForOperationCreate=true&notifyForOperationDelete=true&notifyForOperationUpdate=true&"
+                    + "updateTopic=true&sObjectQuery=SELECT Id, Name FROM 
Account")
+                            .to("seda:RawPayloadCamelTestTopic");
+
         }
     }
 }
diff --git 
a/integration-tests/salesforce/src/test/java/org/apache/camel/quarkus/component/salesforce/SalesforceIntegrationTest.java
 
b/integration-tests/salesforce/src/test/java/org/apache/camel/quarkus/component/salesforce/SalesforceIntegrationTest.java
index 91c68d8..5dffd4e 100644
--- 
a/integration-tests/salesforce/src/test/java/org/apache/camel/quarkus/component/salesforce/SalesforceIntegrationTest.java
+++ 
b/integration-tests/salesforce/src/test/java/org/apache/camel/quarkus/component/salesforce/SalesforceIntegrationTest.java
@@ -23,7 +23,9 @@ import io.restassured.RestAssured;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
 
+import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 
 @EnabledIfEnvironmentVariable(named = "SALESFORCE_USERNAME", matches = ".+")
 @EnabledIfEnvironmentVariable(named = "SALESFORCE_PASSWORD", matches = ".+")
@@ -33,8 +35,9 @@ import static org.hamcrest.Matchers.is;
 public class SalesforceIntegrationTest {
 
     @Test
-    public void testChangeDataCaptureEvents() {
+    public void testCDCAndStreamingEvents() {
         String accountId = null;
+        String topicId = null;
         try {
             // Start the Salesforce CDC consumer
             RestAssured.post("/salesforce/cdc/start")
@@ -42,7 +45,7 @@ public class SalesforceIntegrationTest {
                     .statusCode(200);
 
             // Create an account
-            String accountName = "Camel Quarkus Account Test: " + 
UUID.randomUUID().toString();
+            String accountName = "Camel Quarkus Account Test: " + 
UUID.randomUUID();
             accountId = RestAssured.given()
                     .body(accountName)
                     .post("/salesforce/account")
@@ -58,6 +61,30 @@ public class SalesforceIntegrationTest {
                     .then()
                     .statusCode(200)
                     .body("Name", is(accountName));
+
+            // Verify we can stream the Account as Object
+            RestAssured.given()
+                    .get("/salesforce/streaming")
+                    .then()
+                    .statusCode(200)
+                    .body(equalTo(accountName));
+
+            // Verify we can stream the Account as Raw payload
+            RestAssured.given()
+                    .get("/salesforce/streaming/raw")
+                    .then()
+                    .statusCode(200)
+                    .body("Name", equalTo(accountName));
+
+            // Get the topic ID
+            topicId = RestAssured.given()
+                    .get("/salesforce/topic")
+                    .then()
+                    .statusCode(200)
+                    .extract()
+                    .body()
+                    .asString();
+            assertNotNull(topicId);
         } finally {
             // Shut down the CDC consumer
             RestAssured.post("/salesforce/cdc/stop")
@@ -70,6 +97,14 @@ public class SalesforceIntegrationTest {
                         .then()
                         .statusCode(204);
             }
+
+            // delete the topic
+            if (topicId != null) {
+                RestAssured.delete("/salesforce/topic/" + topicId)
+                        .then()
+                        .statusCode(204);
+            }
         }
     }
+
 }

Reply via email to