This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new e4e7147 CAMEL-14132 - camel-hdfs: Producer throws exception when body is RemoteFile (#3310) e4e7147 is described below commit e4e71473d62e378adce63d6a76a891f379b6affe Author: Marius Cornescu <marius_corne...@yahoo.com> AuthorDate: Mon Nov 4 08:48:51 2019 +0100 CAMEL-14132 - camel-hdfs: Producer throws exception when body is RemoteFile (#3310) * hdfs stream * hdfs stream * Migrate towards a more hdfs object based design * Migrate towards a more hdfs object based design * CAMEL-14132 - camel-hdfs: Producer throws exception when body is RemoteFile --- .../camel/component/hdfs/DefaultHdfsFile.java | 9 +-- .../component/hdfs/HdfsArrayFileTypeHandler.java | 63 +++++++++-------- .../component/hdfs/HdfsBloommapFileHandler.java | 74 ++++++++++---------- .../apache/camel/component/hdfs/HdfsConsumer.java | 12 +--- .../org/apache/camel/component/hdfs/HdfsFile.java | 12 ++-- .../apache/camel/component/hdfs/HdfsFileType.java | 6 +- .../camel/component/hdfs/HdfsInputStream.java | 18 ++--- .../camel/component/hdfs/HdfsMapFileHandler.java | 72 ++++++++++---------- .../component/hdfs/HdfsNormalFileHandler.java | 76 +++++++++++++-------- .../camel/component/hdfs/HdfsOutputStream.java | 6 +- .../apache/camel/component/hdfs/HdfsProducer.java | 5 +- .../component/hdfs/HdfsSequenceFileHandler.java | 78 +++++++++++----------- .../camel/component/hdfs/HdfsConsumerTest.java | 68 +++++++++++++++++++ 13 files changed, 293 insertions(+), 206 deletions(-) diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java index 76942c8..cc0845e 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java @@ -16,6 +16,7 @@ */ package org.apache.camel.component.hdfs; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; @@ -24,7 +25,7 @@ import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; -import org.apache.camel.TypeConverter; +import org.apache.camel.Exchange; import org.apache.camel.util.IOHelper; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.ByteWritable; @@ -37,7 +38,7 @@ import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; -abstract class DefaultHdfsFile implements HdfsFile { +abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> implements HdfsFile<T, U, Object, Object> { protected final long copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException { long numBytes = 0; @@ -61,13 +62,13 @@ abstract class DefaultHdfsFile implements HdfsFile { return numBytes; } - protected final Writable getWritable(Object obj, TypeConverter typeConverter, Holder<Integer> size) { + protected final Writable getWritable(Object obj, Exchange exchange, Holder<Integer> size) { Class<?> objCls = obj == null ? null : obj.getClass(); HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.get(objCls); if (objWritableFactory == null) { objWritableFactory = new HdfsWritableFactories.HdfsObjectWritableFactory(); } - return objWritableFactory.create(obj, typeConverter, size); + return objWritableFactory.create(obj, exchange.getContext().getTypeConverter(), size); } protected final Object getObject(Writable writable, Holder<Integer> size) { diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java index 35bb5a9..2a0e68c 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java @@ -16,72 +16,71 @@ */ package org.apache.camel.component.hdfs; -import java.io.Closeable; import java.io.IOException; +import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.TypeConverter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.ArrayFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.ReflectionUtils; -class HdfsArrayFileTypeHandler extends DefaultHdfsFile { +class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFile.Reader> { + @SuppressWarnings("rawtypes") @Override - public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) { + public ArrayFile.Writer createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { try { - Holder<Integer> valueSize = new Holder<>(); - Writable valueWritable = getWritable(value, typeConverter, valueSize); - ((ArrayFile.Writer) hdfsOutputStream.getOut()).append(valueWritable); - return valueSize.value; - } catch (Exception ex) { + ArrayFile.Writer rout; + HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); + HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); + Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass(); + rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass, + endpointConfig.getCompressionType(), () -> { }); + return rout; + } catch (IOException ex) { throw new RuntimeCamelException(ex); } } @Override - public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { + public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) { try { - ArrayFile.Reader reader = (ArrayFile.Reader) hdfsInputStream.getIn(); Holder<Integer> valueSize = new Holder<>(); - Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); - if (reader.next(valueWritable) != null) { - value.value = getObject(valueWritable, valueSize); - return valueSize.value; - } else { - return 0; - } + Writable valueWritable = getWritable(value, exchange, valueSize); + ((ArrayFile.Writer) hdfsOutputStream.getOut()).append(valueWritable); + return valueSize.value; } catch (Exception ex) { throw new RuntimeCamelException(ex); } } - @SuppressWarnings("rawtypes") @Override - public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { + public ArrayFile.Reader createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { try { - Closeable rout; + ArrayFile.Reader rin; HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); - HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); - Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass(); - rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass, - endpointConfig.getCompressionType(), () -> { }); - return rout; + rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConfiguration()); + return rin; } catch (IOException ex) { throw new RuntimeCamelException(ex); } } @Override - public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { + public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { try { - Closeable rin; - HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); - rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConfiguration()); - return rin; - } catch (IOException ex) { + ArrayFile.Reader reader = (ArrayFile.Reader) hdfsInputStream.getIn(); + Holder<Integer> valueSize = new Holder<>(); + Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); + if (reader.next(valueWritable) != null) { + value.value = getObject(valueWritable, valueSize); + return valueSize.value; + } else { + return 0; + } + } catch (Exception ex) { throw new RuntimeCamelException(ex); } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java index 422e4e6..0b1f907 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java @@ -16,11 +16,10 @@ */ package org.apache.camel.component.hdfs; -import java.io.Closeable; import java.io.IOException; +import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.TypeConverter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.BloomMapFile; @@ -29,15 +28,35 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.ReflectionUtils; -class HdfsBloommapFileHandler extends DefaultHdfsFile { +class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, BloomMapFile.Reader> { + + @SuppressWarnings("rawtypes") + @Override + public BloomMapFile.Writer createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { + try { + BloomMapFile.Writer rout; + HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); + HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); + Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass(); + Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass(); + rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), + MapFile.Writer.valueClass(valueWritableClass), + MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()), + MapFile.Writer.progressable(() -> { + })); + return rout; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } @Override - public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) { + public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) { try { Holder<Integer> keySize = new Holder<>(); - Writable keyWritable = getWritable(key, typeConverter, keySize); + Writable keyWritable = getWritable(key, exchange, keySize); Holder<Integer> valueSize = new Holder<>(); - Writable valueWritable = getWritable(value, typeConverter, valueSize); + Writable valueWritable = getWritable(value, exchange, valueSize); ((BloomMapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable); return Long.sum(keySize.value, valueSize.value); } catch (Exception ex) { @@ -46,6 +65,18 @@ class HdfsBloommapFileHandler extends DefaultHdfsFile { } @Override + public BloomMapFile.Reader createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { + try { + BloomMapFile.Reader rin; + HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); + rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration()); + return rin; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) { try { MapFile.Reader reader = (BloomMapFile.Reader) hdfsistr.getIn(); @@ -65,35 +96,4 @@ class HdfsBloommapFileHandler extends DefaultHdfsFile { } } - @SuppressWarnings("rawtypes") - @Override - public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { - try { - Closeable rout; - HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); - HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); - Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass(); - Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass(); - rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), - MapFile.Writer.valueClass(valueWritableClass), - MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()), - MapFile.Writer.progressable(() -> { - })); - return rout; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } - - @Override - public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { - try { - Closeable rin; - HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); - rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration()); - return rin; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java index 7b3f4e4..4998d7a 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java @@ -29,6 +29,7 @@ import javax.security.auth.login.Configuration; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.Processor; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.support.ScheduledPollConsumer; import org.apache.camel.util.IOHelper; import org.apache.commons.lang.StringUtils; @@ -148,15 +149,8 @@ public final class HdfsConsumer extends ScheduledPollConsumer { Holder<Object> key = new Holder<>(); Holder<Object> value = new Holder<>(); - if (this.endpointConfig.isStreamDownload()) { - key.value = null; - value.value = inputStream; - // use the input stream as the body + while (inputStream.next(key, value) >= 0) { processHdfsInputStream(inputStream, key, value, messageCount, totalFiles); - } else { - while (inputStream.next(key, value) >= 0) { - processHdfsInputStream(inputStream, key, value, messageCount, totalFiles); - } } } @@ -201,7 +195,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer { return false; } } catch (IOException e) { - throw new RuntimeException(e); + throw new RuntimeCamelException(e); } } return true; diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java index 1e2d63f..9979adf 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java @@ -18,16 +18,16 @@ package org.apache.camel.component.hdfs; import java.io.Closeable; -import org.apache.camel.TypeConverter; +import org.apache.camel.Exchange; -interface HdfsFile { +interface HdfsFile<T extends Closeable, U extends Closeable, K, V> { - Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory); + T createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory); - long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter); + long append(HdfsOutputStream hdfsOutputStream, K key, V value, Exchange exchange); - Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory); + U createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory); - long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value); + long next(HdfsInputStream hdfsInputStream, Holder<K> key, Holder<V> value); } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java index 7d4a239..430e20c 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java @@ -18,7 +18,7 @@ package org.apache.camel.component.hdfs; import java.io.Closeable; -import org.apache.camel.TypeConverter; +import org.apache.camel.Exchange; public enum HdfsFileType { @@ -38,8 +38,8 @@ public enum HdfsFileType { return this.file.createOutputStream(hdfsPath, hdfsInfoFactory); } - public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) { - return this.file.append(hdfsOutputStream, key, value, typeConverter); + public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) { + return this.file.append(hdfsOutputStream, key, value, exchange); } public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java index 8f12aef..68c22f6 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java @@ -20,6 +20,7 @@ import java.io.Closeable; import java.io.IOException; import java.util.concurrent.atomic.AtomicLong; +import org.apache.camel.RuntimeCamelException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.slf4j.Logger; @@ -30,6 +31,7 @@ public class HdfsInputStream implements Closeable { private static final Logger LOG = LoggerFactory.getLogger(HdfsInputStream.class); private HdfsFileType fileType; + private HdfsInfo info; private String actualPath; private String suffixedPath; private String suffixedReadPath; @@ -39,7 +41,7 @@ public class HdfsInputStream implements Closeable { private final AtomicLong numOfReadBytes = new AtomicLong(0L); private final AtomicLong numOfReadMessages = new AtomicLong(0L); - private HdfsConfiguration config; + private boolean streamDownload; protected HdfsInputStream() { } @@ -49,7 +51,6 @@ public class HdfsInputStream implements Closeable { * @param hdfsPath * @param hdfsInfoFactory * @return - * @throws IOException */ public static HdfsInputStream createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); @@ -59,18 +60,18 @@ public class HdfsInputStream implements Closeable { iStream.suffixedPath = iStream.actualPath + '.' + endpointConfig.getOpenedSuffix(); iStream.suffixedReadPath = iStream.actualPath + '.' + endpointConfig.getReadSuffix(); iStream.chunkSize = endpointConfig.getChunkSize(); + iStream.streamDownload = endpointConfig.isStreamDownload(); try { - HdfsInfo info = hdfsInfoFactory.newHdfsInfo(iStream.actualPath); - if (info.getFileSystem().rename(new Path(iStream.actualPath), new Path(iStream.suffixedPath))) { + iStream.info = hdfsInfoFactory.newHdfsInfo(iStream.actualPath); + if (iStream.info.getFileSystem().rename(new Path(iStream.actualPath), new Path(iStream.suffixedPath))) { iStream.in = iStream.fileType.createInputStream(iStream.suffixedPath, hdfsInfoFactory); iStream.opened = true; - iStream.config = endpointConfig; } else { LOG.debug("Failed to open file [{}] because it doesn't exist", hdfsPath); iStream = null; } } catch (IOException e) { - throw new RuntimeException(e); + throw new RuntimeCamelException(e); } return iStream; @@ -80,8 +81,6 @@ public class HdfsInputStream implements Closeable { public final void close() throws IOException { if (opened) { IOUtils.closeStream(in); - HdfsInfoFactory hdfsInfoFactory = new HdfsInfoFactory(config); - HdfsInfo info = hdfsInfoFactory.newHdfsInfo(actualPath); info.getFileSystem().rename(new Path(suffixedPath), new Path(suffixedReadPath)); opened = false; } @@ -133,4 +132,7 @@ public class HdfsInputStream implements Closeable { return opened; } + public boolean isStreamDownload() { + return streamDownload; + } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java index 1c19162..b63463d 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java @@ -16,11 +16,10 @@ */ package org.apache.camel.component.hdfs; -import java.io.Closeable; import java.io.IOException; +import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.TypeConverter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.MapFile; @@ -28,15 +27,34 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.ReflectionUtils; -class HdfsMapFileHandler extends DefaultHdfsFile { +class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader> { @Override - public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) { + @SuppressWarnings("rawtypes") + public MapFile.Writer createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { + try { + MapFile.Writer rout; + HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); + HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); + Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass(); + Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass(); + rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass), + MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()), + MapFile.Writer.progressable(() -> { + })); + return rout; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override + public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) { try { Holder<Integer> keySize = new Holder<>(); - Writable keyWritable = getWritable(key, typeConverter, keySize); + Writable keyWritable = getWritable(key, exchange, keySize); Holder<Integer> valueSize = new Holder<>(); - Writable valueWritable = getWritable(value, typeConverter, valueSize); + Writable valueWritable = getWritable(value, exchange, valueSize); ((MapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable); return Long.sum(keySize.value, valueSize.value); } catch (Exception ex) { @@ -45,6 +63,18 @@ class HdfsMapFileHandler extends DefaultHdfsFile { } @Override + public MapFile.Reader createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { + try { + MapFile.Reader rin; + HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); + rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration()); + return rin; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { try { MapFile.Reader reader = (MapFile.Reader) hdfsInputStream.getIn(); @@ -64,34 +94,4 @@ class HdfsMapFileHandler extends DefaultHdfsFile { } } - @Override - @SuppressWarnings("rawtypes") - public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { - try { - Closeable rout; - HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); - HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); - Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass(); - Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass(); - rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass), - MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()), - MapFile.Writer.progressable(() -> { - })); - return rout; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } - - @Override - public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { - try { - Closeable rin; - HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); - rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration()); - return rin; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java index 952bde8..e17dd1d 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java @@ -17,15 +17,15 @@ package org.apache.camel.component.hdfs; import java.io.ByteArrayOutputStream; -import java.io.Closeable; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.Files; +import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.TypeConverter; import org.apache.camel.util.IOHelper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; @@ -33,22 +33,24 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; -class HdfsNormalFileHandler extends DefaultHdfsFile { +class HdfsNormalFileHandler extends DefaultHdfsFile<OutputStream, InputStream> { + + private boolean consumed; @Override - public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { + public OutputStream createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { try { - FSDataOutputStream rout; + OutputStream outputStream; HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); if (endpointConfig.isAppend()) { - rout = hdfsInfo.getFileSystem().append( + outputStream = hdfsInfo.getFileSystem().append( hdfsInfo.getPath(), endpointConfig.getBufferSize(), () -> { } ); } else { - rout = hdfsInfo.getFileSystem().create( + outputStream = hdfsInfo.getFileSystem().create( hdfsInfo.getPath(), endpointConfig.isOverwrite(), endpointConfig.getBufferSize(), @@ -57,37 +59,37 @@ class HdfsNormalFileHandler extends DefaultHdfsFile { () -> { } ); } - return rout; + return outputStream; } catch (IOException ex) { throw new RuntimeCamelException(ex); } } @Override - public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) { - InputStream is = null; + public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) { + InputStream inputStream = null; try { - is = typeConverter.convertTo(InputStream.class, value); - return copyBytes(is, (FSDataOutputStream) hdfsOutputStream.getOut(), HdfsConstants.DEFAULT_BUFFERSIZE, false); + inputStream = exchange.getContext().getTypeConverter().convertTo(InputStream.class, exchange, value); + return copyBytes(inputStream, (FSDataOutputStream) hdfsOutputStream.getOut(), HdfsConstants.DEFAULT_BUFFERSIZE, false); } catch (IOException ex) { throw new RuntimeCamelException(ex); } finally { - IOHelper.close(is); + IOHelper.close(inputStream); } } @Override - public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { + public InputStream createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { try { - Closeable rin; + InputStream inputStream; HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); if (endpointConfig.getFileSystemType().equals(HdfsFileSystemType.LOCAL)) { HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); - rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath()); + inputStream = hdfsInfo.getFileSystem().open(hdfsInfo.getPath()); } else { - rin = new FileInputStream(getHdfsFileToTmpFile(hdfsPath, endpointConfig)); + inputStream = new FileInputStream(getHdfsFileToTmpFile(hdfsPath, endpointConfig)); } - return rin; + return inputStream; } catch (IOException ex) { throw new RuntimeCamelException(ex); } @@ -95,19 +97,40 @@ class HdfsNormalFileHandler extends DefaultHdfsFile { @Override public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { + if (hdfsInputStream.isStreamDownload()) { + return nextAsWrappedStream(hdfsInputStream, key, value); + } else { + return nextAsOutputStream(hdfsInputStream, key, value); + } + } + + private long nextAsWrappedStream(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { + InputStream inputStream = (InputStream) hdfsInputStream.getIn(); + key.value = null; + value.value = inputStream; + + if (consumed) { + return 0; + } else { + consumed = true; + return 1; + } + } + + private long nextAsOutputStream(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { try { - ByteArrayOutputStream bos = new ByteArrayOutputStream(hdfsInputStream.getChunkSize()); + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(hdfsInputStream.getChunkSize()); byte[] buf = new byte[hdfsInputStream.getChunkSize()]; int bytesRead = ((InputStream) hdfsInputStream.getIn()).read(buf); if (bytesRead >= 0) { - bos.write(buf, 0, bytesRead); + outputStream.write(buf, 0, bytesRead); key.value = null; - value.value = bos; + value.value = outputStream; return bytesRead; } else { key.value = null; // indication that we may have read from empty file - value.value = bos; + value.value = outputStream; return 0; } } catch (IOException ex) { @@ -117,18 +140,17 @@ class HdfsNormalFileHandler extends DefaultHdfsFile { private File getHdfsFileToTmpFile(String hdfsPath, HdfsConfiguration configuration) { try { - String fname = hdfsPath.substring(hdfsPath.lastIndexOf('/')); + String fileName = hdfsPath.substring(hdfsPath.lastIndexOf('/')); // [CAMEL-13711] Files.createTempFile not equivalent to File.createTempFile - File outputDest; try { // First trying: Files.createTempFile - outputDest = Files.createTempFile(fname, ".hdfs").toFile(); + outputDest = Files.createTempFile(fileName, ".hdfs").toFile(); } catch (Exception ex) { // Now trying: File.createTempFile - outputDest = File.createTempFile(fname, ".hdfs"); + outputDest = File.createTempFile(fileName, ".hdfs"); } if (outputDest.exists()) { @@ -150,7 +172,7 @@ class HdfsNormalFileHandler extends DefaultHdfsFile { return outputDest; } - return new File(outputDest, fname); + return new File(outputDest, fileName); } catch (IOException ex) { throw new RuntimeCamelException(ex); } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java index b93b93e..c192d8c 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java @@ -21,8 +21,8 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.TypeConverter; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; @@ -84,10 +84,10 @@ public class HdfsOutputStream implements Closeable { } } - public void append(Object key, Object value, TypeConverter typeConverter) { + public void append(Object key, Object value, Exchange exchange) { try { busy.set(true); - long nb = fileType.append(this, key, value, typeConverter); + long nb = fileType.append(this, key, value, exchange); numOfWrittenBytes.addAndGet(nb); numOfWrittenMessages.incrementAndGet(); lastAccess.set(System.currentTimeMillis()); diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java index 8f407d8..0877eef 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java @@ -27,6 +27,7 @@ import javax.security.auth.login.Configuration; import org.apache.camel.Exchange; import org.apache.camel.Expression; +import org.apache.camel.RuntimeCamelException; import org.apache.camel.support.DefaultProducer; import org.apache.camel.util.IOHelper; import org.apache.camel.util.StringHelper; @@ -114,7 +115,7 @@ public class HdfsProducer extends DefaultProducer { } catch (Exception e) { log.warn("Failed to start the HDFS producer. Caused by: [{}]", e.getMessage()); log.debug("", e); - throw new RuntimeException(e); + throw new RuntimeCamelException(e); } finally { HdfsComponent.setJAASConfiguration(auth); } @@ -211,7 +212,7 @@ public class HdfsProducer extends DefaultProducer { String path = oStream.getActualPath(); log.trace("Writing body to hdfs-file {}", path); - oStream.append(key, body, exchange.getContext().getTypeConverter()); + oStream.append(key, body, exchange); idle.set(false); diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java index 936a2a8..d755ebc 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java @@ -16,25 +16,44 @@ */ package org.apache.camel.component.hdfs; -import java.io.Closeable; import java.io.IOException; +import org.apache.camel.Exchange; import org.apache.camel.RuntimeCamelException; -import org.apache.camel.TypeConverter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.SequenceFile; import org.apache.hadoop.io.Writable; import org.apache.hadoop.util.ReflectionUtils; -class HdfsSequenceFileHandler extends DefaultHdfsFile { +class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, SequenceFile.Reader> { @Override - public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, TypeConverter typeConverter) { + public SequenceFile.Writer createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { + try { + SequenceFile.Writer rout; + HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); + HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); + Class<?> keyWritableClass = endpointConfig.getKeyType().getWritableClass(); + Class<?> valueWritableClass = endpointConfig.getValueType().getWritableClass(); + rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass), + SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()), + SequenceFile.Writer.replication(endpointConfig.getReplication()), SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()), + SequenceFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()), + SequenceFile.Writer.progressable(() -> { + }), SequenceFile.Writer.metadata(new SequenceFile.Metadata())); + return rout; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override + public long append(HdfsOutputStream hdfsOutputStream, Object key, Object value, Exchange exchange) { try { Holder<Integer> keySize = new Holder<>(); - Writable keyWritable = getWritable(key, typeConverter, keySize); + Writable keyWritable = getWritable(key, exchange, keySize); Holder<Integer> valueSize = new Holder<>(); - Writable valueWritable = getWritable(value, typeConverter, valueSize); + Writable valueWritable = getWritable(value, exchange, valueSize); SequenceFile.Writer writer = (SequenceFile.Writer) hdfsOutputStream.getOut(); writer.append(keyWritable, valueWritable); writer.sync(); @@ -45,9 +64,21 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile { } @Override - public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) { + public SequenceFile.Reader createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { + try { + SequenceFile.Reader rin; + HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); + rin = new SequenceFile.Reader(hdfsInfo.getConfiguration(), SequenceFile.Reader.file(hdfsInfo.getPath())); + return rin; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override + public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { try { - SequenceFile.Reader reader = (SequenceFile.Reader) hdfsistr.getIn(); + SequenceFile.Reader reader = (SequenceFile.Reader) hdfsInputStream.getIn(); Holder<Integer> keySize = new Holder<>(); Writable keyWritable = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration()); Holder<Integer> valueSize = new Holder<>(); @@ -64,35 +95,4 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile { } } - @Override - public Closeable createOutputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { - try { - Closeable rout; - HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); - HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); - Class<?> keyWritableClass = endpointConfig.getKeyType().getWritableClass(); - Class<?> valueWritableClass = endpointConfig.getValueType().getWritableClass(); - rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass), - SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()), - SequenceFile.Writer.replication(endpointConfig.getReplication()), SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()), - SequenceFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()), - SequenceFile.Writer.progressable(() -> { - }), SequenceFile.Writer.metadata(new SequenceFile.Metadata())); - return rout; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } - - @Override - public Closeable createInputStream(String hdfsPath, HdfsInfoFactory hdfsInfoFactory) { - try { - Closeable rin; - HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); - rin = new SequenceFile.Reader(hdfsInfo.getConfiguration(), SequenceFile.Reader.file(hdfsInfo.getPath())); - return rin; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } } diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java index cc45053..c744901 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java @@ -16,7 +16,11 @@ */ package org.apache.camel.component.hdfs; +import java.io.ByteArrayOutputStream; +import java.io.InputStream; + import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.support.DefaultExchange; @@ -27,11 +31,14 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Before; import org.junit.Test; +import org.mockito.ArgumentCaptor; import static org.apache.camel.component.hdfs.HdfsConstants.DEFAULT_OPENED_SUFFIX; import static org.apache.camel.component.hdfs.HdfsConstants.DEFAULT_READ_SUFFIX; import static org.apache.camel.component.hdfs.HdfsTestSupport.CWD; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.hamcrest.CoreMatchers.startsWith; import static org.junit.Assert.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; @@ -129,6 +136,7 @@ public class HdfsConsumerTest { when(endpointConfig.getOwner()).thenReturn("spiderman"); when(endpointConfig.isConnectOnStartup()).thenReturn(true); when(endpointConfig.getFileSystemLabel(anyString())).thenReturn("TEST_FS_LABEL"); + when(endpointConfig.getChunkSize()).thenReturn(100 * 1000); when(endpoint.getCamelContext()).thenReturn(context); when(endpoint.createExchange()).thenReturn(new DefaultExchange(context)); when(endpoint.getEndpointUri()).thenReturn(hdfsPath); @@ -149,6 +157,8 @@ public class HdfsConsumerTest { when(fileSystem.rename(any(Path.class), any(Path.class))).thenReturn(true); when(fileSystem.open(any(Path.class))).thenReturn(fsDataInputStream); + ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); + underTest = new HdfsConsumer(endpoint, processor, endpointConfig, hdfsInfoFactory, new StringBuilder(hdfsPath)); // when @@ -156,6 +166,64 @@ public class HdfsConsumerTest { // then assertThat(actual, is(1)); + verify(processor, times(1)).process(exchangeCaptor.capture()); + Exchange exchange = exchangeCaptor.getValue(); + assertThat(exchange, notNullValue()); + + ByteArrayOutputStream body = exchange.getIn().getBody(ByteArrayOutputStream.class); + assertThat(body, notNullValue()); + assertThat(body.toString(), startsWith("Lorem ipsum dolor sit amet, consectetur adipiscing elit. Nullam eget fermentum arcu, vel dignissim ipsum.")); + + } + + @Test + public void doPollFromExistingLocalFileWithStreamDownload() throws Exception { + // given + String hdfsPath = "hdfs://localhost/target/test/multiple-consumers"; + when(endpointConfig.getFileSystemType()).thenReturn(HdfsFileSystemType.LOCAL); + when(endpointConfig.getFileType()).thenReturn(HdfsFileType.NORMAL_FILE); + when(endpointConfig.getPath()).thenReturn(hdfsPath); + when(endpointConfig.getOwner()).thenReturn("spiderman"); + when(endpointConfig.isConnectOnStartup()).thenReturn(true); + when(endpointConfig.getFileSystemLabel(anyString())).thenReturn("TEST_FS_LABEL"); + when(endpointConfig.getChunkSize()).thenReturn(100 * 1000); + when(endpointConfig.isStreamDownload()).thenReturn(true); + when(endpoint.getCamelContext()).thenReturn(context); + when(endpoint.createExchange()).thenReturn(new DefaultExchange(context)); + when(endpoint.getEndpointUri()).thenReturn(hdfsPath); + + when(fileSystem.isFile(any(Path.class))).thenReturn(true); + + FileStatus[] fileStatuses = new FileStatus[1]; + FileStatus fileStatus = mock(FileStatus.class); + fileStatuses[0] = fileStatus; + when(fileSystem.globStatus(any(Path.class))).thenReturn(fileStatuses); + when(fileStatus.getPath()).thenReturn(new Path(hdfsPath)); + when(fileStatus.isFile()).thenReturn(true); + when(fileStatus.isDirectory()).thenReturn(false); + when(fileStatus.getOwner()).thenReturn("spiderman"); + + String normalFile = CWD.getAbsolutePath() + "/src/test/resources/hdfs/normal_file.txt"; + FSDataInputStream fsDataInputStream = new FSDataInputStream(new MockDataInputStream(normalFile)); + when(fileSystem.rename(any(Path.class), any(Path.class))).thenReturn(true); + when(fileSystem.open(any(Path.class))).thenReturn(fsDataInputStream); + + ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class); + + underTest = new HdfsConsumer(endpoint, processor, endpointConfig, hdfsInfoFactory, new StringBuilder(hdfsPath)); + + // when + int actual = underTest.doPoll(); + + // then + assertThat(actual, is(1)); + verify(processor, times(1)).process(exchangeCaptor.capture()); + Exchange exchange = exchangeCaptor.getValue(); + assertThat(exchange, notNullValue()); + + InputStream body = (InputStream) exchange.getIn().getBody(); + assertThat(body, notNullValue()); + } } \ No newline at end of file