# IGNITE-386: WIP on internal namings (4).

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/17c8d0d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/17c8d0d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/17c8d0d9

Branch: refs/heads/ignite-386
Commit: 17c8d0d90fe092a7454084126b7054f9078e8933
Parents: 1c4b00d
Author: vozerov-gridgain <voze...@gridgain.com>
Authored: Tue Mar 3 16:05:27 2015 +0300
Committer: vozerov-gridgain <voze...@gridgain.com>
Committed: Tue Mar 3 16:05:27 2015 +0300

----------------------------------------------------------------------
 .../fs/IgniteHadoopSecondaryFileSystem.java     |   10 +-
 .../hadoop/fs/v1/IgniteHadoopFileSystem.java    |   24 +-
 .../hadoop/fs/v2/IgniteHadoopFileSystem.java    |   20 +-
 .../ignite/internal/igfs/hadoop/HadoopIgfs.java |  198 ++
 .../HadoopIgfsCommunicationException.java       |   57 +
 .../igfs/hadoop/HadoopIgfsEndpoint.java         |  210 ++
 .../internal/igfs/hadoop/HadoopIgfsEx.java      |   88 +
 .../internal/igfs/hadoop/HadoopIgfsFuture.java  |   94 +
 .../internal/igfs/hadoop/HadoopIgfsInProc.java  |  409 ++++
 .../internal/igfs/hadoop/HadoopIgfsIo.java      |   76 +
 .../igfs/hadoop/HadoopIgfsIpcIoListener.java    |   36 +
 .../igfs/hadoop/HadoopIgfsJclLogger.java        |  112 +
 .../igfs/hadoop/HadoopIgfsProperties.java       |   88 +
 .../igfs/hadoop/HadoopIgfsProxyInputStream.java |  335 +++
 .../hadoop/HadoopIgfsProxyOutputStream.java     |  165 ++
 .../internal/igfs/hadoop/HadoopIgfsReader.java  |  104 +
 .../igfs/hadoop/HadoopIgfsStreamDelegate.java   |   96 +
 .../hadoop/HadoopIgfsStreamEventListener.java   |   39 +
 .../internal/igfs/hadoop/HadoopIgfsUtils.java   |  131 ++
 .../internal/igfs/hadoop/HadoopIgfsWrapper.java |  511 +++++
 .../igfs/hadoop/HadoopInputIgfsStream.java      |  626 ++++++
 .../internal/igfs/hadoop/HadoopIpcIgfsIo.java   |  599 ++++++
 .../internal/igfs/hadoop/HadoopOutProcIgfs.java |  466 +++++
 .../igfs/hadoop/HadoopOutputIgfsStream.java     |  201 ++
 .../ignite/internal/igfs/hadoop/IgfsHadoop.java |  198 --
 .../IgfsHadoopCommunicationException.java       |   57 -
 .../igfs/hadoop/IgfsHadoopEndpoint.java         |  210 --
 .../internal/igfs/hadoop/IgfsHadoopEx.java      |   88 -
 .../igfs/hadoop/IgfsHadoopFSProperties.java     |   88 -
 .../internal/igfs/hadoop/IgfsHadoopFuture.java  |   94 -
 .../internal/igfs/hadoop/IgfsHadoopInProc.java  |  409 ----
 .../igfs/hadoop/IgfsHadoopInputStream.java      |  626 ------
 .../internal/igfs/hadoop/IgfsHadoopIo.java      |   76 -
 .../internal/igfs/hadoop/IgfsHadoopIpcIo.java   |  599 ------
 .../igfs/hadoop/IgfsHadoopIpcIoListener.java    |   36 -
 .../igfs/hadoop/IgfsHadoopJclLogger.java        |  112 -
 .../internal/igfs/hadoop/IgfsHadoopOutProc.java |  466 -----
 .../igfs/hadoop/IgfsHadoopOutputStream.java     |  201 --
 .../igfs/hadoop/IgfsHadoopProxyInputStream.java |  335 ---
 .../hadoop/IgfsHadoopProxyOutputStream.java     |  165 --
 .../internal/igfs/hadoop/IgfsHadoopReader.java  |  104 -
 .../igfs/hadoop/IgfsHadoopStreamDelegate.java   |   96 -
 .../hadoop/IgfsHadoopStreamEventListener.java   |   39 -
 .../internal/igfs/hadoop/IgfsHadoopUtils.java   |  131 --
 .../internal/igfs/hadoop/IgfsHadoopWrapper.java |  511 -----
 .../planner/HadoopDefaultMapReducePlanner.java  |    2 +-
 .../HadoopIgfs20FileSystemAbstractSelfTest.java | 1967 ++++++++++++++++++
 ...Igfs20FileSystemLoopbackPrimarySelfTest.java |   74 +
 ...oopIgfs20FileSystemShmemPrimarySelfTest.java |   74 +
 .../igfs/HadoopIgfsDualAbstractSelfTest.java    |  304 +++
 .../igfs/HadoopIgfsDualAsyncSelfTest.java       |   32 +
 .../ignite/igfs/HadoopIgfsDualSyncSelfTest.java |   32 +
 .../IgfsHadoop20FileSystemAbstractSelfTest.java | 1967 ------------------
 ...doop20FileSystemLoopbackPrimarySelfTest.java |   74 -
 ...sHadoop20FileSystemShmemPrimarySelfTest.java |   74 -
 .../igfs/IgfsHadoopDualAbstractSelfTest.java    |  304 ---
 .../igfs/IgfsHadoopDualAsyncSelfTest.java       |   32 -
 .../ignite/igfs/IgfsHadoopDualSyncSelfTest.java |   32 -
 .../IgniteHadoopFileSystemAbstractSelfTest.java |   18 +-
 .../IgniteHadoopFileSystemClientSelfTest.java   |    6 +-
 ...IgniteHadoopFileSystemHandshakeSelfTest.java |    2 +-
 .../IgniteHadoopFileSystemIpcCacheSelfTest.java |   12 +-
 .../testsuites/IgniteHadoopTestSuite.java       |    6 +-
 .../IgniteIgfsLinuxAndMacOSTestSuite.java       |    2 +-
 64 files changed, 7175 insertions(+), 7175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java
