Author: davsclaus Date: Fri Apr 5 13:09:04 2013 New Revision: 1464963 URL: http://svn.apache.org/r1464963 Log: CAMEL-6242: Krati component - Should preserve headers
Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiDataStoreRegistration.java camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiHelper.java camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiProducer.java camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/processor/idempotent/KratiIdempotentRepository.java camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/serializer/KratiDefaultSerializer.java camel/trunk/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java?rev=1464963&r1=1464962&r2=1464963&view=diff ============================================================================== --- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java (original) +++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiConsumer.java Fri Apr 5 13:09:04 2013 @@ -40,7 +40,6 @@ public class KratiConsumer extends Sched protected final KratiEndpoint endpoint; protected DataStore<Object, Object> dataStore; - protected int maxMessagesPerPoll = 10; public KratiConsumer(KratiEndpoint endpoint, Processor processor, DataStore<Object, Object> dataStore) { super(endpoint, processor); @@ -88,12 +87,12 @@ public class KratiConsumer extends Sched try { dataStore.delete(exchange.getProperty(KratiConstants.KEY)); } catch (Exception e) { - LOG.warn("Failed to remove from datastore.", e); + LOG.warn("Failed to remove from datastore. This exception is ignored.", e); } } public void onFailure(Exchange exchange) { - //emtpy + // noop } }); Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiDataStoreRegistration.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiDataStoreRegistration.java?rev=1464963&r1=1464962&r2=1464963&view=diff ============================================================================== --- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiDataStoreRegistration.java (original) +++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiDataStoreRegistration.java Fri Apr 5 13:09:04 2013 @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.krati; import java.io.IOException; @@ -42,7 +41,7 @@ public class KratiDataStoreRegistration try { dataStore.close(); } catch (IOException e) { - LOG.warn("Error while closing datastore.", e); + LOG.warn("Error while closing datastore. This exception is ignored.", e); } return true; } else { Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiHelper.java?rev=1464963&r1=1464962&r2=1464963&view=diff ============================================================================== --- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiHelper.java (original) +++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiHelper.java Fri Apr 5 13:09:04 2013 @@ -82,7 +82,7 @@ public final class KratiHelper { try { result = new DynamicDataSet(homeDir, initialCapacity, segmentFactory); } catch (Exception e) { - throw new RuntimeCamelException("Failed to create Krati DataSet.", e); + throw new RuntimeCamelException("Failed to create Krati DataSet. This exception is ignored.", e); } return result; } Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiProducer.java?rev=1464963&r1=1464962&r2=1464963&view=diff ============================================================================== --- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiProducer.java (original) +++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/KratiProducer.java Fri Apr 5 13:09:04 2013 @@ -28,8 +28,8 @@ import org.slf4j.LoggerFactory; public class KratiProducer extends DefaultProducer { private static final transient Logger LOG = LoggerFactory.getLogger(KratiProducer.class); - protected KratiEndpoint endpoint; - protected DataStore<Object, Object> dataStore; + protected final KratiEndpoint endpoint; + protected final DataStore<Object, Object> dataStore; public KratiProducer(KratiEndpoint endpoint, DataStore<Object, Object> dataStore) { super(endpoint); @@ -43,6 +43,9 @@ public class KratiProducer extends Defau LOG.trace("Processing {} operation on '[{}]'", operation, exchange); if (KratiConstants.KRATI_OPERATION_GET.equals(operation) && key != null) { + // preserve headers and attachments + exchange.getOut().setHeaders(exchange.getIn().getHeaders()); + exchange.getOut().setAttachments(exchange.getIn().getAttachments()); exchange.getOut().setBody(dataStore.get(key)); } else if (KratiConstants.KRATI_OPERATION_DELETE.equals(operation) && key != null) { boolean status; @@ -51,16 +54,23 @@ public class KratiProducer extends Defau dataStore.persist(); } if (status) { + exchange.getOut().setHeaders(exchange.getIn().getHeaders()); + exchange.getOut().setAttachments(exchange.getIn().getAttachments()); exchange.getOut().setHeader(KratiConstants.KRATI_OPERATION_STATUS, KratiConstants.KRATI_OPERATION_SUCESSFUL); } else { + exchange.getOut().setHeaders(exchange.getIn().getHeaders()); + exchange.getOut().setAttachments(exchange.getIn().getAttachments()); exchange.getOut().setHeader(KratiConstants.KRATI_OPERATION_STATUS, KratiConstants.KRATI_OPERATION_FAILURE); } } else if (KratiConstants.KRATI_OPERATION_DELETEALL.equals(operation)) { try { dataStore.clear(); + exchange.getOut().setHeaders(exchange.getIn().getHeaders()); + exchange.getOut().setAttachments(exchange.getIn().getAttachments()); exchange.getOut().setHeader(KratiConstants.KRATI_OPERATION_STATUS, KratiConstants.KRATI_OPERATION_SUCESSFUL); } catch (Exception e) { LOG.warn("Error clearing all entries from store", e); + // This is not so good to ignore exceptions, the end user have not access the exception, and cannot use Camel error handling exchange.getOut().setHeader(KratiConstants.KRATI_OPERATION_STATUS, KratiConstants.KRATI_OPERATION_FAILURE); } } else { @@ -74,9 +84,6 @@ public class KratiProducer extends Defau /** * Retrieves the operation from the URI or from the exchange headers. The header will take precedence over the URI. - * - * @param exchange - * @return */ public String getOperation(Exchange exchange) { String operation = ((KratiEndpoint) getEndpoint()).getOperation(); @@ -90,9 +97,6 @@ public class KratiProducer extends Defau /** * Retrieves the key from the URI or from the exchange headers. The header will take precedence over the URI. - * - * @param exchange - * @return */ public Object getKey(Exchange exchange) { Object key = ((KratiEndpoint) getEndpoint()).getKey(); @@ -105,9 +109,6 @@ public class KratiProducer extends Defau /** * Retrieves the value from the URI or from the exchange headers/body. The header/body will take precedence over the URI. - * - * @param exchange - * @return */ public Object getValue(Exchange exchange) { Object value = ((KratiEndpoint) getEndpoint()).getValue(); Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/processor/idempotent/KratiIdempotentRepository.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/processor/idempotent/KratiIdempotentRepository.java?rev=1464963&r1=1464962&r2=1464963&view=diff ============================================================================== --- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/processor/idempotent/KratiIdempotentRepository.java (original) +++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/processor/idempotent/KratiIdempotentRepository.java Fri Apr 5 13:09:04 2013 @@ -57,7 +57,7 @@ public class KratiIdempotentRepository e } } catch (Exception e) { - LOG.warn("Error adding item to krati idempotent repository. This exception is ignored.", e); + LOG.warn("Error adding item to Krati idempotent repository. This exception is ignored.", e); return false; } } @@ -69,7 +69,7 @@ public class KratiIdempotentRepository e try { return dataSet.has(bytes); } catch (Exception e) { - LOG.warn("Error checking item exists in krati idempotent repository. This exception is ignored.", e); + LOG.warn("Error checking item exists in Krati idempotent repository. This exception is ignored.", e); return false; } } @@ -81,7 +81,7 @@ public class KratiIdempotentRepository e try { return dataSet.delete(bytes); } catch (Exception e) { - LOG.warn("Error removing item from krati idempotent repository. This exception is ignored.", e); + LOG.warn("Error removing item from Krati idempotent repository. This exception is ignored.", e); return false; } } Modified: camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/serializer/KratiDefaultSerializer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/serializer/KratiDefaultSerializer.java?rev=1464963&r1=1464962&r2=1464963&view=diff ============================================================================== --- camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/serializer/KratiDefaultSerializer.java (original) +++ camel/trunk/components/camel-krati/src/main/java/org/apache/camel/component/krati/serializer/KratiDefaultSerializer.java Fri Apr 5 13:09:04 2013 @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.krati.serializer; import java.io.ByteArrayInputStream; @@ -77,6 +76,7 @@ public class KratiDefaultSerializer<T ex return null; } + // TODO: should use Camel's ClassResolver for classloading ByteArrayInputStream bais = new ByteArrayInputStream(binary); final ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); try { Modified: camel/trunk/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java?rev=1464963&r1=1464962&r2=1464963&view=diff ============================================================================== --- camel/trunk/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java (original) +++ camel/trunk/components/camel-krati/src/test/java/org/apache/camel/component/krati/KratiProducerTest.java Fri Apr 5 13:09:04 2013 @@ -17,6 +17,8 @@ package org.apache.camel.component.krati; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; @@ -48,6 +50,25 @@ public class KratiProducerTest extends C } @Test + public void testPutAndGetPreserveHeaders() throws InterruptedException { + ProducerTemplate template = context.createProducerTemplate(); + template.sendBodyAndHeader("direct:put", new ValueObject("TEST1"), KratiConstants.KEY, new KeyObject("1")); + template.sendBodyAndHeader("direct:put", new ValueObject("TEST2"), KratiConstants.KEY, new KeyObject("2")); + template.sendBodyAndHeader("direct:put", new ValueObject("TEST3"), KratiConstants.KEY, new KeyObject("3")); + + Exchange out = template.send("direct:get", new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + exchange.getIn().setHeader("foo", 123); + exchange.getIn().setHeader(KratiConstants.KEY, new KeyObject("3")); + } + }); + assertNotNull(out); + assertEquals(123, out.getOut().getHeader("foo")); + assertEquals(new ValueObject("TEST3"), out.getOut().getBody()); + } + + @Test public void testPutDeleteAndGet() throws InterruptedException { ProducerTemplate template = context.createProducerTemplate(); template.sendBodyAndHeader("direct:put", "TEST1", KratiConstants.KEY, "1");