Author: davsclaus Date: Mon Jun 11 09:51:54 2012 New Revision: 1348778 URL: http://svn.apache.org/viewvc?rev=1348778&view=rev Log: [CAMEL-5314] Added converter that convert files, streams etc to payload for blobstores.
Added: camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsPayloadConverter.java - copied unchanged from r1345633, camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsPayloadConverter.java camel/branches/camel-2.9.x/components/camel-jclouds/src/main/resources/META-INF/services/org/apache/camel/TypeConverter - copied unchanged from r1345633, camel/trunk/components/camel-jclouds/src/main/resources/META-INF/services/org/apache/camel/TypeConverter Removed: camel/branches/camel-2.9.x/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/SimpleObject.java Modified: camel/branches/camel-2.9.x/ (props changed) camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreEndpoint.java camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreHelper.java camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducer.java camel/branches/camel-2.9.x/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducerTest.java camel/branches/camel-2.9.x/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreBlueprintRouteTest.java camel/branches/camel-2.9.x/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreRouteTest.java Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Merged /camel/trunk:r1345633 Propchange: camel/branches/camel-2.9.x/ ------------------------------------------------------------------------------ Binary property 'svnmerge-integrated' - no diff available. Modified: camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java?rev=1348778&r1=1348777&r2=1348778&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java (original) +++ camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java Mon Jun 11 09:51:54 2012 @@ -16,19 +16,23 @@ */ package org.apache.camel.component.jclouds; +import java.io.InputStream; import java.util.LinkedList; import java.util.Queue; +import com.google.common.base.Strings; import org.apache.camel.BatchConsumer; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.ShutdownRunningTask; +import org.apache.camel.converter.stream.CachedOutputStream; import org.apache.camel.spi.ShutdownAware; -import org.apache.camel.spi.Synchronization; import org.apache.camel.util.CastUtils; +import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.domain.StorageMetadata; +import org.jclouds.blobstore.domain.StorageType; import org.jclouds.blobstore.options.ListContainerOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,21 +60,42 @@ public class JcloudsBlobStoreConsumer ex } @Override + protected void doStart() throws Exception { + super.doStart(); + String container = endpoint.getContainer(); + String locationId = endpoint.getLocationId(); + JcloudsBlobStoreHelper.ensureContainerExists(blobStore, container, locationId); + } + + @Override protected int poll() throws Exception { shutdownRunningTask = null; pendingExchanges = 0; Queue<Exchange> queue = new LinkedList<Exchange>(); + String directory = endpoint.getDirectory(); ListContainerOptions opt = new ListContainerOptions(); - for (StorageMetadata md : blobStore.list(container, opt.maxResults(maxMessagesPerPoll))) { + if (!Strings.isNullOrEmpty(directory)) { + opt = opt.inDirectory(directory); + } + + for (StorageMetadata md : blobStore.list(container, opt.maxResults(maxMessagesPerPoll).recursive())) { String blobName = md.getName(); - Object body = JcloudsBlobStoreHelper.readBlob(blobStore, container, blobName, Thread.currentThread().getContextClassLoader()); - Exchange exchange = endpoint.createExchange(); - exchange.getIn().setBody(body); - exchange.setProperty(JcloudsConstants.BLOB_NAME, blobName); - queue.add(exchange); + if (md.getType().equals(StorageType.BLOB)) { + if (!Strings.isNullOrEmpty(blobName)) { + InputStream body = JcloudsBlobStoreHelper.readBlob(blobStore, container, blobName); + if (body != null) { + Exchange exchange = endpoint.createExchange(); + CachedOutputStream cos = new CachedOutputStream(exchange); + IOHelper.copy(body, cos); + exchange.getIn().setBody(cos.getStreamCache()); + exchange.setProperty(JcloudsConstants.BLOB_NAME, blobName); + queue.add(exchange); + } + } + } } return queue.isEmpty() ? 0 : processBatch(CastUtils.cast(queue)); } @@ -94,20 +119,14 @@ public class JcloudsBlobStoreConsumer ex // update pending number of exchanges pendingExchanges = total - index - 1; - // add on completion to handle after work when the exchange is done - exchange.addOnCompletion(new Synchronization() { - public void onComplete(Exchange exchange) { - String blobName = (String) exchange.getProperty(JcloudsConstants.BLOB_NAME); - blobStore.removeBlob(container, blobName); - } - - public void onFailure(Exchange exchange) { - //empty method - } - }); - LOG.trace("Processing exchange [{}]...", exchange); getProcessor().process(exchange); + if (exchange.getException() != null) { + // if we failed then throw exception + throw exchange.getException(); + } + + blobStore.removeBlob(container, exchange.getProperty(JcloudsConstants.BLOB_NAME, String.class)); } return total; Modified: camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreEndpoint.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreEndpoint.java?rev=1348778&r1=1348777&r2=1348778&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreEndpoint.java (original) +++ camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreEndpoint.java Mon Jun 11 09:51:54 2012 @@ -24,8 +24,10 @@ import org.jclouds.blobstore.BlobStore; public class JcloudsBlobStoreEndpoint extends JcloudsEndpoint { - private String blobName; + private String locationId; private String container; + private String directory; + private String blobName; private String operation; private BlobStore blobStore; @@ -52,6 +54,14 @@ public class JcloudsBlobStoreEndpoint ex return new JcloudsBlobStoreConsumer(this, processor, blobStore); } + public String getLocationId() { + return locationId; + } + + public void setLocationId(String locationId) { + this.locationId = locationId; + } + public String getContainer() { return container; } @@ -60,12 +70,12 @@ public class JcloudsBlobStoreEndpoint ex this.container = container; } - public String getOperation() { - return operation; + public String getDirectory() { + return directory; } - public void setOperation(String operation) { - this.operation = operation; + public void setDirectory(String directory) { + this.directory = directory; } public String getBlobName() { @@ -75,4 +85,12 @@ public class JcloudsBlobStoreEndpoint ex public void setBlobName(String blobName) { this.blobName = blobName; } + + public String getOperation() { + return operation; + } + + public void setOperation(String operation) { + this.operation = operation; + } } Modified: camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreHelper.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreHelper.java?rev=1348778&r1=1348777&r2=1348778&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreHelper.java (original) +++ camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreHelper.java Mon Jun 11 09:51:54 2012 @@ -17,17 +17,17 @@ package org.apache.camel.component.jclouds; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.ObjectStreamClass; -import org.apache.camel.util.IOHelper; +import javax.ws.rs.core.MediaType; +import com.google.common.base.Strings; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.util.BlobStoreUtils; +import org.jclouds.domain.Location; +import org.jclouds.io.Payload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.jclouds.blobstore.options.PutOptions.Builder.multipart; public final class JcloudsBlobStoreHelper { @@ -38,31 +38,62 @@ public final class JcloudsBlobStoreHelpe } /** - * Writes payload to the the blobstore. + * Creates all directories that are part of the blobName. * * @param blobStore * @param container * @param blobName - * @param payload */ - public static void writeBlob(BlobStore blobStore, String container, String blobName, Object payload) { - if (blobName != null && payload != null) { - Blob blob = blobStore.blobBuilder(blobName).build(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = null; - try { - oos = new ObjectOutputStream(baos); - oos.writeObject(payload); - blob.setPayload(baos.toByteArray()); - blobStore.putBlob(container, blob); - } catch (IOException e) { - LOG.error("Error while writing blob", e); - } finally { - IOHelper.close(oos); - IOHelper.close(baos); + public static void mkDirs(BlobStore blobStore, String container, String blobName) { + if (blobStore != null && !Strings.isNullOrEmpty(blobName) && blobName.contains("/")) { + String directory = BlobStoreUtils.parseDirectoryFromPath(blobName); + blobStore.createDirectory(container, directory); + } + } + + /** + * Checks if container exists and creates one if not. + * + * @param blobStore The {@link BlobStore} to use. + * @param container The container name to check against. + * @param locationId The locationId to create the container if not found. + */ + public static void ensureContainerExists(BlobStore blobStore, String container, String locationId) { + if (blobStore != null && !Strings.isNullOrEmpty(container) && !blobStore.containerExists(container)) { + blobStore.createContainerInLocation(getLocationById(blobStore, locationId), container); + } + } + + /** + * Returns the {@link Location} that matches the locationId. + * + * @param blobStore + * @param locationId + * @return + */ + public static Location getLocationById(BlobStore blobStore, String locationId) { + if (blobStore != null && !Strings.isNullOrEmpty(locationId)) { + for (Location location : blobStore.listAssignableLocations()) { + if (locationId.equals(location.getId())) { + return location; + } } } + return null; + } + /** + * Writes {@link Payload} to the the {@link BlobStore}. + * + * @param blobStore + * @param container + * @param blobName + * @param payload + */ + public static void writeBlob(BlobStore blobStore, String container, String blobName, Payload payload) { + mkDirs(blobStore, container, blobName); + Blob blob = blobStore.blobBuilder(blobName).payload(payload).contentType(MediaType.APPLICATION_OCTET_STREAM).contentDisposition(blobName).build(); + blobStore.putBlob(container, blob, multipart()); } /** @@ -72,35 +103,14 @@ public final class JcloudsBlobStoreHelpe * @param blobName * @return */ - public static Object readBlob(BlobStore blobStore, String container, String blobName, final ClassLoader classLoader) { - Object result = null; - ObjectInputStream ois = null; - blobStore.createContainerInLocation(null, container); - - InputStream is = blobStore.getBlob(container, blobName).getPayload().getInput(); - - try { - ois = new ObjectInputStream(is) { - @Override - public Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { - try { - return classLoader.loadClass(desc.getName()); - } catch (Exception e) { - } - return super.resolveClass(desc); - } - }; - result = ois.readObject(); - } catch (IOException - e) { - e.printStackTrace(); - } catch (ClassNotFoundException - e) { - e.printStackTrace(); - } finally { - IOHelper.close(ois); - IOHelper.close(is); + public static InputStream readBlob(BlobStore blobStore, String container, String blobName) { + InputStream is = null; + if (!Strings.isNullOrEmpty(blobName)) { + Blob blob = blobStore.getBlob(container, blobName); + if (blob != null && blob.getPayload() != null) { + is = blobStore.getBlob(container, blobName).getPayload().getInput(); + } } - return result; + return is; } } Modified: camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducer.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducer.java?rev=1348778&r1=1348777&r2=1348778&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducer.java (original) +++ camel/branches/camel-2.9.x/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducer.java Mon Jun 11 09:51:54 2012 @@ -19,6 +19,7 @@ package org.apache.camel.component.jclou import org.apache.camel.Exchange; import org.jclouds.blobstore.BlobStore; +import org.jclouds.io.Payload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,11 +27,21 @@ public class JcloudsBlobStoreProducer ex private static final Logger LOG = LoggerFactory.getLogger(JcloudsBlobStoreProducer.class); + private final JcloudsBlobStoreEndpoint endpoint; private BlobStore blobStore; - public JcloudsBlobStoreProducer(JcloudsEndpoint endpoint, BlobStore blobStore) { + public JcloudsBlobStoreProducer(JcloudsBlobStoreEndpoint endpoint, BlobStore blobStore) { super(endpoint); this.blobStore = blobStore; + this.endpoint = endpoint; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + String container = endpoint.getContainer(); + String locationId = endpoint.getLocationId(); + JcloudsBlobStoreHelper.ensureContainerExists(blobStore, container, locationId); } @Override @@ -40,10 +51,10 @@ public class JcloudsBlobStoreProducer ex String operation = getOperation(exchange); LOG.trace("Processing {} operation on '{}'", operation, container + "/" + blobName); - Object body = exchange.getIn().getBody(); if (JcloudsConstants.GET.equals(operation)) { - exchange.getOut().setBody(JcloudsBlobStoreHelper.readBlob(blobStore, container, blobName, Thread.currentThread().getContextClassLoader())); + exchange.getOut().setBody(JcloudsBlobStoreHelper.readBlob(blobStore, container, blobName)); } else { + Payload body = exchange.getIn().getBody(Payload.class); JcloudsBlobStoreHelper.writeBlob(blobStore, container, blobName, body); } } @@ -90,4 +101,19 @@ public class JcloudsBlobStoreProducer ex } return operation; } + + /** + * Retrieves the locationId from the URI or from the exchange headers. The header will take precedence over the URI. + * + * @param exchange + * @return + */ + public String getLocationId(Exchange exchange) { + String operation = ((JcloudsBlobStoreEndpoint) getEndpoint()).getLocationId(); + + if (exchange.getIn().getHeader(JcloudsConstants.LOCATION_ID) != null) { + operation = (String) exchange.getIn().getHeader(JcloudsConstants.LOCATION_ID); + } + return operation; + } } Modified: camel/branches/camel-2.9.x/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducerTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducerTest.java?rev=1348778&r1=1348777&r2=1348778&view=diff ============================================================================== --- camel/branches/camel-2.9.x/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducerTest.java (original) +++ camel/branches/camel-2.9.x/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducerTest.java Mon Jun 11 09:51:54 2012 @@ -17,19 +17,31 @@ package org.apache.camel.component.jclouds; +import java.io.ByteArrayInputStream; +import javax.xml.transform.TransformerException; +import javax.xml.transform.sax.SAXSource; +import org.xml.sax.InputSource; import com.google.common.collect.Lists; +import org.apache.camel.Exchange; +import org.apache.camel.StreamCache; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.converter.stream.StreamCacheConverter; +import org.apache.camel.impl.DefaultExchange; import org.apache.camel.test.junit4.CamelTestSupport; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.BlobStoreContextFactory; import org.junit.Test; + + public class JcloudsBlobStoreProducerTest extends CamelTestSupport { private static final String TEST_CONTAINER = "testContainer"; private static final String TEST_BLOB = "testBlob"; + private static final String TEST_BLOB_IN_DIR = "/dir/testBlob"; + private static final String MESSAGE = "<test>This is a test</test>"; BlobStoreContextFactory contextFactory = new BlobStoreContextFactory(); BlobStoreContext blobStoreContext = contextFactory.createContext("transient", "identity", "credential"); @@ -47,26 +59,36 @@ public class JcloudsBlobStoreProducerTes public void testBlobStorePutAndGet() throws InterruptedException { String message = "Some message"; template.sendBody("direct:put-and-get", message); - Object result = template.requestBodyAndHeader("direct:put-and-get", null, JcloudsConstants.OPERATION, JcloudsConstants.GET); + Object result = template.requestBodyAndHeader("direct:put-and-get", null, JcloudsConstants.OPERATION, JcloudsConstants.GET, String.class); assertEquals(message, result); } + @Test + public void testBlobStorePutWithStreamAndGet() throws InterruptedException, TransformerException { + ByteArrayInputStream inputStream = new ByteArrayInputStream(MESSAGE.getBytes()); + Exchange exchange = new DefaultExchange(context); + StreamCache streamCache = StreamCacheConverter.convertToStreamCache(new SAXSource(new InputSource(inputStream)), exchange); + template.sendBody("direct:put-and-get", streamCache); + Object result = template.requestBodyAndHeader("direct:put-and-get", null, JcloudsConstants.OPERATION, JcloudsConstants.GET, String.class); + assertEquals(MESSAGE, result); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { blobStore.createContainerInLocation(null, TEST_CONTAINER); - ((JcloudsComponent)context.getComponent("jclouds")).setBlobStores(Lists.newArrayList(blobStore)); + ((JcloudsComponent) context.getComponent("jclouds")).setBlobStores(Lists.newArrayList(blobStore)); return new RouteBuilder() { public void configure() { from("direct:put") - .setHeader(JcloudsConstants.BLOB_NAME, constant(TEST_BLOB)) + .setHeader(JcloudsConstants.BLOB_NAME, constant(TEST_BLOB_IN_DIR)) .setHeader(JcloudsConstants.CONTAINER_NAME, constant(TEST_CONTAINER)) .to("jclouds:blobstore:transient").to("mock:results"); from("direct:put-and-get") - .setHeader(JcloudsConstants.BLOB_NAME, constant(TEST_BLOB)) + .setHeader(JcloudsConstants.BLOB_NAME, constant(TEST_BLOB_IN_DIR)) .setHeader(JcloudsConstants.CONTAINER_NAME, constant(TEST_CONTAINER)) .to("jclouds:blobstore:transient"); } Modified: camel/branches/camel-2.9.x/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreBlueprintRouteTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreBlueprintRouteTest.java?rev=1348778&r1=1348777&r2=1348778&view=diff ============================================================================== --- camel/branches/camel-2.9.x/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreBlueprintRouteTest.java (original) +++ camel/branches/camel-2.9.x/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreBlueprintRouteTest.java Mon Jun 11 09:51:54 2012 @@ -74,8 +74,8 @@ public class BlobStoreBlueprintRouteTest MockEndpoint mock = ctx.getEndpoint("mock:results", MockEndpoint.class); ProducerTemplate template = ctx.createProducerTemplate(); mock.expectedMessageCount(2); - template.sendBodyAndHeader("direct:start", new SimpleObject("1", "Test 1"), JcloudsConstants.BLOB_NAME, "blob1"); - template.sendBodyAndHeader("direct:start", new SimpleObject("2", "Test 2"), JcloudsConstants.BLOB_NAME, "blob2"); + template.sendBodyAndHeader("direct:start", "Test 1", JcloudsConstants.BLOB_NAME, "blob1"); + template.sendBodyAndHeader("direct:start", "Test 2", JcloudsConstants.BLOB_NAME, "blob2"); assertMockEndpointsSatisfied(); } @@ -88,7 +88,6 @@ public class BlobStoreBlueprintRouteTest @Override public InputStream customizeTestProbe(InputStream testProbe) { return modifyBundle(testProbe) - .add(SimpleObject.class) .add("META-INF/persistence.xml", BlobStoreBlueprintRouteTest.class.getResource("/META-INF/persistence.xml")) .add("OSGI-INF/blueprint/test.xml", BlobStoreBlueprintRouteTest.class.getResource("blueprintCamelContext.xml")) .set(Constants.BUNDLE_SYMBOLICNAME, "CamelBlueprintJcloudsTestBundle") Modified: camel/branches/camel-2.9.x/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreRouteTest.java URL: http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreRouteTest.java?rev=1348778&r1=1348777&r2=1348778&view=diff ============================================================================== --- camel/branches/camel-2.9.x/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreRouteTest.java (original) +++ camel/branches/camel-2.9.x/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreRouteTest.java Mon Jun 11 09:51:54 2012 @@ -64,8 +64,8 @@ public class BlobStoreRouteTest extends public void testProducerAndConsumer() throws Exception { MockEndpoint mock = getMockEndpoint("mock:results"); mock.expectedMessageCount(2); - template.sendBodyAndHeader("direct:start", new SimpleObject("1", "Test 1"), JcloudsConstants.BLOB_NAME, "blob1"); - template.sendBodyAndHeader("direct:start", new SimpleObject("2", "Test 2"), JcloudsConstants.BLOB_NAME, "blob2"); + template.sendBodyAndHeader("direct:start", "Test 1", JcloudsConstants.BLOB_NAME, "blob1"); + template.sendBodyAndHeader("direct:start", "Test 2", JcloudsConstants.BLOB_NAME, "blob2"); assertMockEndpointsSatisfied(); }