http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsNearOnlyMultiNodeSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsNearOnlyMultiNodeSelfTest.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsNearOnlyMultiNodeSelfTest.java
deleted file mode 100644
index 922232e..0000000
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsNearOnlyMultiNodeSelfTest.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.fs;
-
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.util.ipc.shmem.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-import static org.apache.ignite.cache.CacheAtomicityMode.*;
-import static org.apache.ignite.cache.CacheDistributionMode.*;
-import static org.apache.ignite.cache.CacheMode.*;
-import static org.apache.ignite.events.IgniteEventType.*;
-
-/**
- * Test hadoop file system implementation.
- */
-public class GridGgfsNearOnlyMultiNodeSelfTest extends GridCommonAbstractTest {
-    /** Path to the default hadoop configuration. */
-    public static final String HADOOP_FS_CFG = 
"examples/config/filesystem/core-site.xml";
-
-    /** Group size. */
-    public static final int GRP_SIZE = 128;
-
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new 
TcpDiscoveryVmIpFinder(true);
-
-    /** Node count. */
-    private int cnt;
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        startGrids(nodeCount());
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        G.stopAll(true);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration(gridName);
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-        discoSpi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(discoSpi);
-
-        IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration();
-
-        ggfsCfg.setDataCacheName("partitioned");
-        ggfsCfg.setMetaCacheName("partitioned");
-        ggfsCfg.setName("ggfs");
-
-        ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{
-            put("type", "shmem");
-            put("port", 
String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt));
-        }});
-
-        ggfsCfg.setBlockSize(512 * 1024); // Together with group blocks mapper 
will yield 64M per node groups.
-
-        cfg.setGgfsConfiguration(ggfsCfg);
-
-        cfg.setCacheConfiguration(cacheConfiguration(gridName));
-
-        cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, 
EVT_JOB_MAPPED);
-
-        cnt++;
-
-        return cfg;
-    }
-
-    /** @return Node count for test. */
-    protected int nodeCount() {
-        return 4;
-    }
-
-    /**
-     * Gets cache configuration.
-     *
-     * @param gridName Grid name.
-     * @return Cache configuration.
-     */
-    protected CacheConfiguration cacheConfiguration(String gridName) {
-        CacheConfiguration cacheCfg = defaultCacheConfiguration();
-
-        cacheCfg.setName("partitioned");
-        cacheCfg.setCacheMode(PARTITIONED);
-        cacheCfg.setDistributionMode(cnt == 0 ? NEAR_ONLY : PARTITIONED_ONLY);
-        
cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
-        cacheCfg.setAffinityMapper(new 
IgniteFsGroupDataBlocksKeyMapper(GRP_SIZE));
-        cacheCfg.setBackups(0);
-        cacheCfg.setQueryIndexEnabled(false);
-        cacheCfg.setAtomicityMode(TRANSACTIONAL);
-
-        return cacheCfg;
-    }
-
-    /**
-     * Gets config of concrete File System.
-     *
-     * @return Config of concrete File System.
-     */
-    protected Configuration getFileSystemConfig() {
-        Configuration cfg = new Configuration();
-
-        cfg.addResource(U.resolveGridGainUrl(HADOOP_FS_CFG));
-
-        return cfg;
-    }
-
-    /**
-     * Gets File System name.
-     *
-     * @param grid Grid index.
-     * @return File System name.
-     */
-    protected URI getFileSystemURI(int grid) {
-        try {
-            return new URI("ggfs://127.0.0.1:" + 
(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + grid));
-        }
-        catch (URISyntaxException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /** @throws Exception If failed. */
-    public void testContentsConsistency() throws Exception {
-        try (FileSystem fs = FileSystem.get(getFileSystemURI(0), 
getFileSystemConfig())) {
-            Collection<IgniteBiTuple<String, Long>> files = F.asList(
-                F.t("/dir1/dir2/file1", 1024L),
-                F.t("/dir1/dir2/file2", 8 * 1024L),
-                F.t("/dir1/file1", 1024 * 1024L),
-                F.t("/dir1/file2", 5 * 1024 * 1024L),
-                F.t("/file1", 64 * 1024L + 13),
-                F.t("/file2", 13L),
-                F.t("/file3", 123764L)
-            );
-
-            for (IgniteBiTuple<String, Long> file : files) {
-
-                info("Writing file: " + file.get1());
-
-                try (OutputStream os = fs.create(new Path(file.get1()), 
(short)3)) {
-                    byte[] data = new byte[file.get2().intValue()];
-
-                    data[0] = 25;
-                    data[data.length - 1] = 26;
-
-                    os.write(data);
-                }
-
-                info("Finished writing file: " + file.get1());
-            }
-
-            for (int i = 1; i < nodeCount(); i++) {
-
-                try (FileSystem ignored = FileSystem.get(getFileSystemURI(i), 
getFileSystemConfig())) {
-                    for (IgniteBiTuple<String, Long> file : files) {
-                        Path path = new Path(file.get1());
-
-                        FileStatus fileStatus = fs.getFileStatus(path);
-
-                        assertEquals(file.get2(), (Long)fileStatus.getLen());
-
-                        byte[] read = new byte[file.get2().intValue()];
-
-                        info("Reading file: " + path);
-
-                        try (FSDataInputStream in = fs.open(path)) {
-                            in.readFully(read);
-
-                            assert read[0] == 25;
-                            assert read[read.length - 1] == 26;
-                        }
-
-                        info("Finished reading file: " + path);
-                    }
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/hadoop/src/test/java/org/apache/ignite/fs/IgniteFsEventsTestSuite.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/fs/IgniteFsEventsTestSuite.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/fs/IgniteFsEventsTestSuite.java
deleted file mode 100644
index 46d4494..0000000
--- 
a/modules/hadoop/src/test/java/org/apache/ignite/fs/IgniteFsEventsTestSuite.java
+++ /dev/null
@@ -1,267 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.fs;
-
-import junit.framework.*;
-import org.apache.ignite.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.fs.hadoop.*;
-import org.apache.ignite.internal.processors.hadoop.*;
-import org.apache.ignite.internal.util.ipc.shmem.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.jetbrains.annotations.*;
-
-import java.util.*;
-
-import static org.apache.ignite.fs.IgniteFsMode.*;
-
-/**
- * Test suite for GGFS event tests.
- */
-@SuppressWarnings("PublicInnerClass")
-public class IgniteFsEventsTestSuite extends TestSuite {
-    /**
-     * @return Test suite.
-     * @throws Exception Thrown in case of the failure.
-     */
-    public static TestSuite suite() throws Exception {
-        GridHadoopClassLoader ldr = new GridHadoopClassLoader(null);
-
-        TestSuite suite = new TestSuite("Ignite FS Events Test Suite");
-
-        suite.addTest(new 
TestSuite(ldr.loadClass(ShmemPrivate.class.getName())));
-        suite.addTest(new 
TestSuite(ldr.loadClass(ShmemDualSync.class.getName())));
-        suite.addTest(new 
TestSuite(ldr.loadClass(ShmemDualAsync.class.getName())));
-
-        suite.addTest(new 
TestSuite(ldr.loadClass(LoopbackPrivate.class.getName())));
-        suite.addTest(new 
TestSuite(ldr.loadClass(LoopbackDualSync.class.getName())));
-        suite.addTest(new 
TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName())));
-
-        return suite;
-    }
-
-    /**
-     * @return Test suite with only tests that are supported on all platforms.
-     * @throws Exception Thrown in case of the failure.
-     */
-    public static TestSuite suiteNoarchOnly() throws Exception {
-        GridHadoopClassLoader ldr = new GridHadoopClassLoader(null);
-
-        TestSuite suite = new TestSuite("Gridgain GGFS Events Test Suite 
Noarch Only");
-
-        suite.addTest(new 
TestSuite(ldr.loadClass(LoopbackPrivate.class.getName())));
-        suite.addTest(new 
TestSuite(ldr.loadClass(LoopbackDualSync.class.getName())));
-        suite.addTest(new 
TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName())));
-
-        return suite;
-    }
-
-    /**
-     * Shared memory IPC in PRIVATE mode.
-     */
-    public static class ShmemPrivate extends GridGgfsEventsAbstractSelfTest {
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() 
throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() 
{{
-                put("type", "shmem");
-                put("port", 
String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1));
-            }});
-
-            return ggfsCfg;
-        }
-    }
-
-    /**
-     * Loopback socket IPS in PRIVATE mode.
-     */
-    public static class LoopbackPrivate extends GridGgfsEventsAbstractSelfTest 
{
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() 
throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() 
{{
-                put("type", "tcp");
-                put("port", 
String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1));
-            }});
-
-            return ggfsCfg;
-        }
-    }
-
-    /**
-     * Base class for all GGFS tests with primary and secondary file system.
-     */
-    public abstract static class PrimarySecondaryTest extends 
GridGgfsEventsAbstractSelfTest {
-        /** Secondary file system. */
-        private static IgniteFs ggfsSec;
-
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() 
throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setSecondaryFileSystem(new GridGgfsHadoopFileSystemWrapper(
-                "ggfs://ggfs-secondary:grid-secondary@127.0.0.1:11500/",
-                
"modules/core/src/test/config/hadoop/core-site-secondary.xml"));
-
-            return ggfsCfg;
-        }
-
-        /**
-         * @return GGFS configuration for secondary file system.
-         */
-        protected IgniteFsConfiguration getSecondaryGgfsConfiguration() throws 
IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setName("ggfs-secondary");
-            ggfsCfg.setDefaultMode(PRIMARY);
-            ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>(){{
-                put("type", "tcp");
-                put("port", "11500");
-            }});
-
-            return ggfsCfg;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void beforeTestsStarted() throws Exception {
-            ggfsSec = startSecondary();
-
-            super.beforeTestsStarted();
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void afterTestsStopped() throws Exception {
-            super.afterTestsStopped();
-
-            G.stopAll(true);
-        }
-
-        /** {@inheritDoc} */
-        @Override protected void afterTest() throws Exception {
-            super.afterTest();
-
-            // Clean up secondary file system.
-            ggfsSec.format();
-        }
-
-        /**
-         * Start a grid with the secondary file system.
-         *
-         * @return Secondary file system handle.
-         * @throws Exception If failed.
-         */
-        @Nullable private IgniteFs startSecondary() throws Exception {
-            IgniteConfiguration cfg = getConfiguration("grid-secondary", 
getSecondaryGgfsConfiguration());
-
-            cfg.setLocalHost("127.0.0.1");
-            cfg.setPeerClassLoadingEnabled(false);
-
-            Ignite secG = G.start(cfg);
-
-            return secG.fileSystem("ggfs-secondary");
-        }
-    }
-
-    /**
-     * Shared memory IPC in DUAL_SYNC mode.
-     */
-    public static class ShmemDualSync extends PrimarySecondaryTest {
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() 
throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setDefaultMode(DUAL_SYNC);
-
-            return ggfsCfg;
-        }
-    }
-
-    /**
-     * Shared memory IPC in DUAL_SYNC mode.
-     */
-    public static class ShmemDualAsync extends PrimarySecondaryTest {
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() 
throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setDefaultMode(DUAL_ASYNC);
-
-            return ggfsCfg;
-        }
-    }
-
-    /**
-     * Loopback socket IPC with secondary file system.
-     */
-    public abstract static class LoopbackPrimarySecondaryTest extends 
PrimarySecondaryTest {
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() 
throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setSecondaryFileSystem(new GridGgfsHadoopFileSystemWrapper(
-                "ggfs://ggfs-secondary:grid-secondary@127.0.0.1:11500/",
-                
"modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"));
-
-            return ggfsCfg;
-        }
-
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration 
getSecondaryGgfsConfiguration() throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = 
super.getSecondaryGgfsConfiguration();
-
-            ggfsCfg.setName("ggfs-secondary");
-            ggfsCfg.setDefaultMode(PRIMARY);
-            ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() 
{{
-                put("type", "tcp");
-                put("port", "11500");
-            }});
-
-            return ggfsCfg;
-        }
-    }
-
-    /**
-     * Loopback IPC in DUAL_SYNC mode.
-     */
-    public static class LoopbackDualSync extends LoopbackPrimarySecondaryTest {
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() 
throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setDefaultMode(DUAL_SYNC);
-
-            return ggfsCfg;
-        }
-    }
-
-    /**
-     * Loopback socket IPC in DUAL_ASYNC mode.
-     */
-    public static class LoopbackDualAsync extends LoopbackPrimarySecondaryTest 
{
-        /** {@inheritDoc} */
-        @Override protected IgniteFsConfiguration getGgfsConfiguration() 
throws IgniteCheckedException {
-            IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration();
-
-            ggfsCfg.setDefaultMode(DUAL_ASYNC);
-
-            return ggfsCfg;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridFileSystemLoad.java
----------------------------------------------------------------------
diff --git 
a/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridFileSystemLoad.java
 
b/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridFileSystemLoad.java
new file mode 100644
index 0000000..6798710
--- /dev/null
+++ 
b/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridFileSystemLoad.java
@@ -0,0 +1,549 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.ignitefs;
+
+import org.apache.hadoop.conf.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.*;
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+/**
+ * Hadoop file system load application.
+ * <p>
+ * Command line arguments:
+ * <ul>
+ * <li>-u {url} file system URL</li>
+ * <li>-hadoopCfg {cfg} Hadoop configuration</li>
+ * <li>-f {num} files number</li>
+ * <li>-r {num} reads number</li>
+ * <li>-w {num} writes number</li>
+ * <li>-d {num} deletes number</li>
+ * <li>-delay {delay} delay between operations in milliseconds</li>
+ * <li>-t {num} threads number</li>
+ * <li>-minSize {min size} min file size in bytes</li>
+ * <li>-maxSize {max size} max file size in bytes</li>
+ * <li>-startNode {true|false} if 'true' then starts node before execution</li>
+ * <li>-nodeCfg {cfg} configuration for started node</li>
+ * <li>-primaryOnly {true|false} if 'true' then creates files only in 
directory named 'primary' </li>
+ * </ul>
+ * Note: GGFS logging is disabled by default, to enable logging it is 
necessary to set flag
+ * 'fs.ggfs.<name>.log.enabled' in Hadoop configuration file. By default log 
files are created in the
+ * directory 'work/ggfs/log', this path can be changed in Hadoop configuration 
file using property
+ * 'fs.ggfs.<name>.log.dir'.
+ */
+public class GridFileSystemLoad {
+    /** */
+    private static final String DFLT_URL = "ggfs:///";
+
+    /** */
+    private static final int DFLT_MIN_FILE_SIZE = 100 * 1024;
+
+    /** */
+    private static final int DFLT_MAX_FILE_SIZE = 1024 * 1024;
+
+    /** */
+    private static final int DFLT_FILES_NUMBER = 1000;
+
+    /** */
+    private static final int DFLT_READS_NUMBER = 2000;
+
+    /** */
+    private static final int DFLT_WRITES_NUMBER = 2000;
+
+    /** */
+    private static final int DFLT_DELETES_NUMBER = 100;
+
+    /** */
+    private static final int DFLT_THREADS_NUMBER = 2;
+
+    /** */
+    private static final boolean DFLT_START_NODE = true;
+
+    /** */
+    private static final boolean DFLT_PRIMARY_ONLY = false;
+
+    /** */
+    private static final String DFLT_NODE_CFG = 
"config/hadoop/default-config.xml";
+
+    /** */
+    private static final long DFLT_DELAY = 5;
+
+    /** */
+    private static final String DFLT_HADOOP_CFG = 
"examples/config/filesystem/core-site.xml";
+
+    /** */
+    private static final int CREATE_BUF_SIZE = 100 * 1024;
+
+    /** */
+    private static final String DIR_PRIMARY_MODE = "primary";
+
+    /** */
+    private static final String DIR_PROXY_MODE = "proxy";
+
+    /** */
+    private static final String DIR_DUAL_SYNC_MODE = "dual_sync";
+
+    /** */
+    private static final String DIR_DUAL_ASYNC_MODE = "dual_async";
+
+    /**
+     * Main method.
+     *
+     * @param args Command line arguments.
+     * @throws Exception If error occurs.
+     */
+    public static void main(String[] args) throws Exception {
+        String url = DFLT_URL;
+
+        int filesNum = DFLT_FILES_NUMBER;
+
+        int minFileSize = DFLT_MIN_FILE_SIZE;
+
+        int maxFileSize = DFLT_MAX_FILE_SIZE;
+
+        int readsNum = DFLT_READS_NUMBER;
+
+        int writesNum = DFLT_WRITES_NUMBER;
+
+        int deletesNum = DFLT_DELETES_NUMBER;
+
+        int threadsNum = DFLT_THREADS_NUMBER;
+
+        long delay = DFLT_DELAY;
+
+        String nodeCfg = DFLT_NODE_CFG;
+
+        String hadoopCfg = DFLT_HADOOP_CFG;
+
+        boolean startNode = DFLT_START_NODE;
+
+        boolean primaryOnly = DFLT_PRIMARY_ONLY;
+
+        for (int i = 0; i < args.length; i++) {
+            String arg = args[i];
+
+            switch (arg) {
+                case "-u":
+                    url = args[++i];
+
+                    break;
+
+                case "-hadoopCfg":
+                    hadoopCfg= args[++i];
+
+                    break;
+
+                case "-f":
+                    filesNum = Integer.parseInt(args[++i]);
+
+                    break;
+
+                case "-r":
+                    readsNum = Integer.parseInt(args[++i]);
+
+                    break;
+
+                case "-w":
+                    writesNum = Integer.parseInt(args[++i]);
+
+                    break;
+
+                case "-minSize":
+                    minFileSize = Integer.parseInt(args[++i]);
+
+                    break;
+
+                case "-maxSize":
+                    maxFileSize = Integer.parseInt(args[++i]);
+
+                    break;
+
+                case "-d":
+                    deletesNum = Integer.parseInt(args[++i]);
+
+                    break;
+
+                case "-t":
+                    threadsNum = Integer.parseInt(args[++i]);
+
+                    break;
+
+                case "-delay":
+                    delay = Long.parseLong(args[++i]);
+
+                    break;
+
+                case "-startNode":
+                    startNode = Boolean.parseBoolean(args[++i]);
+
+                    break;
+
+                case "-nodeCfg":
+                    nodeCfg= args[++i];
+
+                    break;
+
+                case "-primaryOnly":
+                    primaryOnly = Boolean.parseBoolean(args[++i]);
+
+                    break;
+            }
+        }
+
+        X.println("File system URL: " + url);
+        X.println("Hadoop configuration: " + hadoopCfg);
+        X.println("Primary mode only: " + primaryOnly);
+        X.println("Files number: " + filesNum);
+        X.println("Reads number: " + readsNum);
+        X.println("Writes number: " + writesNum);
+        X.println("Deletes number: " + deletesNum);
+        X.println("Min file size: " + minFileSize);
+        X.println("Max file size: " + maxFileSize);
+        X.println("Threads number: " + threadsNum);
+        X.println("Delay: " + delay);
+
+        if (minFileSize > maxFileSize)
+            throw new IllegalArgumentException();
+
+        Ignite ignite = null;
+
+        if (startNode) {
+            X.println("Starting node using configuration: " + nodeCfg);
+
+            ignite = G.start(U.resolveGridGainUrl(nodeCfg));
+        }
+
+        try {
+            new GridFileSystemLoad().runLoad(url, hadoopCfg, primaryOnly, 
threadsNum, filesNum, readsNum, writesNum,
+                deletesNum, minFileSize, maxFileSize, delay);
+        }
+        finally {
+            if (ignite != null)
+                G.stop(true);
+        }
+    }
+
+    /**
+     * Executes read/write/delete operations.
+     *
+     * @param url File system url.
+     * @param hadoopCfg Hadoop configuration.
+     * @param primaryOnly If {@code true} then creates files only on directory 
named 'primary'.
+     * @param threads Threads number.
+     * @param files Files number.
+     * @param reads Reads number.
+     * @param writes Writes number.
+     * @param deletes Deletes number.
+     * @param minSize Min file size.
+     * @param maxSize Max file size.
+     * @param delay Delay between operations.
+     * @throws Exception If some file system operation failed.
+     */
+    @SuppressWarnings("IfMayBeConditional")
+    public void runLoad(String url, String hadoopCfg, final boolean 
primaryOnly, int threads, int files,
+        final int reads, final int writes, final int deletes, final int 
minSize, final int maxSize, final long delay)
+        throws Exception {
+        Path fsPath = new Path(url);
+
+        Configuration cfg = new Configuration(true);
+
+        cfg.addResource(U.resolveGridGainUrl(hadoopCfg));
+
+        final FileSystem fs = FileSystem.get(fsPath.toUri(), cfg);
+
+        Path workDir = new Path(fsPath, "/fsload");
+
+        fs.delete(workDir, true);
+
+        fs.mkdirs(workDir, FsPermission.getDefault());
+
+        final Path[] dirs;
+
+        if (primaryOnly)
+            dirs = new Path[]{mkdir(fs, workDir, DIR_PRIMARY_MODE)};
+        else
+            dirs = new Path[]{mkdir(fs, workDir, DIR_PRIMARY_MODE), mkdir(fs, 
workDir, DIR_PROXY_MODE),
+                mkdir(fs, workDir, DIR_DUAL_SYNC_MODE), mkdir(fs, workDir, 
DIR_DUAL_ASYNC_MODE)};
+
+        try {
+            ExecutorService exec = Executors.newFixedThreadPool(threads);
+
+            Collection<Future<?>> futs = new ArrayList<>(threads);
+
+            for (int i = 0; i < threads; i++) {
+                final int filesPerThread;
+
+                if (i == 0 && files % threads != 0)
+                    filesPerThread = files / threads + files % threads;
+                else
+                    filesPerThread = files / threads;
+
+                futs.add(exec.submit(new Callable<Void>() {
+                    @Override public Void call() throws Exception {
+                        runLoad(fs, dirs, filesPerThread, reads, writes, 
deletes, minSize, maxSize, delay);
+
+                        return null;
+                    }
+                }));
+            }
+
+            exec.shutdown();
+
+            for (Future<?> fut : futs) {
+                try {
+                    fut.get();
+                }
+                catch (ExecutionException e) {
+                    X.error("Error during execution: " + e);
+
+                    e.getCause().printStackTrace();
+                }
+            }
+        }
+        finally {
+            try {
+                fs.delete(workDir, true);
+            }
+            catch (IOException ignored) {
+                // Ignore.
+            }
+        }
+    }
+
+    /**
+     * Executes read/write/delete operations.
+     *
+     * @param fs File system.
+     * @param dirs Directories where files should be created.
+     * @param filesNum Files number.
+     * @param reads Reads number.
+     * @param writes Writes number.
+     * @param deletes Deletes number.
+     * @param minSize Min file size.
+     * @param maxSize Max file size.
+     * @param delay Delay between operations.
+     * @throws Exception If some file system operation failed.
+     */
+    private void runLoad(FileSystem fs, Path[] dirs, int filesNum, int reads, 
int writes, int deletes,
+        int minSize, int maxSize, long delay) throws Exception {
+        Random random = random();
+
+        List<T2<Path, Integer>> files = new ArrayList<>(filesNum);
+
+        for (int i = 0; i < filesNum; i++) {
+            int size = maxSize == minSize ? minSize : minSize + 
random.nextInt(maxSize - minSize);
+
+            Path file = new Path(dirs[i % dirs.length], "file-" + 
UUID.randomUUID());
+
+            createFile(fs, file, size, CREATE_BUF_SIZE);
+
+            files.add(new T2<>(file, size));
+        }
+
+        List<Path> toDel = new ArrayList<>(deletes);
+
+        for (int i = 0; i < deletes; i++) {
+            int size = maxSize == minSize ? minSize : minSize + 
random.nextInt(maxSize - minSize);
+
+            Path file = new Path(dirs[i % dirs.length], "file-to-delete-" + 
UUID.randomUUID());
+
+            createFile(fs, file, size, CREATE_BUF_SIZE);
+
+            toDel.add(file);
+        }
+
+        while (reads > 0 || writes > 0 || deletes > 0) {
+            if (reads > 0) {
+                reads--;
+
+                T2<Path, Integer> file = files.get(reads % files.size());
+
+                readFull(fs, file.get1(), CREATE_BUF_SIZE);
+
+                int fileSize = file.get2();
+
+                readRandom(fs, file.get1(), fileSize, random.nextInt(fileSize) 
+ 1);
+            }
+
+            if (writes > 0) {
+                writes--;
+
+                T2<Path, Integer> file = files.get(writes % files.size());
+
+                overwriteFile(fs, file.get1(), file.get2(), CREATE_BUF_SIZE);
+
+                appendToFile(fs, file.get1(), random.nextInt(CREATE_BUF_SIZE) 
+ 1);
+            }
+
+            if (deletes > 0) {
+                deletes--;
+
+                deleteFile(fs, toDel.get(deletes));
+            }
+
+            U.sleep(delay);
+        }
+    }
+
+    /**
+     * Creates file.
+     *
+     * @param fs File system.
+     * @param file File path.
+     * @param fileSize File size.
+     * @param bufSize Write buffer size.
+     * @throws IOException If operation failed.
+     */
+    private static void createFile(FileSystem fs, Path file, int fileSize, int 
bufSize) throws IOException {
+        create(fs, file, fileSize, bufSize, false);
+    }
+
+    /**
+     * Overwrites file.
+     *
+     * @param fs File system.
+     * @param file File path.
+     * @param fileSize File size.
+     * @param bufSize Write buffer size.
+     * @throws IOException If operation failed.
+     */
+    private static void overwriteFile(FileSystem fs, Path file, int fileSize, 
int bufSize) throws IOException {
+        create(fs, file, fileSize, bufSize, true);
+    }
+
+    /**
+     * Appends to file.
+     *
+     * @param fs File system.
+     * @param file File path.
+     * @param appendSize Append size.
+     * @throws IOException If operation failed.
+     */
+    private static void appendToFile(FileSystem fs, Path file, int appendSize) 
throws IOException {
+        try (FSDataOutputStream out = fs.append(file)) {
+            out.write(new byte[appendSize]);
+        }
+    }
+
+    /**
+     * Reads whole file.
+     *
+     * @param fs File system.
+     * @param file File path.
+     * @param bufSize Read buffer size.
+     * @throws IOException If operation failed.
+     */
+    @SuppressWarnings("StatementWithEmptyBody")
+    private static void readFull(FileSystem fs, Path file, int bufSize) throws 
IOException {
+        try (FSDataInputStream in = fs.open(file)) {
+            byte[] readBuf = new byte[bufSize];
+
+            while (in.read(readBuf) > 0) {
+                // No-op.
+            }
+        }
+    }
+
+    /**
+     * Deletes file.
+     *
+     * @param fs File system.
+     * @param path File path.
+     * @throws IOException If operation failed.
+     */
+    private static void deleteFile(FileSystem fs, Path path) throws 
IOException {
+        fs.delete(path, false);
+    }
+
+    /**
+     * Reads from random position.
+     *
+     * @param fs File system.
+     * @param path File path.
+     * @param fileSize File size.
+     * @param readSize Read size.
+     * @throws IOException If operation failed.
+     */
+    private static void readRandom(FileSystem fs, Path path, int fileSize, int 
readSize) throws IOException {
+        byte[] readBuf = new byte[readSize];
+
+        try (FSDataInputStream in = fs.open(path)) {
+            in.seek(random().nextInt(fileSize));
+
+            in.read(readBuf);
+        }
+    }
+
+    /**
+     * Creates file.
+     *
+     * @param fs File system.
+     * @param file File path.
+     * @param fileSize File size.
+     * @param bufSize Buffer size.
+     * @param overwrite Overwrite flag.
+     * @throws IOException If operation failed.
+     */
+    private static void create(FileSystem fs, Path file, int fileSize, int 
bufSize, boolean overwrite)
+        throws IOException {
+        try (FSDataOutputStream out = fs.create(file, overwrite)) {
+            int size = 0;
+
+            byte[] buf = new byte[bufSize];
+
+            while (size < fileSize) {
+                int len = Math.min(fileSize - size, bufSize);
+
+                out.write(buf, 0, len);
+
+                size += len;
+            }
+        }
+    }
+
+    /**
+     * Creates directory in the given parent directory.
+     *
+     * @param fs File system.
+     * @param parentDir Parent directory.
+     * @param dirName Directory name.
+     * @return Path for created directory.
+     * @throws IOException If operation failed.
+     */
+    private static Path mkdir(FileSystem fs, Path parentDir, String dirName) 
throws IOException {
+        Path path = new Path(parentDir, dirName);
+
+        fs.mkdirs(path, FsPermission.getDefault());
+
+        return path;
+    }
+
+    /**
+     * @return Thread local random.
+     */
+    private static Random random() {
+        return ThreadLocalRandom.current();
+    }
+}

Reply via email to