Updated Branches: refs/heads/master ce0091e22 -> de4cb2b98
CAMEL-6028 add CamelFileName support to camel-hdfs producer Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/de4cb2b9 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/de4cb2b9 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/de4cb2b9 Branch: refs/heads/master Commit: de4cb2b98259fb7c9777e8c0ca79f841956b9994 Parents: ce0091e Author: boday <bo...@apache.org> Authored: Wed Oct 16 22:50:54 2013 -0700 Committer: boday <bo...@apache.org> Committed: Wed Oct 16 22:50:54 2013 -0700 ---------------------------------------------------------------------- .../camel/component/hdfs/HdfsProducer.java | 12 ++- .../camel/component/hdfs/HdfsProducerTest.java | 86 +++++++++++++------- 2 files changed, 68 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/de4cb2b9/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java index 2c3c0f5..11a73e0 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java @@ -180,8 +180,16 @@ public class HdfsProducer extends DefaultProducer { Object body = exchange.getIn().getBody(); Object key = exchange.getIn().getHeader(HdfsHeader.KEY.name()); - // must have ostream - if (ostream == null) { + // if an explicit filename is specified, close any existing stream and append the filename to the hdfsPath + if (exchange.getIn().getHeader(Exchange.FILE_NAME) != null) { + if (ostream != null) { + IOHelper.close(ostream, "output stream", log); + } + StringBuilder actualPath = new StringBuilder(hdfsPath); + actualPath.append(exchange.getIn().getHeader(Exchange.FILE_NAME, String.class)); + ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config); + } else if (ostream == null) { + // must have ostream ostream = setupHdfs(false); } http://git-wip-us.apache.org/repos/asf/camel/blob/de4cb2b9/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java index 09f67a2..937958b 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java @@ -16,8 +16,12 @@ */ package org.apache.camel.component.hdfs; +import java.io.ByteArrayOutputStream; import java.io.File; +import java.io.InputStream; +import java.net.URL; +import org.apache.camel.Exchange; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.util.IOHelper; import org.apache.hadoop.conf.Configuration; @@ -29,6 +33,7 @@ import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.ByteWritable; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.MapFile; @@ -59,7 +64,7 @@ public class HdfsProducerTest extends HdfsTestSupport { template.sendBody("direct:start1", "PAPPO"); Configuration conf = new Configuration(); - Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel1"); + Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel1"); FileSystem fs1 = FileSystem.get(file1.toUri(), conf); SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); @@ -81,7 +86,7 @@ public class HdfsProducerTest extends HdfsTestSupport { } Configuration conf = new Configuration(); - Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel1"); + Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel1"); FileSystem fs1 = FileSystem.get(file1.toUri(), conf); SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); @@ -106,7 +111,7 @@ public class HdfsProducerTest extends HdfsTestSupport { template.sendBody("direct:write_boolean", aBoolean); Configuration conf = new Configuration(); - Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-boolean"); + Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-boolean"); FileSystem fs1 = FileSystem.get(file1.toUri(), conf); SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); @@ -127,7 +132,7 @@ public class HdfsProducerTest extends HdfsTestSupport { template.sendBody("direct:write_byte", aByte); Configuration conf = new Configuration(); - Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-byte"); + Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-byte"); FileSystem fs1 = FileSystem.get(file1.toUri(), conf); SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); @@ -148,7 +153,7 @@ public class HdfsProducerTest extends HdfsTestSupport { template.sendBody("direct:write_int", anInt); Configuration conf = new Configuration(); - Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-int"); + Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-int"); FileSystem fs1 = FileSystem.get(file1.toUri(), conf); SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); @@ -169,7 +174,7 @@ public class HdfsProducerTest extends HdfsTestSupport { template.sendBody("direct:write_float", aFloat); Configuration conf = new Configuration(); - Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-float"); + Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-float"); FileSystem fs1 = FileSystem.get(file1.toUri(), conf); SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); @@ -190,7 +195,7 @@ public class HdfsProducerTest extends HdfsTestSupport { template.sendBody("direct:write_double", aDouble); Configuration conf = new Configuration(); - Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-double"); + Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-double"); FileSystem fs1 = FileSystem.get(file1.toUri(), conf); SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); @@ -211,7 +216,7 @@ public class HdfsProducerTest extends HdfsTestSupport { template.sendBody("direct:write_long", aLong); Configuration conf = new Configuration(); - Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-long"); + Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-long"); FileSystem fs1 = FileSystem.get(file1.toUri(), conf); SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); @@ -232,7 +237,7 @@ public class HdfsProducerTest extends HdfsTestSupport { template.sendBody("direct:write_text1", txt); Configuration conf = new Configuration(); - Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text1"); + Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text1"); FileSystem fs1 = FileSystem.get(file1.toUri(), conf); SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf); Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); @@ -254,7 +259,7 @@ public class HdfsProducerTest extends HdfsTestSupport { template.sendBodyAndHeader("direct:write_text2", txtValue, "KEY", txtKey); Configuration conf = new Configuration(); - Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text2"); + Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text2"); FileSystem fs1 = FileSystem.get(file1.toUri(), conf); SequenceFile.Reader reader = new SequenceFile.Reader(fs1, file1, conf); Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf); @@ -276,9 +281,9 @@ public class HdfsProducerTest extends HdfsTestSupport { template.sendBodyAndHeader("direct:write_text3", txtValue, "KEY", txtKey); Configuration conf = new Configuration(); - Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text3"); + Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text3"); FileSystem fs1 = FileSystem.get(file1.toUri(), conf); - MapFile.Reader reader = new MapFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "test-camel-text3", conf); + MapFile.Reader reader = new MapFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "/test-camel-text3", conf); Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf); reader.next(key, value); @@ -297,9 +302,9 @@ public class HdfsProducerTest extends HdfsTestSupport { template.sendBody("direct:write_text4", txtValue); Configuration conf = new Configuration(); - Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text4"); + Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text4"); FileSystem fs1 = FileSystem.get(file1.toUri(), conf); - ArrayFile.Reader reader = new ArrayFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "test-camel-text4", conf); + ArrayFile.Reader reader = new ArrayFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "/test-camel-text4", conf); Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf); reader.next(value); assertEquals(value.toString(), txtValue); @@ -317,9 +322,9 @@ public class HdfsProducerTest extends HdfsTestSupport { template.sendBodyAndHeader("direct:write_text5", txtValue, "KEY", txtKey); Configuration conf = new Configuration(); - Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "test-camel-text5"); + Path file1 = new Path("file:///" + TEMP_DIR.toUri() + "/test-camel-text5"); FileSystem fs1 = FileSystem.get(file1.toUri(), conf); - BloomMapFile.Reader reader = new BloomMapFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "test-camel-text5", conf); + BloomMapFile.Reader reader = new BloomMapFile.Reader(fs1, "file:///" + TEMP_DIR.toUri() + "/test-camel-text5", conf); Text key = (Text) ReflectionUtils.newInstance(reader.getKeyClass(), conf); Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf); reader.next(key, value); @@ -329,6 +334,29 @@ public class HdfsProducerTest extends HdfsTestSupport { IOHelper.close(reader); } + @Test + public void testWriteTextWithDynamicFilename() throws Exception { + if (!canTest()) { + return; + } + + for (int i = 0; i < 5; i++) { + template.sendBodyAndHeader("direct:write_dynamic_filename", "CIAO" + i, Exchange.FILE_NAME, "file" + i); + } + + for (int i = 0; i < 5; i++) { + InputStream in = null; + try { + in = new URL("file:///" + TEMP_DIR.toUri() + "/test-camel-dynamic/file" + i).openStream(); + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + IOUtils.copyBytes(in, bos, 4096, false); + assertEquals("CIAO" + i, new String(bos.toByteArray())); + } finally { + IOHelper.close(in); + } + } + } + @Override public void tearDown() throws Exception { if (!canTest()) { @@ -349,33 +377,35 @@ public class HdfsProducerTest extends HdfsTestSupport { @Override public void configure() throws Exception { - from("direct:start1").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel1?fileSystemType=LOCAL&valueType=TEXT&fileType=SEQUENCE_FILE"); + from("direct:start1").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel1?fileSystemType=LOCAL&valueType=TEXT&fileType=SEQUENCE_FILE"); /* For testing writables */ - from("direct:write_boolean").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-boolean?fileSystemType=LOCAL&valueType=BOOLEAN&fileType=SEQUENCE_FILE"); + from("direct:write_boolean").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-boolean?fileSystemType=LOCAL&valueType=BOOLEAN&fileType=SEQUENCE_FILE"); - from("direct:write_byte").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-byte?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE"); + from("direct:write_byte").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-byte?fileSystemType=LOCAL&valueType=BYTE&fileType=SEQUENCE_FILE"); - from("direct:write_int").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-int?fileSystemType=LOCAL&valueType=INT&fileType=SEQUENCE_FILE"); + from("direct:write_int").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-int?fileSystemType=LOCAL&valueType=INT&fileType=SEQUENCE_FILE"); - from("direct:write_float").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-float?fileSystemType=LOCAL&valueType=FLOAT&fileType=SEQUENCE_FILE"); + from("direct:write_float").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-float?fileSystemType=LOCAL&valueType=FLOAT&fileType=SEQUENCE_FILE"); - from("direct:write_long").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-long?fileSystemType=LOCAL&valueType=LONG&fileType=SEQUENCE_FILE"); + from("direct:write_long").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-long?fileSystemType=LOCAL&valueType=LONG&fileType=SEQUENCE_FILE"); - from("direct:write_double").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-double?fileSystemType=LOCAL&valueType=DOUBLE&fileType=SEQUENCE_FILE"); + from("direct:write_double").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-double?fileSystemType=LOCAL&valueType=DOUBLE&fileType=SEQUENCE_FILE"); - from("direct:write_text1").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-text1?fileSystemType=LOCAL&valueType=TEXT&fileType=SEQUENCE_FILE"); + from("direct:write_text1").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-text1?fileSystemType=LOCAL&valueType=TEXT&fileType=SEQUENCE_FILE"); /* For testing key and value writing */ - from("direct:write_text2").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-text2?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=SEQUENCE_FILE"); + from("direct:write_text2").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-text2?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=SEQUENCE_FILE"); - from("direct:write_text3").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-text3?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=MAP_FILE"); + from("direct:write_text3").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-text3?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=MAP_FILE"); /* For testing ArrayFile */ - from("direct:write_text4").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-text4?fileSystemType=LOCAL&valueType=TEXT&fileType=ARRAY_FILE"); + from("direct:write_text4").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-text4?fileSystemType=LOCAL&valueType=TEXT&fileType=ARRAY_FILE"); /* For testing BloomMapFile */ - from("direct:write_text5").to("hdfs:///" + TEMP_DIR.toUri() + "test-camel-text5?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=BLOOMMAP_FILE"); + from("direct:write_text5").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-text5?fileSystemType=LOCAL&keyType=TEXT&valueType=TEXT&fileType=BLOOMMAP_FILE"); + + from("direct:write_dynamic_filename").to("hdfs:///" + TEMP_DIR.toUri() + "/test-camel-dynamic/?fileSystemType=LOCAL&valueType=TEXT"); } }; }