[CAMEL-8430] Fix "readSuffix" usage in camel-hdfs
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/109d8ecb Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/109d8ecb Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/109d8ecb Branch: refs/heads/master Commit: 109d8ecb40db0455c6471fd197fcf1583a386ea4 Parents: f86bbd0 Author: Grzegorz Grzybek <gr.grzy...@gmail.com> Authored: Tue Mar 3 14:26:48 2015 +0100 Committer: Grzegorz Grzybek <gr.grzy...@gmail.com> Committed: Tue Mar 3 14:27:20 2015 +0100 ---------------------------------------------------------------------- .../camel/component/hdfs/HdfsInputStream.java | 4 +- .../camel/component/hdfs/HdfsConsumerTest.java | 54 ++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/109d8ecb/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java index 7e9d2c2..ac1bff9 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java @@ -28,6 +28,7 @@ public class HdfsInputStream implements Closeable { private HdfsFileType fileType; private String actualPath; private String suffixedPath; + private String suffixedReadPath; private Closeable in; private boolean opened; private int chunkSize; @@ -42,6 +43,7 @@ public class HdfsInputStream implements Closeable { ret.fileType = configuration.getFileType(); ret.actualPath = hdfsPath; ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix(); + ret.suffixedReadPath = ret.actualPath + '.' + configuration.getReadSuffix(); ret.chunkSize = configuration.getChunkSize(); HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath); info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath)); @@ -55,7 +57,7 @@ public class HdfsInputStream implements Closeable { if (opened) { IOUtils.closeStream(in); HdfsInfo info = HdfsInfoFactory.newHdfsInfo(actualPath); - info.getFileSystem().rename(new Path(suffixedPath), new Path(actualPath + '.' + HdfsConstants.DEFAULT_READ_SUFFIX)); + info.getFileSystem().rename(new Path(suffixedPath), new Path(suffixedReadPath)); opened = false; } } http://git-wip-us.apache.org/repos/asf/camel/blob/109d8ecb/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java index 33a8f6d..368b1d8 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java @@ -17,9 +17,19 @@ package org.apache.camel.component.hdfs; import java.io.File; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.impl.DefaultScheduledPollConsumerScheduler; +import org.apache.camel.impl.JndiRegistry; +import org.apache.camel.impl.PropertyPlaceholderDelegateRegistry; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -41,6 +51,7 @@ import org.junit.Test; import static org.apache.hadoop.io.SequenceFile.CompressionType; import static org.apache.hadoop.io.SequenceFile.createWriter; +import static org.hamcrest.CoreMatchers.equalTo; public class HdfsConsumerTest extends HdfsTestSupport { @@ -96,6 +107,49 @@ public class HdfsConsumerTest extends HdfsTestSupport { } @Test + public void testReadWithReadSuffix() throws Exception { + if (!canTest()) { + return; + } + + final Path file = new Path(new File("target/test/test-camel-boolean").getAbsolutePath()); + Configuration conf = new Configuration(); + FileSystem fs1 = FileSystem.get(file.toUri(), conf); + SequenceFile.Writer writer = createWriter(fs1, conf, file, NullWritable.class, BooleanWritable.class); + NullWritable keyWritable = NullWritable.get(); + BooleanWritable valueWritable = new BooleanWritable(); + valueWritable.set(true); + writer.append(keyWritable, valueWritable); + writer.sync(); + writer.close(); + + context.addRoutes(new RouteBuilder() { + public void configure() { + from("hdfs:///" + file.getParent().toUri() + "?consumerProperties=#cprops&pattern=*&fileSystemType=LOCAL&fileType=SEQUENCE_FILE&initialDelay=0&readSuffix=handled").to("mock:result"); + } + }); + Map<String, Object> props = new HashMap<String, Object>(); + ScheduledExecutorService pool = context.getExecutorServiceManager().newScheduledThreadPool(null, "unitTestPool", 1); + DefaultScheduledPollConsumerScheduler scheduler = new DefaultScheduledPollConsumerScheduler(pool); + props.put("scheduler", scheduler); + ((JndiRegistry) ((PropertyPlaceholderDelegateRegistry) context.getRegistry()).getRegistry()).bind("cprops", props); + context.start(); + + MockEndpoint resultEndpoint = context.getEndpoint("mock:result", MockEndpoint.class); + resultEndpoint.expectedMessageCount(1); + resultEndpoint.assertIsSatisfied(); + + // synchronize on pool that was used to run hdfs consumer thread + scheduler.getScheduledExecutorService().shutdown(); + scheduler.getScheduledExecutorService().awaitTermination(5000, TimeUnit.MILLISECONDS); + + Set<String> files = new HashSet<String>(Arrays.asList(new File("target/test").list())); + assertThat(files.size(), equalTo(2)); + assertTrue(files.remove("test-camel-boolean.handled")); + assertTrue(files.remove(".test-camel-boolean.handled.crc")); + } + + @Test public void testReadBoolean() throws Exception { if (!canTest()) { return;