Repository: incubator-ignite Updated Branches: refs/heads/ignite-980 f27987549 -> 484de67c1
# some refactoring + comments. Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/484de67c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/484de67c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/484de67c Branch: refs/heads/ignite-980 Commit: 484de67c11bce70f61db49f737d7aa0e981fae06 Parents: f279875 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Tue Jun 23 10:24:38 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Tue Jun 23 10:24:38 2015 +0300 ---------------------------------------------------------------------- .../hadoop/fs/HadoopFileSystemCache.java | 242 ------------------- .../hadoop/fs/HadoopFileSystemCacheUtil.java | 241 ++++++++++++++++++ .../processors/hadoop/v2/HadoopV2Job.java | 6 +- .../hadoop/v2/HadoopV2JobResourceManager.java | 23 +- .../hadoop/v2/HadoopV2TaskContext.java | 13 +- 5 files changed, 258 insertions(+), 267 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/484de67c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java deleted file mode 100644 index 96b32db..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCache.java +++ /dev/null @@ -1,242 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.fs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.mapreduce.*; -import org.apache.ignite.*; -import org.apache.ignite.hadoop.fs.v1.*; -import org.apache.ignite.internal.util.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.net.*; - -/** - * Static caches of file systems used by Map-Reduce tasks and jobs. - * This class - */ -public class HadoopFileSystemCache { - /** - * Creates HadoopLazyConcurrentMap. - * @return a new HadoopLazyConcurrentMap. - */ - public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() { - return new HadoopLazyConcurrentMap<>( - new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { - @Override public FileSystem createValue(FsCacheKey key) { - try { - assert key != null; - - // Explicitly disable FileSystem caching: - URI uri = key.uri(); - - String scheme = uri.getScheme(); - - // Copy the configuration to avoid altering the external object. - Configuration cfg = new Configuration(key.configuration()); - - String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme); - - cfg.setBoolean(prop, true); - - return FileSystem.get(uri, cfg, key.user()); - } - catch (IOException | InterruptedException ioe) { - throw new IgniteException(ioe); - } - } - } - ); - } - - /** - * Gets non-null user name as per the Hadoop viewpoint. - * @param cfg the Hadoop job configuration, may be null. - * @return the user name, never null. - */ - private static String getMrHadoopUser(Configuration cfg) throws IOException { - String user = cfg.get(MRJobConfig.USER_NAME); - - if (user == null) - user = IgniteHadoopFileSystem.getFsHadoopUser(); - - return user; - } - - /** - * Common method to get the V1 file system in MapRed engine. - * It gets the filesystem for the user specified in the - * configuration with {@link MRJobConfig#USER_NAME} property. - * The file systems are created and cached in the given map upon first request. - * - * @param uri The file system uri. - * @param cfg The configuration. - * @param map The caching map. - * @return The file system. - * @throws IOException On error. - */ - public static FileSystem fileSystemForMrUser(@Nullable URI uri, Configuration cfg, - HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map) - throws IOException { - assert map != null; - assert cfg != null; - - final String usr = getMrHadoopUser(cfg); - - assert usr != null; - - if (uri == null) - uri = FileSystem.getDefaultUri(cfg); - - final FileSystem fs; - - try { - final FsCacheKey key = new FsCacheKey(uri, usr, cfg); - - fs = map.getOrCreate(key); - } - catch (IgniteException ie) { - throw new IOException(ie); - } - - assert fs != null; - assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user()); - - return fs; - } - - /** - * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3). - * @param uri0 The uri. - * @param cfg The cfg. - * @return Correct URI. - */ - private static URI fixUri(URI uri0, Configuration cfg) { - if (uri0 == null) - return FileSystem.getDefaultUri(cfg); - - String scheme = uri0.getScheme(); - String authority = uri0.getAuthority(); - - if (authority == null) { - URI dfltUri = FileSystem.getDefaultUri(cfg); - - if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null)) - return dfltUri; - } - - return uri0; - } - - /** - * Note that configuration is not a part of the key. - * It is used solely to initialize the first instance - * that is created for the key. - */ - public static final class FsCacheKey { - /** */ - private final URI uri; - - /** */ - private final String usr; - - /** */ - private final String equalityKey; - - /** */ - private final Configuration cfg; - - /** - * Constructor - */ - public FsCacheKey(URI uri, String usr, Configuration cfg) { - assert uri != null; - assert usr != null; - assert cfg != null; - - this.uri = fixUri(uri, cfg); - this.usr = usr; - this.cfg = cfg; - - this.equalityKey = createEqualityKey(); - } - - /** - * Creates String key used for equality and hashing. - */ - private String createEqualityKey() { - GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@"); - - if (uri.getScheme() != null) - sb.a(uri.getScheme().toLowerCase()); - - sb.a("://"); - - if (uri.getAuthority() != null) - sb.a(uri.getAuthority().toLowerCase()); - - return sb.toString(); - } - - /** - * The URI. - */ - public URI uri() { - return uri; - } - - /** - * The User. - */ - public String user() { - return usr; - } - - /** - * The Configuration. - */ - public Configuration configuration() { - return cfg; - } - - /** {@inheritDoc} */ - @SuppressWarnings("SimplifiableIfStatement") - @Override public boolean equals(Object obj) { - if (obj == this) - return true; - - if (obj == null || getClass() != obj.getClass()) - return false; - - return equalityKey.equals(((FsCacheKey)obj).equalityKey); - } - - /** {@inheritDoc} */ - @Override public int hashCode() { - return equalityKey.hashCode(); - } - - /** {@inheritDoc} */ - @Override public String toString() { - return equalityKey; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/484de67c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java new file mode 100644 index 0000000..397b13e --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/fs/HadoopFileSystemCacheUtil.java @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.fs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.mapreduce.*; +import org.apache.ignite.*; +import org.apache.ignite.hadoop.fs.v1.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.typedef.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.net.*; + +/** + * File system cache utility methods used by Map-Reduce tasks and jobs. + */ +public class HadoopFileSystemCacheUtil { + /** + * A common static factory method. Creates new HadoopLazyConcurrentMap. + * @return a new HadoopLazyConcurrentMap. + */ + public static HadoopLazyConcurrentMap<FsCacheKey, FileSystem> createHadoopLazyConcurrentMap() { + return new HadoopLazyConcurrentMap<>( + new HadoopLazyConcurrentMap.ValueFactory<FsCacheKey, FileSystem>() { + @Override public FileSystem createValue(FsCacheKey key) { + try { + assert key != null; + + // Explicitly disable FileSystem caching: + URI uri = key.uri(); + + String scheme = uri.getScheme(); + + // Copy the configuration to avoid altering the external object. + Configuration cfg = new Configuration(key.configuration()); + + String prop = HadoopFileSystemsUtils.disableFsCachePropertyName(scheme); + + cfg.setBoolean(prop, true); + + return FileSystem.get(uri, cfg, key.user()); + } + catch (IOException | InterruptedException ioe) { + throw new IgniteException(ioe); + } + } + } + ); + } + + /** + * Gets non-null user name as per the Hadoop viewpoint. + * @param cfg the Hadoop job configuration, may be null. + * @return the user name, never null. + */ + private static String getMrHadoopUser(Configuration cfg) throws IOException { + String user = cfg.get(MRJobConfig.USER_NAME); + + if (user == null) + user = IgniteHadoopFileSystem.getFsHadoopUser(); + + return user; + } + + /** + * Common method to get the V1 file system in MapRed engine. + * It gets the filesystem for the user specified in the + * configuration with {@link MRJobConfig#USER_NAME} property. + * The file systems are created and cached in the given map upon first request. + * + * @param uri The file system uri. + * @param cfg The configuration. + * @param map The caching map. + * @return The file system. + * @throws IOException On error. + */ + public static FileSystem fileSystemForMrUserWithCaching(@Nullable URI uri, Configuration cfg, + HadoopLazyConcurrentMap<FsCacheKey, FileSystem> map) + throws IOException { + assert map != null; + assert cfg != null; + + final String usr = getMrHadoopUser(cfg); + + assert usr != null; + + if (uri == null) + uri = FileSystem.getDefaultUri(cfg); + + final FileSystem fs; + + try { + final FsCacheKey key = new FsCacheKey(uri, usr, cfg); + + fs = map.getOrCreate(key); + } + catch (IgniteException ie) { + throw new IOException(ie); + } + + assert fs != null; + assert !(fs instanceof IgniteHadoopFileSystem) || F.eq(usr, ((IgniteHadoopFileSystem)fs).user()); + + return fs; + } + + /** + * Takes Fs URI using logic similar to that used in FileSystem#get(1,2,3). + * @param uri0 The uri. + * @param cfg The cfg. + * @return Correct URI. + */ + private static URI fixUri(URI uri0, Configuration cfg) { + if (uri0 == null) + return FileSystem.getDefaultUri(cfg); + + String scheme = uri0.getScheme(); + String authority = uri0.getAuthority(); + + if (authority == null) { + URI dfltUri = FileSystem.getDefaultUri(cfg); + + if (scheme == null || (scheme.equals(dfltUri.getScheme()) && dfltUri.getAuthority() != null)) + return dfltUri; + } + + return uri0; + } + + /** + * Note that configuration is not a part of the key. + * It is used solely to initialize the first instance + * that is created for the key. + */ + public static final class FsCacheKey { + /** */ + private final URI uri; + + /** */ + private final String usr; + + /** */ + private final String equalityKey; + + /** */ + private final Configuration cfg; + + /** + * Constructor + */ + public FsCacheKey(URI uri, String usr, Configuration cfg) { + assert uri != null; + assert usr != null; + assert cfg != null; + + this.uri = fixUri(uri, cfg); + this.usr = usr; + this.cfg = cfg; + + this.equalityKey = createEqualityKey(); + } + + /** + * Creates String key used for equality and hashing. + */ + private String createEqualityKey() { + GridStringBuilder sb = new GridStringBuilder("(").a(usr).a(")@"); + + if (uri.getScheme() != null) + sb.a(uri.getScheme().toLowerCase()); + + sb.a("://"); + + if (uri.getAuthority() != null) + sb.a(uri.getAuthority().toLowerCase()); + + return sb.toString(); + } + + /** + * The URI. + */ + public URI uri() { + return uri; + } + + /** + * The User. + */ + public String user() { + return usr; + } + + /** + * The Configuration. + */ + public Configuration configuration() { + return cfg; + } + + /** {@inheritDoc} */ + @SuppressWarnings("SimplifiableIfStatement") + @Override public boolean equals(Object obj) { + if (obj == this) + return true; + + if (obj == null || getClass() != obj.getClass()) + return false; + + return equalityKey.equals(((FsCacheKey)obj).equalityKey); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return equalityKey.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return equalityKey; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/484de67c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java index 319640d..d1765a8 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2Job.java @@ -43,7 +43,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCache.*; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtil.*; /** * Hadoop job implementation for v2 API. @@ -84,7 +84,7 @@ public class HadoopV2Job implements HadoopJob { private volatile byte[] jobConfData; /** File system cache map. */ - private final HadoopLazyConcurrentMap<HadoopFileSystemCache.FsCacheKey, FileSystem> fsMap + private final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap = createHadoopLazyConcurrentMap(); /** Disposal guard. */ @@ -399,6 +399,6 @@ public class HadoopV2Job implements HadoopJob { * @throws IOException On error. */ public FileSystem fileSystem(@Nullable URI uri, Configuration cfg) throws IOException { - return HadoopFileSystemCache.fileSystemForMrUser(uri, cfg, fsMap); + return fileSystemForMrUserWithCaching(uri, cfg, fsMap); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/484de67c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index 55a31c6..97ad179 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -61,8 +61,8 @@ class HadoopV2JobResourceManager { /** Staging directory to delivery job jar and config to the work nodes. */ private Path stagingDir; - /** TODO */ - private final HadoopV2Job fsProvider; + /** The job. */ + private final HadoopV2Job job; /** * Creates new instance. @@ -70,11 +70,11 @@ class HadoopV2JobResourceManager { * @param ctx Hadoop job context. * @param log Logger. */ - public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log, HadoopV2Job fsProvider) { + public HadoopV2JobResourceManager(HadoopJobId jobId, JobContextImpl ctx, IgniteLogger log, HadoopV2Job job) { this.jobId = jobId; this.ctx = ctx; this.log = log.getLogger(HadoopV2JobResourceManager.class); - this.fsProvider = fsProvider; + this.job = job; } /** @@ -119,10 +119,7 @@ class HadoopV2JobResourceManager { stagingDir = new Path(new URI(mrDir)); if (download) { -// assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() -// .equals(HadoopClassLoader.nameForJob(locNodeId)); - - FileSystem fs = fsProvider.fileSystem(stagingDir.toUri(), cfg); + FileSystem fs = job.fileSystem(stagingDir.toUri(), cfg); if (!fs.exists(stagingDir)) throw new IgniteCheckedException("Failed to find map-reduce submission " + @@ -217,10 +214,7 @@ class HadoopV2JobResourceManager { FileSystem dstFs = FileSystem.getLocal(cfg); -// assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() -// .equals(HadoopClassLoader.nameForJob(locNodeId)); - - FileSystem srcFs = fsProvider.fileSystem(srcPath.toUri(), cfg); + FileSystem srcFs = job.fileSystem(srcPath.toUri(), cfg); if (extract) { File archivesPath = new File(jobLocDir.getAbsolutePath(), ".cached-archives"); @@ -303,10 +297,7 @@ class HadoopV2JobResourceManager { public void cleanupStagingDirectory() { try { if (stagingDir != null) { -// assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() -// .equals(HadoopClassLoader.nameForJob(locNodeId)); - - FileSystem fs = fsProvider.fileSystem(stagingDir.toUri(), ctx.getJobConf()); + FileSystem fs = job.fileSystem(stagingDir.toUri(), ctx.getJobConf()); fs.delete(stagingDir, true); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/484de67c/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java index f007038..90b0e43 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2TaskContext.java @@ -45,7 +45,7 @@ import java.security.*; import java.util.*; import java.util.concurrent.*; -import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCache.*; +import static org.apache.ignite.internal.processors.hadoop.fs.HadoopFileSystemCacheUtil.*; import static org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*; import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; @@ -56,13 +56,15 @@ public class HadoopV2TaskContext extends HadoopTaskContext { /** */ private static final boolean COMBINE_KEY_GROUPING_SUPPORTED; - /** Lazy per-user file system cache used by Hadoop tasks. */ - private static final HadoopLazyConcurrentMap<HadoopFileSystemCache.FsCacheKey, FileSystem> fsMap + /** Lazy per-user file system cache used by the Hadoop task. */ + private static final HadoopLazyConcurrentMap<FsCacheKey, FileSystem> fsMap = createHadoopLazyConcurrentMap(); /** * This method is called with reflection upon Job finish with class loader of each task. * This will clean up all the Fs created for specific task. + * Each class loader sees uses its own instance of <code>fsMap<code/> since the class loaders + * are different. * * @throws IgniteCheckedException On error. */ @@ -444,12 +446,11 @@ public class HadoopV2TaskContext extends HadoopTaskContext { try { // This assertion uses .startsWith() instead of .equals() because task class loaders may // be reused between tasks of the same job. - assert ((HadoopClassLoader)HadoopFileSystemCache.class.getClassLoader()).name() + assert ((HadoopClassLoader)getClass().getClassLoader()).name() .startsWith(HadoopClassLoader.nameForTask(taskInfo(), true)); - // Task class loader. // We also cache Fs there, all them will be cleared explicitly upon the Job end. - fs = HadoopFileSystemCache.fileSystemForMrUser(jobDir.toUri(), jobConf(), fsMap); + fs = fileSystemForMrUserWithCaching(jobDir.toUri(), jobConf(), fsMap); } catch (IOException e) { throw new IgniteCheckedException(e);