This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 73b1d20  CAMEL-14011 - Refactor camel-hdfs component to reduce issues 
reported by sonar (#3210)
73b1d20 is described below

commit 73b1d202c233cdd7e639d2e460e2ee963ad59646
Author: Marius Cornescu <marius_corne...@yahoo.com>
AuthorDate: Mon Sep 30 05:34:51 2019 +0200

    CAMEL-14011 - Refactor camel-hdfs component to reduce issues reported by 
sonar (#3210)
---
 .../camel/component/hdfs/DefaultHdfsFileType.java  | 114 +++++
 .../camel/component/hdfs/HdfsArrayFileType.java    |  87 ++++
 .../camel/component/hdfs/HdfsBloommapFileType.java |  98 ++++
 .../apache/camel/component/hdfs/HdfsComponent.java |  14 +-
 .../camel/component/hdfs/HdfsConfiguration.java    |  19 +-
 .../apache/camel/component/hdfs/HdfsConsumer.java  | 130 +++---
 .../org/apache/camel/component/hdfs/HdfsFile.java  |  33 ++
 .../camel/component/hdfs/HdfsFileSystemType.java   |   4 +-
 .../apache/camel/component/hdfs/HdfsFileType.java  | 497 +--------------------
 .../camel/component/hdfs/HdfsInputStream.java      |  36 +-
 .../camel/component/hdfs/HdfsMapFileType.java      |  96 ++++
 .../camel/component/hdfs/HdfsNormalFileType.java   | 145 ++++++
 .../apache/camel/component/hdfs/HdfsProducer.java  |  44 +-
 .../camel/component/hdfs/HdfsSequenceFileType.java |  97 ++++
 .../component/hdfs/HdfsWritableFactories.java      |   7 +-
 .../hdfs/kerberos/KerberosConfiguration.java       |  29 +-
 .../camel/component/hdfs/osgi/HdfsActivator.java   |   7 +-
 .../hdfs/kerberos/KerberosConfigurationTest.java   |   2 +-
 18 files changed, 861 insertions(+), 598 deletions(-)

diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFileType.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFileType.java
new file mode 100644
index 0000000..4fdcbeb
--- /dev/null
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFileType.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.PrintStream;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.TypeConverter;
+import org.apache.camel.util.IOHelper;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.ByteWritable;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.DoubleWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+abstract class DefaultHdfsFileType implements HdfsFile {
+
+    protected final long copyBytes(InputStream in, OutputStream out, int 
buffSize, boolean close) throws IOException {
+        long numBytes = 0;
+        PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
+        byte[] buf = new byte[buffSize];
+        try {
+            int bytesRead = in.read(buf);
+            while (bytesRead >= 0) {
+                out.write(buf, 0, bytesRead);
+                numBytes += bytesRead;
+                if ((ps != null) && ps.checkError()) {
+                    throw new IOException("Unable to write to output stream.");
+                }
+                bytesRead = in.read(buf);
+            }
+        } finally {
+            if (close) {
+                IOHelper.close(out, in);
+            }
+        }
+        return numBytes;
+    }
+
+    protected final Writable getWritable(Object obj, TypeConverter 
typeConverter, Holder<Integer> size) {
+        Class<?> objCls = obj == null ? null : obj.getClass();
+        HdfsWritableFactories.HdfsWritableFactory objWritableFactory = 
WritableCache.writables.get(objCls);
+        if (objWritableFactory == null) {
+            objWritableFactory = new 
HdfsWritableFactories.HdfsObjectWritableFactory();
+        }
+        return objWritableFactory.create(obj, typeConverter, size);
+    }
+
+    protected final Object getObject(Writable writable, Holder<Integer> size) {
+        Class<?> writableClass = NullWritable.class;
+        if (writable != null) {
+            writableClass = writable.getClass();
+        }
+        HdfsWritableFactories.HdfsWritableFactory writableObjectFactory = 
WritableCache.readables.get(writableClass);
+        return writableObjectFactory.read(writable, size);
+    }
+
+    @SuppressWarnings({"rawtypes"})
+    private static final class WritableCache {
+        private static Map<Class, HdfsWritableFactories.HdfsWritableFactory> 
writables = new HashMap<>();
+        private static Map<Class, HdfsWritableFactories.HdfsWritableFactory> 
readables = new HashMap<>();
+
+        private WritableCache() {
+        }
+
+        static {
+            writables.put(Boolean.class, new 
HdfsWritableFactories.HdfsBooleanWritableFactory());
+            writables.put(Byte.class, new 
HdfsWritableFactories.HdfsByteWritableFactory());
+            writables.put(ByteBuffer.class, new 
HdfsWritableFactories.HdfsBytesWritableFactory());
+            writables.put(Double.class, new 
HdfsWritableFactories.HdfsDoubleWritableFactory());
+            writables.put(Float.class, new 
HdfsWritableFactories.HdfsFloatWritableFactory());
+            writables.put(Integer.class, new 
HdfsWritableFactories.HdfsIntWritableFactory());
+            writables.put(Long.class, new 
HdfsWritableFactories.HdfsLongWritableFactory());
+            writables.put(String.class, new 
HdfsWritableFactories.HdfsTextWritableFactory());
+            writables.put(null, new 
HdfsWritableFactories.HdfsNullWritableFactory());
+        }
+
+        static {
+            readables.put(BooleanWritable.class, new 
HdfsWritableFactories.HdfsBooleanWritableFactory());
+            readables.put(ByteWritable.class, new 
HdfsWritableFactories.HdfsByteWritableFactory());
+            readables.put(BytesWritable.class, new 
HdfsWritableFactories.HdfsBytesWritableFactory());
+            readables.put(DoubleWritable.class, new 
HdfsWritableFactories.HdfsDoubleWritableFactory());
+            readables.put(FloatWritable.class, new 
HdfsWritableFactories.HdfsFloatWritableFactory());
+            readables.put(IntWritable.class, new 
HdfsWritableFactories.HdfsIntWritableFactory());
+            readables.put(LongWritable.class, new 
HdfsWritableFactories.HdfsLongWritableFactory());
+            readables.put(Text.class, new 
HdfsWritableFactories.HdfsTextWritableFactory());
+            readables.put(NullWritable.class, new 
HdfsWritableFactories.HdfsNullWritableFactory());
+        }
+    }
+}
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java
new file mode 100644
index 0000000..4c4123c
--- /dev/null
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.TypeConverter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+class HdfsArrayFileType extends DefaultHdfsFileType {
+
+    @Override
+    public long append(HdfsOutputStream hdfsostr, Object key, Object value, 
TypeConverter typeConverter) {
+        try {
+            Holder<Integer> valueSize = new Holder<>();
+            Writable valueWritable = getWritable(value, typeConverter, 
valueSize);
+            ((ArrayFile.Writer) hdfsostr.getOut()).append(valueWritable);
+            return valueSize.value;
+        } catch (Exception ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    public long next(HdfsInputStream hdfsistr, Holder<Object> key, 
Holder<Object> value) {
+        try {
+            ArrayFile.Reader reader = (ArrayFile.Reader) hdfsistr.getIn();
+            Holder<Integer> valueSize = new Holder<>();
+            Writable valueWritable = (Writable) 
ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
+            if (reader.next(valueWritable) != null) {
+                value.value = getObject(valueWritable, valueSize);
+                return valueSize.value;
+            } else {
+                return 0;
+            }
+        } catch (Exception ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
+        try {
+            Closeable rout;
+            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
+            Class<? extends WritableComparable> valueWritableClass = 
configuration.getValueType().getWritableClass();
+            rout = new ArrayFile.Writer(hdfsInfo.getConf(), 
hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
+                    configuration.getCompressionType(), () -> { });
+            return rout;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    public Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration) {
+        try {
+            Closeable rin;
+            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
+            rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, 
hdfsInfo.getConf());
+            return rin;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+}
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java
new file mode 100644
index 0000000..be4c6d3
--- /dev/null
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.TypeConverter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BloomMapFile;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+class HdfsBloommapFileType extends DefaultHdfsFileType {
+
+    @Override
+    public long append(HdfsOutputStream hdfsostr, Object key, Object value, 
TypeConverter typeConverter) {
+        try {
+            Holder<Integer> keySize = new Holder<>();
+            Writable keyWritable = getWritable(key, typeConverter, keySize);
+            Holder<Integer> valueSize = new Holder<>();
+            Writable valueWritable = getWritable(value, typeConverter, 
valueSize);
+            ((BloomMapFile.Writer) 
hdfsostr.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
+            return Long.sum(keySize.value, valueSize.value);
+        } catch (Exception ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    public long next(HdfsInputStream hdfsistr, Holder<Object> key, 
Holder<Object> value) {
+        try {
+            MapFile.Reader reader = (BloomMapFile.Reader) hdfsistr.getIn();
+            Holder<Integer> keySize = new Holder<>();
+            WritableComparable<?> keyWritable = (WritableComparable<?>) 
ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration());
+            Holder<Integer> valueSize = new Holder<>();
+            Writable valueWritable = (Writable) 
ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
+            if (reader.next(keyWritable, valueWritable)) {
+                key.value = getObject(keyWritable, keySize);
+                value.value = getObject(valueWritable, valueSize);
+                return Long.sum(keySize.value, valueSize.value);
+            } else {
+                return 0;
+            }
+        } catch (Exception ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
+        try {
+            Closeable rout;
+            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
+            Class<? extends WritableComparable> keyWritableClass = 
configuration.getKeyType().getWritableClass();
+            Class<? extends WritableComparable> valueWritableClass = 
configuration.getValueType().getWritableClass();
+            rout = new BloomMapFile.Writer(hdfsInfo.getConf(), new 
Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass),
+                    MapFile.Writer.valueClass(valueWritableClass),
+                    
MapFile.Writer.compression(configuration.getCompressionType(), 
configuration.getCompressionCodec().getCodec()),
+                    MapFile.Writer.progressable(() -> {
+                    }));
+            return rout;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    public Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration) {
+        try {
+            Closeable rin;
+            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
+            rin = new BloomMapFile.Reader(new Path(hdfsPath), 
hdfsInfo.getConf());
+            return rin;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+}
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsComponent.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsComponent.java
index 07fe6b6..ebeafad 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsComponent.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsComponent.java
@@ -16,13 +16,13 @@
  */
 package org.apache.camel.component.hdfs;
 
-import java.io.File;
 import java.net.URL;
 import java.util.Map;
 
 import javax.security.auth.login.Configuration;
 
 import org.apache.camel.Endpoint;
+import org.apache.camel.component.hdfs.kerberos.KerberosConfiguration;
 import org.apache.camel.spi.annotations.Component;
 import org.apache.camel.support.DefaultComponent;
 import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
@@ -90,17 +90,7 @@ public class HdfsComponent extends DefaultComponent {
      * @param kerberosConfigFileLocation - kerb5.conf file 
(https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html)
      */
     public static void setKerberosConfigFile(String 
kerberosConfigFileLocation) {
-        if (!new File(kerberosConfigFileLocation).exists()) {
-            LOG.warn("Kerberos configuration file [{}}] could not be found.", 
kerberosConfigFileLocation);
-            return;
-        }
-
-        String krb5Conf = System.getProperty(KERBEROS_5_SYS_ENV);
-        if (krb5Conf == null || !krb5Conf.isEmpty()) {
-            System.setProperty(KERBEROS_5_SYS_ENV, kerberosConfigFileLocation);
-        } else if (!krb5Conf.equalsIgnoreCase(kerberosConfigFileLocation)) {
-            LOG.warn("[{}] was already configured with: [{}] config file", 
KERBEROS_5_SYS_ENV, krb5Conf);
-        }
+        
KerberosConfiguration.setKerberosConfigFile(kerberosConfigFileLocation);
     }
 
 }
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
index ce03ef4..51e4884 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java
@@ -191,15 +191,15 @@ public class HdfsConfiguration {
         splitStrategy = getString(hdfsSettings, "splitStrategy", 
kerberosNamedNodes);
 
         if (isNotEmpty(splitStrategy)) {
-            String[] strstrategies = splitStrategy.split(",");
-            for (String strstrategy : strstrategies) {
-                String[] tokens = strstrategy.split(":");
+            String[] strategyElements = splitStrategy.split(",");
+            for (String strategyElement : strategyElements) {
+                String[] tokens = strategyElement.split(":");
                 if (tokens.length != 2) {
                     throw new IllegalArgumentException("Wrong Split Strategy 
[splitStrategy" + "=" + splitStrategy + "]");
                 }
-                HdfsProducer.SplitStrategyType sst = 
HdfsProducer.SplitStrategyType.valueOf(tokens[0]);
-                long ssv = Long.parseLong(tokens[1]);
-                strategies.add(new HdfsProducer.SplitStrategy(sst, ssv));
+                HdfsProducer.SplitStrategyType strategyType = 
HdfsProducer.SplitStrategyType.valueOf(tokens[0]);
+                long strategyValue = Long.parseLong(tokens[1]);
+                strategies.add(new HdfsProducer.SplitStrategy(strategyType, 
strategyValue));
             }
         }
         return strategies;
@@ -216,11 +216,12 @@ public class HdfsConfiguration {
     }
 
     public void checkConsumerOptions() {
+        // no validation required
     }
 
     public void checkProducerOptions() {
         if (isAppend()) {
-            if (!getSplitStrategies().isEmpty()) {
+            if (hasSplitStrategies()) {
                 throw new IllegalArgumentException("Split Strategies 
incompatible with append=true");
             }
             if (getFileType() != HdfsFileType.NORMAL_FILE) {
@@ -491,6 +492,10 @@ public class HdfsConfiguration {
         return splitStrategies;
     }
 
+    public boolean hasSplitStrategies() {
+        return !splitStrategies.isEmpty();
+    }
+
     public String getSplitStrategy() {
         return splitStrategy;
     }
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
index 98e1c7d..7fb0288 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java
@@ -17,7 +17,10 @@
 package org.apache.camel.component.hdfs;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Objects;
 import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -41,7 +44,6 @@ public final class HdfsConsumer extends ScheduledPollConsumer 
{
     private final StringBuilder hdfsPath;
     private final Processor processor;
     private final ReadWriteLock rwlock = new ReentrantReadWriteLock();
-    private volatile HdfsInputStream inputStream;
 
     public HdfsConsumer(HdfsEndpoint endpoint, Processor processor, 
HdfsConfiguration config) {
         super(endpoint, processor);
@@ -66,7 +68,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer 
{
         }
     }
 
-    private HdfsInfo setupHdfs(boolean onStartup) throws Exception {
+    private HdfsInfo setupHdfs(boolean onStartup) throws IOException {
         // if we are starting up then log at info level, and if runtime then 
log at debug level to not flood the log
         if (onStartup) {
             log.info("Connecting to hdfs file-system {}:{}/{} (may take a 
while if connection is not available)", config.getHostName(), config.getPort(), 
hdfsPath);
@@ -100,7 +102,7 @@ public final class HdfsConsumer extends 
ScheduledPollConsumer {
         }
     }
 
-    protected int doPoll() throws Exception {
+    protected int doPoll() throws IOException {
         class ExcludePathFilter implements PathFilter {
             @Override
             public boolean accept(Path path) {
@@ -108,8 +110,6 @@ public final class HdfsConsumer extends 
ScheduledPollConsumer {
             }
         }
 
-        int numMessages = 0;
-
         HdfsInfo info = setupHdfs(false);
         FileStatus[] fileStatuses;
         if (info.getFileSystem().isFile(info.getPath())) {
@@ -121,78 +121,90 @@ public final class HdfsConsumer extends 
ScheduledPollConsumer {
 
         fileStatuses = Optional.ofNullable(fileStatuses).orElse(new 
FileStatus[0]);
 
-        for (FileStatus status : fileStatuses) {
+        return processFileStatuses(info, fileStatuses);
+    }
 
-            if (normalFileIsDirectoryNoSuccessFile(status, info)) {
-                continue;
-            }
+    private int processFileStatuses(HdfsInfo info, FileStatus[] fileStatuses) {
+        final AtomicInteger messageCount = new AtomicInteger(0);
 
-            if (config.getOwner() != null) {
-                // must match owner
-                if (!config.getOwner().equals(status.getOwner())) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Skipping file: {} as not matching owner: 
{}", status.getPath(), config.getOwner());
+        Arrays.stream(fileStatuses)
+                .filter(status -> normalFileIsDirectoryHasSuccessFile(status, 
info))
+                .filter(this::hasMatchingOwner)
+                .map(this::createInputStream)
+                .filter(Objects::nonNull)
+                .forEach(hdfsInputStream -> {
+                    try {
+                        processHdfsInputStream(hdfsInputStream, messageCount, 
fileStatuses.length);
+                    } finally {
+                        IOHelper.close(hdfsInputStream, "input stream", log);
                     }
-                    continue;
-                }
-            }
+                });
 
-            try {
-                this.rwlock.writeLock().lock();
-                this.inputStream = 
HdfsInputStream.createInputStream(status.getPath().toString(), this.config);
-                if (!this.inputStream.isOpened()) {
-                    if (log.isDebugEnabled()) {
-                        log.debug("Skipping file: {} because it doesn't exist 
anymore", status.getPath());
-                    }
-                    continue;
-                }
-            } finally {
-                this.rwlock.writeLock().unlock();
+        return messageCount.get();
+    }
+
+    private void processHdfsInputStream(HdfsInputStream inputStream, 
AtomicInteger messageCount, int totalFiles) {
+        Holder<Object> key = new Holder<>();
+        Holder<Object> value = new Holder<>();
+        while (inputStream.next(key, value) >= 0) {
+            Exchange exchange = this.getEndpoint().createExchange();
+            Message message = exchange.getIn();
+            String fileName = 
StringUtils.substringAfterLast(inputStream.getActualPath(), "/");
+            message.setHeader(Exchange.FILE_NAME, fileName);
+            if (key.value != null) {
+                message.setHeader(HdfsHeader.KEY.name(), key.value);
             }
+            message.setBody(value.value);
 
+            log.debug("Processing file {}", fileName);
             try {
-                Holder<Object> key = new Holder<>();
-                Holder<Object> value = new Holder<>();
-                while (this.inputStream.next(key, value) >= 0) {
-                    Exchange exchange = this.getEndpoint().createExchange();
-                    Message message = exchange.getIn();
-                    String fileName = 
StringUtils.substringAfterLast(status.getPath().toString(), "/");
-                    message.setHeader(Exchange.FILE_NAME, fileName);
-                    if (key.value != null) {
-                        message.setHeader(HdfsHeader.KEY.name(), key.value);
-                    }
-                    message.setBody(value.value);
+                processor.process(exchange);
+            } catch (Exception e) {
+                exchange.setException(e);
+            }
 
-                    log.debug("Processing file {}", fileName);
-                    try {
-                        processor.process(exchange);
-                    } catch (Exception e) {
-                        exchange.setException(e);
-                    }
+            // in case of unhandled exceptions then let the exception handler 
handle them
+            if (exchange.getException() != null) {
+                getExceptionHandler().handleException(exchange.getException());
+            }
 
-                    // in case of unhandled exceptions then let the exception 
handler handle them
-                    if (exchange.getException() != null) {
-                        
getExceptionHandler().handleException(exchange.getException());
-                    }
+            int count = messageCount.incrementAndGet();
+            log.debug("Processed [{}] files out of [{}]", count, totalFiles);
+        }
+    }
 
-                    numMessages++;
+    private boolean normalFileIsDirectoryHasSuccessFile(FileStatus fileStatus, 
HdfsInfo info) {
+        if (config.getFileType().equals(HdfsFileType.NORMAL_FILE) && 
fileStatus.isDirectory()) {
+            try {
+                Path successPath = new Path(fileStatus.getPath().toString() + 
"/_SUCCESS");
+                if (!info.getFileSystem().exists(successPath)) {
+                    return false;
                 }
-            } finally {
-                IOHelper.close(inputStream, "input stream", log);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
             }
         }
-
-        return numMessages;
+        return true;
     }
 
-    private boolean normalFileIsDirectoryNoSuccessFile(FileStatus status, 
HdfsInfo info) throws IOException {
-        if (config.getFileType().equals(HdfsFileType.NORMAL_FILE) && 
status.isDirectory()) {
-            Path successPath = new Path(status.getPath().toString() + 
"/_SUCCESS");
-            if (!info.getFileSystem().exists(successPath)) {
-                return true;
+    private boolean hasMatchingOwner(FileStatus fileStatus) {
+        if (config.getOwner() != null && 
!config.getOwner().equals(fileStatus.getOwner())) {
+            if (log.isDebugEnabled()) {
+                log.debug("Skipping file: {} as not matching owner: {}", 
fileStatus.getPath(), config.getOwner());
             }
+            return true;
         }
         return false;
     }
 
+    private HdfsInputStream createInputStream(FileStatus fileStatus) {
+        try {
+            this.rwlock.writeLock().lock();
+
+            return 
HdfsInputStream.createInputStream(fileStatus.getPath().toString(), this.config);
+        } finally {
+            this.rwlock.writeLock().unlock();
+        }
+    }
+
 }
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java
new file mode 100644
index 0000000..f08cb18
--- /dev/null
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs;
+
+import java.io.Closeable;
+
+import org.apache.camel.TypeConverter;
+
+interface HdfsFile {
+
+    long append(HdfsOutputStream hdfsostr, Object key, Object value, 
TypeConverter typeConverter);
+
+    long next(HdfsInputStream hdfsInputStream, Holder<Object> key, 
Holder<Object> value);
+
+    Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration);
+
+    Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration);
+
+}
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
index 9d8c79b..744fdab 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java
@@ -24,7 +24,7 @@ public enum HdfsFileSystemType {
             StringBuilder hpath = new StringBuilder();
             hpath.append("file://");
             hpath.append(config.getPath());
-            if (!config.getSplitStrategies().isEmpty()) {
+            if (config.hasSplitStrategies()) {
                 hpath.append('/');
             }
             return hpath;
@@ -40,7 +40,7 @@ public enum HdfsFileSystemType {
             hpath.append(':');
             hpath.append(config.getPort());
             hpath.append(config.getPath());
-            if (!config.getSplitStrategies().isEmpty()) {
+            if (config.hasSplitStrategies()) {
                 hpath.append('/');
             }
             return hpath;
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
index 713e271..fe18006 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java
@@ -16,499 +16,38 @@
  */
 package org.apache.camel.component.hdfs;
 
-import java.io.ByteArrayOutputStream;
 import java.io.Closeable;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.util.HashMap;
-import java.util.Map;
 
-import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.TypeConverter;
-import org.apache.camel.util.IOHelper;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.ArrayFile;
-import org.apache.hadoop.io.BloomMapFile;
-import org.apache.hadoop.io.BooleanWritable;
-import org.apache.hadoop.io.ByteWritable;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.MapFile;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.SequenceFile.Reader;
-import org.apache.hadoop.io.SequenceFile.Writer;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.util.ReflectionUtils;
 
 public enum HdfsFileType {
 
-    NORMAL_FILE {
-        @Override
-        public long append(HdfsOutputStream hdfsostr, Object key, Object 
value, TypeConverter typeConverter) {
-            InputStream is = null;
-            try {
-                is = typeConverter.convertTo(InputStream.class, value);
-                return copyBytes(is, (FSDataOutputStream) hdfsostr.getOut(), 
HdfsConstants.DEFAULT_BUFFERSIZE, false);
-            } catch (IOException ex) {
-                throw new RuntimeCamelException(ex);
-            } finally {
-                IOHelper.close(is);
-            }
-        }
+    NORMAL_FILE(new HdfsNormalFileType()),
+    SEQUENCE_FILE(new HdfsSequenceFileType()),
+    MAP_FILE(new HdfsMapFileType()),
+    BLOOMMAP_FILE(new HdfsBloommapFileType()),
+    ARRAY_FILE(new HdfsArrayFileType());
 
-        @Override
-        public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, 
Holder<Object> value) {
-            try {
-                ByteArrayOutputStream bos = new 
ByteArrayOutputStream(hdfsInputStream.getChunkSize());
-                byte[] buf = new byte[hdfsInputStream.getChunkSize()];
-                int bytesRead = ((InputStream) 
hdfsInputStream.getIn()).read(buf);
-                if (bytesRead >= 0) {
-                    bos.write(buf, 0, bytesRead);
-                    key.value = null;
-                    value.value = bos;
-                    return bytesRead;
-                } else {
-                    key.value = null;
-                    // indication that we may have read from empty file
-                    value.value = bos;
-                    return 0;
-                }
-            } catch (IOException ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
+    private final HdfsFile file;
 
-        @Override
-        public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
-            try {
-                Closeable rout;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
-                if (!configuration.isAppend()) {
-                    rout = hdfsInfo.getFileSystem().create(hdfsInfo.getPath(), 
configuration.isOverwrite(), configuration.getBufferSize(),
-                            configuration.getReplication(), 
configuration.getBlockSize(), () -> { });
-                } else {
-                    rout = hdfsInfo.getFileSystem().append(hdfsInfo.getPath(), 
configuration.getBufferSize(), () -> { });
-                }
-                return rout;
-            } catch (IOException ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-
-        @Override
-        public Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration) {
-            try {
-                Closeable rin;
-                if 
(configuration.getFileSystemType().equals(HdfsFileSystemType.LOCAL)) {
-                    HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
-                    rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
-                } else {
-                    rin = new FileInputStream(getHfdsFileToTmpFile(hdfsPath, 
configuration));
-                }
-                return rin;
-            } catch (IOException ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-
-        private File getHfdsFileToTmpFile(String hdfsPath, HdfsConfiguration 
configuration) {
-            try {
-                String fname = hdfsPath.substring(hdfsPath.lastIndexOf('/'));
-
-                // [CAMEL-13711] Files.createTempFile not equivalent to 
File.createTempFile
-
-                File outputDest;
-                try {
-
-                    // First trying: Files.createTempFile
-                    outputDest = Files.createTempFile(fname, ".hdfs").toFile();
-
-                } catch (Exception ex) {
-
-                    // Now trying: File.createTempFile
-                    outputDest = File.createTempFile(fname, ".hdfs");
-                }
-
-                if (outputDest.exists()) {
-                    outputDest.delete();
-                }
-
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
-                FileSystem fileSystem = hdfsInfo.getFileSystem();
-                FileUtil.copy(fileSystem, new Path(hdfsPath), outputDest, 
false, fileSystem.getConf());
-                try {
-                    FileUtil.copyMerge(
-                            fileSystem, // src
-                            new Path(hdfsPath),
-                            FileSystem.getLocal(new Configuration()), // dest
-                            new Path(outputDest.toURI()),
-                            false, fileSystem.getConf(), null);
-                } catch (IOException e) {
-                    return outputDest;
-                }
-
-                return new File(outputDest, fname);
-            } catch (IOException ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-    },
-
-    SEQUENCE_FILE {
-        @Override
-        public long append(HdfsOutputStream hdfsostr, Object key, Object 
value, TypeConverter typeConverter) {
-            try {
-                Holder<Integer> keySize = new Holder<>();
-                Writable keyWritable = getWritable(key, typeConverter, 
keySize);
-                Holder<Integer> valueSize = new Holder<>();
-                Writable valueWritable = getWritable(value, typeConverter, 
valueSize);
-                Writer writer = (SequenceFile.Writer) hdfsostr.getOut();
-                writer.append(keyWritable, valueWritable);
-                writer.sync();
-                return keySize.value + valueSize.value;
-            } catch (Exception ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-
-        @Override
-        public long next(HdfsInputStream hdfsistr, Holder<Object> key, 
Holder<Object> value) {
-            try {
-                SequenceFile.Reader reader = (SequenceFile.Reader) 
hdfsistr.getIn();
-                Holder<Integer> keySize = new Holder<>();
-                Writable keyWritable = (Writable) 
ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration());
-                Holder<Integer> valueSize = new Holder<>();
-                Writable valueWritable = (Writable) 
ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
-                if (reader.next(keyWritable, valueWritable)) {
-                    key.value = getObject(keyWritable, keySize);
-                    value.value = getObject(valueWritable, valueSize);
-                    return keySize.value + valueSize.value;
-                } else {
-                    return 0;
-                }
-            } catch (Exception ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-
-        @Override
-        public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
-            try {
-                Closeable rout;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
-                Class<?> keyWritableClass = 
configuration.getKeyType().getWritableClass();
-                Class<?> valueWritableClass = 
configuration.getValueType().getWritableClass();
-                rout = SequenceFile.createWriter(hdfsInfo.getConf(), 
Writer.file(hdfsInfo.getPath()), Writer.keyClass(keyWritableClass),
-                        Writer.valueClass(valueWritableClass), 
Writer.bufferSize(configuration.getBufferSize()),
-                        Writer.replication(configuration.getReplication()), 
Writer.blockSize(configuration.getBlockSize()),
-                        Writer.compression(configuration.getCompressionType(), 
configuration.getCompressionCodec().getCodec()),
-                        Writer.progressable(() -> {
-                        }), Writer.metadata(new SequenceFile.Metadata()));
-                return rout;
-            } catch (IOException ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-
-        @Override
-        public Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration) {
-            try {
-                Closeable rin;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
-                rin = new SequenceFile.Reader(hdfsInfo.getConf(), 
Reader.file(hdfsInfo.getPath()));
-                return rin;
-            } catch (IOException ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-    },
-
-    MAP_FILE {
-        @Override
-        public long append(HdfsOutputStream hdfsostr, Object key, Object 
value, TypeConverter typeConverter) {
-            try {
-                Holder<Integer> keySize = new Holder<>();
-                Writable keyWritable = getWritable(key, typeConverter, 
keySize);
-                Holder<Integer> valueSize = new Holder<>();
-                Writable valueWritable = getWritable(value, typeConverter, 
valueSize);
-                ((MapFile.Writer) 
hdfsostr.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
-                return keySize.value + valueSize.value;
-            } catch (Exception ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-
-        @Override
-        public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, 
Holder<Object> value) {
-            try {
-                MapFile.Reader reader = (MapFile.Reader) 
hdfsInputStream.getIn();
-                Holder<Integer> keySize = new Holder<>();
-                WritableComparable<?> keyWritable = (WritableComparable<?>) 
ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration());
-                Holder<Integer> valueSize = new Holder<>();
-                Writable valueWritable = (Writable) 
ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
-                if (reader.next(keyWritable, valueWritable)) {
-                    key.value = getObject(keyWritable, keySize);
-                    value.value = getObject(valueWritable, valueSize);
-                    return keySize.value + valueSize.value;
-                } else {
-                    return 0;
-                }
-            } catch (Exception ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-
-        @Override
-        @SuppressWarnings("rawtypes")
-        public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
-            try {
-                Closeable rout;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
-                Class<? extends WritableComparable> keyWritableClass = 
configuration.getKeyType().getWritableClass();
-                Class<? extends WritableComparable> valueWritableClass = 
configuration.getValueType().getWritableClass();
-                rout = new MapFile.Writer(hdfsInfo.getConf(), new 
Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), 
MapFile.Writer.valueClass(valueWritableClass),
-                    
MapFile.Writer.compression(configuration.getCompressionType(), 
configuration.getCompressionCodec().getCodec()),
-                    MapFile.Writer.progressable(() -> {
-                    }));
-                return rout;
-            } catch (IOException ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-
-        @Override
-        public Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration) {
-            try {
-                Closeable rin;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
-                rin = new MapFile.Reader(new Path(hdfsPath), 
hdfsInfo.getConf());
-                return rin;
-            } catch (IOException ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-    },
-
-    BLOOMMAP_FILE {
-        @Override
-        public long append(HdfsOutputStream hdfsostr, Object key, Object 
value, TypeConverter typeConverter) {
-            try {
-                Holder<Integer> keySize = new Holder<>();
-                Writable keyWritable = getWritable(key, typeConverter, 
keySize);
-                Holder<Integer> valueSize = new Holder<>();
-                Writable valueWritable = getWritable(value, typeConverter, 
valueSize);
-                ((BloomMapFile.Writer) 
hdfsostr.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
-                return keySize.value + valueSize.value;
-            } catch (Exception ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-
-        @Override
-        public long next(HdfsInputStream hdfsistr, Holder<Object> key, 
Holder<Object> value) {
-            try {
-                MapFile.Reader reader = (BloomMapFile.Reader) hdfsistr.getIn();
-                Holder<Integer> keySize = new Holder<>();
-                WritableComparable<?> keyWritable = (WritableComparable<?>) 
ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration());
-                Holder<Integer> valueSize = new Holder<>();
-                Writable valueWritable = (Writable) 
ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
-                if (reader.next(keyWritable, valueWritable)) {
-                    key.value = getObject(keyWritable, keySize);
-                    value.value = getObject(valueWritable, valueSize);
-                    return keySize.value + valueSize.value;
-                } else {
-                    return 0;
-                }
-            } catch (Exception ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-
-        @SuppressWarnings("rawtypes")
-        @Override
-        public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
-            try {
-                Closeable rout;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
-                Class<? extends WritableComparable> keyWritableClass = 
configuration.getKeyType().getWritableClass();
-                Class<? extends WritableComparable> valueWritableClass = 
configuration.getValueType().getWritableClass();
-                rout = new BloomMapFile.Writer(hdfsInfo.getConf(), new 
Path(hdfsPath), org.apache.hadoop.io.MapFile.Writer.keyClass(keyWritableClass),
-                        
org.apache.hadoop.io.MapFile.Writer.valueClass(valueWritableClass),
-                        
org.apache.hadoop.io.MapFile.Writer.compression(configuration.getCompressionType(),
 configuration.getCompressionCodec().getCodec()),
-                        org.apache.hadoop.io.MapFile.Writer.progressable(() -> 
{
-                        }));
-                return rout;
-            } catch (IOException ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-
-        @Override
-        public Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration) {
-            try {
-                Closeable rin;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
-                rin = new BloomMapFile.Reader(new Path(hdfsPath), 
hdfsInfo.getConf());
-                return rin;
-            } catch (IOException ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-    },
-
-    ARRAY_FILE {
-        @Override
-        public long append(HdfsOutputStream hdfsostr, Object key, Object 
value, TypeConverter typeConverter) {
-            try {
-                Holder<Integer> valueSize = new Holder<>();
-                Writable valueWritable = getWritable(value, typeConverter, 
valueSize);
-                ((ArrayFile.Writer) hdfsostr.getOut()).append(valueWritable);
-                return valueSize.value;
-            } catch (Exception ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-
-        @Override
-        public long next(HdfsInputStream hdfsistr, Holder<Object> key, 
Holder<Object> value) {
-            try {
-                ArrayFile.Reader reader = (ArrayFile.Reader) hdfsistr.getIn();
-                Holder<Integer> valueSize = new Holder<>();
-                Writable valueWritable = (Writable) 
ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
-                if (reader.next(valueWritable) != null) {
-                    value.value = getObject(valueWritable, valueSize);
-                    return valueSize.value;
-                } else {
-                    return 0;
-                }
-            } catch (Exception ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-
-        @SuppressWarnings("rawtypes")
-        @Override
-        public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
-            try {
-                Closeable rout;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
-                Class<? extends WritableComparable> valueWritableClass = 
configuration.getValueType().getWritableClass();
-                rout = new ArrayFile.Writer(hdfsInfo.getConf(), 
hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass,
-                        configuration.getCompressionType(), () -> { });
-                return rout;
-            } catch (IOException ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-
-        @Override
-        public Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration) {
-            try {
-                Closeable rin;
-                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
-                rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, 
hdfsInfo.getConf());
-                return rin;
-            } catch (IOException ex) {
-                throw new RuntimeCamelException(ex);
-            }
-        }
-    };
-
-    @SuppressWarnings({"rawtypes"})
-    private static final class WritableCache {
-
-        private static Map<Class, HdfsWritableFactories.HdfsWritableFactory> 
writables = new HashMap<>();
-        private static Map<Class, HdfsWritableFactories.HdfsWritableFactory> 
readables = new HashMap<>();
-
-        private WritableCache() {
-        }
-
-        static {
-            writables.put(Boolean.class, new 
HdfsWritableFactories.HdfsBooleanWritableFactory());
-            writables.put(Byte.class, new 
HdfsWritableFactories.HdfsByteWritableFactory());
-            writables.put(ByteBuffer.class, new 
HdfsWritableFactories.HdfsBytesWritableFactory());
-            writables.put(Double.class, new 
HdfsWritableFactories.HdfsDoubleWritableFactory());
-            writables.put(Float.class, new 
HdfsWritableFactories.HdfsFloatWritableFactory());
-            writables.put(Integer.class, new 
HdfsWritableFactories.HdfsIntWritableFactory());
-            writables.put(Long.class, new 
HdfsWritableFactories.HdfsLongWritableFactory());
-            writables.put(String.class, new 
HdfsWritableFactories.HdfsTextWritableFactory());
-            writables.put(null, new 
HdfsWritableFactories.HdfsNullWritableFactory());
-        }
-
-        static {
-            readables.put(BooleanWritable.class, new 
HdfsWritableFactories.HdfsBooleanWritableFactory());
-            readables.put(ByteWritable.class, new 
HdfsWritableFactories.HdfsByteWritableFactory());
-            readables.put(BytesWritable.class, new 
HdfsWritableFactories.HdfsBytesWritableFactory());
-            readables.put(DoubleWritable.class, new 
HdfsWritableFactories.HdfsDoubleWritableFactory());
-            readables.put(FloatWritable.class, new 
HdfsWritableFactories.HdfsFloatWritableFactory());
-            readables.put(IntWritable.class, new 
HdfsWritableFactories.HdfsIntWritableFactory());
-            readables.put(LongWritable.class, new 
HdfsWritableFactories.HdfsLongWritableFactory());
-            readables.put(Text.class, new 
HdfsWritableFactories.HdfsTextWritableFactory());
-            readables.put(NullWritable.class, new 
HdfsWritableFactories.HdfsNullWritableFactory());
-        }
+    private HdfsFileType(HdfsFile file) {
+        this.file = file;
     }
 
-    private static Writable getWritable(Object obj, TypeConverter 
typeConverter, Holder<Integer> size) {
-        Class<?> objCls = obj == null ? null : obj.getClass();
-        HdfsWritableFactories.HdfsWritableFactory objWritableFactory = 
WritableCache.writables.get(objCls);
-        if (objWritableFactory == null) {
-            objWritableFactory = new 
HdfsWritableFactories.HdfsObjectWritableFactory();
-        }
-        return objWritableFactory.create(obj, typeConverter, size);
+    public long append(HdfsOutputStream hdfsostr, Object key, Object value, 
TypeConverter typeConverter) {
+        return this.file.append(hdfsostr, key, value, typeConverter);
     }
 
-    private static Object getObject(Writable writable, Holder<Integer> size) {
-        Class<?> writableClass = NullWritable.class;
-        if (writable != null) {
-            writableClass = writable.getClass();
-        }
-        HdfsWritableFactories.HdfsWritableFactory writableObjectFactory = 
WritableCache.readables.get(writableClass);
-        return writableObjectFactory.read(writable, size);
+    public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, 
Holder<Object> value) {
+        return this.file.next(hdfsInputStream, key, value);
     }
 
-    public abstract long append(HdfsOutputStream hdfsostr, Object key, Object 
value, TypeConverter typeConverter);
-
-    public abstract long next(HdfsInputStream hdfsInputStream, Holder<Object> 
key, Holder<Object> value);
-
-    public abstract Closeable createOutputStream(String hdfsPath, 
HdfsConfiguration configuration);
-
-    public abstract Closeable createInputStream(String hdfsPath, 
HdfsConfiguration configuration);
+    public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
+        return this.file.createOutputStream(hdfsPath, configuration);
+    }
 
-    public static long copyBytes(InputStream in, OutputStream out, int 
buffSize, boolean close) throws IOException {
-        long numBytes = 0;
-        PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null;
-        byte[] buf = new byte[buffSize];
-        try {
-            int bytesRead = in.read(buf);
-            while (bytesRead >= 0) {
-                out.write(buf, 0, bytesRead);
-                numBytes += bytesRead;
-                if ((ps != null) && ps.checkError()) {
-                    throw new IOException("Unable to write to output stream.");
-                }
-                bytesRead = in.read(buf);
-            }
-        } finally {
-            if (close) {
-                IOHelper.close(out, in);
-            }
-        }
-        return numBytes;
+    public Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration) {
+        return this.file.createInputStream(hdfsPath, configuration);
     }
+
 }
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
index 58816d6..40212e8 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java
@@ -18,13 +18,18 @@ package org.apache.camel.component.hdfs;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Optional;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class HdfsInputStream implements Closeable {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(HdfsInputStream.class);
+
     private HdfsFileType fileType;
     private String actualPath;
     private String suffixedPath;
@@ -40,21 +45,36 @@ public class HdfsInputStream implements Closeable {
     protected HdfsInputStream() {
     }
 
-    public static HdfsInputStream createInputStream(String hdfsPath, 
HdfsConfiguration configuration) throws IOException {
+    /**
+     *
+     * @param hdfsPath
+     * @param configuration
+     * @return
+     * @throws IOException
+     */
+    public static HdfsInputStream createInputStream(String hdfsPath, 
HdfsConfiguration configuration) {
         HdfsInputStream ret = new HdfsInputStream();
         ret.fileType = configuration.getFileType();
         ret.actualPath = hdfsPath;
         ret.suffixedPath = ret.actualPath + '.' + 
configuration.getOpenedSuffix();
         ret.suffixedReadPath = ret.actualPath + '.' + 
configuration.getReadSuffix();
         ret.chunkSize = configuration.getChunkSize();
-        HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath, 
configuration);
-        if (info.getFileSystem().rename(new Path(ret.actualPath), new 
Path(ret.suffixedPath))) {
-            ret.in = ret.fileType.createInputStream(ret.suffixedPath, 
configuration);
-            ret.opened = true;
-        } else {
-            ret.opened = false;
+        try {
+            HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath, 
configuration);
+            if (info.getFileSystem().rename(new Path(ret.actualPath), new 
Path(ret.suffixedPath))) {
+                ret.in = ret.fileType.createInputStream(ret.suffixedPath, 
configuration);
+                ret.opened = true;
+                ret.config = configuration;
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Failed to open file [{}] because it doesn't 
exist", hdfsPath);
+                }
+                ret = null;
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
-        ret.config = configuration;
+
         return ret;
     }
 
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java
new file mode 100644
index 0000000..01868b7
--- /dev/null
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.TypeConverter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.MapFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+class HdfsMapFileType extends DefaultHdfsFileType {
+
+    @Override
+    public long append(HdfsOutputStream hdfsostr, Object key, Object value, 
TypeConverter typeConverter) {
+        try {
+            Holder<Integer> keySize = new Holder<>();
+            Writable keyWritable = getWritable(key, typeConverter, keySize);
+            Holder<Integer> valueSize = new Holder<>();
+            Writable valueWritable = getWritable(value, typeConverter, 
valueSize);
+            ((MapFile.Writer) 
hdfsostr.getOut()).append((WritableComparable<?>) keyWritable, valueWritable);
+            return Long.sum(keySize.value, valueSize.value);
+        } catch (Exception ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, 
Holder<Object> value) {
+        try {
+            MapFile.Reader reader = (MapFile.Reader) hdfsInputStream.getIn();
+            Holder<Integer> keySize = new Holder<>();
+            WritableComparable<?> keyWritable = (WritableComparable<?>) 
ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration());
+            Holder<Integer> valueSize = new Holder<>();
+            Writable valueWritable = (Writable) 
ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
+            if (reader.next(keyWritable, valueWritable)) {
+                key.value = getObject(keyWritable, keySize);
+                value.value = getObject(valueWritable, valueSize);
+                return Long.sum(keySize.value, valueSize.value);
+            } else {
+                return 0;
+            }
+        } catch (Exception ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
+        try {
+            Closeable rout;
+            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
+            Class<? extends WritableComparable> keyWritableClass = 
configuration.getKeyType().getWritableClass();
+            Class<? extends WritableComparable> valueWritableClass = 
configuration.getValueType().getWritableClass();
+            rout = new MapFile.Writer(hdfsInfo.getConf(), new Path(hdfsPath), 
MapFile.Writer.keyClass(keyWritableClass), 
MapFile.Writer.valueClass(valueWritableClass),
+                    
MapFile.Writer.compression(configuration.getCompressionType(), 
configuration.getCompressionCodec().getCodec()),
+                    MapFile.Writer.progressable(() -> {
+                    }));
+            return rout;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    public Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration) {
+        try {
+            Closeable rin;
+            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
+            rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConf());
+            return rin;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+}
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileType.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileType.java
new file mode 100644
index 0000000..43df23a
--- /dev/null
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileType.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs;
+
+import java.io.ByteArrayOutputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.TypeConverter;
+import org.apache.camel.util.IOHelper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+
+class HdfsNormalFileType extends DefaultHdfsFileType {
+
+    @Override
+    public long append(HdfsOutputStream hdfsostr, Object key, Object value, 
TypeConverter typeConverter) {
+        InputStream is = null;
+        try {
+            is = typeConverter.convertTo(InputStream.class, value);
+            return copyBytes(is, (FSDataOutputStream) hdfsostr.getOut(), 
HdfsConstants.DEFAULT_BUFFERSIZE, false);
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        } finally {
+            IOHelper.close(is);
+        }
+    }
+
+    @Override
+    public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, 
Holder<Object> value) {
+        try {
+            ByteArrayOutputStream bos = new 
ByteArrayOutputStream(hdfsInputStream.getChunkSize());
+            byte[] buf = new byte[hdfsInputStream.getChunkSize()];
+            int bytesRead = ((InputStream) hdfsInputStream.getIn()).read(buf);
+            if (bytesRead >= 0) {
+                bos.write(buf, 0, bytesRead);
+                key.value = null;
+                value.value = bos;
+                return bytesRead;
+            } else {
+                key.value = null;
+                // indication that we may have read from empty file
+                value.value = bos;
+                return 0;
+            }
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
+        try {
+            Closeable rout;
+            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
+            if (!configuration.isAppend()) {
+                rout = hdfsInfo.getFileSystem().create(hdfsInfo.getPath(), 
configuration.isOverwrite(), configuration.getBufferSize(),
+                        configuration.getReplication(), 
configuration.getBlockSize(), () -> { });
+            } else {
+                rout = hdfsInfo.getFileSystem().append(hdfsInfo.getPath(), 
configuration.getBufferSize(), () -> { });
+            }
+            return rout;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    public Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration) {
+        try {
+            Closeable rin;
+            if 
(configuration.getFileSystemType().equals(HdfsFileSystemType.LOCAL)) {
+                HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
+                rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath());
+            } else {
+                rin = new FileInputStream(getHfdsFileToTmpFile(hdfsPath, 
configuration));
+            }
+            return rin;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    private File getHfdsFileToTmpFile(String hdfsPath, HdfsConfiguration 
configuration) {
+        try {
+            String fname = hdfsPath.substring(hdfsPath.lastIndexOf('/'));
+
+            // [CAMEL-13711] Files.createTempFile not equivalent to 
File.createTempFile
+
+            File outputDest;
+            try {
+                // First trying: Files.createTempFile
+                outputDest = Files.createTempFile(fname, ".hdfs").toFile();
+
+            } catch (Exception ex) {
+                // Now trying: File.createTempFile
+                outputDest = File.createTempFile(fname, ".hdfs");
+            }
+
+            if (outputDest.exists()) {
+                outputDest.delete();
+            }
+
+            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
+            FileSystem fileSystem = hdfsInfo.getFileSystem();
+            FileUtil.copy(fileSystem, new Path(hdfsPath), outputDest, false, 
fileSystem.getConf());
+            try {
+                FileUtil.copyMerge(
+                        fileSystem, // src
+                        new Path(hdfsPath),
+                        FileSystem.getLocal(new Configuration()), // dest
+                        new Path(outputDest.toURI()),
+                        false, fileSystem.getConf(), null);
+            } catch (IOException e) {
+                return outputDest;
+            }
+
+            return new File(outputDest, fname);
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+}
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
index 6c21580..568ee71 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.hdfs;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -108,17 +109,11 @@ public class HdfsProducer extends DefaultProducer {
                 ostream = setupHdfs(true);
             }
 
-            SplitStrategy idleStrategy = null;
-            for (SplitStrategy strategy : config.getSplitStrategies()) {
-                if (strategy.type == SplitStrategyType.IDLE) {
-                    idleStrategy = strategy;
-                    break;
-                }
-            }
-            if (idleStrategy != null) {
+            Optional<SplitStrategy> idleStrategy = 
tryFindIdleStrategy(config.getSplitStrategies());
+            if (idleStrategy.isPresent()) {
                 scheduler = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
 "HdfsIdleCheck");
                 log.debug("Creating IdleCheck task scheduled to run every {} 
millis", config.getCheckIdleInterval());
-                scheduler.scheduleAtFixedRate(new IdleCheck(idleStrategy), 
config.getCheckIdleInterval(), config.getCheckIdleInterval(), 
TimeUnit.MILLISECONDS);
+                scheduler.scheduleAtFixedRate(new 
IdleCheck(idleStrategy.get()), config.getCheckIdleInterval(), 
config.getCheckIdleInterval(), TimeUnit.MILLISECONDS);
             }
         } catch (Exception e) {
             LOG.warn("Failed to start the HDFS producer. Caused by: [{}]", 
e.getMessage());
@@ -129,13 +124,13 @@ public class HdfsProducer extends DefaultProducer {
         }
     }
 
-    private synchronized HdfsOutputStream setupHdfs(boolean onStartup) throws 
Exception {
+    private synchronized HdfsOutputStream setupHdfs(boolean onStartup) throws 
IOException {
         if (ostream != null) {
             return ostream;
         }
 
         StringBuilder actualPath = new StringBuilder(hdfsPath);
-        if (config.getSplitStrategies().size() > 0) {
+        if (config.hasSplitStrategies()) {
             actualPath = newFileName();
         }
 
@@ -161,6 +156,15 @@ public class HdfsProducer extends DefaultProducer {
         return answer;
     }
 
+    private Optional<SplitStrategy> tryFindIdleStrategy(List<SplitStrategy> 
strategies) {
+        for (SplitStrategy strategy : strategies) {
+            if (strategy.type == SplitStrategyType.IDLE) {
+                return Optional.of(strategy);
+            }
+        }
+        return Optional.empty();
+    }
+
     @Override
     protected void doStop() throws Exception {
         super.doStop();
@@ -185,7 +189,7 @@ public class HdfsProducer extends DefaultProducer {
         }
     }
 
-    void doProcess(Exchange exchange) throws Exception {
+    void doProcess(Exchange exchange) throws IOException {
         Object body = exchange.getIn().getBody();
         Object key = exchange.getIn().getHeader(HdfsHeader.KEY.name());
 
@@ -201,13 +205,7 @@ public class HdfsProducer extends DefaultProducer {
             ostream = setupHdfs(false);
         }
 
-        boolean split = false;
-        List<SplitStrategy> strategies = config.getSplitStrategies();
-        for (SplitStrategy splitStrategy : strategies) {
-            split |= splitStrategy.getType().split(ostream, 
splitStrategy.value, this);
-        }
-
-        if (split) {
+        if (isSplitRequired(config.getSplitStrategies())) {
             if (ostream != null) {
                 IOHelper.close(ostream, "output stream", log);
             }
@@ -260,6 +258,14 @@ public class HdfsProducer extends DefaultProducer {
         return actualPath.append(fileName);
     }
 
+    private boolean isSplitRequired(List<SplitStrategy> strategies) {
+        boolean split = false;
+        for (SplitStrategy splitStrategy : strategies) {
+            split |= splitStrategy.getType().split(ostream, 
splitStrategy.value, this);
+        }
+        return split;
+    }
+
     private StringBuilder newFileName() {
         StringBuilder actualPath = new StringBuilder(hdfsPath);
         
actualPath.append(StringHelper.sanitize(getEndpoint().getCamelContext().getUuidGenerator().generateUuid()));
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java
new file mode 100644
index 0000000..0bfde2e
--- /dev/null
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.hdfs;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.TypeConverter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
+
+class HdfsSequenceFileType extends DefaultHdfsFileType {
+
+    @Override
+    public long append(HdfsOutputStream hdfsostr, Object key, Object value, 
TypeConverter typeConverter) {
+        try {
+            Holder<Integer> keySize = new Holder<>();
+            Writable keyWritable = getWritable(key, typeConverter, keySize);
+            Holder<Integer> valueSize = new Holder<>();
+            Writable valueWritable = getWritable(value, typeConverter, 
valueSize);
+            SequenceFile.Writer writer = (SequenceFile.Writer) 
hdfsostr.getOut();
+            writer.append(keyWritable, valueWritable);
+            writer.sync();
+            return Long.sum(keySize.value, valueSize.value);
+        } catch (Exception ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    public long next(HdfsInputStream hdfsistr, Holder<Object> key, 
Holder<Object> value) {
+        try {
+            SequenceFile.Reader reader = (SequenceFile.Reader) 
hdfsistr.getIn();
+            Holder<Integer> keySize = new Holder<>();
+            Writable keyWritable = (Writable) 
ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration());
+            Holder<Integer> valueSize = new Holder<>();
+            Writable valueWritable = (Writable) 
ReflectionUtils.newInstance(reader.getValueClass(), new Configuration());
+            if (reader.next(keyWritable, valueWritable)) {
+                key.value = getObject(keyWritable, keySize);
+                value.value = getObject(valueWritable, valueSize);
+                return Long.sum(keySize.value, valueSize.value);
+            } else {
+                return 0;
+            }
+        } catch (Exception ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    public Closeable createOutputStream(String hdfsPath, HdfsConfiguration 
configuration) {
+        try {
+            Closeable rout;
+            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
+            Class<?> keyWritableClass = 
configuration.getKeyType().getWritableClass();
+            Class<?> valueWritableClass = 
configuration.getValueType().getWritableClass();
+            rout = SequenceFile.createWriter(hdfsInfo.getConf(), 
SequenceFile.Writer.file(hdfsInfo.getPath()), 
SequenceFile.Writer.keyClass(keyWritableClass),
+                    SequenceFile.Writer.valueClass(valueWritableClass), 
SequenceFile.Writer.bufferSize(configuration.getBufferSize()),
+                    
SequenceFile.Writer.replication(configuration.getReplication()), 
SequenceFile.Writer.blockSize(configuration.getBlockSize()),
+                    
SequenceFile.Writer.compression(configuration.getCompressionType(), 
configuration.getCompressionCodec().getCodec()),
+                    SequenceFile.Writer.progressable(() -> {
+                    }), SequenceFile.Writer.metadata(new 
SequenceFile.Metadata()));
+            return rout;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+
+    @Override
+    public Closeable createInputStream(String hdfsPath, HdfsConfiguration 
configuration) {
+        try {
+            Closeable rin;
+            HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, 
configuration);
+            rin = new SequenceFile.Reader(hdfsInfo.getConf(), 
SequenceFile.Reader.file(hdfsInfo.getPath()));
+            return rin;
+        } catch (IOException ex) {
+            throw new RuntimeCamelException(ex);
+        }
+    }
+}
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
index 910aa2b..07c5eb8 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.TypeConverter;
-import org.apache.camel.util.IOHelper;
 import org.apache.hadoop.io.BooleanWritable;
 import org.apache.hadoop.io.ByteWritable;
 import org.apache.hadoop.io.BytesWritable;
@@ -215,9 +214,7 @@ public class HdfsWritableFactories {
 
         @Override
         public Writable create(Object value, TypeConverter typeConverter, 
Holder<Integer> size) {
-            InputStream is = null;
-            try {
-                is = typeConverter.convertTo(InputStream.class, value);
+            try (InputStream is = typeConverter.convertTo(InputStream.class, 
value)) {
                 ByteArrayOutputStream bos = new ByteArrayOutputStream();
                 IOUtils.copyBytes(is, bos, HdfsConstants.DEFAULT_BUFFERSIZE, 
false);
                 BytesWritable writable = new BytesWritable();
@@ -226,8 +223,6 @@ public class HdfsWritableFactories {
                 return writable;
             } catch (IOException ex) {
                 throw new RuntimeCamelException(ex);
-            } finally {
-                IOHelper.close(is);
             }
         }
 
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfiguration.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfiguration.java
index 0cff2f5..4422a08 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfiguration.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfiguration.java
@@ -24,12 +24,13 @@ import java.util.stream.Collectors;
 
 import static java.lang.String.format;
 
-import org.apache.camel.component.hdfs.HdfsComponent;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import 
org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
@@ -37,8 +38,12 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
 
 public class KerberosConfiguration extends Configuration {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(KerberosConfiguration.class);
+
     private static final String HFDS_NAMED_SERVICE = "hfdsNamedService";
 
+    private static final String KERBEROS_5_SYS_ENV = "java.security.krb5.conf";
+
     private static final String AUTHENTICATION_MODE = 
"hadoop.security.authentication";
     private static final String HFDS_FS = "fs.defaultFS";
 
@@ -63,7 +68,7 @@ public class KerberosConfiguration extends Configuration {
                                  String kerberosConfigFileLocation,
                                  int replicationFactor) {
 
-        HdfsComponent.setKerberosConfigFile(kerberosConfigFileLocation);
+        setKerberosConfigFile(kerberosConfigFileLocation);
         setupHdfsConfiguration(namedNodes, replicationFactor);
     }
 
@@ -106,6 +111,26 @@ public class KerberosConfiguration extends Configuration {
         UserGroupInformation.loginUserFromKeytab(username, keyTabFileLocation);
     }
 
+    /**
+     * To use kerberos authentication, set the value of the 
'java.security.krb5.conf' environment variable to an existing file.
+     * If the environment variable is already set, warn if different than the 
specified parameter
+     *
+     * @param kerberosConfigFileLocation - kerb5.conf file 
(https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html)
+     */
+    public static void setKerberosConfigFile(String 
kerberosConfigFileLocation) {
+        if (!new File(kerberosConfigFileLocation).exists()) {
+            LOG.warn("Kerberos configuration file [{}}] could not be found.", 
kerberosConfigFileLocation);
+            return;
+        }
+
+        String krb5Conf = System.getProperty(KERBEROS_5_SYS_ENV);
+        if (krb5Conf == null || !krb5Conf.isEmpty()) {
+            System.setProperty(KERBEROS_5_SYS_ENV, kerberosConfigFileLocation);
+        } else if (!krb5Conf.equalsIgnoreCase(kerberosConfigFileLocation)) {
+            LOG.warn("[{}] was already configured with: [{}] config file", 
KERBEROS_5_SYS_ENV, krb5Conf);
+        }
+    }
+
     private String nodeToString(String nodeName) {
         return nodeName.replaceAll(":[0-9]*", "").replaceAll("\\.", "_");
     }
diff --git 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/osgi/HdfsActivator.java
 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/osgi/HdfsActivator.java
index 811afa3..3a01814 100644
--- 
a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/osgi/HdfsActivator.java
+++ 
b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/osgi/HdfsActivator.java
@@ -24,21 +24,22 @@ public class HdfsActivator implements BundleActivator {
 
     @Override
     public void start(BundleContext context) throws Exception {
+        // no action needed
     }
 
     @Override
     public void stop(BundleContext context) throws Exception {
-        // There's problem inside OSGi when framwork is being shutdown
+        // There's problem inside OSGi when framework is being shutdown
         // hadoop.fs code registers some JVM shutdown hooks throughout the 
code and this ordered
         // list of hooks is run in shutdown thread.
         // At that time bundle class loader / bundle wiring is no longer valid 
(bundle is stopped)
         // so ShutdownHookManager can't load additional classes. But there are 
some inner classes
         // loaded when iterating over registered hadoop shutdown hooks.
-        // Let's explicitely load these inner classes when bundle is stopped, 
as there's last chance
+        // Let's explicitly load these inner classes when bundle is stopped, 
as there's last chance
         // to use valid bundle class loader.
         // This is based on the knowledge of what's contained in SMX bundle
         // org.apache.servicemix.bundles.hadoop-client-*.jar
-        // the above is just a warning that hadopp may have some quirks when 
running inside OSGi
+        // the above is just a warning that hadoop may have some quirks when 
running inside OSGi
         ClassLoader hadoopCl = ShutdownHookManager.class.getClassLoader();
         if (hadoopCl != null) {
             String shm = ShutdownHookManager.class.getName();
diff --git 
a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationTest.java
 
b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationTest.java
index 08f34a2..9c9678b 100644
--- 
a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationTest.java
+++ 
b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationTest.java
@@ -66,7 +66,7 @@ public class KerberosConfigurationTest {
         underTest = new KerberosConfiguration(namedNodes, 
kerberosConfigFileLocation, replicationFactor);
 
         // then
-        /* exception was thrown */
+        /* message is printed in the logs */
     }
 
     @Test(expected = FileNotFoundException.class)

Reply via email to