Updated Branches: refs/heads/camel-2.12.x 0c73ad9fd -> f6e22cd92
CAMEL-6867 reworked the camel-hdfs split mode filenames to use UUID generator Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f6e22cd9 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f6e22cd9 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f6e22cd9 Branch: refs/heads/camel-2.12.x Commit: f6e22cd92a7d4257e1909e9a533856a1898aa9fb Parents: 0c73ad9 Author: boday <bo...@apache.org> Authored: Fri Oct 18 14:25:07 2013 -0700 Committer: boday <bo...@apache.org> Committed: Sun Oct 20 20:42:05 2013 -0700 ---------------------------------------------------------------------- .../component/hdfs/HdfsFileSystemType.java | 2 - .../camel/component/hdfs/HdfsProducer.java | 5 +-- .../hdfs/HdfsProducerConsumerTest.java | 11 +++--- .../component/hdfs/HdfsProducerSplitTest.java | 41 ++++++++------------ 4 files changed, 25 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/f6e22cd9/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java index 97dcff2..891c4eb 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java @@ -26,7 +26,6 @@ public enum HdfsFileSystemType { hpath.append(config.getPath()); if (config.getSplitStrategies().size() > 0) { hpath.append('/'); - hpath.append(HdfsConstants.DEFAULT_SEGMENT_PREFIX); } return hpath; } @@ -43,7 +42,6 @@ public enum HdfsFileSystemType { hpath.append(config.getPath()); if (config.getSplitStrategies().size() > 0) { hpath.append('/'); - hpath.append(HdfsConstants.DEFAULT_SEGMENT_PREFIX); } return hpath; } http://git-wip-us.apache.org/repos/asf/camel/blob/f6e22cd9/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java index 2c3c0f5..36a065e 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java @@ -27,6 +27,7 @@ import javax.security.auth.login.Configuration; import org.apache.camel.Exchange; import org.apache.camel.impl.DefaultProducer; import org.apache.camel.util.IOHelper; +import org.apache.camel.util.StringHelper; public class HdfsProducer extends DefaultProducer { @@ -35,7 +36,6 @@ public class HdfsProducer extends DefaultProducer { private final AtomicBoolean idle = new AtomicBoolean(false); private volatile ScheduledExecutorService scheduler; private volatile HdfsOutputStream ostream; - private long splitNum; public static final class SplitStrategy { private SplitStrategyType type; @@ -229,8 +229,7 @@ public class HdfsProducer extends DefaultProducer { private StringBuilder newFileName() { StringBuilder actualPath = new StringBuilder(hdfsPath); - actualPath.append(splitNum); - splitNum++; + actualPath.append(StringHelper.sanitize(getEndpoint().getCamelContext().getUuidGenerator().generateUuid())); return actualPath; } http://git-wip-us.apache.org/repos/asf/camel/blob/f6e22cd9/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java index b169073..4f93257 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java @@ -17,6 +17,7 @@ package org.apache.camel.component.hdfs; import java.io.File; +import java.util.ArrayList; import java.util.List; import org.apache.camel.Exchange; @@ -55,13 +56,15 @@ public class HdfsProducerConsumerTest extends HdfsTestSupport { @Override public void configure() { from("direct:start").to("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&splitStrategy=BYTES:5,IDLE:1000"); - from("hdfs:///" + file.toUri() + "?pattern=seg*&initialDelay=2000&fileSystemType=LOCAL&chunkSize=5").to("mock:result"); + from("hdfs:///" + file.toUri() + "?initialDelay=2000&fileSystemType=LOCAL&chunkSize=5").to("mock:result"); } }); context.start(); + List<String> expectedResults = new ArrayList<String>(); for (int i = 0; i < 10; ++i) { template.sendBody("direct:start", "CIAO" + i); + expectedResults.add("CIAO" + i); } MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class); @@ -69,11 +72,9 @@ public class HdfsProducerConsumerTest extends HdfsTestSupport { resultEndpoint.expectedMessageCount(10); resultEndpoint.assertIsSatisfied(); - int i = 0; List<Exchange> exchanges = resultEndpoint.getExchanges(); - for (Exchange exchange : exchanges) { - assertEquals("CIAO" + i++, exchange.getIn().getBody(String.class)); - } + assertEquals(10, exchanges.size()); + resultEndpoint.expectedBodiesReceivedInAnyOrder(expectedResults); } @Override http://git-wip-us.apache.org/repos/asf/camel/blob/f6e22cd9/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java index 44bb590..7496a11 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java @@ -16,16 +16,15 @@ */ package org.apache.camel.component.hdfs; -import java.io.ByteArrayOutputStream; +import java.io.BufferedReader; import java.io.File; -import java.io.InputStream; -import java.net.URL; +import java.io.InputStreamReader; import org.apache.camel.builder.RouteBuilder; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.IOUtils; import org.junit.Before; import org.junit.Test; @@ -65,16 +64,13 @@ public class HdfsProducerSplitTest extends HdfsTestSupport { // stop Camel to flush and close file stream stopCamelContext(); - for (int i = 0; i < 3; ++i) { - InputStream in = null; - try { - in = new URL("file:///" + BASE_FILE.toUri() + "3/" + HdfsConstants.DEFAULT_SEGMENT_PREFIX + i).openStream(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - IOUtils.copyBytes(in, bos, 4096, false); - assertEquals("CIAO" + i, new String(bos.toByteArray())); - } finally { - IOUtils.closeStream(in); - } + FileSystem fs = FileSystem.get(new Configuration()); + FileStatus[] status = fs.listStatus(new Path("file:///" + BASE_FILE.toUri() + "3")); + assertEquals(3, status.length); + for (int i = 0; i < 3; i++) { + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(status[i].getPath()))); + assertTrue(br.readLine().startsWith("CIAO")); + assertNull(br.readLine()); } } @@ -98,16 +94,13 @@ public class HdfsProducerSplitTest extends HdfsTestSupport { } stopCamelContext(); - for (int i = 0; i < 10; ++i) { - InputStream in = null; - try { - in = new URL("file:///" + BASE_FILE.toUri() + routeNr + '/' + HdfsConstants.DEFAULT_SEGMENT_PREFIX + i).openStream(); - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - IOUtils.copyBytes(in, bos, 4096, false); - assertEquals("CIAO" + i, new String(bos.toByteArray())); - } finally { - IOUtils.closeStream(in); - } + FileSystem fs = FileSystem.get(new Configuration()); + FileStatus[] status = fs.listStatus(new Path("file:///" + BASE_FILE.toUri() + routeNr)); + assertEquals(10, status.length); + for (int i = 0; i < status.length; i++) { + BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(status[i].getPath()))); + assertTrue(br.readLine().startsWith("CIAO")); + assertNull(br.readLine()); } }