Fixed the producer which now should be handling the async and sync flows in an improved way.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d0bd0f74 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d0bd0f74 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d0bd0f74 Branch: refs/heads/master Commit: d0bd0f7449f63893b0f33a3444d91a4a8e0f1acb Parents: 8757400 Author: gilfernandes <gil.fernan...@gmail.com> Authored: Tue Nov 22 15:04:17 2016 +0000 Committer: Andrea Cosentino <anco...@gmail.com> Committed: Fri Nov 25 10:05:36 2016 +0100 ---------------------------------------------------------------------- .../component/firebase/FirebaseEndpoint.java | 14 ++++----- .../component/firebase/FirebaseProducer.java | 31 +++++++++---------- .../firebase/FirebaseProducerTest.java | 32 +++++++++++++++----- 3 files changed, 47 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d0bd0f74/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java index 709785e..41d7a61 100644 --- a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java +++ b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseEndpoint.java @@ -52,10 +52,10 @@ public class FirebaseEndpoint extends DefaultEndpoint { @Metadata(required = "false") private String keyName = "firebaseKey"; - @UriParam(defaultValue = "async", description = "If true, the save or update request (set value in Firebase terms) " + @UriParam(defaultValue = "reply", description = "If true, the save or update request (set value in Firebase terms) " + "is fired and the reply will be ignored, else the routing thread will wait and the reply will be saved in the exchange message") @Metadata(required = "false") - private boolean async; + private boolean reply; public FirebaseEndpoint(String uri, FirebaseComponent firebaseComponent, FirebaseConfig firebaseConfig) { super(uri, firebaseComponent); @@ -64,7 +64,7 @@ public class FirebaseEndpoint extends DefaultEndpoint { this.setServiceAccountFile(firebaseConfig.getServiceAccountFile()); this.databaseUrl = firebaseConfig.getDatabaseUrl(); final String keyName = firebaseConfig.getKeyName(); - this.setAsync(firebaseConfig.isAsync()); + this.setReply(firebaseConfig.isAsync()); if (keyName != null) { this.setKeyName(keyName); } @@ -110,12 +110,12 @@ public class FirebaseEndpoint extends DefaultEndpoint { this.keyName = keyName; } - public boolean isAsync() { - return async; + public boolean isReply() { + return reply; } - public void setAsync(boolean async) { - this.async = async; + public void setReply(boolean reply) { + this.reply = reply; } public FirebaseApp getFirebaseApp() { http://git-wip-us.apache.org/repos/asf/camel/blob/d0bd0f74/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java index ef8f8a6..33ba39b 100644 --- a/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java +++ b/components/camel-firebase/src/main/java/org/apache/camel/component/firebase/FirebaseProducer.java @@ -22,7 +22,6 @@ import com.google.firebase.database.FirebaseDatabase; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; -import org.apache.camel.Processor; import org.apache.camel.component.firebase.exception.DatabaseErrorException; import org.apache.camel.impl.DefaultAsyncProducer; import org.slf4j.Logger; @@ -43,35 +42,35 @@ public class FirebaseProducer extends DefaultAsyncProducer { rootReference = endpoint.getRootReference(); } - /** - * Processes the message exchange. - * Similar to {@link Processor#process}, but the caller supports having the exchange asynchronously processed. - * <p/> - * If there was a failure processing then the caused {@link Exception} would be set on the {@link Exchange}. - * - * @param exchange the message exchange - * @param callback the {@link AsyncCallback} will be invoked when the processing of the exchange is completed. - * If the exchange is completed synchronously, then the callback is also invoked synchronously. - * The callback should therefore be careful of starting recursive loop. - * @return (doneSync) <tt>true</tt> to continue execute synchronously, <tt>false</tt> to continue being executed asynchronously - */ @Override public boolean process(Exchange exchange, AsyncCallback callback) { final Message in = exchange.getIn(); + final Message out = exchange.getOut(); String firebaseKey = (String) in.getHeader(endpoint.getKeyName()); Object value = in.getBody(); DatabaseReference ref = FirebaseDatabase .getInstance(endpoint.getFirebaseApp()) .getReference(rootReference).child(firebaseKey); + final boolean reply = endpoint.isReply(); + out.setHeaders(in.getHeaders()); + if (reply) { // Wait for reply + processReply(exchange, callback, value, ref); + } else { // Fire and forget + ref.setValue(value); + out.setBody(in.getBody()); + callback.done(true); + } + return !reply; + } + + private void processReply(Exchange exchange, AsyncCallback callback, Object value, DatabaseReference ref) { ref.setValue(value, (DatabaseError databaseError, DatabaseReference databaseReference) -> { if (databaseError != null) { exchange.setException(new DatabaseErrorException(databaseError)); - exchange.getOut().setFault(true); } else { exchange.getOut().setBody(databaseReference); } - callback.done(endpoint.isAsync()); + callback.done(false); }); - return endpoint.isAsync(); } } http://git-wip-us.apache.org/repos/asf/camel/blob/d0bd0f74/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java b/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java index f0f2cc3..973b2ee 100644 --- a/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java +++ b/components/camel-firebase/src/test/java/org/apache/camel/component/firebase/FirebaseProducerTest.java @@ -22,6 +22,9 @@ import java.nio.file.Paths; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import static java.util.stream.IntStream.range; +import static junit.framework.TestCase.fail; + import com.google.firebase.database.DatabaseReference; import org.apache.camel.CamelContext; import org.apache.camel.Message; @@ -52,16 +55,28 @@ public class FirebaseProducerTest { } @Test - public void whenFirebaseSetShouldReceiveMessagesSync() throws Exception { - startRoute(false, DatabaseReference.class); + public void whenFirebaseSetShouldReceiveMessageAsDBReference() throws Exception { + startRoute(true, DatabaseReference.class); } @Test - public void whenFirebaseSetShouldReceiveMessagesAsync() throws Exception { - startRoute(true, String.class); + public void whenFirebaseSetShouldReceiveMessageAsDbString() throws Exception { + startRoute(false, String.class); } - private void startRoute(final boolean async, final Class<?> expectedBodyClass) throws Exception { + @Test + public void whenMultipleFirebaseSetShouldReceiveExpectedMessages() { + range(0, 10).forEach(i -> { + try { + startRoute(true, DatabaseReference.class); + startRoute(false, String.class); + } catch (Exception e) { + fail("Multiple test fails: " + e); + } + }); + } + + private void startRoute(final boolean reply, final Class<?> expectedBodyClass) throws Exception { sampleInputProvider.copySampleFile(); CamelContext context = new DefaultCamelContext(); context.addRoutes(new RouteBuilder() { @@ -78,11 +93,14 @@ public class FirebaseProducerTest { out.setHeader("firebaseKey", keyValue[0]); out.setBody(keyValue[1].trim()); }) - .to(String.format("firebase://%s?rootReference=%s&serviceAccountFile=%s&async=%b", - ConfigurationProvider.createDatabaseUrl(), rootReference, serviceAccountFile, async)) + .to(String.format("firebase://%s?rootReference=%s&serviceAccountFile=%s&reply=%b", + ConfigurationProvider.createDatabaseUrl(), rootReference, serviceAccountFile, reply)) .to("log:whenFirebaseSet?level=WARN") .process(exchange1 -> { assertThat(exchange1.getIn().getBody().getClass()).isEqualTo(expectedBodyClass); + if (reply) { + assertThat(exchange1.getIn().getHeader("firebaseKey")).isNotNull(); + } try { reentrantLock.lock(); wake.signal();