index 5f06a65..9547e9f 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/IgniteHadoopSecondaryFileSystem.java
@@ -157,7 +157,7 @@ public class IgniteHadoopSecondaryFileSystem implements 
Igfs, AutoCloseable {
 
     /** {@inheritDoc} */
     @Nullable @Override public IgfsFile update(IgfsPath path, Map<String, 
String> props) {
-        IgfsHadoopFSProperties props0 = new IgfsHadoopFSProperties(props);
+        HadoopIgfsProperties props0 = new HadoopIgfsProperties(props);
 
         try {
             if (props0.userName() != null || props0.groupName() != null)
@@ -211,7 +211,7 @@ public class IgniteHadoopSecondaryFileSystem implements 
Igfs, AutoCloseable {
     /** {@inheritDoc} */
     @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> 
props) {
         try {
-            if (!fileSys.mkdirs(convert(path), new 
IgfsHadoopFSProperties(props).permission()))
+            if (!fileSys.mkdirs(convert(path), new 
HadoopIgfsProperties(props).permission()))
                 throw new IgniteException("Failed to make directories [path=" 
+ path + ", props=" + props + "]");
         }
         catch (IOException e) {
@@ -272,7 +272,7 @@ public class IgniteHadoopSecondaryFileSystem implements 
Igfs, AutoCloseable {
 
     /** {@inheritDoc} */
     @Override public IgfsReader open(IgfsPath path, int bufSize) {
-        return new IgfsHadoopReader(fileSys, convert(path), bufSize);
+        return new HadoopIgfsReader(fileSys, convert(path), bufSize);
     }
 
     /** {@inheritDoc} */
@@ -288,8 +288,8 @@ public class IgniteHadoopSecondaryFileSystem implements 
Igfs, AutoCloseable {
     /** {@inheritDoc} */
     @Override public OutputStream create(IgfsPath path, int bufSize, boolean 
overwrite, int replication,
         long blockSize, @Nullable Map<String, String> props) {
-        IgfsHadoopFSProperties props0 =
-            new IgfsHadoopFSProperties(props != null ? props : 
Collections.<String, String>emptyMap());
+        HadoopIgfsProperties props0 =
+            new HadoopIgfsProperties(props != null ? props : 
Collections.<String, String>emptyMap());
 
         try {
             return fileSys.create(convert(path), props0.permission(), 
overwrite, bufSize, (short)replication, blockSize,

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
index f25b29f..9c95437 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java
@@ -43,7 +43,7 @@ import static org.apache.ignite.IgniteFs.*;
 import static org.apache.ignite.configuration.IgfsConfiguration.*;
 import static org.apache.ignite.igfs.IgfsMode.*;
 import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
-import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*;
+import static org.apache.ignite.internal.igfs.hadoop.HadoopIgfsUtils.*;
 
 /**
  * {@code IGFS} Hadoop 1.x file system driver over file system API. To use
@@ -95,7 +95,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
     private final AtomicBoolean closeGuard = new AtomicBoolean();
 
     /** Grid remote client. */
-    private IgfsHadoopWrapper rmtClient;
+    private HadoopIgfsWrapper rmtClient;
 
     /** User name for each thread. */
     private final ThreadLocal<String> userName = new ThreadLocal<String>(){
@@ -243,7 +243,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
             String logDir = logDirFile != null ? logDirFile.getAbsolutePath() 
: null;
 
-            rmtClient = new IgfsHadoopWrapper(uriAuthority, logDir, cfg, LOG);
+            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG);
 
             // Handshake.
             IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
@@ -507,13 +507,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
                     clientLog.logOpen(logId, path, PROXY, bufSize, size);
 
-                    return new FSDataInputStream(new 
IgfsHadoopProxyInputStream(is, clientLog, logId));
+                    return new FSDataInputStream(new 
HadoopIgfsProxyInputStream(is, clientLog, logId));
                 }
                 else
                     return is;
             }
             else {
-                IgfsHadoopStreamDelegate stream = 
seqReadsBeforePrefetchOverride ?
+                HadoopIgfsStreamDelegate stream = 
seqReadsBeforePrefetchOverride ?
                     rmtClient.open(path, seqReadsBeforePrefetch) : 
rmtClient.open(path);
 
                 long logId = -1;
@@ -528,7 +528,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
                     LOG.debug("Opening input stream [thread=" + 
Thread.currentThread().getName() + ", path=" + path +
                         ", bufSize=" + bufSize + ']');
 
-                IgfsHadoopInputStream igfsIn = new 
IgfsHadoopInputStream(stream, stream.length(),
+                HadoopInputIgfsStream igfsIn = new 
HadoopInputIgfsStream(stream, stream.length(),
                     bufSize, LOG, clientLog, logId);
 
                 if (LOG.isDebugEnabled())
@@ -575,14 +575,14 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
                     clientLog.logCreate(logId, path, PROXY, overwrite, 
bufSize, replication, blockSize);
 
-                    return new FSDataOutputStream(new 
IgfsHadoopProxyOutputStream(os, clientLog, logId));
+                    return new FSDataOutputStream(new 
HadoopIgfsProxyOutputStream(os, clientLog, logId));
                 }
                 else
                     return os;
             }
             else {
                 // Create stream and close it in the 'finally' section if any 
sequential operation failed.
-                IgfsHadoopStreamDelegate stream = rmtClient.create(path, 
overwrite, colocateFileWrites,
+                HadoopIgfsStreamDelegate stream = rmtClient.create(path, 
overwrite, colocateFileWrites,
                     replication, blockSize, F.asMap(PROP_PERMISSION, 
toString(perm),
                     PROP_PREFER_LOCAL_WRITES, 
Boolean.toString(preferLocFileWrites)));
 
@@ -599,7 +599,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
                 if (LOG.isDebugEnabled())
                     LOG.debug("Opened output stream in create [path=" + path + 
", delegate=" + stream + ']');
 
-                IgfsHadoopOutputStream igfsOut = new 
IgfsHadoopOutputStream(stream, LOG, clientLog,
+                HadoopOutputIgfsStream igfsOut = new 
HadoopOutputIgfsStream(stream, LOG, clientLog,
                     logId);
 
                 bufSize = Math.max(64 * 1024, bufSize);
@@ -652,13 +652,13 @@ public class IgniteHadoopFileSystem extends FileSystem {
 
                     clientLog.logAppend(logId, path, PROXY, bufSize); // Don't 
have stream ID.
 
-                    return new FSDataOutputStream(new 
IgfsHadoopProxyOutputStream(os, clientLog, logId));
+                    return new FSDataOutputStream(new 
HadoopIgfsProxyOutputStream(os, clientLog, logId));
                 }
                 else
                     return os;
             }
             else {
-                IgfsHadoopStreamDelegate stream = rmtClient.append(path, 
false, null);
+                HadoopIgfsStreamDelegate stream = rmtClient.append(path, 
false, null);
 
                 assert stream != null;
 
@@ -673,7 +673,7 @@ public class IgniteHadoopFileSystem extends FileSystem {
                 if (LOG.isDebugEnabled())
                     LOG.debug("Opened output stream in append [path=" + path + 
", delegate=" + stream + ']');
 
-                IgfsHadoopOutputStream igfsOut = new 
IgfsHadoopOutputStream(stream, LOG, clientLog,
+                HadoopOutputIgfsStream igfsOut = new 
HadoopOutputIgfsStream(stream, LOG, clientLog,
                     logId);
 
                 bufSize = Math.max(64 * 1024, bufSize);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
index 280af82..1c9165c 100644
--- 
a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v2/IgniteHadoopFileSystem.java
@@ -43,7 +43,7 @@ import static org.apache.ignite.IgniteFs.*;
 import static org.apache.ignite.configuration.IgfsConfiguration.*;
 import static org.apache.ignite.igfs.IgfsMode.*;
 import static 
org.apache.ignite.internal.processors.hadoop.fs.HadoopParameters.*;
-import static org.apache.ignite.internal.igfs.hadoop.IgfsHadoopUtils.*;
+import static org.apache.ignite.internal.igfs.hadoop.HadoopIgfsUtils.*;
 
 /**
  * {@code IGFS} Hadoop 2.x file system driver over file system API. To use
@@ -89,7 +89,7 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
     private final AtomicBoolean closeGuard = new AtomicBoolean();
 
     /** Grid remote client. */
-    private IgfsHadoopWrapper rmtClient;
+    private HadoopIgfsWrapper rmtClient;
 
     /** Working directory. */
     private IgfsPath workingDir;
@@ -137,7 +137,7 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
      * @throws IOException If initialization failed.
      */
     public IgniteHadoopFileSystem(URI name, Configuration cfg) throws 
URISyntaxException, IOException {
-        super(IgfsHadoopEndpoint.normalize(name), IGFS_SCHEME, false, -1);
+        super(HadoopIgfsEndpoint.normalize(name), IGFS_SCHEME, false, -1);
 
         uri = name;
 
@@ -239,7 +239,7 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
 
             String logDir = logDirFile != null ? logDirFile.getAbsolutePath() 
: null;
 
-            rmtClient = new IgfsHadoopWrapper(uriAuthority, logDir, cfg, LOG);
+            rmtClient = new HadoopIgfsWrapper(uriAuthority, logDir, cfg, LOG);
 
             // Handshake.
             IgfsHandshakeResponse handshake = rmtClient.handshake(logDir);
@@ -446,13 +446,13 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
 
                     clientLog.logOpen(logId, path, PROXY, bufSize, size);
 
-                    return new FSDataInputStream(new 
IgfsHadoopProxyInputStream(is, clientLog, logId));
+                    return new FSDataInputStream(new 
HadoopIgfsProxyInputStream(is, clientLog, logId));
                 }
                 else
                     return is;
             }
             else {
-                IgfsHadoopStreamDelegate stream = 
seqReadsBeforePrefetchOverride ?
+                HadoopIgfsStreamDelegate stream = 
seqReadsBeforePrefetchOverride ?
                     rmtClient.open(path, seqReadsBeforePrefetch) : 
rmtClient.open(path);
 
                 long logId = -1;
@@ -467,7 +467,7 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
                     LOG.debug("Opening input stream [thread=" + 
Thread.currentThread().getName() + ", path=" + path +
                         ", bufSize=" + bufSize + ']');
 
-                IgfsHadoopInputStream igfsIn = new 
IgfsHadoopInputStream(stream, stream.length(),
+                HadoopInputIgfsStream igfsIn = new 
HadoopInputIgfsStream(stream, stream.length(),
                     bufSize, LOG, clientLog, logId);
 
                 if (LOG.isDebugEnabled())
@@ -524,7 +524,7 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
                     else
                         clientLog.logCreate(logId, path, PROXY, overwrite, 
bufSize, replication, blockSize);
 
-                    return new FSDataOutputStream(new 
IgfsHadoopProxyOutputStream(os, clientLog, logId));
+                    return new FSDataOutputStream(new 
HadoopIgfsProxyOutputStream(os, clientLog, logId));
                 }
                 else
                     return os;
@@ -534,7 +534,7 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
                     PROP_PREFER_LOCAL_WRITES, 
Boolean.toString(preferLocFileWrites));
 
                 // Create stream and close it in the 'finally' section if any 
sequential operation failed.
-                IgfsHadoopStreamDelegate stream;
+                HadoopIgfsStreamDelegate stream;
 
                 long logId = -1;
 
@@ -566,7 +566,7 @@ public class IgniteHadoopFileSystem extends 
AbstractFileSystem implements Closea
 
                 assert stream != null;
 
-                IgfsHadoopOutputStream igfsOut = new 
IgfsHadoopOutputStream(stream, LOG,
+                HadoopOutputIgfsStream igfsOut = new 
HadoopOutputIgfsStream(stream, LOG,
                     clientLog, logId);
 
                 bufSize = Math.max(64 * 1024, bufSize);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfs.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfs.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfs.java
new file mode 100644
index 0000000..6ee593e
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfs.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ * Facade for communication with grid.
+ */
+public interface HadoopIgfs {
+    /**
+     * Perform handshake.
+     *
+     * @param logDir Log directory.
+     * @return Future with handshake result.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsHandshakeResponse handshake(String logDir) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Close connection.
+     *
+     * @param force Force flag.
+     */
+    public void close(boolean force);
+
+    /**
+     * Command to retrieve file info for some IGFS path.
+     *
+     * @param path Path to get file info for.
+     * @return Future for info operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsFile info(IgfsPath path) throws IgniteCheckedException, 
IOException;
+
+    /**
+     * Command to update file properties.
+     *
+     * @param path IGFS path to update properties.
+     * @param props Properties to update.
+     * @return Future for update operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsFile update(IgfsPath path, Map<String, String> props) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Sets last access time and last modification time for a file.
+     *
+     * @param path Path to update times.
+     * @param accessTime Last access time to set.
+     * @param modificationTime Last modification time to set.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean setTimes(IgfsPath path, long accessTime, long 
modificationTime) throws IgniteCheckedException,
+        IOException;
+
+    /**
+     * Command to rename given path.
+     *
+     * @param src Source path.
+     * @param dest Destination path.
+     * @return Future for rename operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean rename(IgfsPath src, IgfsPath dest) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Command to delete given path.
+     *
+     * @param path Path to delete.
+     * @param recursive {@code True} if deletion is recursive.
+     * @return Future for delete operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean delete(IgfsPath path, boolean recursive) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Command to get affinity for given path, offset and length.
+     *
+     * @param path Path to get affinity for.
+     * @param start Start position (offset).
+     * @param len Data length.
+     * @return Future for affinity command.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, 
long len) throws IgniteCheckedException,
+        IOException;
+
+    /**
+     * Gets path summary.
+     *
+     * @param path Path to get summary for.
+     * @return Future that will be completed when summary is received.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsPathSummary contentSummary(IgfsPath path) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Command to create directories.
+     *
+     * @param path Path to create.
+     * @return Future for mkdirs operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Command to get list of files in directory.
+     *
+     * @param path Path to list.
+     * @return Future for listFiles operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Collection<IgfsFile> listFiles(IgfsPath path) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Command to get directory listing.
+     *
+     * @param path Path to list.
+     * @return Future for listPaths operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public Collection<IgfsPath> listPaths(IgfsPath path) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Performs status request.
+     *
+     * @return Status response.
+     * @throws IgniteCheckedException If failed.
+     */
+    public IgfsStatus fsStatus() throws IgniteCheckedException, IOException;
+
+    /**
+     * Command to open file for reading.
+     *
+     * @param path File path to open.
+     * @return Future for open operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopIgfsStreamDelegate open(IgfsPath path) throws 
IgniteCheckedException, IOException;
+
+    /**
+     * Command to open file for reading.
+     *
+     * @param path File path to open.
+     * @return Future for open operation.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopIgfsStreamDelegate open(IgfsPath path, int 
seqReadsBeforePrefetch) throws IgniteCheckedException,
+        IOException;
+
+    /**
+     * Command to create file and open it for output.
+     *
+     * @param path Path to file.
+     * @param overwrite If {@code true} then old file contents will be lost.
+     * @param colocate If {@code true} and called on data node, file will be 
written on that node.
+     * @param replication Replication factor.
+     * @param props File properties for creation.
+     * @return Stream descriptor.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopIgfsStreamDelegate create(IgfsPath path, boolean overwrite, 
boolean colocate,
+        int replication, long blockSize, @Nullable Map<String, String> props) 
throws IgniteCheckedException, IOException;
+
+    /**
+     * Open file for output appending data to the end of a file.
+     *
+     * @param path Path to file.
+     * @param create If {@code true}, file will be created if does not exist.
+     * @param props File properties.
+     * @return Stream descriptor.
+     * @throws IgniteCheckedException If failed.
+     */
+    public HadoopIgfsStreamDelegate append(IgfsPath path, boolean create,
+        @Nullable Map<String, String> props) throws IgniteCheckedException, 
IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsCommunicationException.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsCommunicationException.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsCommunicationException.java
new file mode 100644
index 0000000..ecaa61f
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsCommunicationException.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.ignite.*;
+
+/**
+ * Communication exception indicating a problem between file system and IGFS 
instance.
+ */
+public class HadoopIgfsCommunicationException extends IgniteCheckedException {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Creates new exception with given throwable as a nested cause and
+     * source of error message.
+     *
+     * @param cause Non-null throwable cause.
+     */
+    public HadoopIgfsCommunicationException(Exception cause) {
+        super(cause);
+    }
+
+    /**
+     * Creates a new exception with given error message and optional nested 
cause exception.
+     *
+     * @param msg Error message.
+     */
+    public HadoopIgfsCommunicationException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Creates a new exception with given error message and optional nested 
cause exception.
+     *
+     * @param msg Error message.
+     * @param cause Cause.
+     */
+    public HadoopIgfsCommunicationException(String msg, Exception cause) {
+        super(msg, cause);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEndpoint.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEndpoint.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEndpoint.java
new file mode 100644
index 0000000..dc8fcb8
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEndpoint.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+
+import static org.apache.ignite.configuration.IgfsConfiguration.*;
+
+/**
+ * IGFS endpoint abstraction.
+ */
+public class HadoopIgfsEndpoint {
+    /** Localhost. */
+    public static final String LOCALHOST = "127.0.0.1";
+
+    /** IGFS name. */
+    private final String igfsName;
+
+    /** Grid name. */
+    private final String gridName;
+
+    /** Host. */
+    private final String host;
+
+    /** Port. */
+    private final int port;
+
+    /**
+     * Normalize IGFS URI.
+     *
+     * @param uri URI.
+     * @return Normalized URI.
+     * @throws IOException If failed.
+     */
+    public static URI normalize(URI uri) throws IOException {
+        try {
+            if (!F.eq(IgniteFs.IGFS_SCHEME, uri.getScheme()))
+                throw new IOException("Failed to normalize UIR because it has 
non IGFS scheme: " + uri);
+
+            HadoopIgfsEndpoint endpoint = new 
HadoopIgfsEndpoint(uri.getAuthority());
+
+            StringBuilder sb = new StringBuilder();
+
+            if (endpoint.igfs() != null)
+                sb.append(endpoint.igfs());
+
+            if (endpoint.grid() != null)
+                sb.append(":").append(endpoint.grid());
+
+            return new URI(uri.getScheme(), sb.length() != 0 ? sb.toString() : 
null, endpoint.host(), endpoint.port(),
+                uri.getPath(), uri.getQuery(), uri.getFragment());
+        }
+        catch (URISyntaxException | IgniteCheckedException e) {
+            throw new IOException("Failed to normalize URI: " + uri, e);
+        }
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param connStr Connection string.
+     * @throws IgniteCheckedException If failed to parse connection string.
+     */
+    public HadoopIgfsEndpoint(@Nullable String connStr) throws 
IgniteCheckedException {
+        if (connStr == null)
+            connStr = "";
+
+        String[] tokens = connStr.split("@", -1);
+
+        IgniteBiTuple<String, Integer> hostPort;
+
+        if (tokens.length == 1) {
+            igfsName = null;
+            gridName = null;
+
+            hostPort = hostPort(connStr, connStr);
+        }
+        else if (tokens.length == 2) {
+            String authStr = tokens[0];
+
+            if (authStr.isEmpty()) {
+                gridName = null;
+                igfsName = null;
+            }
+            else {
+                String[] authTokens = authStr.split(":", -1);
+
+                igfsName = F.isEmpty(authTokens[0]) ? null : authTokens[0];
+
+                if (authTokens.length == 1)
+                    gridName = null;
+                else if (authTokens.length == 2)
+                    gridName = F.isEmpty(authTokens[1]) ? null : authTokens[1];
+                else
+                    throw new IgniteCheckedException("Invalid connection 
string format: " + connStr);
+            }
+
+            hostPort = hostPort(connStr, tokens[1]);
+        }
+        else
+            throw new IgniteCheckedException("Invalid connection string 
format: " + connStr);
+
+        host = hostPort.get1();
+
+        assert hostPort.get2() != null;
+
+        port = hostPort.get2();
+    }
+
+    /**
+     * Parse host and port.
+     *
+     * @param connStr Full connection string.
+     * @param hostPortStr Host/port connection string part.
+     * @return Tuple with host and port.
+     * @throws IgniteCheckedException If failed to parse connection string.
+     */
+    private IgniteBiTuple<String, Integer> hostPort(String connStr, String 
hostPortStr) throws IgniteCheckedException {
+        String[] tokens = hostPortStr.split(":", -1);
+
+        String host = tokens[0];
+
+        if (F.isEmpty(host))
+            host = LOCALHOST;
+
+        int port;
+
+        if (tokens.length == 1)
+            port = DFLT_IPC_PORT;
+        else if (tokens.length == 2) {
+            String portStr = tokens[1];
+
+            try {
+                port = Integer.valueOf(portStr);
+
+                if (port < 0 || port > 65535)
+                    throw new IgniteCheckedException("Invalid port number: " + 
connStr);
+            }
+            catch (NumberFormatException e) {
+                throw new IgniteCheckedException("Invalid port number: " + 
connStr);
+            }
+        }
+        else
+            throw new IgniteCheckedException("Invalid connection string 
format: " + connStr);
+
+        return F.t(host, port);
+    }
+
+    /**
+     * @return IGFS name.
+     */
+    @Nullable public String igfs() {
+        return igfsName;
+    }
+
+    /**
+     * @return Grid name.
+     */
+    @Nullable public String grid() {
+        return gridName;
+    }
+
+    /**
+     * @return Host.
+     */
+    public String host() {
+        return host;
+    }
+
+    /**
+     * @return Host.
+     */
+    public boolean isLocal() {
+        return F.eq(LOCALHOST, host);
+    }
+
+    /**
+     * @return Port.
+     */
+    public int port() {
+        return port;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return S.toString(HadoopIgfsEndpoint.class, this);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEx.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEx.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEx.java
new file mode 100644
index 0000000..5321fa3
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsEx.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+
+/**
+ * Extended IGFS server interface.
+ */
+public interface HadoopIgfsEx extends HadoopIgfs {
+    /**
+     * Adds event listener that will be invoked when connection with server is 
lost or remote error has occurred.
+     * If connection is closed already, callback will be invoked synchronously 
inside this method.
+     *
+     * @param delegate Stream delegate.
+     * @param lsnr Event listener.
+     */
+    public void addEventListener(HadoopIgfsStreamDelegate delegate, 
HadoopIgfsStreamEventListener lsnr);
+
+    /**
+     * Removes event listener that will be invoked when connection with server 
is lost or remote error has occurred.
+     *
+     * @param delegate Stream delegate.
+     */
+    public void removeEventListener(HadoopIgfsStreamDelegate delegate);
+
+    /**
+     * Asynchronously reads specified amount of bytes from opened input stream.
+     *
+     * @param delegate Stream delegate.
+     * @param pos Position to read from.
+     * @param len Data length to read.
+     * @param outBuf Optional output buffer. If buffer length is less then 
{@code len}, all remaining
+     *     bytes will be read into new allocated buffer of length {len - 
outBuf.length} and this buffer will
+     *     be the result of read future.
+     * @param outOff Output offset.
+     * @param outLen Output length.
+     * @return Read data.
+     */
+    public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate delegate, 
long pos, int len,
+        @Nullable final byte[] outBuf, final int outOff, final int outLen);
+
+    /**
+     * Writes data to the stream with given streamId. This method does not 
return any future since
+     * no response to write request is sent.
+     *
+     * @param delegate Stream delegate.
+     * @param data Data to write.
+     * @param off Offset.
+     * @param len Length.
+     * @throws IOException If failed.
+     */
+    public void writeData(HadoopIgfsStreamDelegate delegate, byte[] data, int 
off, int len) throws IOException;
+
+    /**
+     * Close server stream.
+     *
+     * @param delegate Stream delegate.
+     * @throws IOException If failed.
+     */
+    public void closeStream(HadoopIgfsStreamDelegate delegate) throws 
IOException;
+
+    /**
+     * Flush output stream.
+     *
+     * @param delegate Stream delegate.
+     * @throws IOException If failed.
+     */
+    public void flush(HadoopIgfsStreamDelegate delegate) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsFuture.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsFuture.java
new file mode 100644
index 0000000..9ae0161
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsFuture.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * IGFS client future that holds response parse closure.
+ */
+public class HadoopIgfsFuture<T> extends GridPlainFutureAdapter<T> {
+    /** Output buffer. */
+    private byte[] outBuf;
+
+    /** Output offset. */
+    private int outOff;
+
+    /** Output length. */
+    private int outLen;
+
+    /** Read future flag. */
+    private boolean read;
+
+    /**
+     * @return Output buffer.
+     */
+    public byte[] outputBuffer() {
+        return outBuf;
+    }
+
+    /**
+     * @param outBuf Output buffer.
+     */
+    public void outputBuffer(@Nullable byte[] outBuf) {
+        this.outBuf = outBuf;
+    }
+
+    /**
+     * @return Offset in output buffer to write from.
+     */
+    public int outputOffset() {
+        return outOff;
+    }
+
+    /**
+     * @param outOff Offset in output buffer to write from.
+     */
+    public void outputOffset(int outOff) {
+        this.outOff = outOff;
+    }
+
+    /**
+     * @return Length to write to output buffer.
+     */
+    public int outputLength() {
+        return outLen;
+    }
+
+    /**
+     * @param outLen Length to write to output buffer.
+     */
+    public void outputLength(int outLen) {
+        this.outLen = outLen;
+    }
+
+    /**
+     * @param read {@code True} if this is a read future.
+     */
+    public void read(boolean read) {
+        this.read = read;
+    }
+
+    /**
+     * @return {@code True} if this is a read future.
+     */
+    public boolean read() {
+        return read;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInProc.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInProc.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInProc.java
new file mode 100644
index 0000000..b0da50d
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsInProc.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.commons.logging.*;
+import org.apache.ignite.*;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.processors.igfs.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Communication with grid in the same process.
+ */
+public class HadoopIgfsInProc implements HadoopIgfsEx {
+    /** Target IGFS. */
+    private final IgfsEx igfs;
+
+    /** Buffer size. */
+    private final int bufSize;
+
+    /** Event listeners. */
+    private final Map<HadoopIgfsStreamDelegate, HadoopIgfsStreamEventListener> 
lsnrs =
+        new ConcurrentHashMap<>();
+
+    /** Logger. */
+    private final Log log;
+
+    /**
+     * Constructor.
+     *
+     * @param igfs Target IGFS.
+     * @param log Log.
+     */
+    public HadoopIgfsInProc(IgfsEx igfs, Log log) {
+        this.igfs = igfs;
+        this.log = log;
+
+        bufSize = igfs.configuration().getBlockSize() * 2;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsHandshakeResponse handshake(String logDir) {
+        igfs.clientLogDirectory(logDir);
+
+        return new IgfsHandshakeResponse(igfs.name(), igfs.proxyPaths(), 
igfs.groupBlockSize(),
+            igfs.globalSampling());
+    }
+
+    /** {@inheritDoc} */
+    @Override public void close(boolean force) {
+        // Perform cleanup.
+        for (HadoopIgfsStreamEventListener lsnr : lsnrs.values()) {
+            try {
+                lsnr.onClose();
+            }
+            catch (IgniteCheckedException e) {
+                if (log.isDebugEnabled())
+                    log.debug("Failed to notify stream event listener", e);
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile info(IgfsPath path) throws 
IgniteCheckedException {
+        try {
+            return igfs.info(path);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to get file 
info because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsFile update(IgfsPath path, Map<String, String> props) 
throws IgniteCheckedException {
+        try {
+            return igfs.update(path, props);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to update file 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean setTimes(IgfsPath path, long accessTime, long 
modificationTime) throws IgniteCheckedException {
+        try {
+            igfs.setTimes(path, accessTime, modificationTime);
+
+            return true;
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to set path 
times because Grid is stopping: " +
+                path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws 
IgniteCheckedException {
+        try {
+            igfs.rename(src, dest);
+
+            return true;
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to rename path 
because Grid is stopping: " + src);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean delete(IgfsPath path, boolean recursive) throws 
IgniteCheckedException {
+        try {
+            return igfs.delete(path, recursive);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to delete path 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsStatus fsStatus() throws IgniteCheckedException {
+        try {
+            return igfs.globalSpace();
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to get file 
system status because Grid is " +
+                "stopping.");
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws 
IgniteCheckedException {
+        try {
+            return igfs.listPaths(path);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to list paths 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws 
IgniteCheckedException {
+        try {
+            return igfs.listFiles(path);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to list files 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) 
throws IgniteCheckedException {
+        try {
+            igfs.mkdirs(path, props);
+
+            return true;
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to create 
directory because Grid is stopping: " +
+                path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgfsPathSummary contentSummary(IgfsPath path) throws 
IgniteCheckedException {
+        try {
+            return igfs.summary(path);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to get content 
summary because Grid is stopping: " +
+                path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, 
long start, long len)
+        throws IgniteCheckedException {
+        try {
+            return igfs.affinity(path, start, len);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to get affinity 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(IgfsPath path) throws 
IgniteCheckedException {
+        try {
+            IgfsInputStreamAdapter stream = igfs.open(path, bufSize);
+
+            return new HadoopIgfsStreamDelegate(this, stream, 
stream.fileInfo().length());
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to open file 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate open(IgfsPath path, int 
seqReadsBeforePrefetch)
+        throws IgniteCheckedException {
+        try {
+            IgfsInputStreamAdapter stream = igfs.open(path, bufSize, 
seqReadsBeforePrefetch);
+
+            return new HadoopIgfsStreamDelegate(this, stream, 
stream.fileInfo().length());
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to open file 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate create(IgfsPath path, boolean 
overwrite, boolean colocate,
+        int replication, long blockSize, @Nullable Map<String, String> props) 
throws IgniteCheckedException {
+        try {
+            IgfsOutputStream stream = igfs.create(path, bufSize, overwrite,
+                colocate ? igfs.nextAffinityKey() : null, replication, 
blockSize, props);
+
+            return new HadoopIgfsStreamDelegate(this, stream);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to create file 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public HadoopIgfsStreamDelegate append(IgfsPath path, boolean 
create,
+        @Nullable Map<String, String> props) throws IgniteCheckedException {
+        try {
+            IgfsOutputStream stream = igfs.append(path, bufSize, create, 
props);
+
+            return new HadoopIgfsStreamDelegate(this, stream);
+        }
+        catch (IgniteException e) {
+            throw new IgniteCheckedException(e);
+        }
+        catch (IllegalStateException e) {
+            throw new HadoopIgfsCommunicationException("Failed to append file 
because Grid is stopping: " + path);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public GridPlainFuture<byte[]> readData(HadoopIgfsStreamDelegate 
delegate, long pos, int len,
+        @Nullable byte[] outBuf, int outOff, int outLen) {
+        IgfsInputStreamAdapter stream = delegate.target();
+
+        try {
+            byte[] res = null;
+
+            if (outBuf != null) {
+                int outTailLen = outBuf.length - outOff;
+
+                if (len <= outTailLen)
+                    stream.readFully(pos, outBuf, outOff, len);
+                else {
+                    stream.readFully(pos, outBuf, outOff, outTailLen);
+
+                    int remainderLen = len - outTailLen;
+
+                    res = new byte[remainderLen];
+
+                    stream.readFully(pos, res, 0, remainderLen);
+                }
+            } else {
+                res = new byte[len];
+
+                stream.readFully(pos, res, 0, len);
+            }
+
+            return new GridPlainFutureAdapter<>(res);
+        }
+        catch (IllegalStateException | IOException e) {
+            HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate);
+
+            if (lsnr != null)
+                lsnr.onError(e.getMessage());
+
+            return new GridPlainFutureAdapter<>(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeData(HadoopIgfsStreamDelegate delegate, byte[] 
data, int off, int len)
+        throws IOException {
+        try {
+            IgfsOutputStream stream = delegate.target();
+
+            stream.write(data, off, len);
+        }
+        catch (IllegalStateException | IOException e) {
+            HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate);
+
+            if (lsnr != null)
+                lsnr.onError(e.getMessage());
+
+            if (e instanceof IllegalStateException)
+                throw new IOException("Failed to write data to IGFS stream 
because Grid is stopping.", e);
+            else
+                throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void flush(HadoopIgfsStreamDelegate delegate) throws 
IOException {
+        try {
+            IgfsOutputStream stream = delegate.target();
+
+            stream.flush();
+        }
+        catch (IllegalStateException | IOException e) {
+            HadoopIgfsStreamEventListener lsnr = lsnrs.get(delegate);
+
+            if (lsnr != null)
+                lsnr.onError(e.getMessage());
+
+            if (e instanceof IllegalStateException)
+                throw new IOException("Failed to flush data to IGFS stream 
because Grid is stopping.", e);
+            else
+                throw e;
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void closeStream(HadoopIgfsStreamDelegate desc) throws 
IOException {
+        Closeable closeable = desc.target();
+
+        try {
+            closeable.close();
+        }
+        catch (IllegalStateException e) {
+            throw new IOException("Failed to close IGFS stream because Grid is 
stopping.", e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addEventListener(HadoopIgfsStreamDelegate delegate,
+        HadoopIgfsStreamEventListener lsnr) {
+        HadoopIgfsStreamEventListener lsnr0 = lsnrs.put(delegate, lsnr);
+
+        assert lsnr0 == null || lsnr0 == lsnr;
+
+        if (log.isDebugEnabled())
+            log.debug("Added stream event listener [delegate=" + delegate + 
']');
+    }
+
+    /** {@inheritDoc} */
+    @Override public void removeEventListener(HadoopIgfsStreamDelegate 
delegate) {
+        HadoopIgfsStreamEventListener lsnr0 = lsnrs.remove(delegate);
+
+        if (lsnr0 != null && log.isDebugEnabled())
+            log.debug("Removed stream event listener [delegate=" + delegate + 
']');
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIo.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIo.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIo.java
new file mode 100644
index 0000000..775e7d0
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIo.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.igfs.common.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * IO abstraction layer for IGFS client. Two kind of messages are expected to 
be sent: requests with response
+ * and request without response.
+ */
+public interface HadoopIgfsIo {
+    /**
+     * Sends given IGFS client message and asynchronously awaits for response.
+     *
+     * @param msg Message to send.
+     * @return Future that will be completed.
+     * @throws IgniteCheckedException If a message cannot be sent (connection 
is broken or client was closed).
+     */
+    public GridPlainFuture<IgfsMessage> send(IgfsMessage msg) throws 
IgniteCheckedException;
+
+    /**
+     * Sends given IGFS client message and asynchronously awaits for response. 
When IO detects response
+     * beginning for given message it stops reading data and passes input 
stream to closure which can read
+     * response in a specific way.
+     *
+     * @param msg Message to send.
+     * @param outBuf Output buffer. If {@code null}, the output buffer is not 
used.
+     * @param outOff Output buffer offset.
+     * @param outLen Output buffer length.
+     * @return Future that will be completed when response is returned from 
closure.
+     * @throws IgniteCheckedException If a message cannot be sent (connection 
is broken or client was closed).
+     */
+    public <T> GridPlainFuture<T> send(IgfsMessage msg, @Nullable byte[] 
outBuf, int outOff, int outLen)
+        throws IgniteCheckedException;
+
+    /**
+     * Sends given message and does not wait for response.
+     *
+     * @param msg Message to send.
+     * @throws IgniteCheckedException If send failed.
+     */
+    public void sendPlain(IgfsMessage msg) throws IgniteCheckedException;
+
+    /**
+     * Adds event listener that will be invoked when connection with server is 
lost or remote error has occurred.
+     * If connection is closed already, callback will be invoked synchronously 
inside this method.
+     *
+     * @param lsnr Event listener.
+     */
+    public void addEventListener(HadoopIgfsIpcIoListener lsnr);
+
+    /**
+     * Removes event listener that will be invoked when connection with server 
is lost or remote error has occurred.
+     *
+     * @param lsnr Event listener.
+     */
+    public void removeEventListener(HadoopIgfsIpcIoListener lsnr);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java
new file mode 100644
index 0000000..10d764e
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsIpcIoListener.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+/**
+ * Listens to the events of {@link HadoopIpcIgfsIo}.
+ */
+public interface HadoopIgfsIpcIoListener {
+    /**
+     * Callback invoked when the IO is being closed.
+     */
+    public void onClose();
+
+    /**
+     * Callback invoked when remote error occurs.
+     *
+     * @param streamId Stream ID.
+     * @param errMsg Error message.
+     */
+    public void onError(long streamId, String errMsg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsJclLogger.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsJclLogger.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsJclLogger.java
new file mode 100644
index 0000000..1ba6e64
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsJclLogger.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.commons.logging.*;
+import org.apache.ignite.*;
+import org.jetbrains.annotations.*;
+
+/**
+ * JCL logger wrapper for Hadoop.
+ */
+public class HadoopIgfsJclLogger implements IgniteLogger {
+    /** JCL implementation proxy. */
+    private Log impl;
+
+    /**
+     * Constructor.
+     *
+     * @param impl JCL implementation to use.
+     */
+    HadoopIgfsJclLogger(Log impl) {
+        assert impl != null;
+
+        this.impl = impl;
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteLogger getLogger(Object ctgr) {
+        return new HadoopIgfsJclLogger(LogFactory.getLog(
+            ctgr instanceof Class ? ((Class)ctgr).getName() : 
String.valueOf(ctgr)));
+    }
+
+    /** {@inheritDoc} */
+    @Override public void trace(String msg) {
+        impl.trace(msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void debug(String msg) {
+        impl.debug(msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void info(String msg) {
+        impl.info(msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void warning(String msg) {
+        impl.warn(msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void warning(String msg, @Nullable Throwable e) {
+        impl.warn(msg, e);
+    }
+
+    /** {@inheritDoc} */
+    @Override public void error(String msg) {
+        impl.error(msg);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isQuiet() {
+        return !isInfoEnabled() && !isDebugEnabled();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void error(String msg, @Nullable Throwable e) {
+        impl.error(msg, e);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isTraceEnabled() {
+        return impl.isTraceEnabled();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDebugEnabled() {
+        return impl.isDebugEnabled();
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isInfoEnabled() {
+        return impl.isInfoEnabled();
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public String fileName() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return "IgfsHadoopJclLogger [impl=" + impl + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProperties.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProperties.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProperties.java
new file mode 100644
index 0000000..20c1d5d
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProperties.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.hadoop.fs.permission.*;
+import org.apache.ignite.*;
+
+import java.util.*;
+
+import static org.apache.ignite.IgniteFs.*;
+
+/**
+ * Hadoop file system properties.
+ */
+public class HadoopIgfsProperties {
+    /** Username. */
+    private String usrName;
+
+    /** Group name. */
+    private String grpName;
+
+    /** Permissions. */
+    private FsPermission perm;
+
+    /**
+     * Constructor.
+     *
+     * @param props Properties.
+     * @throws IgniteException In case of error.
+     */
+    public HadoopIgfsProperties(Map<String, String> props) throws 
IgniteException {
+        usrName = props.get(PROP_USER_NAME);
+        grpName = props.get(PROP_GROUP_NAME);
+
+        String permStr = props.get(PROP_PERMISSION);
+
+        if (permStr != null) {
+            try {
+                perm = new FsPermission((short)Integer.parseInt(permStr, 8));
+            }
+            catch (NumberFormatException ignore) {
+                throw new IgniteException("Permissions cannot be parsed: " + 
permStr);
+            }
+        }
+    }
+
+    /**
+     * Get user name.
+     *
+     * @return User name.
+     */
+    public String userName() {
+        return usrName;
+    }
+
+    /**
+     * Get group name.
+     *
+     * @return Group name.
+     */
+    public String groupName() {
+        return grpName;
+    }
+
+    /**
+     * Get permission.
+     *
+     * @return Permission.
+     */
+    public FsPermission permission() {
+        return perm;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyInputStream.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyInputStream.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyInputStream.java
new file mode 100644
index 0000000..cfc6949
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyInputStream.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.internal.igfs.common.*;
+
+import java.io.*;
+
+/**
+ * Secondary Hadoop file system input stream wrapper.
+ */
+public class HadoopIgfsProxyInputStream extends InputStream implements 
Seekable, PositionedReadable {
+    /** Actual input stream to the secondary file system. */
+    private final FSDataInputStream is;
+
+    /** Client logger. */
+    private final IgfsLogger clientLog;
+
+    /** Log stream ID. */
+    private final long logStreamId;
+
+    /** Read time. */
+    private long readTime;
+
+    /** User time. */
+    private long userTime;
+
+    /** Last timestamp. */
+    private long lastTs;
+
+    /** Amount of read bytes. */
+    private long total;
+
+    /** Closed flag. */
+    private boolean closed;
+
+    /**
+     * Constructor.
+     *
+     * @param is Actual input stream to the secondary file system.
+     * @param clientLog Client log.
+     */
+    public HadoopIgfsProxyInputStream(FSDataInputStream is, IgfsLogger 
clientLog, long logStreamId) {
+        assert is != null;
+        assert clientLog != null;
+
+        this.is = is;
+        this.clientLog = clientLog;
+        this.logStreamId = logStreamId;
+
+        lastTs = System.nanoTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read(byte[] b) throws IOException {
+        readStart();
+
+        int res;
+
+        try {
+            res = is.read(b);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (res != -1)
+            total += res;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read(byte[] b, int off, int len) throws 
IOException {
+        readStart();
+
+        int res;
+
+        try {
+            res = super.read(b, off, len);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (res != -1)
+            total += res;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized long skip(long n) throws IOException {
+        readStart();
+
+        long res;
+
+        try {
+            res =  is.skip(n);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (clientLog.isLogEnabled())
+            clientLog.logSkip(logStreamId, res);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int available() throws IOException {
+        readStart();
+
+        try {
+            return is.available();
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() throws IOException {
+        if (!closed) {
+            closed = true;
+
+            readStart();
+
+            try {
+                is.close();
+            }
+            finally {
+                readEnd();
+            }
+
+            if (clientLog.isLogEnabled())
+                clientLog.logCloseIn(logStreamId, userTime, readTime, total);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void mark(int readLimit) {
+        readStart();
+
+        try {
+            is.mark(readLimit);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (clientLog.isLogEnabled())
+            clientLog.logMark(logStreamId, readLimit);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void reset() throws IOException {
+        readStart();
+
+        try {
+            is.reset();
+        }
+        finally {
+            readEnd();
+        }
+
+        if (clientLog.isLogEnabled())
+            clientLog.logReset(logStreamId);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean markSupported() {
+        readStart();
+
+        try {
+            return is.markSupported();
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read() throws IOException {
+        readStart();
+
+        int res;
+
+        try {
+            res = is.read();
+        }
+        finally {
+            readEnd();
+        }
+
+        if (res != -1)
+            total++;
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized int read(long pos, byte[] buf, int off, int 
len) throws IOException {
+        readStart();
+
+        int res;
+
+        try {
+            res = is.read(pos, buf, off, len);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (res != -1)
+            total += res;
+
+        if (clientLog.isLogEnabled())
+            clientLog.logRandomRead(logStreamId, pos, res);
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void readFully(long pos, byte[] buf, int 
off, int len) throws IOException {
+        readStart();
+
+        try {
+            is.readFully(pos, buf, off, len);
+        }
+        finally {
+            readEnd();
+        }
+
+        total += len;
+
+        if (clientLog.isLogEnabled())
+            clientLog.logRandomRead(logStreamId, pos, len);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void readFully(long pos, byte[] buf) throws 
IOException {
+        readStart();
+
+        try {
+            is.readFully(pos, buf);
+        }
+        finally {
+            readEnd();
+        }
+
+        total += buf.length;
+
+        if (clientLog.isLogEnabled())
+            clientLog.logRandomRead(logStreamId, pos, buf.length);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void seek(long pos) throws IOException {
+        readStart();
+
+        try {
+            is.seek(pos);
+        }
+        finally {
+            readEnd();
+        }
+
+        if (clientLog.isLogEnabled())
+            clientLog.logSeek(logStreamId, pos);
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized long getPos() throws IOException {
+        readStart();
+
+        try {
+            return is.getPos();
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized boolean seekToNewSource(long targetPos) 
throws IOException {
+        readStart();
+
+        try {
+            return is.seekToNewSource(targetPos);
+        }
+        finally {
+            readEnd();
+        }
+    }
+
+    /**
+     * Read start.
+     */
+    private void readStart() {
+        long now = System.nanoTime();
+
+        userTime += now - lastTs;
+
+        lastTs = now;
+    }
+
+    /**
+     * Read end.
+     */
+    private void readEnd() {
+        long now = System.nanoTime();
+
+        readTime += now - lastTs;
+
+        lastTs = now;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyOutputStream.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyOutputStream.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyOutputStream.java
new file mode 100644
index 0000000..a7266c4
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsProxyOutputStream.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.hadoop.fs.*;
+import org.apache.ignite.internal.igfs.common.*;
+
+import java.io.*;
+
+/**
+ * Secondary Hadoop file system output stream wrapper.
+ */
+public class HadoopIgfsProxyOutputStream extends OutputStream {
+    /** Actual output stream. */
+    private FSDataOutputStream os;
+
+    /** Client logger. */
+    private final IgfsLogger clientLog;
+
+    /** Log stream ID. */
+    private final long logStreamId;
+
+    /** Read time. */
+    private long writeTime;
+
+    /** User time. */
+    private long userTime;
+
+    /** Last timestamp. */
+    private long lastTs;
+
+    /** Amount of written bytes. */
+    private long total;
+
+    /** Closed flag. */
+    private boolean closed;
+
+    /**
+     * Constructor.
+     *
+     * @param os Actual output stream.
+     * @param clientLog Client logger.
+     * @param logStreamId Log stream ID.
+     */
+    public HadoopIgfsProxyOutputStream(FSDataOutputStream os, IgfsLogger 
clientLog, long logStreamId) {
+        assert os != null;
+        assert clientLog != null;
+
+        this.os = os;
+        this.clientLog = clientLog;
+        this.logStreamId = logStreamId;
+
+        lastTs = System.nanoTime();
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(int b) throws IOException {
+        writeStart();
+
+        try {
+            os.write(b);
+        }
+        finally {
+            writeEnd();
+        }
+
+        total++;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(byte[] b) throws IOException {
+        writeStart();
+
+        try {
+            os.write(b);
+        }
+        finally {
+            writeEnd();
+        }
+
+        total += b.length;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void write(byte[] b, int off, int len) 
throws IOException {
+        writeStart();
+
+        try {
+            os.write(b, off, len);
+        }
+        finally {
+            writeEnd();
+        }
+
+        total += len;
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void flush() throws IOException {
+        writeStart();
+
+        try {
+            os.flush();
+        }
+        finally {
+            writeEnd();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public synchronized void close() throws IOException {
+        if (!closed) {
+            closed = true;
+
+            writeStart();
+
+            try {
+                os.close();
+            }
+            finally {
+                writeEnd();
+            }
+
+            if (clientLog.isLogEnabled())
+                clientLog.logCloseOut(logStreamId, userTime, writeTime, total);
+        }
+    }
+
+    /**
+     * Read start.
+     */
+    private void writeStart() {
+        long now = System.nanoTime();
+
+        userTime += now - lastTs;
+
+        lastTs = now;
+    }
+
+    /**
+     * Read end.
+     */
+    private void writeEnd() {
+        long now = System.nanoTime();
+
+        writeTime += now - lastTs;
+
+        lastTs = now;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/17c8d0d9/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsReader.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsReader.java
 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsReader.java
new file mode 100644
index 0000000..17755dc
--- /dev/null
+++ 
b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/HadoopIgfsReader.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.igfs.hadoop;
+
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.ignite.igfs.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+
+/**
+ * Secondary file system input stream wrapper which actually opens input 
stream only in case it is explicitly
+ * requested.
+ * <p>
+ * The class is expected to be used only from synchronized context and 
therefore is not tread-safe.
+ */
+public class HadoopIgfsReader implements IgfsReader {
+    /** Secondary file system. */
+    private final FileSystem fs;
+
+    /** Path to the file to open. */
+    private final Path path;
+
+    /** Buffer size. */
+    private final int bufSize;
+
+    /** Actual input stream. */
+    private FSDataInputStream in;
+
+    /** Cached error occurred during output stream open. */
+    private IOException err;
+
+    /** Flag indicating that the stream was already opened. */
+    private boolean opened;
+
+    /**
+     * Constructor.
+     *
+     * @param fs Secondary file system.
+     * @param path Path to the file to open.
+     * @param bufSize Buffer size.
+     */
+    public HadoopIgfsReader(FileSystem fs, Path path, int bufSize) {
+        assert fs != null;
+        assert path != null;
+
+        this.fs = fs;
+        this.path = path;
+        this.bufSize = bufSize;
+    }
+
+    /** Get input stream. */
+    private PositionedReadable in() throws IOException {
+        if (opened) {
+            if (err != null)
+                throw err;
+        }
+        else {
+            opened = true;
+
+            try {
+                in = fs.open(path, bufSize);
+
+                if (in == null)
+                    throw new IOException("Failed to open input stream (file 
system returned null): " + path);
+            }
+            catch (IOException e) {
+                err = e;
+
+                throw err;
+            }
+        }
+
+        return in;
+    }
+
+    /**
+     * Close wrapped input stream in case it was previously opened.
+     */
+    @Override public void close() {
+        U.closeQuiet(in);
+    }
+
+    /** {@inheritDoc} */
+    @Override public int read(long pos, byte[] buf, int off, int len) throws 
IOException {
+        return in().read(pos, buf, off, len);
+    }
+}

Reply via email to