This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 5b60c2e CAMEL-14146 : camel-hdfs - Consumer that splits with ChunkSize should add similar header info like the Splitter (#3354) 5b60c2e is described below commit 5b60c2ea3b7b3d49f977c90c26f9f7420c55f595 Author: Marius Cornescu <marius_corne...@yahoo.com> AuthorDate: Fri Nov 22 09:50:34 2019 +0100 CAMEL-14146 : camel-hdfs - Consumer that splits with ChunkSize should add similar header info like the Splitter (#3354) CAMEL-14146 - camel-hdfs - Consumer that splits with ChunkSize should add similar header info like the Splitter --- .../camel/component/hdfs/DefaultHdfsFile.java | 13 ++-- .../component/hdfs/HdfsArrayFileTypeHandler.java | 16 ++-- ...leHandler.java => HdfsBloomMapFileHandler.java} | 19 +++-- .../apache/camel/component/hdfs/HdfsConsumer.java | 86 ++++++++++++++-------- .../apache/camel/component/hdfs/HdfsFileType.java | 4 +- .../camel/component/hdfs/HdfsInputStream.java | 69 +++++++++++++++-- .../camel/component/hdfs/HdfsMapFileHandler.java | 18 +++-- .../component/hdfs/HdfsNormalFileHandler.java | 9 +-- .../component/hdfs/HdfsSequenceFileHandler.java | 24 +++--- .../component/hdfs/HdfsWritableFactories.java | 44 +++++------ .../org/apache/camel/component/hdfs/Holder.java | 10 ++- .../camel/component/hdfs/HdfsInputStreamTest.java | 7 ++ 12 files changed, 217 insertions(+), 102 deletions(-) diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java index cc0845e..04f126d 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java @@ -24,6 +24,7 @@ import java.io.PrintStream; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import org.apache.camel.Exchange; import org.apache.camel.util.IOHelper; @@ -63,11 +64,8 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme } protected final Writable getWritable(Object obj, Exchange exchange, Holder<Integer> size) { - Class<?> objCls = obj == null ? null : obj.getClass(); - HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.get(objCls); - if (objWritableFactory == null) { - objWritableFactory = new HdfsWritableFactories.HdfsObjectWritableFactory(); - } + Class<?> objCls = Optional.ofNullable(obj).orElse(new UnknownType()).getClass(); + HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.getOrDefault(objCls, new HdfsWritableFactories.HdfsObjectWritableFactory()); return objWritableFactory.create(obj, exchange.getContext().getTypeConverter(), size); } @@ -97,7 +95,7 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme writables.put(Integer.class, new HdfsWritableFactories.HdfsIntWritableFactory()); writables.put(Long.class, new HdfsWritableFactories.HdfsLongWritableFactory()); writables.put(String.class, new HdfsWritableFactories.HdfsTextWritableFactory()); - writables.put(null, new HdfsWritableFactories.HdfsNullWritableFactory()); + writables.put(UnknownType.class, new HdfsWritableFactories.HdfsNullWritableFactory()); } static { @@ -112,4 +110,7 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme readables.put(NullWritable.class, new HdfsWritableFactories.HdfsNullWritableFactory()); } } + + private static final class UnknownType { + } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java index 2a0e68c..4db8bd7 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java @@ -36,8 +36,14 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass(); - rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass, - endpointConfig.getCompressionType(), () -> { }); + rout = new ArrayFile.Writer( + hdfsInfo.getConfiguration(), + hdfsInfo.getFileSystem(), + hdfsPath, + valueWritableClass, + endpointConfig.getCompressionType(), + () -> { } + ); return rout; } catch (IOException ex) { throw new RuntimeCamelException(ex); @@ -50,7 +56,7 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi Holder<Integer> valueSize = new Holder<>(); Writable valueWritable = getWritable(value, exchange, valueSize); ((ArrayFile.Writer) hdfsOutputStream.getOut()).append(valueWritable); - return valueSize.value; + return valueSize.getValue(); } catch (Exception ex) { throw new RuntimeCamelException(ex); } @@ -75,8 +81,8 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi Holder<Integer> valueSize = new Holder<>(); Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); if (reader.next(valueWritable) != null) { - value.value = getObject(valueWritable, valueSize); - return valueSize.value; + value.setValue(getObject(valueWritable, valueSize)); + return valueSize.getValue(); } else { return 0; } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java similarity index 86% rename from components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java rename to components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java index 0b1f907..44301ae 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java @@ -28,7 +28,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.ReflectionUtils; -class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, BloomMapFile.Reader> { +class HdfsBloomMapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, BloomMapFile.Reader> { @SuppressWarnings("rawtypes") @Override @@ -39,11 +39,14 @@ class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass(); Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass(); - rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), + rout = new BloomMapFile.Writer( + hdfsInfo.getConfiguration(), + new Path(hdfsPath), + MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass), MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()), - MapFile.Writer.progressable(() -> { - })); + MapFile.Writer.progressable(() -> { }) + ); return rout; } catch (IOException ex) { throw new RuntimeCamelException(ex); @@ -58,7 +61,7 @@ class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom Holder<Integer> valueSize = new Holder<>(); Writable valueWritable = getWritable(value, exchange, valueSize); ((BloomMapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable); - return Long.sum(keySize.value, valueSize.value); + return Long.sum(keySize.getValue(), valueSize.getValue()); } catch (Exception ex) { throw new RuntimeCamelException(ex); } @@ -85,9 +88,9 @@ class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom Holder<Integer> valueSize = new Holder<>(); Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); if (reader.next(keyWritable, valueWritable)) { - key.value = getObject(keyWritable, keySize); - value.value = getObject(valueWritable, valueSize); - return Long.sum(keySize.value, valueSize.value); + key.setValue(getObject(keyWritable, keySize)); + value.setValue(getObject(valueWritable, valueSize)); + return Long.sum(keySize.getValue(), valueSize.getValue()); } else { return 0; } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java index affad6c..8e105cd 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java @@ -18,11 +18,13 @@ package org.apache.camel.component.hdfs; import java.io.IOException; import java.util.Arrays; +import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.stream.Collectors; import javax.security.auth.login.Configuration; @@ -127,54 +129,68 @@ public final class HdfsConsumer extends ScheduledPollConsumer { } private int processFileStatuses(HdfsInfo info, FileStatus[] fileStatuses) { - final AtomicInteger messageCount = new AtomicInteger(0); + final AtomicInteger totalMessageCount = new AtomicInteger(0); - Arrays.stream(fileStatuses) + List<HdfsInputStream> hdfsFiles = Arrays.stream(fileStatuses) .filter(status -> normalFileIsDirectoryHasSuccessFile(status, info)) .filter(this::hasMatchingOwner) .limit(endpointConfig.getMaxMessagesPerPoll()) - .map(this::createInputStream) + .map(this::asHdfsFile) .filter(Objects::nonNull) - .forEach(hdfsInputStream -> { - try { - processHdfsInputStream(hdfsInputStream, messageCount, fileStatuses.length); - } finally { - IOHelper.close(hdfsInputStream, "input stream", log); - } - }); + .collect(Collectors.toList()); - return messageCount.get(); + log.info("Processing [{}] valid files out of [{}] available.", hdfsFiles.size(), fileStatuses.length); + + for (int i = 0; i < hdfsFiles.size(); i++) { + HdfsInputStream hdfsFile = hdfsFiles.get(i); + try { + int messageCount = processHdfsInputStream(hdfsFile, totalMessageCount); + log.debug("Processed [{}] files out of [{}].", i, hdfsFiles.size()); + log.debug("File [{}] was split to [{}] messages.", i, messageCount); + } finally { + IOHelper.close(hdfsFile, "hdfs file", log); + } + } + + return totalMessageCount.get(); } - private void processHdfsInputStream(HdfsInputStream inputStream, AtomicInteger messageCount, int totalFiles) { - Holder<Object> key = new Holder<>(); - Holder<Object> value = new Holder<>(); + private int processHdfsInputStream(HdfsInputStream hdfsFile, AtomicInteger totalMessageCount) { + final AtomicInteger messageCount = new AtomicInteger(0); + Holder<Object> currentKey = new Holder<>(); + Holder<Object> currentValue = new Holder<>(); - while (inputStream.next(key, value) >= 0) { - processHdfsInputStream(inputStream, key, value, messageCount, totalFiles); + while (hdfsFile.next(currentKey, currentValue) >= 0) { + processHdfsInputStream(hdfsFile, currentKey, currentValue, messageCount, totalMessageCount); + messageCount.incrementAndGet(); } + + return messageCount.get(); } - private void processHdfsInputStream(HdfsInputStream inputStream, Holder<Object> key, Holder<Object> value, AtomicInteger messageCount, int totalFiles) { + private void processHdfsInputStream(HdfsInputStream hdfsFile, Holder<Object> key, Holder<Object> value, AtomicInteger messageCount, AtomicInteger totalMessageCount) { Exchange exchange = this.getEndpoint().createExchange(); Message message = exchange.getIn(); - String fileName = StringUtils.substringAfterLast(inputStream.getActualPath(), "/"); + String fileName = StringUtils.substringAfterLast(hdfsFile.getActualPath(), "/"); message.setHeader(Exchange.FILE_NAME, fileName); message.setHeader(Exchange.FILE_NAME_CONSUMED, fileName); - message.setHeader("CamelFileAbsolutePath", inputStream.getActualPath()); - if (key.value != null) { - message.setHeader(HdfsHeader.KEY.name(), key.value); + message.setHeader("CamelFileAbsolutePath", hdfsFile.getActualPath()); + if (key.getValue() != null) { + message.setHeader(HdfsHeader.KEY.name(), key.getValue()); } - if (inputStream.getNumOfReadBytes() >= 0) { - message.setHeader(Exchange.FILE_LENGTH, inputStream.getNumOfReadBytes()); + if (hdfsFile.getNumOfReadBytes() >= 0) { + message.setHeader(Exchange.FILE_LENGTH, hdfsFile.getNumOfReadBytes()); } - message.setBody(value.value); + message.setBody(value.getValue()); + + updateNewExchange(exchange, messageCount.get(), hdfsFile); - log.debug("Processing file {}", fileName); + log.debug("Processing file [{}]", fileName); try { processor.process(exchange); + totalMessageCount.incrementAndGet(); } catch (Exception e) { exchange.setException(e); } @@ -183,9 +199,6 @@ public final class HdfsConsumer extends ScheduledPollConsumer { if (exchange.getException() != null) { getExceptionHandler().handleException(exchange.getException()); } - - int count = messageCount.incrementAndGet(); - log.debug("Processed [{}] files out of [{}]", count, totalFiles); } private boolean normalFileIsDirectoryHasSuccessFile(FileStatus fileStatus, HdfsInfo info) { @@ -212,7 +225,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer { return true; } - private HdfsInputStream createInputStream(FileStatus fileStatus) { + private HdfsInputStream asHdfsFile(FileStatus fileStatus) { try { this.rwLock.writeLock().lock(); return HdfsInputStream.createInputStream(fileStatus.getPath().toString(), hdfsInfoFactory); @@ -221,4 +234,19 @@ public final class HdfsConsumer extends ScheduledPollConsumer { } } + protected void updateNewExchange(Exchange exchange, int index, HdfsInputStream hdfsFile) { + // do not share unit of work + exchange.setUnitOfWork(null); + + exchange.setProperty(Exchange.SPLIT_INDEX, index); + + if (hdfsFile.hasNext()) { + exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE); + } else { + exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE); + // streaming mode, so set total size when we are complete based on the index + exchange.setProperty(Exchange.SPLIT_SIZE, index + 1); + } + } + } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java index 430e20c..8930587 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java @@ -25,7 +25,7 @@ public enum HdfsFileType { NORMAL_FILE(new HdfsNormalFileHandler()), SEQUENCE_FILE(new HdfsSequenceFileHandler()), MAP_FILE(new HdfsMapFileHandler()), - BLOOMMAP_FILE(new HdfsBloommapFileHandler()), + BLOOMMAP_FILE(new HdfsBloomMapFileHandler()), ARRAY_FILE(new HdfsArrayFileTypeHandler()); private final HdfsFile file; @@ -46,7 +46,7 @@ public enum HdfsFileType { return this.file.createInputStream(hdfsPath, hdfsInfoFactory); } - public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { + public long next(HdfsInputStream hdfsInputStream, final Holder<Object> key, final Holder<Object> value) { return this.file.next(hdfsInputStream, key, value); } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java index 68c22f6..8673ac9 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java @@ -18,6 +18,8 @@ package org.apache.camel.component.hdfs; import java.io.Closeable; import java.io.IOException; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.RuntimeCamelException; @@ -43,6 +45,8 @@ public class HdfsInputStream implements Closeable { private boolean streamDownload; + private EntryHolder cachedNextEntry; + protected HdfsInputStream() { } @@ -92,20 +96,42 @@ public class HdfsInputStream implements Closeable { * @param value * @return number of bytes read. 0 is correct number of bytes (empty file), -1 indicates no record was read */ - public final long next(Holder<Object> key, Holder<Object> value) { + public final long next(final Holder<Object> key, final Holder<Object> value) { + EntryHolder nextEntry = Optional.ofNullable(cachedNextEntry).orElseGet(() -> getNextFromStream(key, value)); + cachedNextEntry = null; + + key.setValue(nextEntry.getKey().getValue()); + value.setValue(nextEntry.getValue().getValue()); + + return nextEntry.getByteCount(); + } + + private EntryHolder getNextFromStream(final Holder<Object> key, final Holder<Object> value) { long nb = fileType.next(this, key, value); // when zero bytes was read from given type of file, we may still have a record (e.g., empty file) // null value.value is the only indication that no (new) record/chunk was read - if (nb == 0 && numOfReadMessages.get() > 0) { + if ((nb == 0 && numOfReadMessages.get() > 0) || Objects.isNull(value.getValue())) { // we've read all chunks from file, which size is exact multiple the chunk size - return -1; - } - if (value.value != null) { + nb = -1; + } else { numOfReadBytes.addAndGet(nb); numOfReadMessages.incrementAndGet(); - return nb; } - return -1; + + return new EntryHolder(key, value, nb); + } + + /** + */ + public final boolean hasNext() { + if (Objects.isNull(cachedNextEntry)) { + Holder<Object> nextKey = new Holder<>(); + Holder<Object> nextValue = new Holder<>(); + long nextByteCount = next(nextKey, nextValue); + cachedNextEntry = new EntryHolder(nextKey, nextValue, nextByteCount); + } + + return cachedNextEntry.hasNext(); } public final long getNumOfReadBytes() { @@ -135,4 +161,33 @@ public class HdfsInputStream implements Closeable { public boolean isStreamDownload() { return streamDownload; } + + private static class EntryHolder { + + private long byteCount; + private Holder<Object> key; + private Holder<Object> value; + + public EntryHolder(Holder<Object> key, Holder<Object> value, long byteCount) { + this.key = key; + this.value = value; + this.byteCount = byteCount; + } + + public Holder<Object> getKey() { + return key; + } + + public Holder<Object> getValue() { + return value; + } + + public Boolean hasNext() { + return byteCount >= 0; + } + + public long getByteCount() { + return byteCount; + } + } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java index b63463d..4924ddf 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java @@ -38,10 +38,14 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader> HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass(); Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass(); - rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass), + rout = new MapFile.Writer( + hdfsInfo.getConfiguration(), + new Path(hdfsPath), + MapFile.Writer.keyClass(keyWritableClass), + MapFile.Writer.valueClass(valueWritableClass), MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()), - MapFile.Writer.progressable(() -> { - })); + MapFile.Writer.progressable(() -> { }) + ); return rout; } catch (IOException ex) { throw new RuntimeCamelException(ex); @@ -56,7 +60,7 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader> Holder<Integer> valueSize = new Holder<>(); Writable valueWritable = getWritable(value, exchange, valueSize); ((MapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable); - return Long.sum(keySize.value, valueSize.value); + return Long.sum(keySize.getValue(), valueSize.getValue()); } catch (Exception ex) { throw new RuntimeCamelException(ex); } @@ -83,9 +87,9 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader> Holder<Integer> valueSize = new Holder<>(); Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); if (reader.next(keyWritable, valueWritable)) { - key.value = getObject(keyWritable, keySize); - value.value = getObject(valueWritable, valueSize); - return Long.sum(keySize.value, valueSize.value); + key.setValue(getObject(keyWritable, keySize)); + value.setValue(getObject(valueWritable, valueSize)); + return Long.sum(keySize.getValue(), valueSize.getValue()); } else { return 0; } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java index e17dd1d..084e6d6 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java @@ -106,8 +106,7 @@ class HdfsNormalFileHandler extends DefaultHdfsFile<OutputStream, InputStream> { private long nextAsWrappedStream(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { InputStream inputStream = (InputStream) hdfsInputStream.getIn(); - key.value = null; - value.value = inputStream; + value.setValue(inputStream); if (consumed) { return 0; @@ -124,13 +123,11 @@ class HdfsNormalFileHandler extends DefaultHdfsFile<OutputStream, InputStream> { int bytesRead = ((InputStream) hdfsInputStream.getIn()).read(buf); if (bytesRead >= 0) { outputStream.write(buf, 0, bytesRead); - key.value = null; - value.value = outputStream; + value.setValue(outputStream); return bytesRead; } else { - key.value = null; // indication that we may have read from empty file - value.value = outputStream; + value.setValue(outputStream); return 0; } } catch (IOException ex) { diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java index d755ebc..3c53dae 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java @@ -35,12 +35,18 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); Class<?> keyWritableClass = endpointConfig.getKeyType().getWritableClass(); Class<?> valueWritableClass = endpointConfig.getValueType().getWritableClass(); - rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass), - SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()), - SequenceFile.Writer.replication(endpointConfig.getReplication()), SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()), + rout = SequenceFile.createWriter( + hdfsInfo.getConfiguration(), + SequenceFile.Writer.file(hdfsInfo.getPath()), + SequenceFile.Writer.keyClass(keyWritableClass), + SequenceFile.Writer.valueClass(valueWritableClass), + SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()), + SequenceFile.Writer.replication(endpointConfig.getReplication()), + SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()), SequenceFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()), - SequenceFile.Writer.progressable(() -> { - }), SequenceFile.Writer.metadata(new SequenceFile.Metadata())); + SequenceFile.Writer.progressable(() -> { }), + SequenceFile.Writer.metadata(new SequenceFile.Metadata()) + ); return rout; } catch (IOException ex) { throw new RuntimeCamelException(ex); @@ -57,7 +63,7 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque SequenceFile.Writer writer = (SequenceFile.Writer) hdfsOutputStream.getOut(); writer.append(keyWritable, valueWritable); writer.sync(); - return Long.sum(keySize.value, valueSize.value); + return Long.sum(keySize.getValue(), valueSize.getValue()); } catch (Exception ex) { throw new RuntimeCamelException(ex); } @@ -84,9 +90,9 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque Holder<Integer> valueSize = new Holder<>(); Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); if (reader.next(keyWritable, valueWritable)) { - key.value = getObject(keyWritable, keySize); - value.value = getObject(valueWritable, valueSize); - return Long.sum(keySize.value, valueSize.value); + key.setValue(getObject(keyWritable, keySize)); + value.setValue(getObject(valueWritable, valueSize)); + return Long.sum(keySize.getValue(), valueSize.getValue()); } else { return 0; } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java index 07c5eb8..7f2de0f 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java @@ -48,13 +48,13 @@ public class HdfsWritableFactories { @Override public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { - size.value = 0; + size.setValue(0); return NullWritable.get(); } @Override public Object read(Writable writable, Holder<Integer> size) { - size.value = 0; + size.setValue(0); return null; } } @@ -65,7 +65,7 @@ public class HdfsWritableFactories { @Override public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { - size.value = SIZE; + size.setValue(SIZE); ByteWritable writable = new ByteWritable(); writable.set(typeConverter.convertTo(Byte.class, value)); return writable; @@ -73,7 +73,7 @@ public class HdfsWritableFactories { @Override public Object read(Writable writable, Holder<Integer> size) { - size.value = SIZE; + size.setValue(SIZE); return ((ByteWritable) writable).get(); } } @@ -84,7 +84,7 @@ public class HdfsWritableFactories { @Override public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { - size.value = SIZE; + size.setValue(SIZE); BooleanWritable writable = new BooleanWritable(); writable.set(typeConverter.convertTo(Boolean.class, value)); return writable; @@ -92,7 +92,7 @@ public class HdfsWritableFactories { @Override public Object read(Writable writable, Holder<Integer> size) { - size.value = SIZE; + size.setValue(SIZE); return ((BooleanWritable) writable).get(); } } @@ -104,15 +104,15 @@ public class HdfsWritableFactories { BytesWritable writable = new BytesWritable(); ByteBuffer bb = (ByteBuffer) value; writable.set(bb.array(), 0, bb.array().length); - size.value = bb.array().length; + size.setValue(bb.array().length); return writable; } @Override public Object read(Writable writable, Holder<Integer> size) { - size.value = ((BytesWritable) writable).getLength(); - ByteBuffer bb = ByteBuffer.allocate(size.value); - bb.put(((BytesWritable) writable).getBytes(), 0, size.value); + size.setValue(((BytesWritable) writable).getLength()); + ByteBuffer bb = ByteBuffer.allocate(size.getValue()); + bb.put(((BytesWritable) writable).getBytes(), 0, size.getValue()); return bb; } } @@ -123,7 +123,7 @@ public class HdfsWritableFactories { @Override public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { - size.value = SIZE; + size.setValue(SIZE); DoubleWritable writable = new DoubleWritable(); writable.set(typeConverter.convertTo(Double.class, value)); return writable; @@ -131,7 +131,7 @@ public class HdfsWritableFactories { @Override public Object read(Writable writable, Holder<Integer> size) { - size.value = SIZE; + size.setValue(SIZE); return ((DoubleWritable) writable).get(); } } @@ -142,7 +142,7 @@ public class HdfsWritableFactories { @Override public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { - size.value = SIZE; + size.setValue(SIZE); FloatWritable writable = new FloatWritable(); writable.set(typeConverter.convertTo(Float.class, value)); return writable; @@ -150,7 +150,7 @@ public class HdfsWritableFactories { @Override public Object read(Writable writable, Holder<Integer> size) { - size.value = SIZE; + size.setValue(SIZE); return ((FloatWritable) writable).get(); } } @@ -161,7 +161,7 @@ public class HdfsWritableFactories { @Override public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { - size.value = SIZE; + size.setValue(SIZE); IntWritable writable = new IntWritable(); writable.set(typeConverter.convertTo(Integer.class, value)); return writable; @@ -169,7 +169,7 @@ public class HdfsWritableFactories { @Override public Object read(Writable writable, Holder<Integer> size) { - size.value = SIZE; + size.setValue(SIZE); return ((IntWritable) writable).get(); } } @@ -180,7 +180,7 @@ public class HdfsWritableFactories { @Override public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { - size.value = SIZE; + size.setValue(SIZE); LongWritable writable = new LongWritable(); writable.set(typeConverter.convertTo(Long.class, value)); return writable; @@ -188,7 +188,7 @@ public class HdfsWritableFactories { @Override public Object read(Writable writable, Holder<Integer> size) { - size.value = SIZE; + size.setValue(SIZE); return ((LongWritable) writable).get(); } } @@ -199,13 +199,13 @@ public class HdfsWritableFactories { public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { Text writable = new Text(); writable.set(typeConverter.convertTo(String.class, value)); - size.value = writable.getBytes().length; + size.setValue(writable.getBytes().length); return writable; } @Override public Object read(Writable writable, Holder<Integer> size) { - size.value = ((Text) writable).getLength(); + size.setValue(((Text) writable).getLength()); return writable.toString(); } } @@ -219,7 +219,7 @@ public class HdfsWritableFactories { IOUtils.copyBytes(is, bos, HdfsConstants.DEFAULT_BUFFERSIZE, false); BytesWritable writable = new BytesWritable(); writable.set(bos.toByteArray(), 0, bos.toByteArray().length); - size.value = bos.toByteArray().length; + size.setValue(bos.toByteArray().length); return writable; } catch (IOException ex) { throw new RuntimeCamelException(ex); @@ -228,7 +228,7 @@ public class HdfsWritableFactories { @Override public Object read(Writable writable, Holder<Integer> size) { - size.value = 0; + size.setValue(0); return null; } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java index 8a02dcd..458f8ea 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java @@ -21,7 +21,7 @@ public final class Holder<T> { /** * The value contained in the holder. **/ - public T value; + private T value; /** * Creates a new holder with a <code>null</code> value. @@ -37,4 +37,12 @@ public final class Holder<T> { public Holder(T value) { this.value = value; } + + public T getValue() { + return value; + } + + public void setValue(T value) { + this.value = value; + } } diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java index ce6b334..2841a14 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java @@ -20,6 +20,7 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Before; @@ -55,6 +56,7 @@ public class HdfsInputStreamTest { fileSystem = mock(FileSystem.class); configuration = mock(Configuration.class); Path path = mock(Path.class); + FileStatus fileStatus = mock(FileStatus.class); when(hdfsInfoFactory.newHdfsInfo(anyString())).thenReturn(hdfsInfo); when(hdfsInfoFactory.newHdfsInfoWithoutAuth(anyString())).thenReturn(hdfsInfo); @@ -63,6 +65,11 @@ public class HdfsInputStreamTest { when(hdfsInfo.getFileSystem()).thenReturn(fileSystem); when(hdfsInfo.getConfiguration()).thenReturn(configuration); when(hdfsInfo.getPath()).thenReturn(path); + + when(path.getFileSystem(configuration)).thenReturn(fileSystem); + + when(fileSystem.getFileStatus(path)).thenReturn(fileStatus); + when(fileStatus.getLen()).thenReturn(1000L); } @Test