# IGNITE-218: Review.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fce09646 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fce09646 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fce09646 Branch: refs/heads/ignite-218 Commit: fce0964622a408e4f62f6e858c625c5fbf058630 Parents: e529748 Author: vozerov-gridgain <voze...@gridgain.com> Authored: Fri Apr 17 12:38:32 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Fri Apr 17 12:38:32 2015 +0300 ---------------------------------------------------------------------- .../org/apache/ignite/igfs/IgfsUserContext.java | 9 +++++++-- .../igfs/common/IgfsPathControlRequest.java | 2 ++ .../internal/processors/igfs/IgfsImpl.java | 20 +++++++++----------- .../fs/IgniteHadoopFileSystemCounterWriter.java | 1 + .../fs/IgniteHadoopIgfsSecondaryFileSystem.java | 4 +++- .../ignite/hadoop/fs/LazyConcurrentMap.java | 8 ++++++-- .../internal/processors/hadoop/HadoopUtils.java | 1 + .../hadoop/igfs/HadoopIgfsWrapper.java | 1 + .../hadoop/taskexecutor/HadoopRunnableTask.java | 6 ++++-- .../hadoop/v2/HadoopV2JobResourceManager.java | 7 ++++--- .../processors/hadoop/HadoopStartup.java | 1 + 11 files changed, 39 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fce09646/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java index 2878ab4..926e84d 100644 --- a/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java +++ b/modules/core/src/main/java/org/apache/ignite/igfs/IgfsUserContext.java @@ -20,6 +20,7 @@ package org.apache.ignite.igfs; import org.apache.ignite.*; import org.apache.ignite.configuration.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import java.util.concurrent.*; @@ -46,15 +47,17 @@ public abstract class IgfsUserContext { * must return exactly the Exception thrown from the callable. */ public static <T> T doAs(String user, final Callable<T> cllbl) { + // TODO: Use A.ensure(); if (F.isEmpty(user)) throw new IllegalArgumentException("Failed to use null or empty user name."); + // TODO: Remove. user = user.intern(); final String ctxUser = userStackThreadLocal.get(); try { - //noinspection StringEquality + // TODO: Equals: F.eq if (ctxUser == user) return cllbl.call(); // correct context is already there @@ -66,7 +69,8 @@ public abstract class IgfsUserContext { finally { userStackThreadLocal.set(ctxUser); } - } catch (Exception e) { + } + catch (Exception e) { throw new IgniteException(e); } } @@ -89,6 +93,7 @@ public abstract class IgfsUserContext { * @param user a user name to be fixed. * @return non-null interned user name. */ + // TODO: Move to IgfsUtils. public static String fixUserName(@Nullable String user) { if (F.isEmpty(user)) user = FileSystemConfiguration.DFLT_USER_NAME; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fce09646/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java index 55495d9..cfc8f16 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/igfs/common/IgfsPathControlRequest.java @@ -63,6 +63,7 @@ public class IgfsPathControlRequest extends IgfsMessage { /** Last modification time. */ private long modificationTime; + // TODO: COmments. private String userName; /** @@ -238,6 +239,7 @@ public class IgfsPathControlRequest extends IgfsMessage { return S.toString(IgfsPathControlRequest.class, this, "cmd", command()); } + // TODO: COmments. public final String userName() { assert userName != null; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fce09646/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java index a0f1c3f..34636d2 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsImpl.java @@ -56,9 +56,8 @@ import static org.apache.ignite.internal.processors.igfs.IgfsFileInfo.*; /** * Cache-based IGFS implementation. - * This is a singleton: only 1 IgfsImpl exists in Ignite node. */ -public class IgfsImpl implements IgfsEx { +public final class IgfsImpl implements IgfsEx { /** Default permissions for file system entry. */ private static final String PERMISSION_DFLT_VAL = "0777"; @@ -66,7 +65,7 @@ public class IgfsImpl implements IgfsEx { private static final Map<String, String> DFLT_DIR_META = F.asMap(PROP_PERMISSION, PERMISSION_DFLT_VAL); /** Handshake message. */ - private IgfsPaths secondaryPaths; + private final IgfsPaths secondaryPaths; /** Cache based structure (meta data) manager. */ private IgfsMetaManager meta; @@ -75,7 +74,7 @@ public class IgfsImpl implements IgfsEx { private IgfsDataManager data; /** FS configuration. */ - private final FileSystemConfiguration cfg; + private FileSystemConfiguration cfg; /** IGFS context. */ private IgfsContext igfsCtx; @@ -90,7 +89,7 @@ public class IgfsImpl implements IgfsEx { private IgniteLogger log; /** Mode resolver. */ - private IgfsModeResolver modeRslvr; + private final IgfsModeResolver modeRslvr; /** Connection to the secondary file system. */ private IgfsSecondaryFileSystem secondaryFs; @@ -123,7 +122,7 @@ public class IgfsImpl implements IgfsEx { private IgfsPerBlockLruEvictionPolicy evictPlc; /** Pool for threads working in DUAL mode. */ - private IgniteThreadPoolExecutor dualPool; + private final IgniteThreadPoolExecutor dualPool; /** * Creates IGFS instance with given context. @@ -142,11 +141,10 @@ public class IgfsImpl implements IgfsEx { evts = igfsCtx.kernalContext().event(); meta = igfsCtx.meta(); data = igfsCtx.data(); - secondaryFs = cfg.getSecondaryFileSystem(); /* Default IGFS mode. */ - final IgfsMode dfltMode; + IgfsMode dfltMode; if (secondaryFs == null) { if (cfg.getDefaultMode() == PROXY) @@ -202,9 +200,6 @@ public class IgfsImpl implements IgfsEx { secondaryPaths = new IgfsPaths(secondaryFs == null ? null : secondaryFs.properties(), dfltMode, modeRslvr.modesOrdered()); - dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE, 5000L, - new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null; - // Check whether IGFS LRU eviction policy is set on data cache. String dataCacheName = igfsCtx.configuration().getDataCacheName(); @@ -223,6 +218,9 @@ public class IgfsImpl implements IgfsEx { igfsCtx.kernalContext().io().addMessageListener(topic, delMsgLsnr); igfsCtx.kernalContext().event().addLocalEventListener(delDiscoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); + + dualPool = secondaryFs != null ? new IgniteThreadPoolExecutor(4, Integer.MAX_VALUE, 5000L, + new LinkedBlockingQueue<Runnable>(), new IgfsThreadFactory(cfg.getName()), null) : null; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fce09646/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java index a0927e2..3b8c28e 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopFileSystemCounterWriter.java @@ -79,6 +79,7 @@ public class IgniteHadoopFileSystemCounterWriter implements HadoopCounterWriter try (FileSystem fs = HadoopV2JobResourceManager.fileSystemForMrUser(jobStatPath.toUri(), hadoopCfg)) { fs.mkdirs(jobStatPath); + // TODO: OUt-of-bound try (PrintStream out = new PrintStream(fs.create(new Path(jobStatPath, PERFORMANCE_COUNTER_FILE_NAME)))) { for (T2<String, Long> evt : perfCntr.evts()) { out.print(evt.get1()); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fce09646/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java index 64b54e0..a6db645 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopIgfsSecondaryFileSystem.java @@ -27,6 +27,7 @@ import org.apache.ignite.internal.processors.hadoop.*; import org.apache.ignite.internal.processors.hadoop.igfs.*; import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; import org.jetbrains.annotations.*; import org.apache.ignite.hadoop.fs.LazyConcurrentMap.*; @@ -57,7 +58,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys private final String dfltUserName; /** Lazy per-user cache for the file systems. It is cleared and nulled in #close() method. */ - private volatile LazyConcurrentMap<String, FileSystem> fileSysLazyMap = new LazyConcurrentMap<>( + private final LazyConcurrentMap<String, FileSystem> fileSysLazyMap = new LazyConcurrentMap<>( new ValueFactory<String, FileSystem>() { @Override public FileSystem createValue(String key) { try { @@ -350,6 +351,7 @@ public class IgniteHadoopIgfsSecondaryFileSystem implements IgfsSecondaryFileSys new HadoopIgfsProperties(props != null ? props : Collections.<String, String>emptyMap()); try { + // TODO: Out of bounds. return fileSysForUser().create(convert(path), props0.permission(), overwrite, bufSize, (short)replication, blockSize, null); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fce09646/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java index 444ff48..4c592af 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/LazyConcurrentMap.java @@ -28,6 +28,8 @@ import java.util.concurrent.*; * Maps values by keys. * Values are created lazily using {@link ValueFactory}. */ +// TODO: Remove from public. +// TODO: Consistent naming (Hadoop prefix if in Hadoop module). public class LazyConcurrentMap<K, V> { /** The map storing the actual values. */ private final ConcurrentMap<K, ValueWrapper> map = new ConcurrentHashMap8<>(); @@ -87,7 +89,8 @@ public class LazyConcurrentMap<K, V> { try { return w.getValue(); - } catch (InterruptedException ie) { + } + catch (InterruptedException ie) { throw new IgniteException(ie); } } @@ -121,7 +124,7 @@ public class LazyConcurrentMap<K, V> { private final K key; /** the value */ - private volatile V v; + private V v; /** * Creates new wrapper. @@ -150,6 +153,7 @@ public class LazyConcurrentMap<K, V> { * @throws InterruptedException */ @Nullable V getValue() throws InterruptedException { + // TODO: Use U.await(vlueCrtLatch) instead. vlueCrtLatch.await(); return v; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fce09646/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 75c8a49..04c5ec2 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -126,6 +126,7 @@ public class HadoopUtils { break; case PHASE_REDUCE: + // TODO: Create ticket: why PHASE_REDUCE could have 0 reducers. setupProgress = 1; mapProgress = 1; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fce09646/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java index eaf7392..b650318 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/igfs/HadoopIgfsWrapper.java @@ -66,6 +66,7 @@ public class HadoopIgfsWrapper implements HadoopIgfs { * @param conf Configuration. * @param log Current logger. */ + // TODO: Out of bounds. public HadoopIgfsWrapper(String authority, String logDir, Configuration conf, Log log, String user) throws IOException { try { this.authority = authority; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fce09646/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java index fe350b2..b47bedd 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java @@ -115,7 +115,8 @@ public abstract class HadoopRunnableTask implements Callable<Void> { UserGroupInformation currUser = UserGroupInformation.getCurrentUser(); ugiUser = currUser.getShortUserName(); - } catch (IOException ioe) { + } + catch (IOException ioe) { throw new IgniteCheckedException(ioe); } @@ -131,7 +132,8 @@ public abstract class HadoopRunnableTask implements Callable<Void> { Configuration conf = ((HadoopV2Job)job).jobConf(); ticketCachePath = conf.get(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); - } else + } + else ticketCachePath = job.info().property(CommonConfigurationKeys.KERBEROS_TICKET_CACHE_PATH); UserGroupInformation ugi = UserGroupInformation.getBestUGI(ticketCachePath, user); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fce09646/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java index 0b2f7df..f75425e 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/v2/HadoopV2JobResourceManager.java @@ -105,8 +105,6 @@ public class HadoopV2JobResourceManager { if (user == null) user = IgniteHadoopFileSystem.getFsHadoopUser(cfg); - user = user.intern(); - return user; } @@ -128,9 +126,11 @@ public class HadoopV2JobResourceManager { uri = FileSystem.getDefaultUri(cfg); final FileSystem fs; + try { fs = FileSystem.get(uri, cfg, user); - } catch (InterruptedException ie) { + } + catch (InterruptedException ie) { throw new IOException(ie); } @@ -163,6 +163,7 @@ public class HadoopV2JobResourceManager { stagingDir = new Path(new URI(mrDir)); if (download) { + // TODO: Out of bounds. try (FileSystem fs = fileSystemForMrUser(stagingDir.toUri(), cfg)) { if (!fs.exists(stagingDir)) throw new IgniteCheckedException("Failed to find map-reduce submission directory (does not exist): " + http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fce09646/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java index e545ca9..1d398b5 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopStartup.java @@ -39,6 +39,7 @@ public class HadoopStartup { public static Configuration configuration() { Configuration cfg = new Configuration(); + // TODO: Remove. cfg.set("fs.defaultFS", "igfs://igfs@localhost:10500"); cfg.set("fs.igfs.impl", org.apache.ignite.hadoop.fs.v1.IgniteHadoopFileSystem.class.getName());