Repository: camel
Updated Branches:
  refs/heads/master a547de166 -> d84f9b6e4


[CAMEL-8434] Allow consuming empty files in camel-hdfs


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d84f9b6e
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d84f9b6e
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d84f9b6e

Branch: refs/heads/master
Commit: d84f9b6e44c05c9954d57630903af6fde59336ac
Parents: 4e314d2
Author: Grzegorz Grzybek <gr.grzy...@gmail.com>
Authored: Wed Mar 4 11:57:59 2015 +0100
Committer: Grzegorz Grzybek <gr.grzy...@gmail.com>
Committed: Wed Mar 4 11:58:12 2015 +0100

----------------------------------------------------------------------
 .../camel/component/hdfs/HdfsConsumer.java      |  2 +-
 .../camel/component/hdfs/HdfsFileType.java      |  3 +-
 .../camel/component/hdfs/HdfsInputStream.java   | 11 ++-
 .../camel/component/hdfs/HdfsConsumerTest.java  | 84 ++++++++++++++++++++
 4 files changed, 96 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d84f9b6e/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index 367f418..f718238 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -145,7 +145,7 @@ public final class HdfsConsumer extends 
ScheduledPollConsumer {
             try {
                 Holder<Object> key = new Holder<Object>();
                 Holder<Object> value = new Holder<Object>();
-                while (this.istream.next(key, value) != 0) {
+                while (this.istream.next(key, value) >= 0) {
                     Exchange exchange = this.getEndpoint().createExchange();
                     Message message = new DefaultMessage();
                     String fileName = 
StringUtils.substringAfterLast(status.getPath().toString(), "/");

http://git-wip-us.apache.org/repos/asf/camel/blob/d84f9b6e/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
index 97c174f..ff4036d 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
@@ -84,7 +84,8 @@ public enum HdfsFileType {
                     return bytesRead;
                 } else {
                     key.value = null;
-                    value.value = null;
+                    // indication that we may have read from empty file
+                    value.value = bos;
                     return 0;
                 }
             } catch (IOException ex) {

http://git-wip-us.apache.org/repos/asf/camel/blob/d84f9b6e/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index ac1bff9..24342f6 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -64,11 +64,18 @@ public class HdfsInputStream implements Closeable {
 
     public final long next(Holder<Object> key, Holder<Object> value) {
         long nb = fileType.next(this, key, value);
-        if (nb > 0) {
+        // when zero bytes was read from given type of file, we may still have 
a record (e.g., empty file)
+        // null value.value is the only indication that no (new) record/chunk 
was read
+        if (nb == 0 && numOfReadBytes.get() > 0) {
+            // we've read all chunks from file, which size is exact multiple 
the chunk size
+            return -1;
+        }
+        if (value.value != null) {
             numOfReadBytes.addAndGet(nb);
             numOfReadMessages.incrementAndGet();
+            return nb;
         }
-        return nb;
+        return -1;
     }
 
     public final long getNumOfReadBytes() {

http://git-wip-us.apache.org/repos/asf/camel/blob/d84f9b6e/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
 
b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
index 368b1d8..1cda88c 100644
--- 
a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
+++ 
b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.component.hdfs;
 
+import java.io.ByteArrayOutputStream;
 import java.io.File;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -107,6 +108,89 @@ public class HdfsConsumerTest extends HdfsTestSupport {
     }
 
     @Test
+    public void testSimpleConsumerWithEmptyFile() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        final Path file = new Path(new 
File("target/test/test-camel-normal-file").getAbsolutePath());
+        Configuration conf = new Configuration();
+        FileSystem fs = FileSystem.get(file.toUri(), conf);
+        FSDataOutputStream out = fs.create(file);
+        out.close();
+
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", 
MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(1);
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                from("hdfs:///" + file.toUri() + 
"?fileSystemType=LOCAL&chunkSize=4096&initialDelay=0").to("mock:result");
+            }
+        });
+        context.start();
+
+        resultEndpoint.assertIsSatisfied();
+        
assertThat(resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length,
 equalTo(0));
+    }
+
+    @Test
+    public void testSimpleConsumerFileWithSizeEqualToNChunks() throws 
Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        final Path file = new Path(new 
File("target/test/test-camel-normal-file").getAbsolutePath());
+        Configuration conf = new Configuration();
+        FileSystem fs = FileSystem.get(file.toUri(), conf);
+        FSDataOutputStream out = fs.create(file);
+        // size = 5 times chunk size = 210 bytes
+        for (int i = 0; i < 42; ++i) {
+            out.write(new byte[] { 0x61, 0x62, 0x63, 0x64, 0x65 });
+            out.flush();
+        }
+        out.close();
+
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", 
MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(5);
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                from("hdfs:///" + file.toUri() + 
"?fileSystemType=LOCAL&chunkSize=42&initialDelay=0").to("mock:result");
+            }
+        });
+        context.start();
+
+        resultEndpoint.assertIsSatisfied();
+        
assertThat(resultEndpoint.getReceivedExchanges().get(0).getIn().getBody(ByteArrayOutputStream.class).toByteArray().length,
 equalTo(42));
+    }
+
+    @Test
+    public void testSimpleConsumerWithEmptySequenceFile() throws Exception {
+        if (!canTest()) {
+            return;
+        }
+
+        final Path file = new Path(new 
File("target/test/test-camel-sequence-file").getAbsolutePath());
+        Configuration conf = new Configuration();
+        FileSystem fs1 = FileSystem.get(file.toUri(), conf);
+        SequenceFile.Writer writer = createWriter(fs1, conf, file, 
NullWritable.class, BooleanWritable.class);
+        writer.sync();
+        writer.close();
+
+        MockEndpoint resultEndpoint = context.getEndpoint("mock:result", 
MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(0);
+
+        context.addRoutes(new RouteBuilder() {
+            public void configure() {
+                from("hdfs:///" + file.toUri() + 
"?fileSystemType=LOCAL&fileType=SEQUENCE_FILE&chunkSize=4096&initialDelay=0").to("mock:result");
+            }
+        });
+        context.start();
+
+        resultEndpoint.assertIsSatisfied();
+    }
+
+    @Test
     public void testReadWithReadSuffix() throws Exception {
         if (!canTest()) {
             return;

Reply via email to