This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 3032641 CAMEL-14073 - camel-hdfs - Cleanup HA/Cluster related classes (#3254) 3032641 is described below commit 3032641514838fcab5e5a37ee01dfa82a9f59cb7 Author: Marius Cornescu <marius_corne...@yahoo.com> AuthorDate: Thu Oct 17 05:17:57 2019 +0200 CAMEL-14073 - camel-hdfs - Cleanup HA/Cluster related classes (#3254) * CAMEL-14073 - camel-hdfs - Cleanup HA/Cluster related classes * CAMEL-14073 : camel-hdfs - Cleanup HA/Cluster related classes * Update hdfs-component.adoc * Update hdfs-component.adoc --- components/camel-hdfs/pom.xml | 5 ++ .../camel-hdfs/src/main/docs/hdfs-component.adoc | 18 ++++++ .../component/hdfs/HaConfigurationBuilder.java | 52 ++++++++++++--- .../camel/component/hdfs/HdfsArrayFileType.java | 4 +- .../camel/component/hdfs/HdfsBloommapFileType.java | 4 +- .../camel/component/hdfs/HdfsConfiguration.java | 19 +++++- .../apache/camel/component/hdfs/HdfsConsumer.java | 21 +++--- .../org/apache/camel/component/hdfs/HdfsInfo.java | 56 +++------------- .../camel/component/hdfs/HdfsInfoFactory.java | 63 ++++++++++++++++-- .../camel/component/hdfs/HdfsInputStream.java | 31 ++++----- .../camel/component/hdfs/HdfsMapFileType.java | 4 +- .../camel/component/hdfs/HdfsOsgiHelper.java | 8 +-- .../camel/component/hdfs/HdfsOutputStream.java | 46 ++++++------- .../apache/camel/component/hdfs/HdfsProducer.java | 75 ++++++++++------------ .../camel/component/hdfs/HdfsSequenceFileType.java | 4 +- .../kerberos/KerberosConfigurationBuilder.java | 7 +- .../camel/component/hdfs/FromFileToHdfsTest.java | 8 +-- .../component/hdfs/HaConfigurationBuilderTest.java | 41 ++++++++++-- .../camel/component/hdfs/HdfsConsumerTest.java | 36 +++++------ .../apache/camel/component/hdfs/HdfsInfoTest.java} | 36 +++++++---- .../component/hdfs/HdfsProducerConsumerTest.java | 6 +- .../component/hdfs/HdfsProducerSplitTest.java | 8 +-- .../camel/component/hdfs/HdfsProducerTest.java | 34 +++++----- .../camel/component/hdfs/HdfsTestSupport.java | 44 +++++++++++-- .../hdfs/kerberos/KerberosAuthenticationTest.java | 11 ++-- .../kerberos/KerberosConfigurationBuilderTest.java | 16 ++--- 26 files changed, 394 insertions(+), 263 deletions(-) diff --git a/components/camel-hdfs/pom.xml b/components/camel-hdfs/pom.xml index b7bbc17e..d2c9532 100644 --- a/components/camel-hdfs/pom.xml +++ b/components/camel-hdfs/pom.xml @@ -139,6 +139,11 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-core</artifactId> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <scope>test</scope> diff --git a/components/camel-hdfs/src/main/docs/hdfs-component.adoc b/components/camel-hdfs/src/main/docs/hdfs-component.adoc index 423f915..ae3703c 100644 --- a/components/camel-hdfs/src/main/docs/hdfs-component.adoc +++ b/components/camel-hdfs/src/main/docs/hdfs-component.adoc @@ -327,6 +327,24 @@ resource with bundle that contains blueprint definition. This way Hadoop 2.x will have correct mapping of URI schemes to filesystem implementations. +=== Using this component with a HighAvailability configuration + +In a HA setup, there will be multiple nodes (_configured through the *namedNodes* parameter_). +The "hostname" and "port" portion of the endpoint uri will no longer have a _"host"_ meaning, but it will represent the name given to the cluster. + +You can choose whatever name you want for the cluster (_the name should follow the [a-zA-Z0-9] convention_). +This name will be sanitized by replacing the _dirty_ characters with underscore. This is done so that a host name or ip could pottentialy be used, if it makes sense to you. + +The cluster name will be mapped to the HA filesystem with a coresponding proxy, with failover, and the _works_. + +[source,java] +------------------------------------------------------------------------------------------------------ + +from("hdfs://node1_and_2_cluster/dir1/dir2?namedNodes=node1.exemple.org:8020,node2.exemple.org:8020").routeId(...) +... +------------------------------------------------------------------------------------------------------ + + === Using this component with Kerberos authentication The kerberos config file is read when the camel component is created, not when the endpoint is created. diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HaConfigurationBuilder.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HaConfigurationBuilder.java index 1d432b3..ca5daad 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HaConfigurationBuilder.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HaConfigurationBuilder.java @@ -19,6 +19,7 @@ package org.apache.camel.component.hdfs; import java.util.List; import java.util.stream.Collectors; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; @@ -31,6 +32,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY; final class HaConfigurationBuilder { private static final String HFDS_NAMED_SERVICE = "hfdsNamedService"; + private static final String HFDS_NAMED_SERVICE_SEPARATOR = "_"; private static final String HFDS_FS = "fs.defaultFS"; private HaConfigurationBuilder() { @@ -49,31 +51,63 @@ final class HaConfigurationBuilder { * configuration.set("dfs.client.failover.proxy.provider.hfdsNamedService", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); * <p> * - * @param namedNodes - All named nodes from the hadoop cluster - * @param replicationFactor - dfs replication factor + * @param configuration - hdfs configuration that will be setup with the HA settings + * @param endpointConfig - configuration with the HA settings configured on the endpoint */ - static void withClusterConfiguration(Configuration configuration, List<String> namedNodes, int replicationFactor) { + static void withClusterConfiguration(Configuration configuration, HdfsConfiguration endpointConfig) { + String haNamedService = getSanitizedClusterName(endpointConfig.getHostName()); + withClusterConfiguration(configuration, haNamedService, endpointConfig.getNamedNodeList(), endpointConfig.getReplication()); + } + + /** + * Generates the correct HA configuration (normally read from xml) based on the namedNodes: + * All named nodes have to be qualified: configuration.set("dfs.ha.namenodes.hfdsNamedService","namenode1,namenode2"); + * For each named node the following entries is added + * <p> + * configuration.set("dfs.namenode.rpc-address.hfdsNamedService.namenode1", "namenode1:1234"); + * <p> + * Finally the proxy provider has to be specified: + * <p> + * configuration.set("dfs.client.failover.proxy.provider.hfdsNamedService", "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"); + * <p> + * + * @param configuration - hdfs configuration that will be setup with the HA settings + * @param haNamedService - how the ha named service that represents the cluster will be named (used to resolve the FS) + * @param namedNodes - All named nodes from the hadoop cluster + * @param replicationFactor - dfs replication factor + */ + static void withClusterConfiguration(Configuration configuration, String haNamedService, List<String> namedNodes, int replicationFactor) { configuration.set(DFSConfigKeys.DFS_REPLICATION_KEY, Integer.toString(replicationFactor)); - configuration.set(DFSConfigKeys.DFS_NAMESERVICES, HFDS_NAMED_SERVICE); + configuration.set(DFSConfigKeys.DFS_NAMESERVICES, haNamedService); configuration.set( - DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, HFDS_NAMED_SERVICE), + DFSUtil.addKeySuffixes(DFS_HA_NAMENODES_KEY_PREFIX, haNamedService), nodeToString(namedNodes.stream().map(HaConfigurationBuilder::nodeToString).collect(Collectors.joining(","))) ); namedNodes.forEach(nodeName -> configuration.set( - DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, HFDS_NAMED_SERVICE, nodeToString(nodeName)), + DFSUtil.addKeySuffixes(DFS_NAMENODE_RPC_ADDRESS_KEY, haNamedService, nodeToString(nodeName)), nodeName) ); - configuration.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + HFDS_NAMED_SERVICE, ConfiguredFailoverProxyProvider.class.getName()); + configuration.set(DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX + "." + haNamedService, ConfiguredFailoverProxyProvider.class.getName()); + + configuration.set(HFDS_FS, "hdfs://" + haNamedService); + + } + + static String getSanitizedClusterName(String rawClusterName) { + String clusterName = HFDS_NAMED_SERVICE; - configuration.set(HFDS_FS, "hdfs://" + HFDS_NAMED_SERVICE); + if (StringUtils.isNotEmpty(rawClusterName)) { + clusterName = rawClusterName.replaceAll("\\.", HFDS_NAMED_SERVICE_SEPARATOR); + } + return clusterName; } private static String nodeToString(String nodeName) { - return nodeName.replaceAll(":[0-9]*", "").replaceAll("\\.", "_"); + return nodeName.replaceAll(":[0-9]*", "").replaceAll("\\.", HFDS_NAMED_SERVICE_SEPARATOR); } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java index 4c4123c..8b5aa8d 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsArrayFileType.java @@ -65,7 +65,7 @@ class HdfsArrayFileType extends DefaultHdfsFileType { Closeable rout; HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass(); - rout = new ArrayFile.Writer(hdfsInfo.getConf(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass, + rout = new ArrayFile.Writer(hdfsInfo.getConfiguration(), hdfsInfo.getFileSystem(), hdfsPath, valueWritableClass, configuration.getCompressionType(), () -> { }); return rout; } catch (IOException ex) { @@ -78,7 +78,7 @@ class HdfsArrayFileType extends DefaultHdfsFileType { try { Closeable rin; HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); - rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConf()); + rin = new ArrayFile.Reader(hdfsInfo.getFileSystem(), hdfsPath, hdfsInfo.getConfiguration()); return rin; } catch (IOException ex) { throw new RuntimeCamelException(ex); diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java index be4c6d3..fa0c0ab 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsBloommapFileType.java @@ -73,7 +73,7 @@ class HdfsBloommapFileType extends DefaultHdfsFileType { HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass(); Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass(); - rout = new BloomMapFile.Writer(hdfsInfo.getConf(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), + rout = new BloomMapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass), MapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()), MapFile.Writer.progressable(() -> { @@ -89,7 +89,7 @@ class HdfsBloommapFileType extends DefaultHdfsFileType { try { Closeable rin; HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); - rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConf()); + rin = new BloomMapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration()); return rin; } catch (IOException ex) { throw new RuntimeCamelException(ex); diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java index be5bdb3..c5d3dc4 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConfiguration.java @@ -98,6 +98,7 @@ public class HdfsConfiguration { private String kerberosKeytabLocation; public HdfsConfiguration() { + // default constructor } private Boolean getBoolean(Map<String, Object> hdfsSettings, String param, Boolean dflt) { @@ -207,7 +208,7 @@ public class HdfsConfiguration { private List<String> getNamedNodeList(Map<String, Object> hdfsSettings) { namedNodes = getString(hdfsSettings, "namedNodes", namedNodes); - + if (isNotEmpty(namedNodes)) { return Arrays.stream(namedNodes.split(",")).distinct().collect(Collectors.toList()); } @@ -560,6 +561,10 @@ public class HdfsConfiguration { return namedNodeList; } + public boolean hasClusterConfiguration() { + return !getNamedNodeList().isEmpty(); + } + public String getKerberosConfigFileLocation() { return kerberosConfigFileLocation; } @@ -595,7 +600,17 @@ public class HdfsConfiguration { } public boolean isKerberosAuthentication() { - return isNotEmpty(namedNodes) && isNotEmpty(kerberosConfigFileLocation) && isNotEmpty(kerberosUsername) && isNotEmpty(kerberosKeytabLocation); + return isNotEmpty(kerberosConfigFileLocation) && isNotEmpty(kerberosUsername) && isNotEmpty(kerberosKeytabLocation); + } + + /** + * Get the label of the hdfs file system like: HOST_NAME:PORT/PATH + * + * @param path + * @return HOST_NAME:PORT/PATH + */ + String getFileSystemLabel(String path) { + return String.format("%s:%s/%s", getHostName(), getPort(), path); } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java index 9971f74..52b1249 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsConsumer.java @@ -38,12 +38,10 @@ import org.apache.hadoop.fs.PathFilter; public final class HdfsConsumer extends ScheduledPollConsumer { - public static final long DEFAULT_CONSUMER_INITIAL_DELAY = 10 * 1000L; - private final HdfsConfiguration config; private final StringBuilder hdfsPath; private final Processor processor; - private final ReadWriteLock rwlock = new ReentrantReadWriteLock(); + private final ReadWriteLock rwLock = new ReentrantReadWriteLock(); public HdfsConsumer(HdfsEndpoint endpoint, Processor processor, HdfsConfiguration config) { super(endpoint, processor); @@ -69,24 +67,21 @@ public final class HdfsConsumer extends ScheduledPollConsumer { } private HdfsInfo setupHdfs(boolean onStartup) throws IOException { + String hdfsFsDescription = config.getFileSystemLabel(hdfsPath.toString()); // if we are starting up then log at info level, and if runtime then log at debug level to not flood the log if (onStartup) { - log.info("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", config.getHostName(), config.getPort(), hdfsPath); + log.info("Connecting to hdfs file-system {} (may take a while if connection is not available)", hdfsFsDescription); } else { - if (log.isDebugEnabled()) { - log.debug("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", config.getHostName(), config.getPort(), hdfsPath); - } + log.debug("Connecting to hdfs file-system {} (may take a while if connection is not available)", hdfsFsDescription); } // hadoop will cache the connection by default so its faster to get in the poll method HdfsInfo answer = HdfsInfoFactory.newHdfsInfo(this.hdfsPath.toString(), config); if (onStartup) { - log.info("Connected to hdfs file-system {}:{}/{}", config.getHostName(), config.getPort(), hdfsPath); + log.info("Connected to hdfs file-system {}", hdfsFsDescription); } else { - if (log.isDebugEnabled()) { - log.debug("Connected to hdfs file-system {}:{}/{}", config.getHostName(), config.getPort(), hdfsPath); - } + log.debug("Connected to hdfs file-system {}", hdfsFsDescription); } return answer; } @@ -199,11 +194,11 @@ public final class HdfsConsumer extends ScheduledPollConsumer { private HdfsInputStream createInputStream(FileStatus fileStatus) { try { - this.rwlock.writeLock().lock(); + this.rwLock.writeLock().lock(); return HdfsInputStream.createInputStream(fileStatus.getPath().toString(), this.config); } finally { - this.rwlock.writeLock().unlock(); + this.rwLock.writeLock().unlock(); } } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java index bd9f245..ff8035e 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfo.java @@ -16,29 +16,23 @@ */ package org.apache.camel.component.hdfs; -import java.io.IOException; -import java.net.URI; -import java.util.List; - -import org.apache.camel.component.hdfs.kerberos.KerberosAuthentication; -import org.apache.camel.component.hdfs.kerberos.KerberosConfigurationBuilder; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -public final class HdfsInfo { +final class HdfsInfo { - private Configuration configuration; - private FileSystem fileSystem; - private Path path; + private final Configuration configuration; + private final FileSystem fileSystem; + private final Path path; - HdfsInfo(String hdfsPath, HdfsConfiguration endpointConfig) throws IOException { - this.configuration = newConfiguration(endpointConfig); - this.fileSystem = newFileSystem(this.configuration, hdfsPath, endpointConfig); - this.path = new Path(hdfsPath); + HdfsInfo(Configuration configuration, FileSystem fileSystem, Path hdfsPath) { + this.configuration = configuration; + this.fileSystem = fileSystem; + this.path = hdfsPath; } - public Configuration getConf() { + public Configuration getConfiguration() { return configuration; } @@ -50,36 +44,4 @@ public final class HdfsInfo { return path; } - private static Configuration newConfiguration(HdfsConfiguration endpointConfig) { - Configuration configuration = new Configuration(); - - if (endpointConfig.isKerberosAuthentication()) { - String kerberosConfigFileLocation = endpointConfig.getKerberosConfigFileLocation(); - KerberosConfigurationBuilder.withKerberosConfiguration(configuration, kerberosConfigFileLocation); - - } - - List<String> namedNodes = endpointConfig.getNamedNodeList(); - if (!namedNodes.isEmpty()) { - HaConfigurationBuilder.withClusterConfiguration(configuration, endpointConfig.getNamedNodeList(), endpointConfig.getReplication()); - - } - - return configuration; - } - - /** - * this will connect to the hadoop hdfs file system, and in case of no connection - * then the hardcoded timeout in hadoop is 45 x 20 sec = 15 minutes - */ - private static FileSystem newFileSystem(Configuration configuration, String hdfsPath, HdfsConfiguration endpointConfig) throws IOException { - if (endpointConfig.isKerberosAuthentication()) { - String userName = endpointConfig.getKerberosUsername(); - String keytabLocation = endpointConfig.getKerberosKeytabLocation(); - new KerberosAuthentication(configuration, userName, keytabLocation).loginWithKeytab(); - } - - return FileSystem.get(URI.create(hdfsPath), configuration); - } - } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java index 4862a70..6bb47da 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java @@ -17,22 +17,77 @@ package org.apache.camel.component.hdfs; import java.io.IOException; +import java.net.URI; -import javax.security.auth.login.Configuration; +import org.apache.camel.component.hdfs.kerberos.KerberosAuthentication; +import org.apache.camel.component.hdfs.kerberos.KerberosConfigurationBuilder; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; public final class HdfsInfoFactory { private HdfsInfoFactory() { + // hidden } - public static HdfsInfo newHdfsInfo(String hdfsPath, HdfsConfiguration configuration) throws IOException { + static HdfsInfo newHdfsInfo(String hdfsPath, HdfsConfiguration endpointConfig) throws IOException { // need to remember auth as Hadoop will override that, which otherwise means the Auth is broken afterwards - Configuration auth = HdfsComponent.getJAASConfiguration(); + javax.security.auth.login.Configuration auth = HdfsComponent.getJAASConfiguration(); try { - return new HdfsInfo(hdfsPath, configuration); + return newHdfsInfoWithoutAuth(hdfsPath, endpointConfig); } finally { HdfsComponent.setJAASConfiguration(auth); } } + static HdfsInfo newHdfsInfoWithoutAuth(String hdfsPath, HdfsConfiguration endpointConfig) throws IOException { + Configuration configuration = newConfiguration(endpointConfig); + + authenticate(configuration, endpointConfig); + + FileSystem fileSystem = newFileSystem(configuration, hdfsPath, endpointConfig); + Path path = new Path(hdfsPath); + + return new HdfsInfo(configuration, fileSystem, path); + } + + static Configuration newConfiguration(HdfsConfiguration endpointConfig) { + Configuration configuration = new Configuration(); + + if (endpointConfig.isKerberosAuthentication()) { + KerberosConfigurationBuilder.withKerberosConfiguration(configuration, endpointConfig); + } + + if (endpointConfig.hasClusterConfiguration()) { + HaConfigurationBuilder.withClusterConfiguration(configuration, endpointConfig); + } + + return configuration; + } + + static void authenticate(Configuration configuration, HdfsConfiguration endpointConfig) throws IOException { + if (endpointConfig.isKerberosAuthentication()) { + String userName = endpointConfig.getKerberosUsername(); + String keytabLocation = endpointConfig.getKerberosKeytabLocation(); + new KerberosAuthentication(configuration, userName, keytabLocation).loginWithKeytab(); + } + } + + /** + * this will connect to the hadoop hdfs file system, and in case of no connection + * then the hardcoded timeout in hadoop is 45 x 20 sec = 15 minutes + */ + static FileSystem newFileSystem(Configuration configuration, String hdfsPath, HdfsConfiguration endpointConfig) throws IOException { + FileSystem fileSystem; + if (endpointConfig.hasClusterConfiguration()) { + // using default FS that was set during in the cluster configuration (@see org.apache.camel.component.hdfs.HaConfigurationBuilder) + fileSystem = FileSystem.get(configuration); + } else { + fileSystem = FileSystem.get(URI.create(hdfsPath), configuration); + } + + return fileSystem; + } + } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java index 40212e8..49e3ddc 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInputStream.java @@ -18,7 +18,6 @@ package org.apache.camel.component.hdfs; import java.io.Closeable; import java.io.IOException; -import java.util.Optional; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.fs.Path; @@ -53,29 +52,27 @@ public class HdfsInputStream implements Closeable { * @throws IOException */ public static HdfsInputStream createInputStream(String hdfsPath, HdfsConfiguration configuration) { - HdfsInputStream ret = new HdfsInputStream(); - ret.fileType = configuration.getFileType(); - ret.actualPath = hdfsPath; - ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix(); - ret.suffixedReadPath = ret.actualPath + '.' + configuration.getReadSuffix(); - ret.chunkSize = configuration.getChunkSize(); + HdfsInputStream iStream = new HdfsInputStream(); + iStream.fileType = configuration.getFileType(); + iStream.actualPath = hdfsPath; + iStream.suffixedPath = iStream.actualPath + '.' + configuration.getOpenedSuffix(); + iStream.suffixedReadPath = iStream.actualPath + '.' + configuration.getReadSuffix(); + iStream.chunkSize = configuration.getChunkSize(); try { - HdfsInfo info = HdfsInfoFactory.newHdfsInfo(ret.actualPath, configuration); - if (info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath))) { - ret.in = ret.fileType.createInputStream(ret.suffixedPath, configuration); - ret.opened = true; - ret.config = configuration; + HdfsInfo info = HdfsInfoFactory.newHdfsInfo(iStream.actualPath, configuration); + if (info.getFileSystem().rename(new Path(iStream.actualPath), new Path(iStream.suffixedPath))) { + iStream.in = iStream.fileType.createInputStream(iStream.suffixedPath, configuration); + iStream.opened = true; + iStream.config = configuration; } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Failed to open file [{}] because it doesn't exist", hdfsPath); - } - ret = null; + LOG.debug("Failed to open file [{}] because it doesn't exist", hdfsPath); + iStream = null; } } catch (IOException e) { throw new RuntimeException(e); } - return ret; + return iStream; } @Override diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java index 01868b7..fbd3e04 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsMapFileType.java @@ -72,7 +72,7 @@ class HdfsMapFileType extends DefaultHdfsFileType { HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); Class<? extends WritableComparable> keyWritableClass = configuration.getKeyType().getWritableClass(); Class<? extends WritableComparable> valueWritableClass = configuration.getValueType().getWritableClass(); - rout = new MapFile.Writer(hdfsInfo.getConf(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass), + rout = new MapFile.Writer(hdfsInfo.getConfiguration(), new Path(hdfsPath), MapFile.Writer.keyClass(keyWritableClass), MapFile.Writer.valueClass(valueWritableClass), MapFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()), MapFile.Writer.progressable(() -> { })); @@ -87,7 +87,7 @@ class HdfsMapFileType extends DefaultHdfsFileType { try { Closeable rin; HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); - rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConf()); + rin = new MapFile.Reader(new Path(hdfsPath), hdfsInfo.getConfiguration()); return rin; } catch (IOException ex) { throw new RuntimeCamelException(ex); diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOsgiHelper.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOsgiHelper.java index 9d9a5cc..51efaf7 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOsgiHelper.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOsgiHelper.java @@ -43,10 +43,10 @@ public class HdfsOsgiHelper { Configuration conf = new Configuration(); // set that as the hdfs configuration's classloader conf.setClassLoader(cl); - for (String key : fileSystems.keySet()) { - URI uri = URI.create(key); - conf.setClass(String.format("fs.%s.impl", uri.getScheme()), cl.loadClass(fileSystems.get(key)), FileSystem.class); - LOG.debug("Successfully loaded class: {}", fileSystems.get(key)); + for (Map.Entry<String, String> fsEntry : fileSystems.entrySet()) { + URI uri = URI.create(fsEntry.getKey()); + conf.setClass(String.format("fs.%s.impl", uri.getScheme()), cl.loadClass(fsEntry.getValue()), FileSystem.class); + LOG.debug("Successfully loaded class: {}", fsEntry.getValue()); FileSystem.get(uri, conf); LOG.debug("Successfully got uri: {} from FileSystem Object", uri); } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java index 6e623e3..49a3a00 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsOutputStream.java @@ -43,35 +43,35 @@ public class HdfsOutputStream implements Closeable { } public static HdfsOutputStream createOutputStream(String hdfsPath, HdfsConfiguration configuration) throws IOException { - HdfsOutputStream ret = new HdfsOutputStream(); - ret.fileType = configuration.getFileType(); - ret.actualPath = hdfsPath; - ret.info = new HdfsInfo(ret.actualPath, configuration); + HdfsOutputStream oStream = new HdfsOutputStream(); + oStream.fileType = configuration.getFileType(); + oStream.actualPath = hdfsPath; + oStream.info = HdfsInfoFactory.newHdfsInfoWithoutAuth(oStream.actualPath, configuration); + + oStream.suffixedPath = oStream.actualPath + '.' + configuration.getOpenedSuffix(); + + Path actualPath = new Path(oStream.actualPath); + boolean actualPathExists = oStream.info.getFileSystem().exists(actualPath); - ret.suffixedPath = ret.actualPath + '.' + configuration.getOpenedSuffix(); if (configuration.isWantAppend() || configuration.isAppend()) { - if (!ret.info.getFileSystem().exists(new Path(ret.actualPath))) { - configuration.setAppend(false); - } else { + if (actualPathExists) { configuration.setAppend(true); - ret.info = new HdfsInfo(ret.suffixedPath, configuration); - ret.info.getFileSystem().rename(new Path(ret.actualPath), new Path(ret.suffixedPath)); + oStream.info = HdfsInfoFactory.newHdfsInfoWithoutAuth(oStream.suffixedPath, configuration); + oStream.info.getFileSystem().rename(actualPath, new Path(oStream.suffixedPath)); + } else { + configuration.setAppend(false); } - } else { - if (ret.info.getFileSystem().exists(new Path(ret.actualPath))) { - //only check if not directory - if (!ret.info.getFileSystem().isDirectory(new Path(ret.actualPath))) { - if (configuration.isOverwrite()) { - ret.info.getFileSystem().delete(new Path(ret.actualPath), true); - } else { - throw new RuntimeCamelException("The file already exists"); - } - } + } else if (actualPathExists && !oStream.info.getFileSystem().isDirectory(actualPath)) { // only check if not directory + if (configuration.isOverwrite()) { + oStream.info.getFileSystem().delete(actualPath, true); + } else { + throw new RuntimeCamelException("The file already exists"); } } - ret.out = ret.fileType.createOutputStream(ret.suffixedPath, configuration); - ret.opened = true; - return ret; + + oStream.out = oStream.fileType.createOutputStream(oStream.suffixedPath, configuration); + oStream.opened = true; + return oStream; } @Override diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java index 568ee71..048a120 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsProducer.java @@ -30,18 +30,14 @@ import org.apache.camel.Expression; import org.apache.camel.support.DefaultProducer; import org.apache.camel.util.IOHelper; import org.apache.camel.util.StringHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class HdfsProducer extends DefaultProducer { - private static final Logger LOG = LoggerFactory.getLogger(HdfsProducer.class); - private final HdfsConfiguration config; private final StringBuilder hdfsPath; private final AtomicBoolean idle = new AtomicBoolean(false); private volatile ScheduledExecutorService scheduler; - private volatile HdfsOutputStream ostream; + private volatile HdfsOutputStream oStream; public static final class SplitStrategy { private SplitStrategyType type; @@ -106,7 +102,7 @@ public class HdfsProducer extends DefaultProducer { // setup hdfs if configured to do on startup if (getEndpoint().getConfig().isConnectOnStartup()) { - ostream = setupHdfs(true); + oStream = setupHdfs(true); } Optional<SplitStrategy> idleStrategy = tryFindIdleStrategy(config.getSplitStrategies()); @@ -116,8 +112,8 @@ public class HdfsProducer extends DefaultProducer { scheduler.scheduleAtFixedRate(new IdleCheck(idleStrategy.get()), config.getCheckIdleInterval(), config.getCheckIdleInterval(), TimeUnit.MILLISECONDS); } } catch (Exception e) { - LOG.warn("Failed to start the HDFS producer. Caused by: [{}]", e.getMessage()); - LOG.debug("", e); + log.warn("Failed to start the HDFS producer. Caused by: [{}]", e.getMessage()); + log.debug("", e); throw new RuntimeException(e); } finally { HdfsComponent.setJAASConfiguration(auth); @@ -125,8 +121,8 @@ public class HdfsProducer extends DefaultProducer { } private synchronized HdfsOutputStream setupHdfs(boolean onStartup) throws IOException { - if (ostream != null) { - return ostream; + if (oStream != null) { + return oStream; } StringBuilder actualPath = new StringBuilder(hdfsPath); @@ -134,23 +130,21 @@ public class HdfsProducer extends DefaultProducer { actualPath = newFileName(); } + String hdfsFsDescription = config.getFileSystemLabel(actualPath.toString()); + // if we are starting up then log at info level, and if runtime then log at debug level to not flood the log if (onStartup) { - log.info("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", config.getHostName(), config.getPort(), actualPath); + log.info("Connecting to hdfs file-system {} (may take a while if connection is not available)", hdfsFsDescription); } else { - if (log.isDebugEnabled()) { - log.debug("Connecting to hdfs file-system {}:{}/{} (may take a while if connection is not available)", config.getHostName(), config.getPort(), actualPath); - } + log.debug("Connecting to hdfs file-system {} (may take a while if connection is not available)", hdfsFsDescription); } HdfsOutputStream answer = HdfsOutputStream.createOutputStream(actualPath.toString(), config); if (onStartup) { - log.info("Connected to hdfs file-system {}:{}/{}", config.getHostName(), config.getPort(), actualPath); + log.info("Connected to hdfs file-system {}", hdfsFsDescription); } else { - if (log.isDebugEnabled()) { - log.debug("Connected to hdfs file-system {}:{}/{}", config.getHostName(), config.getPort(), actualPath); - } + log.debug("Connected to hdfs file-system {}", hdfsFsDescription); } return answer; @@ -172,9 +166,9 @@ public class HdfsProducer extends DefaultProducer { getEndpoint().getCamelContext().getExecutorServiceManager().shutdown(scheduler); scheduler = null; } - if (ostream != null) { - IOHelper.close(ostream, "output stream", log); - ostream = null; + if (oStream != null) { + IOHelper.close(oStream, "output stream", log); + oStream = null; } } @@ -195,27 +189,27 @@ public class HdfsProducer extends DefaultProducer { // if an explicit filename is specified, close any existing stream and append the filename to the hdfsPath if (exchange.getIn().getHeader(Exchange.FILE_NAME) != null) { - if (ostream != null) { - IOHelper.close(ostream, "output stream", log); + if (oStream != null) { + IOHelper.close(oStream, "output stream", log); } StringBuilder actualPath = getHdfsPathUsingFileNameHeader(exchange); - ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config); - } else if (ostream == null) { - // must have ostream - ostream = setupHdfs(false); + oStream = HdfsOutputStream.createOutputStream(actualPath.toString(), config); + } else if (oStream == null) { + // must have oStream + oStream = setupHdfs(false); } if (isSplitRequired(config.getSplitStrategies())) { - if (ostream != null) { - IOHelper.close(ostream, "output stream", log); + if (oStream != null) { + IOHelper.close(oStream, "output stream", log); } StringBuilder actualPath = newFileName(); - ostream = HdfsOutputStream.createOutputStream(actualPath.toString(), config); + oStream = HdfsOutputStream.createOutputStream(actualPath.toString(), config); } - String path = ostream.getActualPath(); + String path = oStream.getActualPath(); log.trace("Writing body to hdfs-file {}", path); - ostream.append(key, body, exchange.getContext().getTypeConverter()); + oStream.append(key, body, exchange.getContext().getTypeConverter()); idle.set(false); @@ -231,8 +225,8 @@ public class HdfsProducer extends DefaultProducer { if (close) { try { HdfsProducer.this.log.trace("Closing stream"); - ostream.close(); - ostream = null; + oStream.close(); + oStream = null; } catch (IOException e) { // ignore } @@ -243,6 +237,7 @@ public class HdfsProducer extends DefaultProducer { /** * helper method to construct the hdfsPath from the CamelFileName String or Expression + * * @param exchange * @return */ @@ -253,7 +248,7 @@ public class HdfsProducer extends DefaultProducer { if (value instanceof String) { fileName = exchange.getContext().getTypeConverter().convertTo(String.class, exchange, value); } else if (value instanceof Expression) { - fileName = ((Expression) value).evaluate(exchange, String.class); + fileName = ((Expression) value).evaluate(exchange, String.class); } return actualPath.append(fileName); } @@ -261,7 +256,7 @@ public class HdfsProducer extends DefaultProducer { private boolean isSplitRequired(List<SplitStrategy> strategies) { boolean split = false; for (SplitStrategy splitStrategy : strategies) { - split |= splitStrategy.getType().split(ostream, splitStrategy.value, this); + split |= splitStrategy.getType().split(oStream, splitStrategy.value, this); } return split; } @@ -285,18 +280,18 @@ public class HdfsProducer extends DefaultProducer { @Override public void run() { - // only run if ostream has been created - if (ostream == null) { + // only run if oStream has been created + if (oStream == null) { return; } HdfsProducer.this.log.trace("IdleCheck running"); - if (System.currentTimeMillis() - ostream.getLastAccess() > strategy.value && !idle.get() && !ostream.isBusy().get()) { + if (System.currentTimeMillis() - oStream.getLastAccess() > strategy.value && !idle.get() && !oStream.isBusy().get()) { idle.set(true); try { HdfsProducer.this.log.trace("Closing stream as idle"); - ostream.close(); + oStream.close(); } catch (IOException e) { // ignore } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java index 0bfde2e..159183a 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsSequenceFileType.java @@ -71,7 +71,7 @@ class HdfsSequenceFileType extends DefaultHdfsFileType { HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); Class<?> keyWritableClass = configuration.getKeyType().getWritableClass(); Class<?> valueWritableClass = configuration.getValueType().getWritableClass(); - rout = SequenceFile.createWriter(hdfsInfo.getConf(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass), + rout = SequenceFile.createWriter(hdfsInfo.getConfiguration(), SequenceFile.Writer.file(hdfsInfo.getPath()), SequenceFile.Writer.keyClass(keyWritableClass), SequenceFile.Writer.valueClass(valueWritableClass), SequenceFile.Writer.bufferSize(configuration.getBufferSize()), SequenceFile.Writer.replication(configuration.getReplication()), SequenceFile.Writer.blockSize(configuration.getBlockSize()), SequenceFile.Writer.compression(configuration.getCompressionType(), configuration.getCompressionCodec().getCodec()), @@ -88,7 +88,7 @@ class HdfsSequenceFileType extends DefaultHdfsFileType { try { Closeable rin; HdfsInfo hdfsInfo = HdfsInfoFactory.newHdfsInfo(hdfsPath, configuration); - rin = new SequenceFile.Reader(hdfsInfo.getConf(), SequenceFile.Reader.file(hdfsInfo.getPath())); + rin = new SequenceFile.Reader(hdfsInfo.getConfiguration(), SequenceFile.Reader.file(hdfsInfo.getPath())); return rin; } catch (IOException ex) { throw new RuntimeCamelException(ex); diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilder.java b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilder.java index b5025cc..c753c46 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilder.java +++ b/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilder.java @@ -18,6 +18,7 @@ package org.apache.camel.component.hdfs.kerberos; import java.io.File; +import org.apache.camel.component.hdfs.HdfsConfiguration; import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,10 +37,10 @@ public final class KerberosConfigurationBuilder { /** * Add all the kerberos specific settings needed for this authentication mode * - * @param kerberosConfigFileLocation - The location of the kerberos config file (on the server) + * @param endpointConfig - configuration with the HA settings configured on the endpoint */ - public static void withKerberosConfiguration(Configuration configuration, String kerberosConfigFileLocation) { - setKerberosConfigFile(kerberosConfigFileLocation); + public static void withKerberosConfiguration(Configuration configuration, HdfsConfiguration endpointConfig) { + setKerberosConfigFile(endpointConfig.getKerberosConfigFileLocation()); configuration.set(AUTHENTICATION_MODE, "kerberos"); } diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java index a134bce..fdf8d99 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/FromFileToHdfsTest.java @@ -37,7 +37,7 @@ public class FromFileToHdfsTest extends HdfsTestSupport { @Override @Before public void setUp() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } deleteDirectory("target/inbox"); @@ -48,7 +48,7 @@ public class FromFileToHdfsTest extends HdfsTestSupport { @Override @After public void tearDown() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -61,7 +61,7 @@ public class FromFileToHdfsTest extends HdfsTestSupport { @Test public void testFileToHdfs() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -80,7 +80,7 @@ public class FromFileToHdfsTest extends HdfsTestSupport { @Test public void testTwoFilesToHdfs() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HaConfigurationBuilderTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HaConfigurationBuilderTest.java index 3cf20fc..91dffe7 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HaConfigurationBuilderTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HaConfigurationBuilderTest.java @@ -33,21 +33,48 @@ public class HaConfigurationBuilderTest { public void withClusterConfiguration() { // given Configuration configuration = new Configuration(); + String haClusterName = "haCluster"; List<String> namedNodes = Arrays.asList("kerb_node_01.example.com:8021", "kerb_node_02.example.com:8022"); int replicationFactor = 3; // when - HaConfigurationBuilder.withClusterConfiguration(configuration, namedNodes, replicationFactor); + HaConfigurationBuilder.withClusterConfiguration(configuration, haClusterName, namedNodes, replicationFactor); // then assertThat(configuration, notNullValue()); assertThat(configuration.get(DFSConfigKeys.DFS_REPLICATION_KEY), is("3")); - assertThat(configuration.get(DFSConfigKeys.DFS_NAMESERVICES), is("hfdsNamedService")); - assertThat(configuration.get("dfs.ha.namenodes.hfdsNamedService"), is("kerb_node_01_example_com,kerb_node_02_example_com")); - assertThat(configuration.get("dfs.namenode.rpc-address.hfdsNamedService.kerb_node_01_example_com"), is("kerb_node_01.example.com:8021")); - assertThat(configuration.get("dfs.namenode.rpc-address.hfdsNamedService.kerb_node_02_example_com"), is("kerb_node_02.example.com:8022")); - assertThat(configuration.get("dfs.client.failover.proxy.provider.hfdsNamedService"), is("org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")); - assertThat(configuration.get("fs.defaultFS"), is("hdfs://hfdsNamedService")); + assertThat(configuration.get(DFSConfigKeys.DFS_NAMESERVICES), is("haCluster")); + assertThat(configuration.get("dfs.ha.namenodes.haCluster"), is("kerb_node_01_example_com,kerb_node_02_example_com")); + assertThat(configuration.get("dfs.namenode.rpc-address.haCluster.kerb_node_01_example_com"), is("kerb_node_01.example.com:8021")); + assertThat(configuration.get("dfs.namenode.rpc-address.haCluster.kerb_node_02_example_com"), is("kerb_node_02.example.com:8022")); + assertThat(configuration.get("dfs.client.failover.proxy.provider.haCluster"), is("org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider")); + assertThat(configuration.get("fs.defaultFS"), is("hdfs://haCluster")); + } + + @Test + public void getSanitizedClusterNameWithNull() { + // given + String haClusterName = null; + + // when + String actual = HaConfigurationBuilder.getSanitizedClusterName(haClusterName); + + // then + assertThat(actual, notNullValue()); + assertThat(actual, is("hfdsNamedService")); + } + + @Test + public void getSanitizedClusterNameWithHostName() { + // given + String haClusterName = "this.is.a.cluster.host"; + + // when + String actual = HaConfigurationBuilder.getSanitizedClusterName(haClusterName); + + // then + assertThat(actual, notNullValue()); + assertThat(actual, is("this_is_a_cluster_host")); } } diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java index 0845fbe..f0ec9de 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsConsumerTest.java @@ -67,7 +67,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Override @Before public void setUp() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -84,7 +84,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Test public void testSimpleConsumer() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -113,11 +113,11 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Test public void testConcurrentConsumers() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } - final File rootdir = new File("."); + final File rootdir = CWD; final File dir = new File("target/test/multiple-consumers"); dir.mkdirs(); for (int i = 1; i <= ITERATIONS; i++) { @@ -157,7 +157,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Test public void testSimpleConsumerWithEmptyFile() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -186,7 +186,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Test public void testSimpleConsumerFileWithSizeEqualToNChunks() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -217,7 +217,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Test public void testSimpleConsumerWithEmptySequenceFile() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -242,7 +242,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Test public void testReadWithReadSuffix() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -287,7 +287,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Test public void testReadBoolean() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -315,7 +315,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Test public void testReadByte() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -346,7 +346,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Test public void testReadFloat() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -376,7 +376,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Test public void testReadDouble() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -406,7 +406,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Test public void testReadInt() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -436,7 +436,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Test public void testReadLong() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -466,7 +466,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Test public void testReadBytes() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -496,7 +496,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Test public void testReadString() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -526,7 +526,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Test public void testReadStringArrayFile() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -560,7 +560,7 @@ public class HdfsConsumerTest extends HdfsTestSupport { @Override @After public void tearDown() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } diff --git a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInfoTest.java similarity index 52% copy from components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java copy to components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInfoTest.java index 4862a70..eff5fa9 100644 --- a/components/camel-hdfs/src/main/java/org/apache/camel/component/hdfs/HdfsInfoFactory.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsInfoTest.java @@ -18,21 +18,29 @@ package org.apache.camel.component.hdfs; import java.io.IOException; -import javax.security.auth.login.Configuration; +import org.junit.Test; -public final class HdfsInfoFactory { +import static org.hamcrest.CoreMatchers.notNullValue; +import static org.junit.Assert.*; +import static org.mockito.Mockito.mock; - private HdfsInfoFactory() { - } +public class HdfsInfoTest { - public static HdfsInfo newHdfsInfo(String hdfsPath, HdfsConfiguration configuration) throws IOException { - // need to remember auth as Hadoop will override that, which otherwise means the Auth is broken afterwards - Configuration auth = HdfsComponent.getJAASConfiguration(); - try { - return new HdfsInfo(hdfsPath, configuration); - } finally { - HdfsComponent.setJAASConfiguration(auth); - } - } + private HdfsInfo underTest; + + @Test + public void createHdfsInfo() throws IOException { + // given + String hdfsPath = "hdfs://localhost/target/test/multiple-consumers"; + HdfsConfiguration endpointConfig = mock(HdfsConfiguration.class); -} + // when + underTest = HdfsInfoFactory.newHdfsInfoWithoutAuth(hdfsPath, endpointConfig); + + // then + assertThat(underTest, notNullValue()); + assertThat(underTest.getConfiguration(), notNullValue()); + assertThat(underTest.getFileSystem(), notNullValue()); + assertThat(underTest.getPath(), notNullValue()); + } +} \ No newline at end of file diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java index 0f71bdd..7e98793 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerConsumerTest.java @@ -34,7 +34,7 @@ public class HdfsProducerConsumerTest extends HdfsTestSupport { @Override @Before public void setUp() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } super.setUp(); @@ -47,7 +47,7 @@ public class HdfsProducerConsumerTest extends HdfsTestSupport { @Test public void testSimpleSplitWriteRead() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -81,7 +81,7 @@ public class HdfsProducerConsumerTest extends HdfsTestSupport { @Override @After public void tearDown() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java index b3c2e85..60976f5 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerSplitTest.java @@ -35,7 +35,7 @@ public class HdfsProducerSplitTest extends HdfsTestSupport { @Override @Before public void setUp() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } super.setUp(); @@ -53,7 +53,7 @@ public class HdfsProducerSplitTest extends HdfsTestSupport { @Test public void testSimpleWriteFileWithIdleSplit() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -86,7 +86,7 @@ public class HdfsProducerSplitTest extends HdfsTestSupport { } private void doTest(int routeNr) throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -108,7 +108,7 @@ public class HdfsProducerSplitTest extends HdfsTestSupport { @Override @After public void tearDown() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java index f4cdfc0..a97f29b 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsProducerTest.java @@ -53,7 +53,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Override @Before public void setUp() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } super.setUp(); @@ -61,7 +61,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Test public void testProducer() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } template.sendBody("direct:start1", "PAPPO"); @@ -79,7 +79,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Test public void testProducerClose() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } for (int i = 0; i < 10; ++i) { @@ -105,7 +105,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Test public void testWriteBoolean() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } Boolean aBoolean = true; @@ -125,7 +125,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Test public void testWriteByte() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } byte aByte = 8; @@ -145,7 +145,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Test public void testWriteInt() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } int anInt = 1234; @@ -165,7 +165,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Test public void testWriteFloat() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } float aFloat = 12.34f; @@ -185,7 +185,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Test public void testWriteDouble() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } Double aDouble = 12.34D; @@ -205,7 +205,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Test public void testWriteLong() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } long aLong = 1234567890; @@ -225,7 +225,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Test public void testWriteText() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } String txt = "CIAO MONDO !"; @@ -245,7 +245,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Test public void testWriteTextWithKey() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } String txtKey = "THEKEY"; @@ -266,7 +266,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Test public void testMapWriteTextWithKey() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } String txtKey = "THEKEY"; @@ -286,7 +286,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Test public void testArrayWriteText() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } String txtValue = "CIAO MONDO !"; @@ -305,7 +305,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Test public void testBloomMapWriteText() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } String txtKey = "THEKEY"; @@ -325,7 +325,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Test public void testWriteTextWithDynamicFilename() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -348,7 +348,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Test public void testWriteTextWithDynamicFilenameExpression() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } @@ -372,7 +372,7 @@ public class HdfsProducerTest extends HdfsTestSupport { @Override @After public void tearDown() throws Exception { - if (!canTest()) { + if (skipTest()) { return; } super.tearDown(); diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsTestSupport.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsTestSupport.java index 6399fc0..e406014 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsTestSupport.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/HdfsTestSupport.java @@ -16,25 +16,55 @@ */ package org.apache.camel.component.hdfs; +import java.io.File; +import java.util.Objects; + import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.util.Shell; public abstract class HdfsTestSupport extends CamelTestSupport { - public boolean canTest() { + public static final File CWD = new File("."); + + private static Boolean skipTests; + + public boolean skipTest() { + if (Objects.isNull(skipTests)) { + skipTests = notConfiguredToRunTests(); + } + + return skipTests; + } + + private boolean notConfiguredToRunTests() { + return isJavaFromIbm() || missingLocalHadoopConfiguration() || missingAuthenticationConfiguration(); + } + + private static boolean isJavaFromIbm() { // Hadoop doesn't run on IBM JDK - if (System.getProperty("java.vendor").contains("IBM")) { - return false; + return System.getProperty("java.vendor").contains("IBM"); + } + + private static boolean missingLocalHadoopConfiguration() { + boolean hasLocalHadoop; + try { + String hadoopHome = Shell.getHadoopHome(); + hasLocalHadoop = StringUtils.isNotEmpty(hadoopHome); + } catch (Throwable e) { + hasLocalHadoop = false; } + return !hasLocalHadoop; + } - // must be able to get security configuration + private boolean missingAuthenticationConfiguration() { try { javax.security.auth.login.Configuration.getConfiguration(); + return false; } catch (Exception e) { log.debug("Cannot run test due security exception", e); - return false; + return true; } - - return true; } } diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosAuthenticationTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosAuthenticationTest.java index ca94682..383a45d 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosAuthenticationTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosAuthenticationTest.java @@ -16,13 +16,14 @@ */ package org.apache.camel.component.hdfs.kerberos; -import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.junit.Test; +import static org.apache.camel.component.hdfs.HdfsTestSupport.CWD; + public class KerberosAuthenticationTest { private KerberosAuthentication underTest; @@ -33,7 +34,7 @@ public class KerberosAuthenticationTest { Configuration configuration = new Configuration(); String username = "test_user"; - String keyTabFileLocation = pwd() + "/src/test/resources/kerberos/test-keytab.bin"; + String keyTabFileLocation = CWD.getAbsolutePath() + "/src/test/resources/kerberos/test-keytab.bin"; underTest = new KerberosAuthentication(configuration, username, keyTabFileLocation); @@ -50,7 +51,7 @@ public class KerberosAuthenticationTest { Configuration configuration = new Configuration(); String username = "test_user"; - String keyTabFileLocation = pwd() + "/src/test/resources/kerberos/missing.bin"; + String keyTabFileLocation = CWD.getAbsolutePath() + "/src/test/resources/kerberos/missing.bin"; underTest = new KerberosAuthentication(configuration, username, keyTabFileLocation); @@ -61,8 +62,4 @@ public class KerberosAuthenticationTest { /* exception was thrown */ } - private String pwd() { - return new File(".").getAbsolutePath(); - } - } diff --git a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilderTest.java b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilderTest.java index aa00a82..af75002 100644 --- a/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilderTest.java +++ b/components/camel-hdfs/src/test/java/org/apache/camel/component/hdfs/kerberos/KerberosConfigurationBuilderTest.java @@ -16,13 +16,9 @@ */ package org.apache.camel.component.hdfs.kerberos; -import java.io.File; -import java.util.Arrays; -import java.util.List; - -import org.apache.hadoop.conf.Configuration; import org.junit.Test; +import static org.apache.camel.component.hdfs.HdfsTestSupport.CWD; import static org.junit.Assert.*; public class KerberosConfigurationBuilderTest { @@ -30,7 +26,7 @@ public class KerberosConfigurationBuilderTest { @Test public void withKerberosConfiguration() { // given - String kerberosConfigFileLocation = pwd() + "/src/test/resources/kerberos/test-kerb5.conf"; + String kerberosConfigFileLocation = CWD.getAbsolutePath() + "/src/test/resources/kerberos/test-kerb5.conf"; // when KerberosConfigurationBuilder.setKerberosConfigFile(kerberosConfigFileLocation); @@ -43,7 +39,7 @@ public class KerberosConfigurationBuilderTest { public void setKerberosConfigFileWithRealFile() { // given String kerb5FileName = "test-kerb5.conf"; - String kerberosConfigFileLocation = pwd() + "/src/test/resources/kerberos/" + kerb5FileName; + String kerberosConfigFileLocation = CWD.getAbsolutePath() + "/src/test/resources/kerberos/" + kerb5FileName; // when KerberosConfigurationBuilder.setKerberosConfigFile(kerberosConfigFileLocation); @@ -58,7 +54,7 @@ public class KerberosConfigurationBuilderTest { public void setKerberosConfigFileWithMissingFile() { // given String kerb5FileName = "missing-kerb5.conf"; - String kerberosConfigFileLocation = pwd() + "/src/test/resources/kerberos/" + kerb5FileName; + String kerberosConfigFileLocation = CWD.getAbsolutePath() + "/src/test/resources/kerberos/" + kerb5FileName; // when KerberosConfigurationBuilder.setKerberosConfigFile(kerberosConfigFileLocation); @@ -68,8 +64,4 @@ public class KerberosConfigurationBuilderTest { assertNull(actual); } - private String pwd() { - return new File(".").getAbsolutePath(); - } - }