This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch revert-3354-CAMEL-14146-new in repository https://gitbox.apache.org/repos/asf/camel.git
commit 0e4d2eced12ebfd55833f527695d3ba8e35af68f Author: Claus Ibsen <claus.ib...@gmail.com> AuthorDate: Fri Nov 22 09:51:18 2019 +0100 Revert "CAMEL-14146 : camel-hdfs - Consumer that splits with ChunkSize should add similar header info like the Splitter (#3354)" This reverts commit 5b60c2ea3b7b3d49f977c90c26f9f7420c55f595. --- .../camel/component/hdfs/DefaultHdfsFile.java | 13 ++-- .../component/hdfs/HdfsArrayFileTypeHandler.java | 16 ++-- ...leHandler.java => HdfsBloommapFileHandler.java} | 19 ++--- .../apache/camel/component/hdfs/HdfsConsumer.java | 86 ++++++++-------------- .../apache/camel/component/hdfs/HdfsFileType.java | 4 +- .../camel/component/hdfs/HdfsInputStream.java | 69 ++--------------- .../camel/component/hdfs/HdfsMapFileHandler.java | 18 ++--- .../component/hdfs/HdfsNormalFileHandler.java | 9 ++- .../component/hdfs/HdfsSequenceFileHandler.java | 24 +++--- .../component/hdfs/HdfsWritableFactories.java | 44 +++++------ .../org/apache/camel/component/hdfs/Holder.java | 10 +-- .../camel/component/hdfs/HdfsInputStreamTest.java | 7 -- 12 files changed, 102 insertions(+), 217 deletions(-) diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java index 04f126d..cc0845e 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFile.java @@ -24,7 +24,6 @@ import java.io.PrintStream; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import org.apache.camel.Exchange; import org.apache.camel.util.IOHelper; @@ -64,8 +63,11 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme } protected final Writable getWritable(Object obj, Exchange exchange, Holder<Integer> size) { - Class<?> objCls = Optional.ofNullable(obj).orElse(new UnknownType()).getClass(); - HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.getOrDefault(objCls, new HdfsWritableFactories.HdfsObjectWritableFactory()); + Class<?> objCls = obj == null ? null : obj.getClass(); + HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.get(objCls); + if (objWritableFactory == null) { + objWritableFactory = new HdfsWritableFactories.HdfsObjectWritableFactory(); + } return objWritableFactory.create(obj, exchange.getContext().getTypeConverter(), size); } @@ -95,7 +97,7 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme writables.put(Integer.class, new HdfsWritableFactories.HdfsIntWritableFactory()); writables.put(Long.class, new HdfsWritableFactories.HdfsLongWritableFactory()); writables.put(String.class, new HdfsWritableFactories.HdfsTextWritableFactory()); - writables.put(UnknownType.class, new HdfsWritableFactories.HdfsNullWritableFactory()); + writables.put(null, new HdfsWritableFactories.HdfsNullWritableFactory()); } static { @@ -110,7 +112,4 @@ abstract class DefaultHdfsFile<T extends Closeable, U extends Closeable> impleme readables.put(NullWritable.class, new HdfsWritableFactories.HdfsNullWritableFactory()); } } - - private static final class UnknownType { - } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java index 4db8bd7..2a0e68c 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileTypeHandler.java @@ -36,14 +36,8 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi HdfsInfo hdfsInfo = hdfsInfoFactory.newHdfsInfo(hdfsPath); HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass(); - rout = new ArrayFile.Writer( - hdfsInfo.getConfiguration(), - hdfsInfo.getFileSystem(), - hdfsPath, - valueWritableClass, - endpointConfig.getCompressionType(), - () -> { } - ); + rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass, + endpointConfig.getCompressionType(), () -> { }); return rout; } catch (IOException ex) { throw new RuntimeCamelException(ex); @@ -56,7 +50,7 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi Holder<Integer> valueSize = new Holder<>(); Writable valueWritable = getWritable(value, exchange, valueSize); ((ArrayFile.Writer) hdfsOutputStream.getOut()).append(valueWritable); - return valueSize.getValue(); + return valueSize.value; } catch (Exception ex) { throw new RuntimeCamelException(ex); } @@ -81,8 +75,8 @@ class HdfsArrayFileTypeHandler extends DefaultHdfsFile<ArrayFile.Writer, ArrayFi Holder<Integer> valueSize = new Holder<>(); Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); if (reader.next(valueWritable) != null) { - value.setValue(getObject(valueWritable, valueSize)); - return valueSize.getValue(); + value.value = getObject(valueWritable, valueSize); + return valueSize.value; } else { return 0; } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java similarity index 86% rename from components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java rename to components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java index 44301ae..0b1f907 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloomMapFileHandler.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileHandler.java @@ -28,7 +28,7 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.util.ReflectionUtils; -class HdfsBloomMapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, BloomMapFile.Reader> { +class HdfsBloommapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, BloomMapFile.Reader> { @SuppressWarnings("rawtypes") @Override @@ -39,14 +39,11 @@ class HdfsBloomMapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass(); Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass(); - rout = new BloomMapFile.Writer( - hdfsInfo.getConfiguration(), - new Path(hdfsPath), - MapFile.Writer.keyClass(keyWritableClass), + rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass), MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()), - MapFile.Writer.progressable(() -> { }) - ); + MapFile.Writer.progressable(() -> { + })); return rout; } catch (IOException ex) { throw new RuntimeCamelException(ex); @@ -61,7 +58,7 @@ class HdfsBloomMapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom Holder<Integer> valueSize = new Holder<>(); Writable valueWritable = getWritable(value, exchange, valueSize); ((BloomMapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable); - return Long.sum(keySize.getValue(), valueSize.getValue()); + return Long.sum(keySize.value, valueSize.value); } catch (Exception ex) { throw new RuntimeCamelException(ex); } @@ -88,9 +85,9 @@ class HdfsBloomMapFileHandler extends DefaultHdfsFile<BloomMapFile.Writer, Bloom Holder<Integer> valueSize = new Holder<>(); Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); if (reader.next(keyWritable, valueWritable)) { - key.setValue(getObject(keyWritable, keySize)); - value.setValue(getObject(valueWritable, valueSize)); - return Long.sum(keySize.getValue(), valueSize.getValue()); + key.value = getObject(keyWritable, keySize); + value.value = getObject(valueWritable, valueSize); + return Long.sum(keySize.value, valueSize.value); } else { return 0; } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java index 8e105cd..affad6c 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java @@ -18,13 +18,11 @@ package org.apache.camel.component.hdfs; import java.io.IOException; import java.util.Arrays; -import java.util.List; import java.util.Objects; import java.util.Optional; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.stream.Collectors; import javax.security.auth.login.Configuration; @@ -129,68 +127,54 @@ public final class HdfsConsumer extends ScheduledPollConsumer { } private int processFileStatuses(HdfsInfo info, FileStatus[] fileStatuses) { - final AtomicInteger totalMessageCount = new AtomicInteger(0); + final AtomicInteger messageCount = new AtomicInteger(0); - List<HdfsInputStream> hdfsFiles = Arrays.stream(fileStatuses) + Arrays.stream(fileStatuses) .filter(status -> normalFileIsDirectoryHasSuccessFile(status, info)) .filter(this::hasMatchingOwner) .limit(endpointConfig.getMaxMessagesPerPoll()) - .map(this::asHdfsFile) + .map(this::createInputStream) .filter(Objects::nonNull) - .collect(Collectors.toList()); - - log.info("Processing [{}] valid files out of [{}] available.", hdfsFiles.size(), fileStatuses.length); + .forEach(hdfsInputStream -> { + try { + processHdfsInputStream(hdfsInputStream, messageCount, fileStatuses.length); + } finally { + IOHelper.close(hdfsInputStream, "input stream", log); + } + }); - for (int i = 0; i < hdfsFiles.size(); i++) { - HdfsInputStream hdfsFile = hdfsFiles.get(i); - try { - int messageCount = processHdfsInputStream(hdfsFile, totalMessageCount); - log.debug("Processed [{}] files out of [{}].", i, hdfsFiles.size()); - log.debug("File [{}] was split to [{}] messages.", i, messageCount); - } finally { - IOHelper.close(hdfsFile, "hdfs file", log); - } - } - - return totalMessageCount.get(); + return messageCount.get(); } - private int processHdfsInputStream(HdfsInputStream hdfsFile, AtomicInteger totalMessageCount) { - final AtomicInteger messageCount = new AtomicInteger(0); - Holder<Object> currentKey = new Holder<>(); - Holder<Object> currentValue = new Holder<>(); + private void processHdfsInputStream(HdfsInputStream inputStream, AtomicInteger messageCount, int totalFiles) { + Holder<Object> key = new Holder<>(); + Holder<Object> value = new Holder<>(); - while (hdfsFile.next(currentKey, currentValue) >= 0) { - processHdfsInputStream(hdfsFile, currentKey, currentValue, messageCount, totalMessageCount); - messageCount.incrementAndGet(); + while (inputStream.next(key, value) >= 0) { + processHdfsInputStream(inputStream, key, value, messageCount, totalFiles); } - - return messageCount.get(); } - private void processHdfsInputStream(HdfsInputStream hdfsFile, Holder<Object> key, Holder<Object> value, AtomicInteger messageCount, AtomicInteger totalMessageCount) { + private void processHdfsInputStream(HdfsInputStream inputStream, Holder<Object> key, Holder<Object> value, AtomicInteger messageCount, int totalFiles) { Exchange exchange = this.getEndpoint().createExchange(); Message message = exchange.getIn(); - String fileName = StringUtils.substringAfterLast(hdfsFile.getActualPath(), "/"); + String fileName = StringUtils.substringAfterLast(inputStream.getActualPath(), "/"); message.setHeader(Exchange.FILE_NAME, fileName); message.setHeader(Exchange.FILE_NAME_CONSUMED, fileName); - message.setHeader("CamelFileAbsolutePath", hdfsFile.getActualPath()); - if (key.getValue() != null) { - message.setHeader(HdfsHeader.KEY.name(), key.getValue()); + message.setHeader("CamelFileAbsolutePath", inputStream.getActualPath()); + if (key.value != null) { + message.setHeader(HdfsHeader.KEY.name(), key.value); } - if (hdfsFile.getNumOfReadBytes() >= 0) { - message.setHeader(Exchange.FILE_LENGTH, hdfsFile.getNumOfReadBytes()); + if (inputStream.getNumOfReadBytes() >= 0) { + message.setHeader(Exchange.FILE_LENGTH, inputStream.getNumOfReadBytes()); } - message.setBody(value.getValue()); - - updateNewExchange(exchange, messageCount.get(), hdfsFile); + message.setBody(value.value); - log.debug("Processing file [{}]", fileName); + log.debug("Processing file {}", fileName); try { processor.process(exchange); - totalMessageCount.incrementAndGet(); } catch (Exception e) { exchange.setException(e); } @@ -199,6 +183,9 @@ public final class HdfsConsumer extends ScheduledPollConsumer { if (exchange.getException() != null) { getExceptionHandler().handleException(exchange.getException()); } + + int count = messageCount.incrementAndGet(); + log.debug("Processed [{}] files out of [{}]", count, totalFiles); } private boolean normalFileIsDirectoryHasSuccessFile(FileStatus fileStatus, HdfsInfo info) { @@ -225,7 +212,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer { return true; } - private HdfsInputStream asHdfsFile(FileStatus fileStatus) { + private HdfsInputStream createInputStream(FileStatus fileStatus) { try { this.rwLock.writeLock().lock(); return HdfsInputStream.createInputStream(fileStatus.getPath().toString(), hdfsInfoFactory); @@ -234,19 +221,4 @@ public final class HdfsConsumer extends ScheduledPollConsumer { } } - protected void updateNewExchange(Exchange exchange, int index, HdfsInputStream hdfsFile) { - // do not share unit of work - exchange.setUnitOfWork(null); - - exchange.setProperty(Exchange.SPLIT_INDEX, index); - - if (hdfsFile.hasNext()) { - exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.FALSE); - } else { - exchange.setProperty(Exchange.SPLIT_COMPLETE, Boolean.TRUE); - // streaming mode, so set total size when we are complete based on the index - exchange.setProperty(Exchange.SPLIT_SIZE, index + 1); - } - } - } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java index 8930587..430e20c 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java @@ -25,7 +25,7 @@ public enum HdfsFileType { NORMAL_FILE(new HdfsNormalFileHandler()), SEQUENCE_FILE(new HdfsSequenceFileHandler()), MAP_FILE(new HdfsMapFileHandler()), - BLOOMMAP_FILE(new HdfsBloomMapFileHandler()), + BLOOMMAP_FILE(new HdfsBloommapFileHandler()), ARRAY_FILE(new HdfsArrayFileTypeHandler()); private final HdfsFile file; @@ -46,7 +46,7 @@ public enum HdfsFileType { return this.file.createInputStream(hdfsPath, hdfsInfoFactory); } - public long next(HdfsInputStream hdfsInputStream, final Holder<Object> key, final Holder<Object> value) { + public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { return this.file.next(hdfsInputStream, key, value); } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java index 8673ac9..68c22f6 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java @@ -18,8 +18,6 @@ package org.apache.camel.component.hdfs; import java.io.Closeable; import java.io.IOException; -import java.util.Objects; -import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.RuntimeCamelException; @@ -45,8 +43,6 @@ public class HdfsInputStream implements Closeable { private boolean streamDownload; - private EntryHolder cachedNextEntry; - protected HdfsInputStream() { } @@ -96,42 +92,20 @@ public class HdfsInputStream implements Closeable { * @param value * @return number of bytes read. 0 is correct number of bytes (empty file), -1 indicates no record was read */ - public final long next(final Holder<Object> key, final Holder<Object> value) { - EntryHolder nextEntry = Optional.ofNullable(cachedNextEntry).orElseGet(() -> getNextFromStream(key, value)); - cachedNextEntry = null; - - key.setValue(nextEntry.getKey().getValue()); - value.setValue(nextEntry.getValue().getValue()); - - return nextEntry.getByteCount(); - } - - private EntryHolder getNextFromStream(final Holder<Object> key, final Holder<Object> value) { + public final long next(Holder<Object> key, Holder<Object> value) { long nb = fileType.next(this, key, value); // when zero bytes was read from given type of file, we may still have a record (e.g., empty file) // null value.value is the only indication that no (new) record/chunk was read - if ((nb == 0 && numOfReadMessages.get() > 0) || Objects.isNull(value.getValue())) { + if (nb == 0 && numOfReadMessages.get() > 0) { // we've read all chunks from file, which size is exact multiple the chunk size - nb = -1; - } else { + return -1; + } + if (value.value != null) { numOfReadBytes.addAndGet(nb); numOfReadMessages.incrementAndGet(); + return nb; } - - return new EntryHolder(key, value, nb); - } - - /** - */ - public final boolean hasNext() { - if (Objects.isNull(cachedNextEntry)) { - Holder<Object> nextKey = new Holder<>(); - Holder<Object> nextValue = new Holder<>(); - long nextByteCount = next(nextKey, nextValue); - cachedNextEntry = new EntryHolder(nextKey, nextValue, nextByteCount); - } - - return cachedNextEntry.hasNext(); + return -1; } public final long getNumOfReadBytes() { @@ -161,33 +135,4 @@ public class HdfsInputStream implements Closeable { public boolean isStreamDownload() { return streamDownload; } - - private static class EntryHolder { - - private long byteCount; - private Holder<Object> key; - private Holder<Object> value; - - public EntryHolder(Holder<Object> key, Holder<Object> value, long byteCount) { - this.key = key; - this.value = value; - this.byteCount = byteCount; - } - - public Holder<Object> getKey() { - return key; - } - - public Holder<Object> getValue() { - return value; - } - - public Boolean hasNext() { - return byteCount >= 0; - } - - public long getByteCount() { - return byteCount; - } - } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java index 4924ddf..b63463d 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileHandler.java @@ -38,14 +38,10 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader> HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); Class<? extends WritableComparable> keyWritableClass = endpointConfig.getKeyType().getWritableClass(); Class<? extends WritableComparable> valueWritableClass = endpointConfig.getValueType().getWritableClass(); - rout = new MapFile.Writer( - hdfsInfo.getConfiguration(), - new Path(hdfsPath), - MapFile.Writer.keyClass(keyWritableClass), - MapFile.Writer.valueClass(valueWritableClass), + rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass), MapFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()), - MapFile.Writer.progressable(() -> { }) - ); + MapFile.Writer.progressable(() -> { + })); return rout; } catch (IOException ex) { throw new RuntimeCamelException(ex); @@ -60,7 +56,7 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader> Holder<Integer> valueSize = new Holder<>(); Writable valueWritable = getWritable(value, exchange, valueSize); ((MapFile.Writer) hdfsOutputStream.getOut()).append((WritableComparable<?>) keyWritable, valueWritable); - return Long.sum(keySize.getValue(), valueSize.getValue()); + return Long.sum(keySize.value, valueSize.value); } catch (Exception ex) { throw new RuntimeCamelException(ex); } @@ -87,9 +83,9 @@ class HdfsMapFileHandler extends DefaultHdfsFile<MapFile.Writer, MapFile.Reader> Holder<Integer> valueSize = new Holder<>(); Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); if (reader.next(keyWritable, valueWritable)) { - key.setValue(getObject(keyWritable, keySize)); - value.setValue(getObject(valueWritable, valueSize)); - return Long.sum(keySize.getValue(), valueSize.getValue()); + key.value = getObject(keyWritable, keySize); + value.value = getObject(valueWritable, valueSize); + return Long.sum(keySize.value, valueSize.value); } else { return 0; } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java index 084e6d6..e17dd1d 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileHandler.java @@ -106,7 +106,8 @@ class HdfsNormalFileHandler extends DefaultHdfsFile<OutputStream, InputStream> { private long nextAsWrappedStream(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { InputStream inputStream = (InputStream) hdfsInputStream.getIn(); - value.setValue(inputStream); + key.value = null; + value.value = inputStream; if (consumed) { return 0; @@ -123,11 +124,13 @@ class HdfsNormalFileHandler extends DefaultHdfsFile<OutputStream, InputStream> { int bytesRead = ((InputStream) hdfsInputStream.getIn()).read(buf); if (bytesRead >= 0) { outputStream.write(buf, 0, bytesRead); - value.setValue(outputStream); + key.value = null; + value.value = outputStream; return bytesRead; } else { + key.value = null; // indication that we may have read from empty file - value.setValue(outputStream); + value.value = outputStream; return 0; } } catch (IOException ex) { diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java index 3c53dae..d755ebc 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileHandler.java @@ -35,18 +35,12 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque HdfsConfiguration endpointConfig = hdfsInfoFactory.getEndpointConfig(); Class<?> keyWritableClass = endpointConfig.getKeyType().getWritableClass(); Class<?> valueWritableClass = endpointConfig.getValueType().getWritableClass(); - rout = SequenceFile.createWriter( - hdfsInfo.getConfiguration(), - SequenceFile.Writer.file(hdfsInfo.getPath()), - SequenceFile.Writer.keyClass(keyWritableClass), - SequenceFile.Writer.valueClass(valueWritableClass), - SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()), - SequenceFile.Writer.replication(endpointConfig.getReplication()), - SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()), + rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass), + SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(endpointConfig.getBufferSize()), + SequenceFile.Writer.replication(endpointConfig.getReplication()), SequenceFile.Writer.blockSize(endpointConfig.getBlockSize()), SequenceFile.Writer.compression(endpointConfig.getCompressionType(), endpointConfig.getCompressionCodec().getCodec()), - SequenceFile.Writer.progressable(() -> { }), - SequenceFile.Writer.metadata(new SequenceFile.Metadata()) - ); + SequenceFile.Writer.progressable(() -> { + }), SequenceFile.Writer.metadata(new SequenceFile.Metadata())); return rout; } catch (IOException ex) { throw new RuntimeCamelException(ex); @@ -63,7 +57,7 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque SequenceFile.Writer writer = (SequenceFile.Writer) hdfsOutputStream.getOut(); writer.append(keyWritable, valueWritable); writer.sync(); - return Long.sum(keySize.getValue(), valueSize.getValue()); + return Long.sum(keySize.value, valueSize.value); } catch (Exception ex) { throw new RuntimeCamelException(ex); } @@ -90,9 +84,9 @@ class HdfsSequenceFileHandler extends DefaultHdfsFile<SequenceFile.Writer, Seque Holder<Integer> valueSize = new Holder<>(); Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); if (reader.next(keyWritable, valueWritable)) { - key.setValue(getObject(keyWritable, keySize)); - value.setValue(getObject(valueWritable, valueSize)); - return Long.sum(keySize.getValue(), valueSize.getValue()); + key.value = getObject(keyWritable, keySize); + value.value = getObject(valueWritable, valueSize); + return Long.sum(keySize.value, valueSize.value); } else { return 0; } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java index 7f2de0f..07c5eb8 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java @@ -48,13 +48,13 @@ public class HdfsWritableFactories { @Override public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { - size.setValue(0); + size.value = 0; return NullWritable.get(); } @Override public Object read(Writable writable, Holder<Integer> size) { - size.setValue(0); + size.value = 0; return null; } } @@ -65,7 +65,7 @@ public class HdfsWritableFactories { @Override public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { - size.setValue(SIZE); + size.value = SIZE; ByteWritable writable = new ByteWritable(); writable.set(typeConverter.convertTo(Byte.class, value)); return writable; @@ -73,7 +73,7 @@ public class HdfsWritableFactories { @Override public Object read(Writable writable, Holder<Integer> size) { - size.setValue(SIZE); + size.value = SIZE; return ((ByteWritable) writable).get(); } } @@ -84,7 +84,7 @@ public class HdfsWritableFactories { @Override public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { - size.setValue(SIZE); + size.value = SIZE; BooleanWritable writable = new BooleanWritable(); writable.set(typeConverter.convertTo(Boolean.class, value)); return writable; @@ -92,7 +92,7 @@ public class HdfsWritableFactories { @Override public Object read(Writable writable, Holder<Integer> size) { - size.setValue(SIZE); + size.value = SIZE; return ((BooleanWritable) writable).get(); } } @@ -104,15 +104,15 @@ public class HdfsWritableFactories { BytesWritable writable = new BytesWritable(); ByteBuffer bb = (ByteBuffer) value; writable.set(bb.array(), 0, bb.array().length); - size.setValue(bb.array().length); + size.value = bb.array().length; return writable; } @Override public Object read(Writable writable, Holder<Integer> size) { - size.setValue(((BytesWritable) writable).getLength()); - ByteBuffer bb = ByteBuffer.allocate(size.getValue()); - bb.put(((BytesWritable) writable).getBytes(), 0, size.getValue()); + size.value = ((BytesWritable) writable).getLength(); + ByteBuffer bb = ByteBuffer.allocate(size.value); + bb.put(((BytesWritable) writable).getBytes(), 0, size.value); return bb; } } @@ -123,7 +123,7 @@ public class HdfsWritableFactories { @Override public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { - size.setValue(SIZE); + size.value = SIZE; DoubleWritable writable = new DoubleWritable(); writable.set(typeConverter.convertTo(Double.class, value)); return writable; @@ -131,7 +131,7 @@ public class HdfsWritableFactories { @Override public Object read(Writable writable, Holder<Integer> size) { - size.setValue(SIZE); + size.value = SIZE; return ((DoubleWritable) writable).get(); } } @@ -142,7 +142,7 @@ public class HdfsWritableFactories { @Override public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { - size.setValue(SIZE); + size.value = SIZE; FloatWritable writable = new FloatWritable(); writable.set(typeConverter.convertTo(Float.class, value)); return writable; @@ -150,7 +150,7 @@ public class HdfsWritableFactories { @Override public Object read(Writable writable, Holder<Integer> size) { - size.setValue(SIZE); + size.value = SIZE; return ((FloatWritable) writable).get(); } } @@ -161,7 +161,7 @@ public class HdfsWritableFactories { @Override public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { - size.setValue(SIZE); + size.value = SIZE; IntWritable writable = new IntWritable(); writable.set(typeConverter.convertTo(Integer.class, value)); return writable; @@ -169,7 +169,7 @@ public class HdfsWritableFactories { @Override public Object read(Writable writable, Holder<Integer> size) { - size.setValue(SIZE); + size.value = SIZE; return ((IntWritable) writable).get(); } } @@ -180,7 +180,7 @@ public class HdfsWritableFactories { @Override public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { - size.setValue(SIZE); + size.value = SIZE; LongWritable writable = new LongWritable(); writable.set(typeConverter.convertTo(Long.class, value)); return writable; @@ -188,7 +188,7 @@ public class HdfsWritableFactories { @Override public Object read(Writable writable, Holder<Integer> size) { - size.setValue(SIZE); + size.value = SIZE; return ((LongWritable) writable).get(); } } @@ -199,13 +199,13 @@ public class HdfsWritableFactories { public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { Text writable = new Text(); writable.set(typeConverter.convertTo(String.class, value)); - size.setValue(writable.getBytes().length); + size.value = writable.getBytes().length; return writable; } @Override public Object read(Writable writable, Holder<Integer> size) { - size.setValue(((Text) writable).getLength()); + size.value = ((Text) writable).getLength(); return writable.toString(); } } @@ -219,7 +219,7 @@ public class HdfsWritableFactories { IOUtils.copyBytes(is, bos, HdfsConstants.DEFAULT_BUFFERSIZE, false); BytesWritable writable = new BytesWritable(); writable.set(bos.toByteArray(), 0, bos.toByteArray().length); - size.setValue(bos.toByteArray().length); + size.value = bos.toByteArray().length; return writable; } catch (IOException ex) { throw new RuntimeCamelException(ex); @@ -228,7 +228,7 @@ public class HdfsWritableFactories { @Override public Object read(Writable writable, Holder<Integer> size) { - size.setValue(0); + size.value = 0; return null; } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java index 458f8ea..8a02dcd 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/Holder.java @@ -21,7 +21,7 @@ public final class Holder<T> { /** * The value contained in the holder. **/ - private T value; + public T value; /** * Creates a new holder with a <code>null</code> value. @@ -37,12 +37,4 @@ public final class Holder<T> { public Holder(T value) { this.value = value; } - - public T getValue() { - return value; - } - - public void setValue(T value) { - this.value = value; - } } diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java index 2841a14..ce6b334 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInputStreamTest.java @@ -20,7 +20,6 @@ import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.junit.Before; @@ -56,7 +55,6 @@ public class HdfsInputStreamTest { fileSystem = mock(FileSystem.class); configuration = mock(Configuration.class); Path path = mock(Path.class); - FileStatus fileStatus = mock(FileStatus.class); when(hdfsInfoFactory.newHdfsInfo(anyString())).thenReturn(hdfsInfo); when(hdfsInfoFactory.newHdfsInfoWithoutAuth(anyString())).thenReturn(hdfsInfo); @@ -65,11 +63,6 @@ public class HdfsInputStreamTest { when(hdfsInfo.getFileSystem()).thenReturn(fileSystem); when(hdfsInfo.getConfiguration()).thenReturn(configuration); when(hdfsInfo.getPath()).thenReturn(path); - - when(path.getFileSystem(configuration)).thenReturn(fileSystem); - - when(fileSystem.getFileStatus(path)).thenReturn(fileStatus); - when(fileStatus.getLen()).thenReturn(1000L); } @Test