Repository: camel Updated Branches: refs/heads/master a547de166 -> d84f9b6e4
[CAMEL-8434] Allow consuming empty files in camel-hdfs Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d84f9b6e Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d84f9b6e Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d84f9b6e Branch: refs/heads/master Commit: d84f9b6e44c05c9954d57630903af6fde59336ac Parents: 4e314d2 Author: Grzegorz Grzybek <gr.grzy...@gmail.com> Authored: Wed Mar 4 11:57:59 2015 +0100 Committer: Grzegorz Grzybek <gr.grzy...@gmail.com> Committed: Wed Mar 4 11:58:12 2015 +0100 ---------------------------------------------------------------------- .../camel/component/hdfs/HdfsConsumer.java | 2 +- .../camel/component/hdfs/HdfsFileType.java | 3 +- .../camel/component/hdfs/HdfsInputStream.java | 11 ++- .../camel/component/hdfs/HdfsConsumerTest.java | 84 ++++++++++++++++++++ 4 files changed, 96 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d84f9b6e/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java index 367f418..f718238 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java @@ -145,7 +145,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer { try { Holder<Object> key = new Holder<Object>(); Holder<Object> value = new Holder<Object>(); - while (this.istream.next(key, value) != 0) { + while (this.istream.next(key, value) >= 0) { Exchange exchange = this.getEndpoint().createExchange(); Message message = new DefaultMessage(); String fileName = StringUtils.substringAfterLast(status.getPath().toString(), "/"); http://git-wip-us.apache.org/repos/asf/camel/blob/d84f9b6e/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java index 97c174f..ff4036d 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java @@ -84,7 +84,8 @@ public enum HdfsFileType { return bytesRead; } else { key.value = null; - value.value = null; + // indication that we may have read from empty file + value.value = bos; return 0; } } catch (IOException ex) { http://git-wip-us.apache.org/repos/asf/camel/blob/d84f9b6e/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java index ac1bff9..24342f6 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java @@ -64,11 +64,18 @@ public class HdfsInputStream implements Closeable { public final long next(Holder<Object> key, Holder<Object> value) { long nb = fileType.next(this, key, value); - if (nb > 0) { + // when zero bytes was read from given type of file, we may still have a record (e.g., empty file) + // null value.value is the only indication that no (new) record/chunk was read + if (nb == 0 && numOfReadBytes.get() > 0) { + // we've read all chunks from file, which size is exact multiple the chunk size + return -1; + } + if (value.value != null) { numOfReadBytes.addAndGet(nb); numOfReadMessages.incrementAndGet(); + return nb; } - return nb; + return -1; } public final long getNumOfReadBytes() { http://git-wip-us.apache.org/repos/asf/camel/blob/d84f9b6e/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java index 368b1d8..1cda88c 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.hdfs; +import java.io.ByteArrayOutputStream; import java.io.File; import java.util.Arrays; import java.util.HashMap; @@ -107,6 +108,89 @@ public class HdfsConsumerTest extends HdfsTestSupport { } @Test + public void testSimpleConsumerWithEmptyFile() throws Exception { + if (!canTest()) { + return; + } + + final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath()); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(file.toUri(), conf); + FSDataOutputStream out = fs.create(file); + out.close(); + + MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class); + resultEndpoint.expectedMessageCount(1); + + context.addRoutes(new RouteBuilder() { + public void configure() { + from("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=4096&initialDelay=0").to("mock:result"); + } + }); + context.start(); + + resultEndpoint.assertIsSatisfied(); + assertThat(resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length, equalTo(0)); + } + + @Test + public void testSimpleConsumerFileWithSizeEqualToNChunks() throws Exception { + if (!canTest()) { + return; + } + + final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath()); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(file.toUri(), conf); + FSDataOutputStream out = fs.create(file); + // size = 5 times chunk size = 210 bytes + for (int i = 0; i < 42; ++i) { + out.write(new byte[] { 0x61, 0x62, 0x63, 0x64, 0x65 }); + out.flush(); + } + out.close(); + + MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class); + resultEndpoint.expectedMessageCount(5); + + context.addRoutes(new RouteBuilder() { + public void configure() { + from("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=42&initialDelay=0").to("mock:result"); + } + }); + context.start(); + + resultEndpoint.assertIsSatisfied(); + assertThat(resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length, equalTo(42)); + } + + @Test + public void testSimpleConsumerWithEmptySequenceFile() throws Exception { + if (!canTest()) { + return; + } + + final Path file = new Path(new File("target/test/test-camel-sequence-file").getAbsolutePath()); + Configuration conf = new Configuration(); + FileSystem fs1 = FileSystem.get(file.toUri(), conf); + SequenceFile.Writer writer = createWriter(fs1, conf, file, NullWritable.class, BooleanWritable.class); + writer.sync(); + writer.close(); + + MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class); + resultEndpoint.expectedMessageCount(0); + + context.addRoutes(new RouteBuilder() { + public void configure() { + from("hdfs:///" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&chunkSize=4096&initialDelay=0").to("mock:result"); + } + }); + context.start(); + + resultEndpoint.assertIsSatisfied(); + } + + @Test public void testReadWithReadSuffix() throws Exception { if (!canTest()) { return;