[CAMEL-8430] Fix "readSuffix" usage in camel-hdfs2 (cherry picked from commit f86bbd0e2ca5763deb4fb6122ad018786b3a7f03)
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/5f470c36 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/5f470c36 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/5f470c36 Branch: refs/heads/camel-2.14.x Commit: 5f470c36668182c5339e19e40214efeec0ebb970 Parents: d430125 Author: Grzegorz Grzybek <gr.grzy...@gmail.com> Authored: Tue Mar 3 14:22:54 2015 +0100 Committer: Grzegorz Grzybek <gr.grzy...@gmail.com> Committed: Wed Mar 4 08:38:05 2015 +0100 ---------------------------------------------------------------------- .../camel/component/hdfs2/HdfsInputStream.java | 4 +- .../camel/component/hdfs2/HdfsConsumerTest.java | 53 ++++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/5f470c36/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java index 951e51e..8e761e1 100644 --- a/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java +++ b/components/camel-hdfs2/src/main/java/org/apache/camel/component/hdfs2/HdfsInputStream.java @@ -28,6 +28,7 @@ public class HdfsInputStream implements Closeable { private HdfsFileType fileType; private String actualPath; private String suffixedPath; + private String suffixedReadPath; private Closeable in; private boolean opened; private int chunkSize; @@ -42,6 +43,7 @@ public class HdfsInputStream implements Closeable { ret.fileType = configuration.getFileType(); ret.actualPath = hdfsPath; ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix(); + ret.suffixedReadPath = ret.actualPath + '.' + configuration.getReadSuffix(); ret.chunkSize = configuration.getChunkSize(); HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath); info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath)); @@ -55,7 +57,7 @@ public class HdfsInputStream implements Closeable { if (opened) { IOUtils.closeStream(in); HdfsInfo info = HdfsInfoFactory.newHdfsInfo(actualPath); - info.getFileSystem().rename(new Path(suffixedPath), new Path(actualPath + '.' + HdfsConstants.DEFAULT_READ_SUFFIX)); + info.getFileSystem().rename(new Path(suffixedPath), new Path(suffixedReadPath)); opened = false; } } http://git-wip-us.apache.org/repos/asf/camel/blob/5f470c36/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java index 8588030..48c155f 100644 --- a/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java +++ b/components/camel-hdfs2/src/test/java/org/apache/camel/component/hdfs2/HdfsConsumerTest.java @@ -18,9 +18,19 @@ package org.apache.camel.component.hdfs2; import java.io.File; import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultScheduledPollConsumerScheduler; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -42,6 +52,7 @@ import org.junit.Before; import org.junit.Test; import static org.apache.hadoop.io.SequenceFile.CompressionType; +import static org.hamcrest.CoreMatchers.equalTo; public class HdfsConsumerTest extends HdfsTestSupport { @@ -97,6 +108,48 @@ public class HdfsConsumerTest extends HdfsTestSupport { } @Test + public void testReadWithReadSuffix() throws Exception { + if (!canTest()) { + return; + } + + final Path file = new Path(new File("target/test/test-camel-boolean").getAbsolutePath()); + Configuration conf = new Configuration(); + SequenceFile.Writer writer = createWriter(conf, file, NullWritable.class, BooleanWritable.class); + NullWritable keyWritable = NullWritable.get(); + BooleanWritable valueWritable = new BooleanWritable(); + valueWritable.set(true); + writer.append(keyWritable, valueWritable); + writer.sync(); + writer.close(); + + context.addRoutes(new RouteBuilder() { + public void configure() { + from("hdfs2:///" + file.getParent().toUri() + "?consumerProperties=#cprops&pattern=*&fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0&readSuffix=handled").to("mock:result"); + } + }); + Map<String, Object> props = new HashMap<String, Object>(); + ScheduledExecutorService pool = context.getExecutorServiceManager().newScheduledThreadPool(null, "unitTestPool", 1); + DefaultScheduledPollConsumerScheduler scheduler = new DefaultScheduledPollConsumerScheduler(pool); + props.put("scheduler", scheduler); + ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) context.getRegistry()).getRegistry()).bind("cprops", props); + context.start(); + + MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class); + resultEndpoint.expectedMessageCount(1); + resultEndpoint.assertIsSatisfied(); + + // synchronize on pool that was used to run hdfs consumer thread + scheduler.getScheduledExecutorService().shutdown(); + scheduler.getScheduledExecutorService().awaitTermination(5000, TimeUnit.MILLISECONDS); + + Set<String> files = new HashSet<String>(Arrays.asList(new File("target/test").list())); + assertThat(files.size(), equalTo(2)); + assertTrue(files.remove("test-camel-boolean.handled")); + assertTrue(files.remove(".test-camel-boolean.handled.crc")); + } + + @Test public void testReadBoolean() throws Exception { if (!canTest()) { return;