[CAMEL-8434] Allow consuming empty files in camel-hdfs2
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4e314d26 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4e314d26 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4e314d26 Branch: refs/heads/master Commit: 4e314d2691416e2e01be3a4457db3e21f90be41c Parents: a547de1 Author: Grzegorz Grzybek <gr.grzy...@gmail.com> Authored: Wed Mar 4 11:19:00 2015 +0100 Committer: Grzegorz Grzybek <gr.grzy...@gmail.com> Committed: Wed Mar 4 11:58:12 2015 +0100 ---------------------------------------------------------------------- .../camel/component/hdfs2/HdfsConsumer.java | 2 +- .../camel/component/hdfs2/HdfsFileType.java | 3 +- .../camel/component/hdfs2/HdfsInputStream.java | 17 +++- .../camel/component/hdfs2/HdfsConsumerTest.java | 83 ++++++++++++++++++++ 4 files changed, 101 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/4e314d26/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java index 5b52f8b..28192fa 100644 --- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java +++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsConsumer.java @@ -145,7 +145,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer { try { Holder<Object> key = new Holder<Object>(); Holder<Object> value = new Holder<Object>(); - while (this.istream.next(key, value) != 0) { + while (this.istream.next(key, value) >= 0) { Exchange exchange = this.getEndpoint().createExchange(); Message message = new DefaultMessage(); String fileName = StringUtils.substringAfterLast(status.getPath().toString(), "/"); http://git-wip-us.apache.org/repos/asf/camel/blob/4e314d26/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java index bbe0172..e6ec656 100644 --- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java +++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsFileType.java @@ -85,7 +85,8 @@ public enum HdfsFileType { return bytesRead; } else { key.value = null; - value.value = null; + // indication that we may have read from empty file + value.value = bos; return 0; } } catch (IOException ex) { http://git-wip-us.apache.org/repos/asf/camel/blob/4e314d26/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java index 8e761e1..50970dd 100644 --- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java +++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java @@ -62,13 +62,26 @@ public class HdfsInputStream implements Closeable { } } + /** + * Reads next record/chunk specific to give file type. + * @param key + * @param value + * @return number of bytes read. 0 is correct number of bytes (empty file), -1 indicates no record was read + */ public final long next(Holder<Object> key, Holder<Object> value) { long nb = fileType.next(this, key, value); - if (nb > 0) { + // when zero bytes was read from given type of file, we may still have a record (e.g., empty file) + // null value.value is the only indication that no (new) record/chunk was read + if (nb == 0 && numOfReadBytes.get() > 0) { + // we've read all chunks from file, which size is exact multiple the chunk size + return -1; + } + if (value.value != null) { numOfReadBytes.addAndGet(nb); numOfReadMessages.incrementAndGet(); + return nb; } - return nb; + return -1; } public final long getNumOfReadBytes() { http://git-wip-us.apache.org/repos/asf/camel/blob/4e314d26/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java index 48c155f..16aedc2 100644 --- a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java +++ b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.hdfs2; +import java.io.ByteArrayOutputStream; import java.io.File; import java.io.IOException; import java.util.Arrays; @@ -108,6 +109,88 @@ public class HdfsConsumerTest extends HdfsTestSupport { } @Test + public void testSimpleConsumerWithEmptyFile() throws Exception { + if (!canTest()) { + return; + } + + final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath()); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(file.toUri(), conf); + FSDataOutputStream out = fs.create(file); + out.close(); + + MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class); + resultEndpoint.expectedMessageCount(1); + + context.addRoutes(new RouteBuilder() { + public void configure() { + from("hdfs2:///" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=4096&initialDelay=0").to("mock:result"); + } + }); + context.start(); + + resultEndpoint.assertIsSatisfied(); + assertThat(resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length, equalTo(0)); + } + + @Test + public void testSimpleConsumerFileWithSizeEqualToNChunks() throws Exception { + if (!canTest()) { + return; + } + + final Path file = new Path(new File("target/test/test-camel-normal-file").getAbsolutePath()); + Configuration conf = new Configuration(); + FileSystem fs = FileSystem.get(file.toUri(), conf); + FSDataOutputStream out = fs.create(file); + // size = 5 times chunk size = 210 bytes + for (int i = 0; i < 42; ++i) { + out.write(new byte[] { 0x61, 0x62, 0x63, 0x64, 0x65 }); + out.flush(); + } + out.close(); + + MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class); + resultEndpoint.expectedMessageCount(5); + + context.addRoutes(new RouteBuilder() { + public void configure() { + from("hdfs2:///" + file.toUri() + "?fileSystemType=LOCAL&chunkSize=42&initialDelay=0").to("mock:result"); + } + }); + context.start(); + + resultEndpoint.assertIsSatisfied(); + assertThat(resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length, equalTo(42)); + } + + @Test + public void testSimpleConsumerWithEmptySequenceFile() throws Exception { + if (!canTest()) { + return; + } + + final Path file = new Path(new File("target/test/test-camel-sequence-file").getAbsolutePath()); + Configuration conf = new Configuration(); + SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BooleanWritable.class); + writer.sync(); + writer.close(); + + MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class); + resultEndpoint.expectedMessageCount(0); + + context.addRoutes(new RouteBuilder() { + public void configure() { + from("hdfs2:///" + file.toUri() + "?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&chunkSize=4096&initialDelay=0").to("mock:result"); + } + }); + context.start(); + + resultEndpoint.assertIsSatisfied(); + } + + @Test public void testReadWithReadSuffix() throws Exception { if (!canTest()) { return;