This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 73b1d20 CAMEL-14011 - Refactor camel-hdfs component to reduce issues reported by sonar (#3210) 73b1d20 is described below commit 73b1d202c233cdd7e639d2e460e2ee963ad59646 Author: Marius Cornescu <marius_corne...@yahoo.com> AuthorDate: Mon Sep 30 05:34:51 2019 +0200 CAMEL-14011 - Refactor camel-hdfs component to reduce issues reported by sonar (#3210) --- .../camel/component/hdfs/DefaultHdfsFileType.java | 114 +++++ .../camel/component/hdfs/HdfsArrayFileType.java | 87 ++++ .../camel/component/hdfs/HdfsBloommapFileType.java | 98 ++++ .../apache/camel/component/hdfs/HdfsComponent.java | 14 +- .../camel/component/hdfs/HdfsConfiguration.java | 19 +- .../apache/camel/component/hdfs/HdfsConsumer.java | 130 +++--- .../org/apache/camel/component/hdfs/HdfsFile.java | 33 ++ .../camel/component/hdfs/HdfsFileSystemType.java | 4 +- .../apache/camel/component/hdfs/HdfsFileType.java | 497 +-------------------- .../camel/component/hdfs/HdfsInputStream.java | 36 +- .../camel/component/hdfs/HdfsMapFileType.java | 96 ++++ .../camel/component/hdfs/HdfsNormalFileType.java | 145 ++++++ .../apache/camel/component/hdfs/HdfsProducer.java | 44 +- .../camel/component/hdfs/HdfsSequenceFileType.java | 97 ++++ .../component/hdfs/HdfsWritableFactories.java | 7 +- .../hdfs/kerberos/KerberosConfiguration.java | 29 +- .../camel/component/hdfs/osgi/HdfsActivator.java | 7 +- .../hdfs/kerberos/KerberosConfigurationTest.java | 2 +- 18 files changed, 861 insertions(+), 598 deletions(-) diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFileType.java new file mode 100644 index 0000000..4fdcbeb --- /dev/null +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/DefaultHdfsFileType.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hdfs; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.TypeConverter; +import org.apache.camel.util.IOHelper; +import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.ByteWritable; +import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DoubleWritable; +import org.apache.hadoop.io.FloatWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +abstract class DefaultHdfsFileType implements HdfsFile { + + protected final long copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException { + long numBytes = 0; + PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null; + byte[] buf = new byte[buffSize]; + try { + int bytesRead = in.read(buf); + while (bytesRead >= 0) { + out.write(buf, 0, bytesRead); + numBytes += bytesRead; + if ((ps != null) && ps.checkError()) { + throw new IOException("Unable to write to output stream."); + } + bytesRead = in.read(buf); + } + } finally { + if (close) { + IOHelper.close(out, in); + } + } + return numBytes; + } + + protected final Writable getWritable(Object obj, TypeConverter typeConverter, Holder<Integer> size) { + Class<?> objCls = obj == null ? null : obj.getClass(); + HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.get(objCls); + if (objWritableFactory == null) { + objWritableFactory = new HdfsWritableFactories.HdfsObjectWritableFactory(); + } + return objWritableFactory.create(obj, typeConverter, size); + } + + protected final Object getObject(Writable writable, Holder<Integer> size) { + Class<?> writableClass = NullWritable.class; + if (writable != null) { + writableClass = writable.getClass(); + } + HdfsWritableFactories.HdfsWritableFactory writableObjectFactory = WritableCache.readables.get(writableClass); + return writableObjectFactory.read(writable, size); + } + + @SuppressWarnings({"rawtypes"}) + private static final class WritableCache { + private static Map<Class, HdfsWritableFactories.HdfsWritableFactory> writables = new HashMap<>(); + private static Map<Class, HdfsWritableFactories.HdfsWritableFactory> readables = new HashMap<>(); + + private WritableCache() { + } + + static { + writables.put(Boolean.class, new HdfsWritableFactories.HdfsBooleanWritableFactory()); + writables.put(Byte.class, new HdfsWritableFactories.HdfsByteWritableFactory()); + writables.put(ByteBuffer.class, new HdfsWritableFactories.HdfsBytesWritableFactory()); + writables.put(Double.class, new HdfsWritableFactories.HdfsDoubleWritableFactory()); + writables.put(Float.class, new HdfsWritableFactories.HdfsFloatWritableFactory()); + writables.put(Integer.class, new HdfsWritableFactories.HdfsIntWritableFactory()); + writables.put(Long.class, new HdfsWritableFactories.HdfsLongWritableFactory()); + writables.put(String.class, new HdfsWritableFactories.HdfsTextWritableFactory()); + writables.put(null, new HdfsWritableFactories.HdfsNullWritableFactory()); + } + + static { + readables.put(BooleanWritable.class, new HdfsWritableFactories.HdfsBooleanWritableFactory()); + readables.put(ByteWritable.class, new HdfsWritableFactories.HdfsByteWritableFactory()); + readables.put(BytesWritable.class, new HdfsWritableFactories.HdfsBytesWritableFactory()); + readables.put(DoubleWritable.class, new HdfsWritableFactories.HdfsDoubleWritableFactory()); + readables.put(FloatWritable.class, new HdfsWritableFactories.HdfsFloatWritableFactory()); + readables.put(IntWritable.class, new HdfsWritableFactories.HdfsIntWritableFactory()); + readables.put(LongWritable.class, new HdfsWritableFactories.HdfsLongWritableFactory()); + readables.put(Text.class, new HdfsWritableFactories.HdfsTextWritableFactory()); + readables.put(NullWritable.class, new HdfsWritableFactories.HdfsNullWritableFactory()); + } + } +} diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java new file mode 100644 index 0000000..4c4123c --- /dev/null +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hdfs; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.TypeConverter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.ArrayFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.util.ReflectionUtils; + +class HdfsArrayFileType extends DefaultHdfsFileType { + + @Override + public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) { + try { + Holder<Integer> valueSize = new Holder<>(); + Writable valueWritable = getWritable(value, typeConverter, valueSize); + ((ArrayFile.Writer) hdfsostr.getOut()).append(valueWritable); + return valueSize.value; + } catch (Exception ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override + public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) { + try { + ArrayFile.Reader reader = (ArrayFile.Reader) hdfsistr.getIn(); + Holder<Integer> valueSize = new Holder<>(); + Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); + if (reader.next(valueWritable) != null) { + value.value = getObject(valueWritable, valueSize); + return valueSize.value; + } else { + return 0; + } + } catch (Exception ex) { + throw new RuntimeCamelException(ex); + } + } + + @SuppressWarnings("rawtypes") + @Override + public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) { + try { + Closeable rout; + HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); + Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass(); + rout = new ArrayFile.Writer(hdfsInfo.getConf(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass, + configuration.getCompressionType(), () -> { }); + return rout; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override + public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) { + try { + Closeable rin; + HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); + rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConf()); + return rin; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } +} diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java new file mode 100644 index 0000000..be4c6d3 --- /dev/null +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hdfs; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.TypeConverter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.BloomMapFile; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.util.ReflectionUtils; + +class HdfsBloommapFileType extends DefaultHdfsFileType { + + @Override + public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) { + try { + Holder<Integer> keySize = new Holder<>(); + Writable keyWritable = getWritable(key, typeConverter, keySize); + Holder<Integer> valueSize = new Holder<>(); + Writable valueWritable = getWritable(value, typeConverter, valueSize); + ((BloomMapFile.Writer) hdfsostr.getOut()).append((WritableComparable<?>) keyWritable, valueWritable); + return Long.sum(keySize.value, valueSize.value); + } catch (Exception ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override + public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) { + try { + MapFile.Reader reader = (BloomMapFile.Reader) hdfsistr.getIn(); + Holder<Integer> keySize = new Holder<>(); + WritableComparable<?> keyWritable = (WritableComparable<?>) ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration()); + Holder<Integer> valueSize = new Holder<>(); + Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); + if (reader.next(keyWritable, valueWritable)) { + key.value = getObject(keyWritable, keySize); + value.value = getObject(valueWritable, valueSize); + return Long.sum(keySize.value, valueSize.value); + } else { + return 0; + } + } catch (Exception ex) { + throw new RuntimeCamelException(ex); + } + } + + @SuppressWarnings("rawtypes") + @Override + public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) { + try { + Closeable rout; + HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); + Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass(); + Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass(); + rout = new BloomMapFile.Writer(hdfsInfo.getConf(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), + MapFile.Writer.valueClass(valueWritableClass), + MapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()), + MapFile.Writer.progressable(() -> { + })); + return rout; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override + public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) { + try { + Closeable rin; + HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); + rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConf()); + return rin; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } +} diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsComponent.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsComponent.java index 07fe6b6..ebeafad 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsComponent.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsComponent.java @@ -16,13 +16,13 @@ */ package org.apache.camel.component.hdfs; -import java.io.File; import java.net.URL; import java.util.Map; import javax.security.auth.login.Configuration; import org.apache.camel.Endpoint; +import org.apache.camel.component.hdfs.kerberos.KerberosConfiguration; import org.apache.camel.spi.annotations.Component; import org.apache.camel.support.DefaultComponent; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; @@ -90,17 +90,7 @@ public class HdfsComponent extends DefaultComponent { * @param kerberosConfigFileLocation - kerb5.conf file (https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html) */ public static void setKerberosConfigFile(String kerberosConfigFileLocation) { - if (!new File(kerberosConfigFileLocation).exists()) { - LOG.warn("Kerberos configuration file [{}}] could not be found.", kerberosConfigFileLocation); - return; - } - - String krb5Conf = System.getProperty(KERBEROS_5_SYS_ENV); - if (krb5Conf == null || !krb5Conf.isEmpty()) { - System.setProperty(KERBEROS_5_SYS_ENV, kerberosConfigFileLocation); - } else if (!krb5Conf.equalsIgnoreCase(kerberosConfigFileLocation)) { - LOG.warn("[{}] was already configured with: [{}] config file", KERBEROS_5_SYS_ENV, krb5Conf); - } + KerberosConfiguration.setKerberosConfigFile(kerberosConfigFileLocation); } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java index ce03ef4..51e4884 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java @@ -191,15 +191,15 @@ public class HdfsConfiguration { splitStrategy = getString(hdfsSettings, "splitStrategy", kerberosNamedNodes); if (isNotEmpty(splitStrategy)) { - String[] strstrategies = splitStrategy.split(","); - for (String strstrategy : strstrategies) { - String[] tokens = strstrategy.split(":"); + String[] strategyElements = splitStrategy.split(","); + for (String strategyElement : strategyElements) { + String[] tokens = strategyElement.split(":"); if (tokens.length != 2) { throw new IllegalArgumentException("Wrong Split Strategy [splitStrategy" + "=" + splitStrategy + "]"); } - HdfsProducer.SplitStrategyType sst = HdfsProducer.SplitStrategyType.valueOf(tokens[0]); - long ssv = Long.parseLong(tokens[1]); - strategies.add(new HdfsProducer.SplitStrategy(sst, ssv)); + HdfsProducer.SplitStrategyType strategyType = HdfsProducer.SplitStrategyType.valueOf(tokens[0]); + long strategyValue = Long.parseLong(tokens[1]); + strategies.add(new HdfsProducer.SplitStrategy(strategyType, strategyValue)); } } return strategies; @@ -216,11 +216,12 @@ public class HdfsConfiguration { } public void checkConsumerOptions() { + // no validation required } public void checkProducerOptions() { if (isAppend()) { - if (!getSplitStrategies().isEmpty()) { + if (hasSplitStrategies()) { throw new IllegalArgumentException("Split Strategies incompatible with append=true"); } if (getFileType() != HdfsFileType.NORMAL_FILE) { @@ -491,6 +492,10 @@ public class HdfsConfiguration { return splitStrategies; } + public boolean hasSplitStrategies() { + return !splitStrategies.isEmpty(); + } + public String getSplitStrategy() { return splitStrategy; } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java index 98e1c7d..7fb0288 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java @@ -17,7 +17,10 @@ package org.apache.camel.component.hdfs; import java.io.IOException; +import java.util.Arrays; +import java.util.Objects; import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -41,7 +44,6 @@ public final class HdfsConsumer extends ScheduledPollConsumer { private final StringBuilder hdfsPath; private final Processor processor; private final ReadWriteLock rwlock = new ReentrantReadWriteLock(); - private volatile HdfsInputStream inputStream; public HdfsConsumer(HdfsEndpoint endpoint, Processor processor, HdfsConfiguration config) { super(endpoint, processor); @@ -66,7 +68,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer { } } - private HdfsInfo setupHdfs(boolean onStartup) throws Exception { + private HdfsInfo setupHdfs(boolean onStartup) throws IOException { // if we are starting up then log at info level, and if runtime then log at debug level to not flood the log if (onStartup) { log.info("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", config.getHostName(), config.getPort(), hdfsPath); @@ -100,7 +102,7 @@ public final class HdfsConsumer extends ScheduledPollConsumer { } } - protected int doPoll() throws Exception { + protected int doPoll() throws IOException { class ExcludePathFilter implements PathFilter { @Override public boolean accept(Path path) { @@ -108,8 +110,6 @@ public final class HdfsConsumer extends ScheduledPollConsumer { } } - int numMessages = 0; - HdfsInfo info = setupHdfs(false); FileStatus[] fileStatuses; if (info.getFileSystem().isFile(info.getPath())) { @@ -121,78 +121,90 @@ public final class HdfsConsumer extends ScheduledPollConsumer { fileStatuses = Optional.ofNullable(fileStatuses).orElse(new FileStatus[0]); - for (FileStatus status : fileStatuses) { + return processFileStatuses(info, fileStatuses); + } - if (normalFileIsDirectoryNoSuccessFile(status, info)) { - continue; - } + private int processFileStatuses(HdfsInfo info, FileStatus[] fileStatuses) { + final AtomicInteger messageCount = new AtomicInteger(0); - if (config.getOwner() != null) { - // must match owner - if (!config.getOwner().equals(status.getOwner())) { - if (log.isDebugEnabled()) { - log.debug("Skipping file: {} as not matching owner: {}", status.getPath(), config.getOwner()); + Arrays.stream(fileStatuses) + .filter(status -> normalFileIsDirectoryHasSuccessFile(status, info)) + .filter(this::hasMatchingOwner) + .map(this::createInputStream) + .filter(Objects::nonNull) + .forEach(hdfsInputStream -> { + try { + processHdfsInputStream(hdfsInputStream, messageCount, fileStatuses.length); + } finally { + IOHelper.close(hdfsInputStream, "input stream", log); } - continue; - } - } + }); - try { - this.rwlock.writeLock().lock(); - this.inputStream = HdfsInputStream.createInputStream(status.getPath().toString(), this.config); - if (!this.inputStream.isOpened()) { - if (log.isDebugEnabled()) { - log.debug("Skipping file: {} because it doesn't exist anymore", status.getPath()); - } - continue; - } - } finally { - this.rwlock.writeLock().unlock(); + return messageCount.get(); + } + + private void processHdfsInputStream(HdfsInputStream inputStream, AtomicInteger messageCount, int totalFiles) { + Holder<Object> key = new Holder<>(); + Holder<Object> value = new Holder<>(); + while (inputStream.next(key, value) >= 0) { + Exchange exchange = this.getEndpoint().createExchange(); + Message message = exchange.getIn(); + String fileName = StringUtils.substringAfterLast(inputStream.getActualPath(), "/"); + message.setHeader(Exchange.FILE_NAME, fileName); + if (key.value != null) { + message.setHeader(HdfsHeader.KEY.name(), key.value); } + message.setBody(value.value); + log.debug("Processing file {}", fileName); try { - Holder<Object> key = new Holder<>(); - Holder<Object> value = new Holder<>(); - while (this.inputStream.next(key, value) >= 0) { - Exchange exchange = this.getEndpoint().createExchange(); - Message message = exchange.getIn(); - String fileName = StringUtils.substringAfterLast(status.getPath().toString(), "/"); - message.setHeader(Exchange.FILE_NAME, fileName); - if (key.value != null) { - message.setHeader(HdfsHeader.KEY.name(), key.value); - } - message.setBody(value.value); + processor.process(exchange); + } catch (Exception e) { + exchange.setException(e); + } - log.debug("Processing file {}", fileName); - try { - processor.process(exchange); - } catch (Exception e) { - exchange.setException(e); - } + // in case of unhandled exceptions then let the exception handler handle them + if (exchange.getException() != null) { + getExceptionHandler().handleException(exchange.getException()); + } - // in case of unhandled exceptions then let the exception handler handle them - if (exchange.getException() != null) { - getExceptionHandler().handleException(exchange.getException()); - } + int count = messageCount.incrementAndGet(); + log.debug("Processed [{}] files out of [{}]", count, totalFiles); + } + } - numMessages++; + private boolean normalFileIsDirectoryHasSuccessFile(FileStatus fileStatus, HdfsInfo info) { + if (config.getFileType().equals(HdfsFileType.NORMAL_FILE) && fileStatus.isDirectory()) { + try { + Path successPath = new Path(fileStatus.getPath().toString() + "/_SUCCESS"); + if (!info.getFileSystem().exists(successPath)) { + return false; } - } finally { - IOHelper.close(inputStream, "input stream", log); + } catch (IOException e) { + throw new RuntimeException(e); } } - - return numMessages; + return true; } - private boolean normalFileIsDirectoryNoSuccessFile(FileStatus status, HdfsInfo info) throws IOException { - if (config.getFileType().equals(HdfsFileType.NORMAL_FILE) && status.isDirectory()) { - Path successPath = new Path(status.getPath().toString() + "/_SUCCESS"); - if (!info.getFileSystem().exists(successPath)) { - return true; + private boolean hasMatchingOwner(FileStatus fileStatus) { + if (config.getOwner() != null && !config.getOwner().equals(fileStatus.getOwner())) { + if (log.isDebugEnabled()) { + log.debug("Skipping file: {} as not matching owner: {}", fileStatus.getPath(), config.getOwner()); } + return true; } return false; } + private HdfsInputStream createInputStream(FileStatus fileStatus) { + try { + this.rwlock.writeLock().lock(); + + return HdfsInputStream.createInputStream(fileStatus.getPath().toString(), this.config); + } finally { + this.rwlock.writeLock().unlock(); + } + } + } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java new file mode 100644 index 0000000..f08cb18 --- /dev/null +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFile.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hdfs; + +import java.io.Closeable; + +import org.apache.camel.TypeConverter; + +interface HdfsFile { + + long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter); + + long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value); + + Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration); + + Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration); + +} diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java index 9d8c79b..744fdab 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileSystemType.java @@ -24,7 +24,7 @@ public enum HdfsFileSystemType { StringBuilder hpath = new StringBuilder(); hpath.append("file://"); hpath.append(config.getPath()); - if (!config.getSplitStrategies().isEmpty()) { + if (config.hasSplitStrategies()) { hpath.append('/'); } return hpath; @@ -40,7 +40,7 @@ public enum HdfsFileSystemType { hpath.append(':'); hpath.append(config.getPort()); hpath.append(config.getPath()); - if (!config.getSplitStrategies().isEmpty()) { + if (config.hasSplitStrategies()) { hpath.append('/'); } return hpath; diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java index 713e271..fe18006 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsFileType.java @@ -16,499 +16,38 @@ */ package org.apache.camel.component.hdfs; -import java.io.ByteArrayOutputStream; import java.io.Closeable; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.io.PrintStream; -import java.nio.ByteBuffer; -import java.nio.file.Files; -import java.util.HashMap; -import java.util.Map; -import org.apache.camel.RuntimeCamelException; import org.apache.camel.TypeConverter; -import org.apache.camel.util.IOHelper; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.io.ArrayFile; -import org.apache.hadoop.io.BloomMapFile; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.ByteWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.DoubleWritable; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.MapFile; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.SequenceFile; -import org.apache.hadoop.io.SequenceFile.Reader; -import org.apache.hadoop.io.SequenceFile.Writer; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableComparable; -import org.apache.hadoop.util.ReflectionUtils; public enum HdfsFileType { - NORMAL_FILE { - @Override - public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) { - InputStream is = null; - try { - is = typeConverter.convertTo(InputStream.class, value); - return copyBytes(is, (FSDataOutputStream) hdfsostr.getOut(), HdfsConstants.DEFAULT_BUFFERSIZE, false); - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } finally { - IOHelper.close(is); - } - } + NORMAL_FILE(new HdfsNormalFileType()), + SEQUENCE_FILE(new HdfsSequenceFileType()), + MAP_FILE(new HdfsMapFileType()), + BLOOMMAP_FILE(new HdfsBloommapFileType()), + ARRAY_FILE(new HdfsArrayFileType()); - @Override - public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { - try { - ByteArrayOutputStream bos = new ByteArrayOutputStream(hdfsInputStream.getChunkSize()); - byte[] buf = new byte[hdfsInputStream.getChunkSize()]; - int bytesRead = ((InputStream) hdfsInputStream.getIn()).read(buf); - if (bytesRead >= 0) { - bos.write(buf, 0, bytesRead); - key.value = null; - value.value = bos; - return bytesRead; - } else { - key.value = null; - // indication that we may have read from empty file - value.value = bos; - return 0; - } - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } + private final HdfsFile file; - @Override - public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) { - try { - Closeable rout; - HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); - if (!configuration.isAppend()) { - rout = hdfsInfo.getFileSystem().create(hdfsInfo.getPath(), configuration.isOverwrite(), configuration.getBufferSize(), - configuration.getReplication(), configuration.getBlockSize(), () -> { }); - } else { - rout = hdfsInfo.getFileSystem().append(hdfsInfo.getPath(), configuration.getBufferSize(), () -> { }); - } - return rout; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } - - @Override - public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) { - try { - Closeable rin; - if (configuration.getFileSystemType().equals(HdfsFileSystemType.LOCAL)) { - HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); - rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath()); - } else { - rin = new FileInputStream(getHfdsFileToTmpFile(hdfsPath, configuration)); - } - return rin; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } - - private File getHfdsFileToTmpFile(String hdfsPath, HdfsConfiguration configuration) { - try { - String fname = hdfsPath.substring(hdfsPath.lastIndexOf('/')); - - // [CAMEL-13711] Files.createTempFile not equivalent to File.createTempFile - - File outputDest; - try { - - // First trying: Files.createTempFile - outputDest = Files.createTempFile(fname, ".hdfs").toFile(); - - } catch (Exception ex) { - - // Now trying: File.createTempFile - outputDest = File.createTempFile(fname, ".hdfs"); - } - - if (outputDest.exists()) { - outputDest.delete(); - } - - HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); - FileSystem fileSystem = hdfsInfo.getFileSystem(); - FileUtil.copy(fileSystem, new Path(hdfsPath), outputDest, false, fileSystem.getConf()); - try { - FileUtil.copyMerge( - fileSystem, // src - new Path(hdfsPath), - FileSystem.getLocal(new Configuration()), // dest - new Path(outputDest.toURI()), - false, fileSystem.getConf(), null); - } catch (IOException e) { - return outputDest; - } - - return new File(outputDest, fname); - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } - }, - - SEQUENCE_FILE { - @Override - public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) { - try { - Holder<Integer> keySize = new Holder<>(); - Writable keyWritable = getWritable(key, typeConverter, keySize); - Holder<Integer> valueSize = new Holder<>(); - Writable valueWritable = getWritable(value, typeConverter, valueSize); - Writer writer = (SequenceFile.Writer) hdfsostr.getOut(); - writer.append(keyWritable, valueWritable); - writer.sync(); - return keySize.value + valueSize.value; - } catch (Exception ex) { - throw new RuntimeCamelException(ex); - } - } - - @Override - public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) { - try { - SequenceFile.Reader reader = (SequenceFile.Reader) hdfsistr.getIn(); - Holder<Integer> keySize = new Holder<>(); - Writable keyWritable = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration()); - Holder<Integer> valueSize = new Holder<>(); - Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); - if (reader.next(keyWritable, valueWritable)) { - key.value = getObject(keyWritable, keySize); - value.value = getObject(valueWritable, valueSize); - return keySize.value + valueSize.value; - } else { - return 0; - } - } catch (Exception ex) { - throw new RuntimeCamelException(ex); - } - } - - @Override - public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) { - try { - Closeable rout; - HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); - Class<?> keyWritableClass = configuration.getKeyType().getWritableClass(); - Class<?> valueWritableClass = configuration.getValueType().getWritableClass(); - rout = SequenceFile.createWriter(hdfsInfo.getConf(), Writer.file(hdfsInfo.getPath()), Writer.keyClass(keyWritableClass), - Writer.valueClass(valueWritableClass), Writer.bufferSize(configuration.getBufferSize()), - Writer.replication(configuration.getReplication()), Writer.blockSize(configuration.getBlockSize()), - Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()), - Writer.progressable(() -> { - }), Writer.metadata(new SequenceFile.Metadata())); - return rout; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } - - @Override - public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) { - try { - Closeable rin; - HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); - rin = new SequenceFile.Reader(hdfsInfo.getConf(), Reader.file(hdfsInfo.getPath())); - return rin; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } - }, - - MAP_FILE { - @Override - public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) { - try { - Holder<Integer> keySize = new Holder<>(); - Writable keyWritable = getWritable(key, typeConverter, keySize); - Holder<Integer> valueSize = new Holder<>(); - Writable valueWritable = getWritable(value, typeConverter, valueSize); - ((MapFile.Writer) hdfsostr.getOut()).append((WritableComparable<?>) keyWritable, valueWritable); - return keySize.value + valueSize.value; - } catch (Exception ex) { - throw new RuntimeCamelException(ex); - } - } - - @Override - public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { - try { - MapFile.Reader reader = (MapFile.Reader) hdfsInputStream.getIn(); - Holder<Integer> keySize = new Holder<>(); - WritableComparable<?> keyWritable = (WritableComparable<?>) ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration()); - Holder<Integer> valueSize = new Holder<>(); - Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); - if (reader.next(keyWritable, valueWritable)) { - key.value = getObject(keyWritable, keySize); - value.value = getObject(valueWritable, valueSize); - return keySize.value + valueSize.value; - } else { - return 0; - } - } catch (Exception ex) { - throw new RuntimeCamelException(ex); - } - } - - @Override - @SuppressWarnings("rawtypes") - public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) { - try { - Closeable rout; - HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); - Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass(); - Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass(); - rout = new MapFile.Writer(hdfsInfo.getConf(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass), - MapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()), - MapFile.Writer.progressable(() -> { - })); - return rout; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } - - @Override - public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) { - try { - Closeable rin; - HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); - rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConf()); - return rin; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } - }, - - BLOOMMAP_FILE { - @Override - public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) { - try { - Holder<Integer> keySize = new Holder<>(); - Writable keyWritable = getWritable(key, typeConverter, keySize); - Holder<Integer> valueSize = new Holder<>(); - Writable valueWritable = getWritable(value, typeConverter, valueSize); - ((BloomMapFile.Writer) hdfsostr.getOut()).append((WritableComparable<?>) keyWritable, valueWritable); - return keySize.value + valueSize.value; - } catch (Exception ex) { - throw new RuntimeCamelException(ex); - } - } - - @Override - public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) { - try { - MapFile.Reader reader = (BloomMapFile.Reader) hdfsistr.getIn(); - Holder<Integer> keySize = new Holder<>(); - WritableComparable<?> keyWritable = (WritableComparable<?>) ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration()); - Holder<Integer> valueSize = new Holder<>(); - Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); - if (reader.next(keyWritable, valueWritable)) { - key.value = getObject(keyWritable, keySize); - value.value = getObject(valueWritable, valueSize); - return keySize.value + valueSize.value; - } else { - return 0; - } - } catch (Exception ex) { - throw new RuntimeCamelException(ex); - } - } - - @SuppressWarnings("rawtypes") - @Override - public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) { - try { - Closeable rout; - HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); - Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass(); - Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass(); - rout = new BloomMapFile.Writer(hdfsInfo.getConf(), new Path(hdfsPath), org.apache.hadoop.io.MapFile.Writer.keyClass(keyWritableClass), - org.apache.hadoop.io.MapFile.Writer.valueClass(valueWritableClass), - org.apache.hadoop.io.MapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()), - org.apache.hadoop.io.MapFile.Writer.progressable(() -> { - })); - return rout; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } - - @Override - public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) { - try { - Closeable rin; - HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); - rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConf()); - return rin; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } - }, - - ARRAY_FILE { - @Override - public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) { - try { - Holder<Integer> valueSize = new Holder<>(); - Writable valueWritable = getWritable(value, typeConverter, valueSize); - ((ArrayFile.Writer) hdfsostr.getOut()).append(valueWritable); - return valueSize.value; - } catch (Exception ex) { - throw new RuntimeCamelException(ex); - } - } - - @Override - public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) { - try { - ArrayFile.Reader reader = (ArrayFile.Reader) hdfsistr.getIn(); - Holder<Integer> valueSize = new Holder<>(); - Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); - if (reader.next(valueWritable) != null) { - value.value = getObject(valueWritable, valueSize); - return valueSize.value; - } else { - return 0; - } - } catch (Exception ex) { - throw new RuntimeCamelException(ex); - } - } - - @SuppressWarnings("rawtypes") - @Override - public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) { - try { - Closeable rout; - HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); - Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass(); - rout = new ArrayFile.Writer(hdfsInfo.getConf(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass, - configuration.getCompressionType(), () -> { }); - return rout; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } - - @Override - public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) { - try { - Closeable rin; - HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); - rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConf()); - return rin; - } catch (IOException ex) { - throw new RuntimeCamelException(ex); - } - } - }; - - @SuppressWarnings({"rawtypes"}) - private static final class WritableCache { - - private static Map<Class, HdfsWritableFactories.HdfsWritableFactory> writables = new HashMap<>(); - private static Map<Class, HdfsWritableFactories.HdfsWritableFactory> readables = new HashMap<>(); - - private WritableCache() { - } - - static { - writables.put(Boolean.class, new HdfsWritableFactories.HdfsBooleanWritableFactory()); - writables.put(Byte.class, new HdfsWritableFactories.HdfsByteWritableFactory()); - writables.put(ByteBuffer.class, new HdfsWritableFactories.HdfsBytesWritableFactory()); - writables.put(Double.class, new HdfsWritableFactories.HdfsDoubleWritableFactory()); - writables.put(Float.class, new HdfsWritableFactories.HdfsFloatWritableFactory()); - writables.put(Integer.class, new HdfsWritableFactories.HdfsIntWritableFactory()); - writables.put(Long.class, new HdfsWritableFactories.HdfsLongWritableFactory()); - writables.put(String.class, new HdfsWritableFactories.HdfsTextWritableFactory()); - writables.put(null, new HdfsWritableFactories.HdfsNullWritableFactory()); - } - - static { - readables.put(BooleanWritable.class, new HdfsWritableFactories.HdfsBooleanWritableFactory()); - readables.put(ByteWritable.class, new HdfsWritableFactories.HdfsByteWritableFactory()); - readables.put(BytesWritable.class, new HdfsWritableFactories.HdfsBytesWritableFactory()); - readables.put(DoubleWritable.class, new HdfsWritableFactories.HdfsDoubleWritableFactory()); - readables.put(FloatWritable.class, new HdfsWritableFactories.HdfsFloatWritableFactory()); - readables.put(IntWritable.class, new HdfsWritableFactories.HdfsIntWritableFactory()); - readables.put(LongWritable.class, new HdfsWritableFactories.HdfsLongWritableFactory()); - readables.put(Text.class, new HdfsWritableFactories.HdfsTextWritableFactory()); - readables.put(NullWritable.class, new HdfsWritableFactories.HdfsNullWritableFactory()); - } + private HdfsFileType(HdfsFile file) { + this.file = file; } - private static Writable getWritable(Object obj, TypeConverter typeConverter, Holder<Integer> size) { - Class<?> objCls = obj == null ? null : obj.getClass(); - HdfsWritableFactories.HdfsWritableFactory objWritableFactory = WritableCache.writables.get(objCls); - if (objWritableFactory == null) { - objWritableFactory = new HdfsWritableFactories.HdfsObjectWritableFactory(); - } - return objWritableFactory.create(obj, typeConverter, size); + public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) { + return this.file.append(hdfsostr, key, value, typeConverter); } - private static Object getObject(Writable writable, Holder<Integer> size) { - Class<?> writableClass = NullWritable.class; - if (writable != null) { - writableClass = writable.getClass(); - } - HdfsWritableFactories.HdfsWritableFactory writableObjectFactory = WritableCache.readables.get(writableClass); - return writableObjectFactory.read(writable, size); + public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { + return this.file.next(hdfsInputStream, key, value); } - public abstract long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter); - - public abstract long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value); - - public abstract Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration); - - public abstract Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration); + public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) { + return this.file.createOutputStream(hdfsPath, configuration); + } - public static long copyBytes(InputStream in, OutputStream out, int buffSize, boolean close) throws IOException { - long numBytes = 0; - PrintStream ps = out instanceof PrintStream ? (PrintStream) out : null; - byte[] buf = new byte[buffSize]; - try { - int bytesRead = in.read(buf); - while (bytesRead >= 0) { - out.write(buf, 0, bytesRead); - numBytes += bytesRead; - if ((ps != null) && ps.checkError()) { - throw new IOException("Unable to write to output stream."); - } - bytesRead = in.read(buf); - } - } finally { - if (close) { - IOHelper.close(out, in); - } - } - return numBytes; + public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) { + return this.file.createInputStream(hdfsPath, configuration); } + } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java index 58816d6..40212e8 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java @@ -18,13 +18,18 @@ package org.apache.camel.component.hdfs; import java.io.Closeable; import java.io.IOException; +import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class HdfsInputStream implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(HdfsInputStream.class); + private HdfsFileType fileType; private String actualPath; private String suffixedPath; @@ -40,21 +45,36 @@ public class HdfsInputStream implements Closeable { protected HdfsInputStream() { } - public static HdfsInputStream createInputStream(String hdfsPath, HdfsConfiguration configuration) throws IOException { + /** + * + * @param hdfsPath + * @param configuration + * @return + * @throws IOException + */ + public static HdfsInputStream createInputStream(String hdfsPath, HdfsConfiguration configuration) { HdfsInputStream ret = new HdfsInputStream(); ret.fileType = configuration.getFileType(); ret.actualPath = hdfsPath; ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix(); ret.suffixedReadPath = ret.actualPath + '.' + configuration.getReadSuffix(); ret.chunkSize = configuration.getChunkSize(); - HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath, configuration); - if (info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath))) { - ret.in = ret.fileType.createInputStream(ret.suffixedPath, configuration); - ret.opened = true; - } else { - ret.opened = false; + try { + HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath, configuration); + if (info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath))) { + ret.in = ret.fileType.createInputStream(ret.suffixedPath, configuration); + ret.opened = true; + ret.config = configuration; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Failed to open file [{}] because it doesn't exist", hdfsPath); + } + ret = null; + } + } catch (IOException e) { + throw new RuntimeException(e); } - ret.config = configuration; + return ret; } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java new file mode 100644 index 0000000..01868b7 --- /dev/null +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hdfs; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.TypeConverter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.MapFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.util.ReflectionUtils; + +class HdfsMapFileType extends DefaultHdfsFileType { + + @Override + public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) { + try { + Holder<Integer> keySize = new Holder<>(); + Writable keyWritable = getWritable(key, typeConverter, keySize); + Holder<Integer> valueSize = new Holder<>(); + Writable valueWritable = getWritable(value, typeConverter, valueSize); + ((MapFile.Writer) hdfsostr.getOut()).append((WritableComparable<?>) keyWritable, valueWritable); + return Long.sum(keySize.value, valueSize.value); + } catch (Exception ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override + public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { + try { + MapFile.Reader reader = (MapFile.Reader) hdfsInputStream.getIn(); + Holder<Integer> keySize = new Holder<>(); + WritableComparable<?> keyWritable = (WritableComparable<?>) ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration()); + Holder<Integer> valueSize = new Holder<>(); + Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); + if (reader.next(keyWritable, valueWritable)) { + key.value = getObject(keyWritable, keySize); + value.value = getObject(valueWritable, valueSize); + return Long.sum(keySize.value, valueSize.value); + } else { + return 0; + } + } catch (Exception ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override + @SuppressWarnings("rawtypes") + public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) { + try { + Closeable rout; + HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); + Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass(); + Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass(); + rout = new MapFile.Writer(hdfsInfo.getConf(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass), + MapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()), + MapFile.Writer.progressable(() -> { + })); + return rout; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override + public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) { + try { + Closeable rin; + HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); + rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConf()); + return rin; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } +} diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileType.java new file mode 100644 index 0000000..43df23a --- /dev/null +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsNormalFileType.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hdfs; + +import java.io.ByteArrayOutputStream; +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.TypeConverter; +import org.apache.camel.util.IOHelper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.Path; + +class HdfsNormalFileType extends DefaultHdfsFileType { + + @Override + public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) { + InputStream is = null; + try { + is = typeConverter.convertTo(InputStream.class, value); + return copyBytes(is, (FSDataOutputStream) hdfsostr.getOut(), HdfsConstants.DEFAULT_BUFFERSIZE, false); + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } finally { + IOHelper.close(is); + } + } + + @Override + public long next(HdfsInputStream hdfsInputStream, Holder<Object> key, Holder<Object> value) { + try { + ByteArrayOutputStream bos = new ByteArrayOutputStream(hdfsInputStream.getChunkSize()); + byte[] buf = new byte[hdfsInputStream.getChunkSize()]; + int bytesRead = ((InputStream) hdfsInputStream.getIn()).read(buf); + if (bytesRead >= 0) { + bos.write(buf, 0, bytesRead); + key.value = null; + value.value = bos; + return bytesRead; + } else { + key.value = null; + // indication that we may have read from empty file + value.value = bos; + return 0; + } + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override + public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) { + try { + Closeable rout; + HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); + if (!configuration.isAppend()) { + rout = hdfsInfo.getFileSystem().create(hdfsInfo.getPath(), configuration.isOverwrite(), configuration.getBufferSize(), + configuration.getReplication(), configuration.getBlockSize(), () -> { }); + } else { + rout = hdfsInfo.getFileSystem().append(hdfsInfo.getPath(), configuration.getBufferSize(), () -> { }); + } + return rout; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override + public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) { + try { + Closeable rin; + if (configuration.getFileSystemType().equals(HdfsFileSystemType.LOCAL)) { + HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); + rin = hdfsInfo.getFileSystem().open(hdfsInfo.getPath()); + } else { + rin = new FileInputStream(getHfdsFileToTmpFile(hdfsPath, configuration)); + } + return rin; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } + + private File getHfdsFileToTmpFile(String hdfsPath, HdfsConfiguration configuration) { + try { + String fname = hdfsPath.substring(hdfsPath.lastIndexOf('/')); + + // [CAMEL-13711] Files.createTempFile not equivalent to File.createTempFile + + File outputDest; + try { + // First trying: Files.createTempFile + outputDest = Files.createTempFile(fname, ".hdfs").toFile(); + + } catch (Exception ex) { + // Now trying: File.createTempFile + outputDest = File.createTempFile(fname, ".hdfs"); + } + + if (outputDest.exists()) { + outputDest.delete(); + } + + HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); + FileSystem fileSystem = hdfsInfo.getFileSystem(); + FileUtil.copy(fileSystem, new Path(hdfsPath), outputDest, false, fileSystem.getConf()); + try { + FileUtil.copyMerge( + fileSystem, // src + new Path(hdfsPath), + FileSystem.getLocal(new Configuration()), // dest + new Path(outputDest.toURI()), + false, fileSystem.getConf(), null); + } catch (IOException e) { + return outputDest; + } + + return new File(outputDest, fname); + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } +} diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java index 6c21580..568ee71 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java @@ -18,6 +18,7 @@ package org.apache.camel.component.hdfs; import java.io.IOException; import java.util.List; +import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -108,17 +109,11 @@ public class HdfsProducer extends DefaultProducer { ostream = setupHdfs(true); } - SplitStrategy idleStrategy = null; - for (SplitStrategy strategy : config.getSplitStrategies()) { - if (strategy.type == SplitStrategyType.IDLE) { - idleStrategy = strategy; - break; - } - } - if (idleStrategy != null) { + Optional<SplitStrategy> idleStrategy = tryFindIdleStrategy(config.getSplitStrategies()); + if (idleStrategy.isPresent()) { scheduler = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this, "HdfsIdleCheck"); log.debug("Creating IdleCheck task scheduled to run every {} millis", config.getCheckIdleInterval()); - scheduler.scheduleAtFixedRate(new IdleCheck(idleStrategy), config.getCheckIdleInterval(), config.getCheckIdleInterval(), TimeUnit.MILLISECONDS); + scheduler.scheduleAtFixedRate(new IdleCheck(idleStrategy.get()), config.getCheckIdleInterval(), config.getCheckIdleInterval(), TimeUnit.MILLISECONDS); } } catch (Exception e) { LOG.warn("Failed to start the HDFS producer. Caused by: [{}]", e.getMessage()); @@ -129,13 +124,13 @@ public class HdfsProducer extends DefaultProducer { } } - private synchronized HdfsOutputStream setupHdfs(boolean onStartup) throws Exception { + private synchronized HdfsOutputStream setupHdfs(boolean onStartup) throws IOException { if (ostream != null) { return ostream; } StringBuilder actualPath = new StringBuilder(hdfsPath); - if (config.getSplitStrategies().size() > 0) { + if (config.hasSplitStrategies()) { actualPath = newFileName(); } @@ -161,6 +156,15 @@ public class HdfsProducer extends DefaultProducer { return answer; } + private Optional<SplitStrategy> tryFindIdleStrategy(List<SplitStrategy> strategies) { + for (SplitStrategy strategy : strategies) { + if (strategy.type == SplitStrategyType.IDLE) { + return Optional.of(strategy); + } + } + return Optional.empty(); + } + @Override protected void doStop() throws Exception { super.doStop(); @@ -185,7 +189,7 @@ public class HdfsProducer extends DefaultProducer { } } - void doProcess(Exchange exchange) throws Exception { + void doProcess(Exchange exchange) throws IOException { Object body = exchange.getIn().getBody(); Object key = exchange.getIn().getHeader(HdfsHeader.KEY.name()); @@ -201,13 +205,7 @@ public class HdfsProducer extends DefaultProducer { ostream = setupHdfs(false); } - boolean split = false; - List<SplitStrategy> strategies = config.getSplitStrategies(); - for (SplitStrategy splitStrategy : strategies) { - split |= splitStrategy.getType().split(ostream, splitStrategy.value, this); - } - - if (split) { + if (isSplitRequired(config.getSplitStrategies())) { if (ostream != null) { IOHelper.close(ostream, "output stream", log); } @@ -260,6 +258,14 @@ public class HdfsProducer extends DefaultProducer { return actualPath.append(fileName); } + private boolean isSplitRequired(List<SplitStrategy> strategies) { + boolean split = false; + for (SplitStrategy splitStrategy : strategies) { + split |= splitStrategy.getType().split(ostream, splitStrategy.value, this); + } + return split; + } + private StringBuilder newFileName() { StringBuilder actualPath = new StringBuilder(hdfsPath); actualPath.append(StringHelper.sanitize(getEndpoint().getCamelContext().getUuidGenerator().generateUuid())); diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java new file mode 100644 index 0000000..0bfde2e --- /dev/null +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.hdfs; + +import java.io.Closeable; +import java.io.IOException; + +import org.apache.camel.RuntimeCamelException; +import org.apache.camel.TypeConverter; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.util.ReflectionUtils; + +class HdfsSequenceFileType extends DefaultHdfsFileType { + + @Override + public long append(HdfsOutputStream hdfsostr, Object key, Object value, TypeConverter typeConverter) { + try { + Holder<Integer> keySize = new Holder<>(); + Writable keyWritable = getWritable(key, typeConverter, keySize); + Holder<Integer> valueSize = new Holder<>(); + Writable valueWritable = getWritable(value, typeConverter, valueSize); + SequenceFile.Writer writer = (SequenceFile.Writer) hdfsostr.getOut(); + writer.append(keyWritable, valueWritable); + writer.sync(); + return Long.sum(keySize.value, valueSize.value); + } catch (Exception ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override + public long next(HdfsInputStream hdfsistr, Holder<Object> key, Holder<Object> value) { + try { + SequenceFile.Reader reader = (SequenceFile.Reader) hdfsistr.getIn(); + Holder<Integer> keySize = new Holder<>(); + Writable keyWritable = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), new Configuration()); + Holder<Integer> valueSize = new Holder<>(); + Writable valueWritable = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), new Configuration()); + if (reader.next(keyWritable, valueWritable)) { + key.value = getObject(keyWritable, keySize); + value.value = getObject(valueWritable, valueSize); + return Long.sum(keySize.value, valueSize.value); + } else { + return 0; + } + } catch (Exception ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override + public Closeable createOutputStream(String hdfsPath, HdfsConfiguration configuration) { + try { + Closeable rout; + HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); + Class<?> keyWritableClass = configuration.getKeyType().getWritableClass(); + Class<?> valueWritableClass = configuration.getValueType().getWritableClass(); + rout = SequenceFile.createWriter(hdfsInfo.getConf(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass), + SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(configuration.getBufferSize()), + SequenceFile.Writer.replication(configuration.getReplication()), SequenceFile.Writer.blockSize(configuration.getBlockSize()), + SequenceFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()), + SequenceFile.Writer.progressable(() -> { + }), SequenceFile.Writer.metadata(new SequenceFile.Metadata())); + return rout; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } + + @Override + public Closeable createInputStream(String hdfsPath, HdfsConfiguration configuration) { + try { + Closeable rin; + HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); + rin = new SequenceFile.Reader(hdfsInfo.getConf(), SequenceFile.Reader.file(hdfsInfo.getPath())); + return rin; + } catch (IOException ex) { + throw new RuntimeCamelException(ex); + } + } +} diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java index 910aa2b..07c5eb8 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsWritableFactories.java @@ -23,7 +23,6 @@ import java.nio.ByteBuffer; import org.apache.camel.RuntimeCamelException; import org.apache.camel.TypeConverter; -import org.apache.camel.util.IOHelper; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.ByteWritable; import org.apache.hadoop.io.BytesWritable; @@ -215,9 +214,7 @@ public class HdfsWritableFactories { @Override public Writable create(Object value, TypeConverter typeConverter, Holder<Integer> size) { - InputStream is = null; - try { - is = typeConverter.convertTo(InputStream.class, value); + try (InputStream is = typeConverter.convertTo(InputStream.class, value)) { ByteArrayOutputStream bos = new ByteArrayOutputStream(); IOUtils.copyBytes(is, bos, HdfsConstants.DEFAULT_BUFFERSIZE, false); BytesWritable writable = new BytesWritable(); @@ -226,8 +223,6 @@ public class HdfsWritableFactories { return writable; } catch (IOException ex) { throw new RuntimeCamelException(ex); - } finally { - IOHelper.close(is); } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfiguration.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfiguration.java index 0cff2f5..4422a08 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfiguration.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfiguration.java @@ -24,12 +24,13 @@ import java.util.stream.Collectors; import static java.lang.String.format; -import org.apache.camel.component.hdfs.HdfsComponent; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider; import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX; @@ -37,8 +38,12 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; public class KerberosConfiguration extends Configuration { + private static final Logger LOG = LoggerFactory.getLogger(KerberosConfiguration.class); + private static final String HFDS_NAMED_SERVICE = "hfdsNamedService"; + private static final String KERBEROS_5_SYS_ENV = "java.security.krb5.conf"; + private static final String AUTHENTICATION_MODE = "hadoop.security.authentication"; private static final String HFDS_FS = "fs.defaultFS"; @@ -63,7 +68,7 @@ public class KerberosConfiguration extends Configuration { String kerberosConfigFileLocation, int replicationFactor) { - HdfsComponent.setKerberosConfigFile(kerberosConfigFileLocation); + setKerberosConfigFile(kerberosConfigFileLocation); setupHdfsConfiguration(namedNodes, replicationFactor); } @@ -106,6 +111,26 @@ public class KerberosConfiguration extends Configuration { UserGroupInformation.loginUserFromKeytab(username, keyTabFileLocation); } + /** + * To use kerberos authentication, set the value of the 'java.security.krb5.conf' environment variable to an existing file. + * If the environment variable is already set, warn if different than the specified parameter + * + * @param kerberosConfigFileLocation - kerb5.conf file (https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html) + */ + public static void setKerberosConfigFile(String kerberosConfigFileLocation) { + if (!new File(kerberosConfigFileLocation).exists()) { + LOG.warn("Kerberos configuration file [{}}] could not be found.", kerberosConfigFileLocation); + return; + } + + String krb5Conf = System.getProperty(KERBEROS_5_SYS_ENV); + if (krb5Conf == null || !krb5Conf.isEmpty()) { + System.setProperty(KERBEROS_5_SYS_ENV, kerberosConfigFileLocation); + } else if (!krb5Conf.equalsIgnoreCase(kerberosConfigFileLocation)) { + LOG.warn("[{}] was already configured with: [{}] config file", KERBEROS_5_SYS_ENV, krb5Conf); + } + } + private String nodeToString(String nodeName) { return nodeName.replaceAll(":[0-9]*", "").replaceAll("\\.", "_"); } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/osgi/HdfsActivator.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/osgi/HdfsActivator.java index 811afa3..3a01814 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/osgi/HdfsActivator.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/osgi/HdfsActivator.java @@ -24,21 +24,22 @@ public class HdfsActivator implements BundleActivator { @Override public void start(BundleContext context) throws Exception { + // no action needed } @Override public void stop(BundleContext context) throws Exception { - // There's problem inside OSGi when framwork is being shutdown + // There's problem inside OSGi when framework is being shutdown // hadoop.fs code registers some JVM shutdown hooks throughout the code and this ordered // list of hooks is run in shutdown thread. // At that time bundle class loader / bundle wiring is no longer valid (bundle is stopped) // so ShutdownHookManager can't load additional classes. But there are some inner classes // loaded when iterating over registered hadoop shutdown hooks. - // Let's explicitely load these inner classes when bundle is stopped, as there's last chance + // Let's explicitly load these inner classes when bundle is stopped, as there's last chance // to use valid bundle class loader. // This is based on the knowledge of what's contained in SMX bundle // org.apache.servicemix.bundles.hadoop-client-*.jar - // the above is just a warning that hadopp may have some quirks when running inside OSGi + // the above is just a warning that hadoop may have some quirks when running inside OSGi ClassLoader hadoopCl = ShutdownHookManager.class.getClassLoader(); if (hadoopCl != null) { String shm = ShutdownHookManager.class.getName(); diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationTest.java index 08f34a2..9c9678b 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationTest.java @@ -66,7 +66,7 @@ public class KerberosConfigurationTest { underTest = new KerberosConfiguration(namedNodes, kerberosConfigFileLocation, replicationFactor); // then - /* exception was thrown */ + /* message is printed in the logs */ } @Test(expected = FileNotFoundException.class)