Author: iocanel Date: Sun Jun 3 09:15:00 2012 New Revision: 1345633 URL: http://svn.apache.org/viewvc?rev=1345633&view=rev Log: [CAMEL-5314] Added converter that convert files, streams etc to payload for blobstores.
Added: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsPayloadConverter.java camel/trunk/components/camel-jclouds/src/main/resources/META-INF/services/org/apache/camel/TypeConverter Removed: camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/SimpleObject.java Modified: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreEndpoint.java camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreHelper.java camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducer.java camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumerTest.java camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducerTest.java camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreBlueprintRouteTest.java camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreRouteTest.java Modified: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java?rev=1345633&r1=1345632&r2=1345633&view=diff ============================================================================== --- camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java (original) +++ camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumer.java Sun Jun 3 09:15:00 2012 @@ -16,17 +16,22 @@ */ package org.apache.camel.component.jclouds; +import java.io.InputStream; import java.util.LinkedList; import java.util.Queue; +import com.google.common.base.Strings; import org.apache.camel.Exchange; import org.apache.camel.Processor; +import org.apache.camel.converter.stream.CachedOutputStream; import org.apache.camel.impl.ScheduledBatchPollingConsumer; import org.apache.camel.spi.Synchronization; import org.apache.camel.util.CastUtils; +import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.domain.StorageMetadata; +import org.jclouds.blobstore.domain.StorageType; import org.jclouds.blobstore.options.ListContainerOptions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,21 +52,42 @@ public class JcloudsBlobStoreConsumer ex } @Override + protected void doStart() throws Exception { + super.doStart(); + String container = endpoint.getContainer(); + String locationId = endpoint.getLocationId(); + JcloudsBlobStoreHelper.ensureContainerExists(blobStore, container, locationId); + } + + @Override protected int poll() throws Exception { shutdownRunningTask = null; pendingExchanges = 0; Queue<Exchange> queue = new LinkedList<Exchange>(); + String directory = endpoint.getDirectory(); ListContainerOptions opt = new ListContainerOptions(); - for (StorageMetadata md : blobStore.list(container, opt.maxResults(maxMessagesPerPoll))) { + if (!Strings.isNullOrEmpty(directory)) { + opt = opt.inDirectory(directory); + } + + for (StorageMetadata md : blobStore.list(container, opt.maxResults(maxMessagesPerPoll).recursive())) { String blobName = md.getName(); - Object body = JcloudsBlobStoreHelper.readBlob(blobStore, container, blobName, Thread.currentThread().getContextClassLoader()); - Exchange exchange = endpoint.createExchange(); - exchange.getIn().setBody(body); - exchange.setProperty(JcloudsConstants.BLOB_NAME, blobName); - queue.add(exchange); + if (md.getType().equals(StorageType.BLOB)) { + if (!Strings.isNullOrEmpty(blobName)) { + InputStream body = JcloudsBlobStoreHelper.readBlob(blobStore, container, blobName); + if (body != null) { + Exchange exchange = endpoint.createExchange(); + CachedOutputStream cos = new CachedOutputStream(exchange); + IOHelper.copy(body, cos); + exchange.getIn().setBody(cos.getStreamCache()); + exchange.setProperty(JcloudsConstants.BLOB_NAME, blobName); + queue.add(exchange); + } + } + } } return queue.isEmpty() ? 0 : processBatch(CastUtils.cast(queue)); } @@ -80,20 +106,14 @@ public class JcloudsBlobStoreConsumer ex // update pending number of exchanges pendingExchanges = total - index - 1; - // add on completion to handle after work when the exchange is done - exchange.addOnCompletion(new Synchronization() { - public void onComplete(Exchange exchange) { - String blobName = (String) exchange.getProperty(JcloudsConstants.BLOB_NAME); - blobStore.removeBlob(container, blobName); - } - - public void onFailure(Exchange exchange) { - //empty method - } - }); - LOG.trace("Processing exchange [{}]...", exchange); getProcessor().process(exchange); + if (exchange.getException() != null) { + // if we failed then throw exception + throw exchange.getException(); + } + + blobStore.removeBlob(container, exchange.getProperty(JcloudsConstants.BLOB_NAME, String.class)); } return total; Modified: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreEndpoint.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreEndpoint.java?rev=1345633&r1=1345632&r2=1345633&view=diff ============================================================================== --- camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreEndpoint.java (original) +++ camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreEndpoint.java Sun Jun 3 09:15:00 2012 @@ -24,8 +24,10 @@ import org.jclouds.blobstore.BlobStore; public class JcloudsBlobStoreEndpoint extends JcloudsEndpoint { - private String blobName; + private String locationId; private String container; + private String directory; + private String blobName; private String operation; private BlobStore blobStore; @@ -52,6 +54,14 @@ public class JcloudsBlobStoreEndpoint ex return new JcloudsBlobStoreConsumer(this, processor, blobStore); } + public String getLocationId() { + return locationId; + } + + public void setLocationId(String locationId) { + this.locationId = locationId; + } + public String getContainer() { return container; } @@ -60,12 +70,12 @@ public class JcloudsBlobStoreEndpoint ex this.container = container; } - public String getOperation() { - return operation; + public String getDirectory() { + return directory; } - public void setOperation(String operation) { - this.operation = operation; + public void setDirectory(String directory) { + this.directory = directory; } public String getBlobName() { @@ -75,4 +85,12 @@ public class JcloudsBlobStoreEndpoint ex public void setBlobName(String blobName) { this.blobName = blobName; } + + public String getOperation() { + return operation; + } + + public void setOperation(String operation) { + this.operation = operation; + } } Modified: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreHelper.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreHelper.java?rev=1345633&r1=1345632&r2=1345633&view=diff ============================================================================== --- camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreHelper.java (original) +++ camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreHelper.java Sun Jun 3 09:15:00 2012 @@ -17,17 +17,17 @@ package org.apache.camel.component.jclouds; -import java.io.ByteArrayOutputStream; -import java.io.IOException; import java.io.InputStream; -import java.io.ObjectInputStream; -import java.io.ObjectOutputStream; -import java.io.ObjectStreamClass; -import org.apache.camel.util.IOHelper; +import javax.ws.rs.core.MediaType; +import com.google.common.base.Strings; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.domain.Blob; +import org.jclouds.blobstore.util.BlobStoreUtils; +import org.jclouds.domain.Location; +import org.jclouds.io.Payload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.jclouds.blobstore.options.PutOptions.Builder.multipart; public final class JcloudsBlobStoreHelper { @@ -38,30 +38,64 @@ public final class JcloudsBlobStoreHelpe } /** - * Writes payload to the the blobstore. + * Creates all directories that are part of the blobName. * * @param blobStore * @param container * @param blobName - * @param payload */ - public static void writeBlob(BlobStore blobStore, String container, String blobName, Object payload) { - if (blobName != null && payload != null) { - Blob blob = blobStore.blobBuilder(blobName).build(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - ObjectOutputStream oos = null; - try { - oos = new ObjectOutputStream(baos); - oos.writeObject(payload); - blob.setPayload(baos.toByteArray()); - blobStore.putBlob(container, blob); - } catch (IOException e) { - LOG.error("Error while writing blob", e); - } finally { - IOHelper.close(oos, baos); + public static void mkDirs(BlobStore blobStore, String container, String blobName) { + if (blobStore != null && !Strings.isNullOrEmpty(blobName) && blobName.contains("/")) { + String directory = BlobStoreUtils.parseDirectoryFromPath(blobName); + blobStore.createDirectory(container, directory); + } + } + + /** + * Checks if container exists and creates one if not. + * + * @param blobStore The {@link BlobStore} to use. + * @param container The container name to check against. + * @param locationId The locationId to create the container if not found. + */ + public static void ensureContainerExists(BlobStore blobStore, String container, String locationId) { + if (blobStore != null && !Strings.isNullOrEmpty(container) && !blobStore.containerExists(container)) { + blobStore.createContainerInLocation(getLocationById(blobStore, locationId), container); + } + } + + /** + * Returns the {@link Location} that matches the locationId. + * + * @param blobStore + * @param locationId + * @return + */ + public static Location getLocationById(BlobStore blobStore, String locationId) { + if (blobStore != null && !Strings.isNullOrEmpty(locationId)) { + for (Location location : blobStore.listAssignableLocations()) { + if (locationId.equals(location.getId())) { + return location; + } } } + return null; + } + /** + * Writes {@link Payload} to the the {@link BlobStore}. + * + * @param blobStore + * @param container + * @param blobName + * @param payload + */ + public static void writeBlob(BlobStore blobStore, String container, String blobName, Payload payload) { + if (blobName != null && payload != null) { + mkDirs(blobStore, container, blobName); + Blob blob = blobStore.blobBuilder(blobName).payload(payload).contentType(MediaType.APPLICATION_OCTET_STREAM).contentDisposition(blobName).build(); + blobStore.putBlob(container, blob, multipart()); + } } /** @@ -71,34 +105,14 @@ public final class JcloudsBlobStoreHelpe * @param blobName * @return */ - public static Object readBlob(BlobStore blobStore, String container, String blobName, final ClassLoader classLoader) { - Object result = null; - ObjectInputStream ois = null; - blobStore.createContainerInLocation(null, container); - - InputStream is = blobStore.getBlob(container, blobName).getPayload().getInput(); - - try { - ois = new ObjectInputStream(is) { - @Override - public Class<?> resolveClass(ObjectStreamClass desc) throws IOException, ClassNotFoundException { - try { - return classLoader.loadClass(desc.getName()); - } catch (Exception e) { - } - return super.resolveClass(desc); - } - }; - result = ois.readObject(); - } catch (IOException - e) { - e.printStackTrace(); - } catch (ClassNotFoundException - e) { - e.printStackTrace(); - } finally { - IOHelper.close(ois, is); + public static InputStream readBlob(BlobStore blobStore, String container, String blobName) { + InputStream is = null; + if (!Strings.isNullOrEmpty(blobName)) { + Blob blob = blobStore.getBlob(container, blobName); + if (blob != null && blob.getPayload() != null) { + is = blobStore.getBlob(container, blobName).getPayload().getInput(); + } } - return result; + return is; } } Modified: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducer.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducer.java?rev=1345633&r1=1345632&r2=1345633&view=diff ============================================================================== --- camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducer.java (original) +++ camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducer.java Sun Jun 3 09:15:00 2012 @@ -19,6 +19,7 @@ package org.apache.camel.component.jclou import org.apache.camel.Exchange; import org.jclouds.blobstore.BlobStore; +import org.jclouds.io.Payload; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,11 +27,21 @@ public class JcloudsBlobStoreProducer ex private static final Logger LOG = LoggerFactory.getLogger(JcloudsBlobStoreProducer.class); + private final JcloudsBlobStoreEndpoint endpoint; private BlobStore blobStore; - public JcloudsBlobStoreProducer(JcloudsEndpoint endpoint, BlobStore blobStore) { + public JcloudsBlobStoreProducer(JcloudsBlobStoreEndpoint endpoint, BlobStore blobStore) { super(endpoint); this.blobStore = blobStore; + this.endpoint = endpoint; + } + + @Override + protected void doStart() throws Exception { + super.doStart(); + String container = endpoint.getContainer(); + String locationId = endpoint.getLocationId(); + JcloudsBlobStoreHelper.ensureContainerExists(blobStore, container, locationId); } @Override @@ -40,10 +51,10 @@ public class JcloudsBlobStoreProducer ex String operation = getOperation(exchange); LOG.trace("Processing {} operation on '{}'", operation, container + "/" + blobName); - Object body = exchange.getIn().getBody(); if (JcloudsConstants.GET.equals(operation)) { - exchange.getOut().setBody(JcloudsBlobStoreHelper.readBlob(blobStore, container, blobName, Thread.currentThread().getContextClassLoader())); + exchange.getOut().setBody(JcloudsBlobStoreHelper.readBlob(blobStore, container, blobName)); } else { + Payload body = exchange.getIn().getBody(Payload.class); JcloudsBlobStoreHelper.writeBlob(blobStore, container, blobName, body); } } @@ -90,4 +101,19 @@ public class JcloudsBlobStoreProducer ex } return operation; } + + /** + * Retrieves the locationId from the URI or from the exchange headers. The header will take precedence over the URI. + * + * @param exchange + * @return + */ + public String getLocationId(Exchange exchange) { + String operation = ((JcloudsBlobStoreEndpoint) getEndpoint()).getLocationId(); + + if (exchange.getIn().getHeader(JcloudsConstants.LOCATION_ID) != null) { + operation = (String) exchange.getIn().getHeader(JcloudsConstants.LOCATION_ID); + } + return operation; + } } Added: camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsPayloadConverter.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsPayloadConverter.java?rev=1345633&view=auto ============================================================================== --- camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsPayloadConverter.java (added) +++ camel/trunk/components/camel-jclouds/src/main/java/org/apache/camel/component/jclouds/JcloudsPayloadConverter.java Sun Jun 3 09:15:00 2012 @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.jclouds; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import javax.xml.transform.Source; +import javax.xml.transform.stream.StreamSource; +import com.google.common.io.ByteStreams; +import com.google.common.io.InputSupplier; +import org.apache.camel.Converter; +import org.apache.camel.Exchange; +import org.apache.camel.FallbackConverter; +import org.apache.camel.StreamCache; +import org.apache.camel.TypeConverter; +import org.apache.camel.component.file.GenericFile; +import org.apache.camel.converter.stream.CachedOutputStream; +import org.apache.camel.converter.stream.StreamSourceCache; +import org.apache.camel.spi.TypeConverterRegistry; +import org.apache.camel.util.IOHelper; +import org.jclouds.io.Payload; +import org.jclouds.io.payloads.ByteArrayPayload; +import org.jclouds.io.payloads.FilePayload; +import org.jclouds.io.payloads.InputStreamPayload; +import org.jclouds.io.payloads.StringPayload; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Converter +public final class JcloudsPayloadConverter { + + private static final Logger LOG = LoggerFactory.getLogger(JcloudsPayloadConverter.class); + + private JcloudsPayloadConverter() { + //Utility Class + } + + @Converter + public static Payload toPayload(byte[] bytes) { + return new ByteArrayPayload(bytes); + } + + @Converter + public static Payload toPayload(String str) { + return new StringPayload(str); + } + + @Converter + public static Payload toPayload(File file) { + return new FilePayload(file); + } + + @Converter + public static Payload toPayload(InputStream is, Exchange exchange) throws IOException { + if (is.markSupported()) { + InputStreamPayload payload = new InputStreamPayload(is); + long contentLength = ByteStreams.length(payload); + is.reset(); + payload.getContentMetadata().setContentLength(contentLength); + return payload; + } else { + CachedOutputStream cos = new CachedOutputStream(exchange); + return toPayload(cos.getWrappedInputStream(), exchange); + } + } + + @Converter + public static Payload toPayload(StreamSource source, Exchange exchange) throws IOException { + return toPayload(new StreamSourceCache(source, exchange)); + } + + @Converter + public static Payload toPayload(final StreamSourceCache cache) throws IOException { + long contentLength = ByteStreams.length(new InputSupplier<InputStream>() { + @Override + public InputStream getInput() throws IOException { + return cache.getInputStream(); + } + }); + cache.reset(); + InputStreamPayload payload = new InputStreamPayload(cache.getInputStream()); + payload.getContentMetadata().setContentLength(contentLength); + return payload; + } + + @FallbackConverter + public static <T extends Payload> T convertTo(Class<T> type, Exchange exchange, Object value, TypeConverterRegistry registry) throws IOException { + Class sourceType = value.getClass(); + if (GenericFile.class.isAssignableFrom(sourceType)) { + GenericFile genericFile = (GenericFile) value; + if (genericFile.getFile() != null) { + Class genericFileType = genericFile.getFile().getClass(); + TypeConverter converter = registry.lookup(Payload.class, genericFileType); + if (converter != null) { + return (T) converter.convertTo(Payload.class, genericFile.getFile()); + } + } + } + return null; + } +} Added: camel/trunk/components/camel-jclouds/src/main/resources/META-INF/services/org/apache/camel/TypeConverter URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/main/resources/META-INF/services/org/apache/camel/TypeConverter?rev=1345633&view=auto ============================================================================== --- camel/trunk/components/camel-jclouds/src/main/resources/META-INF/services/org/apache/camel/TypeConverter (added) +++ camel/trunk/components/camel-jclouds/src/main/resources/META-INF/services/org/apache/camel/TypeConverter Sun Jun 3 09:15:00 2012 @@ -0,0 +1,17 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.apache.camel.component.jclouds.JcloudsPayloadConverter \ No newline at end of file Modified: camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumerTest.java?rev=1345633&r1=1345632&r2=1345633&view=diff ============================================================================== --- camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumerTest.java (original) +++ camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreConsumerTest.java Sun Jun 3 09:15:00 2012 @@ -24,6 +24,7 @@ import org.apache.camel.test.junit4.Came import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.BlobStoreContextFactory; +import org.jclouds.io.payloads.StringPayload; import org.junit.Test; public class JcloudsBlobStoreConsumerTest extends CamelTestSupport { @@ -32,6 +33,10 @@ public class JcloudsBlobStoreConsumerTes private static final String TEST_BLOB1 = "testBlob1"; private static final String TEST_BLOB2 = "testBlob2"; + private static final String TEST_CONTAINER_WITH_DIR = "testContainerWithDirectories"; + private static final String TEST_BLOB_IN_DIR = "dir/testBlob"; + private static final String TEST_BLOB_IN_OTHER = "other/testBlob"; + BlobStoreContextFactory contextFactory = new BlobStoreContextFactory(); BlobStoreContext blobStoreContext = contextFactory.createContext("transient", "identity", "credential"); BlobStore blobStore = blobStoreContext.getBlobStore(); @@ -39,7 +44,7 @@ public class JcloudsBlobStoreConsumerTes @Test public void testBlobStoreGetOneBlob() throws InterruptedException { String message = "Some message"; - JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER, TEST_BLOB1, message); + JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER, TEST_BLOB1, new StringPayload(message)); MockEndpoint mockEndpoint = resolveMandatoryEndpoint("mock:results", MockEndpoint.class); mockEndpoint.expectedMessageCount(1); @@ -53,30 +58,61 @@ public class JcloudsBlobStoreConsumerTes @Test public void testBlobStoreGetTwoBlobs() throws InterruptedException { String message1 = "Blob 1"; - JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER, TEST_BLOB1, message1); + JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER, TEST_BLOB1, new StringPayload(message1)); String message2 = "Blob 2"; - JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER, TEST_BLOB2, message2); + JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER, TEST_BLOB2, new StringPayload(message2)); MockEndpoint mockEndpoint = resolveMandatoryEndpoint("mock:results", MockEndpoint.class); mockEndpoint.expectedMessageCount(2); mockEndpoint.expectedBodiesReceived(message1, message2); mockEndpoint.assertIsSatisfied(); + } + + @Test + public void testBlobStoreWithDirectory() throws InterruptedException { + String message1 = "Blob in directory"; + JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER_WITH_DIR, TEST_BLOB_IN_DIR, new StringPayload(message1)); + MockEndpoint mockEndpoint = resolveMandatoryEndpoint("mock:results-in-dir", MockEndpoint.class); + mockEndpoint.expectedMessageCount(1); + mockEndpoint.expectedBodiesReceived(message1); + + mockEndpoint.assertIsSatisfied(); + } + + @Test + public void testBlobStoreWithMultipleDirectories() throws InterruptedException { + String message1 = "Blob in directory"; + String message2 = "Blob in other directory"; + JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER_WITH_DIR, TEST_BLOB_IN_DIR, new StringPayload(message1)); + JcloudsBlobStoreHelper.writeBlob(blobStore, TEST_CONTAINER_WITH_DIR, TEST_BLOB_IN_OTHER, new StringPayload(message2)); + + MockEndpoint mockEndpoint = resolveMandatoryEndpoint("mock:results-in-dir", MockEndpoint.class); + mockEndpoint.expectedMessageCount(1); + mockEndpoint.expectedBodiesReceived(message1); + + mockEndpoint.assertIsSatisfied(); } @Override protected RouteBuilder createRouteBuilder() throws Exception { blobStore.createContainerInLocation(null, TEST_CONTAINER); + blobStore.createContainerInLocation(null, TEST_CONTAINER_WITH_DIR); ((JcloudsComponent) context.getComponent("jclouds")).setBlobStores(Lists.newArrayList(blobStore)); return new RouteBuilder() { public void configure() { from("jclouds:blobstore:transient?container=" + TEST_CONTAINER) + .convertBodyTo(String.class) .to("mock:results"); + + from("jclouds:blobstore:transient?container=" + TEST_CONTAINER_WITH_DIR + "&directory=dir") + .convertBodyTo(String.class) + .to("mock:results-in-dir"); } }; } Modified: camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducerTest.java URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducerTest.java?rev=1345633&r1=1345632&r2=1345633&view=diff ============================================================================== --- camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducerTest.java (original) +++ camel/trunk/components/camel-jclouds/src/test/java/org/apache/camel/component/jclouds/JcloudsBlobStoreProducerTest.java Sun Jun 3 09:15:00 2012 @@ -17,19 +17,31 @@ package org.apache.camel.component.jclouds; +import java.io.ByteArrayInputStream; +import javax.xml.transform.TransformerException; +import javax.xml.transform.sax.SAXSource; +import org.xml.sax.InputSource; import com.google.common.collect.Lists; +import org.apache.camel.Exchange; +import org.apache.camel.StreamCache; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.converter.stream.StreamCacheConverter; +import org.apache.camel.impl.DefaultExchange; import org.apache.camel.test.junit4.CamelTestSupport; import org.jclouds.blobstore.BlobStore; import org.jclouds.blobstore.BlobStoreContext; import org.jclouds.blobstore.BlobStoreContextFactory; import org.junit.Test; + + public class JcloudsBlobStoreProducerTest extends CamelTestSupport { private static final String TEST_CONTAINER = "testContainer"; private static final String TEST_BLOB = "testBlob"; + private static final String TEST_BLOB_IN_DIR = "/dir/testBlob"; + private static final String MESSAGE = "<test>This is a test</test>"; BlobStoreContextFactory contextFactory = new BlobStoreContextFactory(); BlobStoreContext blobStoreContext = contextFactory.createContext("transient", "identity", "credential"); @@ -47,26 +59,36 @@ public class JcloudsBlobStoreProducerTes public void testBlobStorePutAndGet() throws InterruptedException { String message = "Some message"; template.sendBody("direct:put-and-get", message); - Object result = template.requestBodyAndHeader("direct:put-and-get", null, JcloudsConstants.OPERATION, JcloudsConstants.GET); + Object result = template.requestBodyAndHeader("direct:put-and-get", null, JcloudsConstants.OPERATION, JcloudsConstants.GET, String.class); assertEquals(message, result); } + @Test + public void testBlobStorePutWithStreamAndGet() throws InterruptedException, TransformerException { + ByteArrayInputStream inputStream = new ByteArrayInputStream(MESSAGE.getBytes()); + Exchange exchange = new DefaultExchange(context); + StreamCache streamCache = StreamCacheConverter.convertToStreamCache(new SAXSource(new InputSource(inputStream)), exchange); + template.sendBody("direct:put-and-get", streamCache); + Object result = template.requestBodyAndHeader("direct:put-and-get", null, JcloudsConstants.OPERATION, JcloudsConstants.GET, String.class); + assertEquals(MESSAGE, result); + } + @Override protected RouteBuilder createRouteBuilder() throws Exception { blobStore.createContainerInLocation(null, TEST_CONTAINER); - ((JcloudsComponent)context.getComponent("jclouds")).setBlobStores(Lists.newArrayList(blobStore)); + ((JcloudsComponent) context.getComponent("jclouds")).setBlobStores(Lists.newArrayList(blobStore)); return new RouteBuilder() { public void configure() { from("direct:put") - .setHeader(JcloudsConstants.BLOB_NAME, constant(TEST_BLOB)) + .setHeader(JcloudsConstants.BLOB_NAME, constant(TEST_BLOB_IN_DIR)) .setHeader(JcloudsConstants.CONTAINER_NAME, constant(TEST_CONTAINER)) .to("jclouds:blobstore:transient").to("mock:results"); from("direct:put-and-get") - .setHeader(JcloudsConstants.BLOB_NAME, constant(TEST_BLOB)) + .setHeader(JcloudsConstants.BLOB_NAME, constant(TEST_BLOB_IN_DIR)) .setHeader(JcloudsConstants.CONTAINER_NAME, constant(TEST_CONTAINER)) .to("jclouds:blobstore:transient"); } Modified: camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreBlueprintRouteTest.java URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreBlueprintRouteTest.java?rev=1345633&r1=1345632&r2=1345633&view=diff ============================================================================== --- camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreBlueprintRouteTest.java (original) +++ camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreBlueprintRouteTest.java Sun Jun 3 09:15:00 2012 @@ -70,8 +70,8 @@ public class BlobStoreBlueprintRouteTest MockEndpoint mock = ctx.getEndpoint("mock:results", MockEndpoint.class); ProducerTemplate template = ctx.createProducerTemplate(); mock.expectedMessageCount(2); - template.sendBodyAndHeader("direct:start", new SimpleObject("1", "Test 1"), JcloudsConstants.BLOB_NAME, "blob1"); - template.sendBodyAndHeader("direct:start", new SimpleObject("2", "Test 2"), JcloudsConstants.BLOB_NAME, "blob2"); + template.sendBodyAndHeader("direct:start", "Test 1", JcloudsConstants.BLOB_NAME, "blob1"); + template.sendBodyAndHeader("direct:start", "Test 2", JcloudsConstants.BLOB_NAME, "blob2"); assertMockEndpointsSatisfied(); } @@ -81,7 +81,6 @@ public class BlobStoreBlueprintRouteTest getDefaultCamelKarafOptions(), //Helper.setLogLevel("INFO"), provision(newBundle() - .add(SimpleObject.class) .add("META-INF/persistence.xml", BlobStoreBlueprintRouteTest.class.getResource("/META-INF/persistence.xml")) .add("OSGI-INF/blueprint/test.xml", BlobStoreBlueprintRouteTest.class.getResource("blueprintCamelContext.xml")) .set(Constants.BUNDLE_SYMBOLICNAME, "CamelBlueprintJcloudsTestBundle") Modified: camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreRouteTest.java URL: http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreRouteTest.java?rev=1345633&r1=1345632&r2=1345633&view=diff ============================================================================== --- camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreRouteTest.java (original) +++ camel/trunk/tests/camel-itest-osgi/src/test/java/org/apache/camel/itest/osgi/jclouds/BlobStoreRouteTest.java Sun Jun 3 09:15:00 2012 @@ -63,8 +63,8 @@ public class BlobStoreRouteTest extends public void testProducerAndConsumer() throws Exception { MockEndpoint mock = getMockEndpoint("mock:results"); mock.expectedMessageCount(2); - template.sendBodyAndHeader("direct:start", new SimpleObject("1", "Test 1"), JcloudsConstants.BLOB_NAME, "blob1"); - template.sendBodyAndHeader("direct:start", new SimpleObject("2", "Test 2"), JcloudsConstants.BLOB_NAME, "blob2"); + template.sendBodyAndHeader("direct:start", "Test 1", JcloudsConstants.BLOB_NAME, "blob1"); + template.sendBodyAndHeader("direct:start", "Test 2", JcloudsConstants.BLOB_NAME, "blob2"); assertMockEndpointsSatisfied(); }