http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsNearOnlyMultiNodeSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsNearOnlyMultiNodeSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsNearOnlyMultiNodeSelfTest.java deleted file mode 100644 index 922232e..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/fs/GridGgfsNearOnlyMultiNodeSelfTest.java +++ /dev/null @@ -1,212 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.fs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.cache.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.util.ipc.shmem.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.spi.discovery.tcp.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.*; -import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; -import org.apache.ignite.testframework.junits.common.*; - -import java.io.*; -import java.net.*; -import java.util.*; - -import static org.apache.ignite.cache.CacheAtomicityMode.*; -import static org.apache.ignite.cache.CacheDistributionMode.*; -import static org.apache.ignite.cache.CacheMode.*; -import static org.apache.ignite.events.IgniteEventType.*; - -/** - * Test hadoop file system implementation. - */ -public class GridGgfsNearOnlyMultiNodeSelfTest extends GridCommonAbstractTest { - /** Path to the default hadoop configuration. */ - public static final String HADOOP_FS_CFG = "examples/config/filesystem/core-site.xml"; - - /** Group size. */ - public static final int GRP_SIZE = 128; - - /** IP finder. */ - private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true); - - /** Node count. */ - private int cnt; - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - startGrids(nodeCount()); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - G.stopAll(true); - } - - /** {@inheritDoc} */ - @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { - IgniteConfiguration cfg = super.getConfiguration(gridName); - - TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); - discoSpi.setIpFinder(IP_FINDER); - - cfg.setDiscoverySpi(discoSpi); - - IgniteFsConfiguration ggfsCfg = new IgniteFsConfiguration(); - - ggfsCfg.setDataCacheName("partitioned"); - ggfsCfg.setMetaCacheName("partitioned"); - ggfsCfg.setName("ggfs"); - - ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ - put("type", "shmem"); - put("port", String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + cnt)); - }}); - - ggfsCfg.setBlockSize(512 * 1024); // Together with group blocks mapper will yield 64M per node groups. - - cfg.setGgfsConfiguration(ggfsCfg); - - cfg.setCacheConfiguration(cacheConfiguration(gridName)); - - cfg.setIncludeEventTypes(EVT_TASK_FAILED, EVT_TASK_FINISHED, EVT_JOB_MAPPED); - - cnt++; - - return cfg; - } - - /** @return Node count for test. */ - protected int nodeCount() { - return 4; - } - - /** - * Gets cache configuration. - * - * @param gridName Grid name. - * @return Cache configuration. - */ - protected CacheConfiguration cacheConfiguration(String gridName) { - CacheConfiguration cacheCfg = defaultCacheConfiguration(); - - cacheCfg.setName("partitioned"); - cacheCfg.setCacheMode(PARTITIONED); - cacheCfg.setDistributionMode(cnt == 0 ? NEAR_ONLY : PARTITIONED_ONLY); - cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); - cacheCfg.setAffinityMapper(new IgniteFsGroupDataBlocksKeyMapper(GRP_SIZE)); - cacheCfg.setBackups(0); - cacheCfg.setQueryIndexEnabled(false); - cacheCfg.setAtomicityMode(TRANSACTIONAL); - - return cacheCfg; - } - - /** - * Gets config of concrete File System. - * - * @return Config of concrete File System. - */ - protected Configuration getFileSystemConfig() { - Configuration cfg = new Configuration(); - - cfg.addResource(U.resolveGridGainUrl(HADOOP_FS_CFG)); - - return cfg; - } - - /** - * Gets File System name. - * - * @param grid Grid index. - * @return File System name. - */ - protected URI getFileSystemURI(int grid) { - try { - return new URI("ggfs://127.0.0.1:" + (IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + grid)); - } - catch (URISyntaxException e) { - throw new RuntimeException(e); - } - } - - /** @throws Exception If failed. */ - public void testContentsConsistency() throws Exception { - try (FileSystem fs = FileSystem.get(getFileSystemURI(0), getFileSystemConfig())) { - Collection<IgniteBiTuple<String, Long>> files = F.asList( - F.t("/dir1/dir2/file1", 1024L), - F.t("/dir1/dir2/file2", 8 * 1024L), - F.t("/dir1/file1", 1024 * 1024L), - F.t("/dir1/file2", 5 * 1024 * 1024L), - F.t("/file1", 64 * 1024L + 13), - F.t("/file2", 13L), - F.t("/file3", 123764L) - ); - - for (IgniteBiTuple<String, Long> file : files) { - - info("Writing file: " + file.get1()); - - try (OutputStream os = fs.create(new Path(file.get1()), (short)3)) { - byte[] data = new byte[file.get2().intValue()]; - - data[0] = 25; - data[data.length - 1] = 26; - - os.write(data); - } - - info("Finished writing file: " + file.get1()); - } - - for (int i = 1; i < nodeCount(); i++) { - - try (FileSystem ignored = FileSystem.get(getFileSystemURI(i), getFileSystemConfig())) { - for (IgniteBiTuple<String, Long> file : files) { - Path path = new Path(file.get1()); - - FileStatus fileStatus = fs.getFileStatus(path); - - assertEquals(file.get2(), (Long)fileStatus.getLen()); - - byte[] read = new byte[file.get2().intValue()]; - - info("Reading file: " + path); - - try (FSDataInputStream in = fs.open(path)) { - in.readFully(read); - - assert read[0] == 25; - assert read[read.length - 1] == 26; - } - - info("Finished reading file: " + path); - } - } - } - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/hadoop/src/test/java/org/apache/ignite/fs/IgniteFsEventsTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/fs/IgniteFsEventsTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/fs/IgniteFsEventsTestSuite.java deleted file mode 100644 index 46d4494..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/fs/IgniteFsEventsTestSuite.java +++ /dev/null @@ -1,267 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.fs; - -import junit.framework.*; -import org.apache.ignite.*; -import org.apache.ignite.configuration.*; -import org.apache.ignite.internal.fs.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.util.ipc.shmem.*; -import org.apache.ignite.internal.util.typedef.*; -import org.jetbrains.annotations.*; - -import java.util.*; - -import static org.apache.ignite.fs.IgniteFsMode.*; - -/** - * Test suite for GGFS event tests. - */ -@SuppressWarnings("PublicInnerClass") -public class IgniteFsEventsTestSuite extends TestSuite { - /** - * @return Test suite. - * @throws Exception Thrown in case of the failure. - */ - public static TestSuite suite() throws Exception { - GridHadoopClassLoader ldr = new GridHadoopClassLoader(null); - - TestSuite suite = new TestSuite("Ignite FS Events Test Suite"); - - suite.addTest(new TestSuite(ldr.loadClass(ShmemPrivate.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(ShmemDualSync.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(ShmemDualAsync.class.getName()))); - - suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrivate.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName()))); - - return suite; - } - - /** - * @return Test suite with only tests that are supported on all platforms. - * @throws Exception Thrown in case of the failure. - */ - public static TestSuite suiteNoarchOnly() throws Exception { - GridHadoopClassLoader ldr = new GridHadoopClassLoader(null); - - TestSuite suite = new TestSuite("Gridgain GGFS Events Test Suite Noarch Only"); - - suite.addTest(new TestSuite(ldr.loadClass(LoopbackPrivate.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualSync.class.getName()))); - suite.addTest(new TestSuite(ldr.loadClass(LoopbackDualAsync.class.getName()))); - - return suite; - } - - /** - * Shared memory IPC in PRIVATE mode. - */ - public static class ShmemPrivate extends GridGgfsEventsAbstractSelfTest { - /** {@inheritDoc} */ - @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException { - IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration(); - - ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ - put("type", "shmem"); - put("port", String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1)); - }}); - - return ggfsCfg; - } - } - - /** - * Loopback socket IPS in PRIVATE mode. - */ - public static class LoopbackPrivate extends GridGgfsEventsAbstractSelfTest { - /** {@inheritDoc} */ - @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException { - IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration(); - - ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", String.valueOf(IpcSharedMemoryServerEndpoint.DFLT_IPC_PORT + 1)); - }}); - - return ggfsCfg; - } - } - - /** - * Base class for all GGFS tests with primary and secondary file system. - */ - public abstract static class PrimarySecondaryTest extends GridGgfsEventsAbstractSelfTest { - /** Secondary file system. */ - private static IgniteFs ggfsSec; - - /** {@inheritDoc} */ - @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException { - IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration(); - - ggfsCfg.setSecondaryFileSystem(new GridGgfsHadoopFileSystemWrapper( - "ggfs://ggfs-secondary:grid-secondary@127.0.0.1:11500/", - "modules/core/src/test/config/hadoop/core-site-secondary.xml")); - - return ggfsCfg; - } - - /** - * @return GGFS configuration for secondary file system. - */ - protected IgniteFsConfiguration getSecondaryGgfsConfiguration() throws IgniteCheckedException { - IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration(); - - ggfsCfg.setName("ggfs-secondary"); - ggfsCfg.setDefaultMode(PRIMARY); - ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>(){{ - put("type", "tcp"); - put("port", "11500"); - }}); - - return ggfsCfg; - } - - /** {@inheritDoc} */ - @Override protected void beforeTestsStarted() throws Exception { - ggfsSec = startSecondary(); - - super.beforeTestsStarted(); - } - - /** {@inheritDoc} */ - @Override protected void afterTestsStopped() throws Exception { - super.afterTestsStopped(); - - G.stopAll(true); - } - - /** {@inheritDoc} */ - @Override protected void afterTest() throws Exception { - super.afterTest(); - - // Clean up secondary file system. - ggfsSec.format(); - } - - /** - * Start a grid with the secondary file system. - * - * @return Secondary file system handle. - * @throws Exception If failed. - */ - @Nullable private IgniteFs startSecondary() throws Exception { - IgniteConfiguration cfg = getConfiguration("grid-secondary", getSecondaryGgfsConfiguration()); - - cfg.setLocalHost("127.0.0.1"); - cfg.setPeerClassLoadingEnabled(false); - - Ignite secG = G.start(cfg); - - return secG.fileSystem("ggfs-secondary"); - } - } - - /** - * Shared memory IPC in DUAL_SYNC mode. - */ - public static class ShmemDualSync extends PrimarySecondaryTest { - /** {@inheritDoc} */ - @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException { - IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration(); - - ggfsCfg.setDefaultMode(DUAL_SYNC); - - return ggfsCfg; - } - } - - /** - * Shared memory IPC in DUAL_SYNC mode. - */ - public static class ShmemDualAsync extends PrimarySecondaryTest { - /** {@inheritDoc} */ - @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException { - IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration(); - - ggfsCfg.setDefaultMode(DUAL_ASYNC); - - return ggfsCfg; - } - } - - /** - * Loopback socket IPC with secondary file system. - */ - public abstract static class LoopbackPrimarySecondaryTest extends PrimarySecondaryTest { - /** {@inheritDoc} */ - @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException { - IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration(); - - ggfsCfg.setSecondaryFileSystem(new GridGgfsHadoopFileSystemWrapper( - "ggfs://ggfs-secondary:grid-secondary@127.0.0.1:11500/", - "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml")); - - return ggfsCfg; - } - - /** {@inheritDoc} */ - @Override protected IgniteFsConfiguration getSecondaryGgfsConfiguration() throws IgniteCheckedException { - IgniteFsConfiguration ggfsCfg = super.getSecondaryGgfsConfiguration(); - - ggfsCfg.setName("ggfs-secondary"); - ggfsCfg.setDefaultMode(PRIMARY); - ggfsCfg.setIpcEndpointConfiguration(new HashMap<String, String>() {{ - put("type", "tcp"); - put("port", "11500"); - }}); - - return ggfsCfg; - } - } - - /** - * Loopback IPC in DUAL_SYNC mode. - */ - public static class LoopbackDualSync extends LoopbackPrimarySecondaryTest { - /** {@inheritDoc} */ - @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException { - IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration(); - - ggfsCfg.setDefaultMode(DUAL_SYNC); - - return ggfsCfg; - } - } - - /** - * Loopback socket IPC in DUAL_ASYNC mode. - */ - public static class LoopbackDualAsync extends LoopbackPrimarySecondaryTest { - /** {@inheritDoc} */ - @Override protected IgniteFsConfiguration getGgfsConfiguration() throws IgniteCheckedException { - IgniteFsConfiguration ggfsCfg = super.getGgfsConfiguration(); - - ggfsCfg.setDefaultMode(DUAL_ASYNC); - - return ggfsCfg; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/afa29526/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridFileSystemLoad.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridFileSystemLoad.java b/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridFileSystemLoad.java new file mode 100644 index 0000000..6798710 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/ignitefs/GridFileSystemLoad.java @@ -0,0 +1,549 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.ignitefs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.permission.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; + +/** + * Hadoop file system load application. + * <p> + * Command line arguments: + * <ul> + * <li>-u {url} file system URL</li> + * <li>-hadoopCfg {cfg} Hadoop configuration</li> + * <li>-f {num} files number</li> + * <li>-r {num} reads number</li> + * <li>-w {num} writes number</li> + * <li>-d {num} deletes number</li> + * <li>-delay {delay} delay between operations in milliseconds</li> + * <li>-t {num} threads number</li> + * <li>-minSize {min size} min file size in bytes</li> + * <li>-maxSize {max size} max file size in bytes</li> + * <li>-startNode {true|false} if 'true' then starts node before execution</li> + * <li>-nodeCfg {cfg} configuration for started node</li> + * <li>-primaryOnly {true|false} if 'true' then creates files only in directory named 'primary' </li> + * </ul> + * Note: GGFS logging is disabled by default, to enable logging it is necessary to set flag + * 'fs.ggfs.<name>.log.enabled' in Hadoop configuration file. By default log files are created in the + * directory 'work/ggfs/log', this path can be changed in Hadoop configuration file using property + * 'fs.ggfs.<name>.log.dir'. + */ +public class GridFileSystemLoad { + /** */ + private static final String DFLT_URL = "ggfs:///"; + + /** */ + private static final int DFLT_MIN_FILE_SIZE = 100 * 1024; + + /** */ + private static final int DFLT_MAX_FILE_SIZE = 1024 * 1024; + + /** */ + private static final int DFLT_FILES_NUMBER = 1000; + + /** */ + private static final int DFLT_READS_NUMBER = 2000; + + /** */ + private static final int DFLT_WRITES_NUMBER = 2000; + + /** */ + private static final int DFLT_DELETES_NUMBER = 100; + + /** */ + private static final int DFLT_THREADS_NUMBER = 2; + + /** */ + private static final boolean DFLT_START_NODE = true; + + /** */ + private static final boolean DFLT_PRIMARY_ONLY = false; + + /** */ + private static final String DFLT_NODE_CFG = "config/hadoop/default-config.xml"; + + /** */ + private static final long DFLT_DELAY = 5; + + /** */ + private static final String DFLT_HADOOP_CFG = "examples/config/filesystem/core-site.xml"; + + /** */ + private static final int CREATE_BUF_SIZE = 100 * 1024; + + /** */ + private static final String DIR_PRIMARY_MODE = "primary"; + + /** */ + private static final String DIR_PROXY_MODE = "proxy"; + + /** */ + private static final String DIR_DUAL_SYNC_MODE = "dual_sync"; + + /** */ + private static final String DIR_DUAL_ASYNC_MODE = "dual_async"; + + /** + * Main method. + * + * @param args Command line arguments. + * @throws Exception If error occurs. + */ + public static void main(String[] args) throws Exception { + String url = DFLT_URL; + + int filesNum = DFLT_FILES_NUMBER; + + int minFileSize = DFLT_MIN_FILE_SIZE; + + int maxFileSize = DFLT_MAX_FILE_SIZE; + + int readsNum = DFLT_READS_NUMBER; + + int writesNum = DFLT_WRITES_NUMBER; + + int deletesNum = DFLT_DELETES_NUMBER; + + int threadsNum = DFLT_THREADS_NUMBER; + + long delay = DFLT_DELAY; + + String nodeCfg = DFLT_NODE_CFG; + + String hadoopCfg = DFLT_HADOOP_CFG; + + boolean startNode = DFLT_START_NODE; + + boolean primaryOnly = DFLT_PRIMARY_ONLY; + + for (int i = 0; i < args.length; i++) { + String arg = args[i]; + + switch (arg) { + case "-u": + url = args[++i]; + + break; + + case "-hadoopCfg": + hadoopCfg= args[++i]; + + break; + + case "-f": + filesNum = Integer.parseInt(args[++i]); + + break; + + case "-r": + readsNum = Integer.parseInt(args[++i]); + + break; + + case "-w": + writesNum = Integer.parseInt(args[++i]); + + break; + + case "-minSize": + minFileSize = Integer.parseInt(args[++i]); + + break; + + case "-maxSize": + maxFileSize = Integer.parseInt(args[++i]); + + break; + + case "-d": + deletesNum = Integer.parseInt(args[++i]); + + break; + + case "-t": + threadsNum = Integer.parseInt(args[++i]); + + break; + + case "-delay": + delay = Long.parseLong(args[++i]); + + break; + + case "-startNode": + startNode = Boolean.parseBoolean(args[++i]); + + break; + + case "-nodeCfg": + nodeCfg= args[++i]; + + break; + + case "-primaryOnly": + primaryOnly = Boolean.parseBoolean(args[++i]); + + break; + } + } + + X.println("File system URL: " + url); + X.println("Hadoop configuration: " + hadoopCfg); + X.println("Primary mode only: " + primaryOnly); + X.println("Files number: " + filesNum); + X.println("Reads number: " + readsNum); + X.println("Writes number: " + writesNum); + X.println("Deletes number: " + deletesNum); + X.println("Min file size: " + minFileSize); + X.println("Max file size: " + maxFileSize); + X.println("Threads number: " + threadsNum); + X.println("Delay: " + delay); + + if (minFileSize > maxFileSize) + throw new IllegalArgumentException(); + + Ignite ignite = null; + + if (startNode) { + X.println("Starting node using configuration: " + nodeCfg); + + ignite = G.start(U.resolveGridGainUrl(nodeCfg)); + } + + try { + new GridFileSystemLoad().runLoad(url, hadoopCfg, primaryOnly, threadsNum, filesNum, readsNum, writesNum, + deletesNum, minFileSize, maxFileSize, delay); + } + finally { + if (ignite != null) + G.stop(true); + } + } + + /** + * Executes read/write/delete operations. + * + * @param url File system url. + * @param hadoopCfg Hadoop configuration. + * @param primaryOnly If {@code true} then creates files only on directory named 'primary'. + * @param threads Threads number. + * @param files Files number. + * @param reads Reads number. + * @param writes Writes number. + * @param deletes Deletes number. + * @param minSize Min file size. + * @param maxSize Max file size. + * @param delay Delay between operations. + * @throws Exception If some file system operation failed. + */ + @SuppressWarnings("IfMayBeConditional") + public void runLoad(String url, String hadoopCfg, final boolean primaryOnly, int threads, int files, + final int reads, final int writes, final int deletes, final int minSize, final int maxSize, final long delay) + throws Exception { + Path fsPath = new Path(url); + + Configuration cfg = new Configuration(true); + + cfg.addResource(U.resolveGridGainUrl(hadoopCfg)); + + final FileSystem fs = FileSystem.get(fsPath.toUri(), cfg); + + Path workDir = new Path(fsPath, "/fsload"); + + fs.delete(workDir, true); + + fs.mkdirs(workDir, FsPermission.getDefault()); + + final Path[] dirs; + + if (primaryOnly) + dirs = new Path[]{mkdir(fs, workDir, DIR_PRIMARY_MODE)}; + else + dirs = new Path[]{mkdir(fs, workDir, DIR_PRIMARY_MODE), mkdir(fs, workDir, DIR_PROXY_MODE), + mkdir(fs, workDir, DIR_DUAL_SYNC_MODE), mkdir(fs, workDir, DIR_DUAL_ASYNC_MODE)}; + + try { + ExecutorService exec = Executors.newFixedThreadPool(threads); + + Collection<Future<?>> futs = new ArrayList<>(threads); + + for (int i = 0; i < threads; i++) { + final int filesPerThread; + + if (i == 0 && files % threads != 0) + filesPerThread = files / threads + files % threads; + else + filesPerThread = files / threads; + + futs.add(exec.submit(new Callable<Void>() { + @Override public Void call() throws Exception { + runLoad(fs, dirs, filesPerThread, reads, writes, deletes, minSize, maxSize, delay); + + return null; + } + })); + } + + exec.shutdown(); + + for (Future<?> fut : futs) { + try { + fut.get(); + } + catch (ExecutionException e) { + X.error("Error during execution: " + e); + + e.getCause().printStackTrace(); + } + } + } + finally { + try { + fs.delete(workDir, true); + } + catch (IOException ignored) { + // Ignore. + } + } + } + + /** + * Executes read/write/delete operations. + * + * @param fs File system. + * @param dirs Directories where files should be created. + * @param filesNum Files number. + * @param reads Reads number. + * @param writes Writes number. + * @param deletes Deletes number. + * @param minSize Min file size. + * @param maxSize Max file size. + * @param delay Delay between operations. + * @throws Exception If some file system operation failed. + */ + private void runLoad(FileSystem fs, Path[] dirs, int filesNum, int reads, int writes, int deletes, + int minSize, int maxSize, long delay) throws Exception { + Random random = random(); + + List<T2<Path, Integer>> files = new ArrayList<>(filesNum); + + for (int i = 0; i < filesNum; i++) { + int size = maxSize == minSize ? minSize : minSize + random.nextInt(maxSize - minSize); + + Path file = new Path(dirs[i % dirs.length], "file-" + UUID.randomUUID()); + + createFile(fs, file, size, CREATE_BUF_SIZE); + + files.add(new T2<>(file, size)); + } + + List<Path> toDel = new ArrayList<>(deletes); + + for (int i = 0; i < deletes; i++) { + int size = maxSize == minSize ? minSize : minSize + random.nextInt(maxSize - minSize); + + Path file = new Path(dirs[i % dirs.length], "file-to-delete-" + UUID.randomUUID()); + + createFile(fs, file, size, CREATE_BUF_SIZE); + + toDel.add(file); + } + + while (reads > 0 || writes > 0 || deletes > 0) { + if (reads > 0) { + reads--; + + T2<Path, Integer> file = files.get(reads % files.size()); + + readFull(fs, file.get1(), CREATE_BUF_SIZE); + + int fileSize = file.get2(); + + readRandom(fs, file.get1(), fileSize, random.nextInt(fileSize) + 1); + } + + if (writes > 0) { + writes--; + + T2<Path, Integer> file = files.get(writes % files.size()); + + overwriteFile(fs, file.get1(), file.get2(), CREATE_BUF_SIZE); + + appendToFile(fs, file.get1(), random.nextInt(CREATE_BUF_SIZE) + 1); + } + + if (deletes > 0) { + deletes--; + + deleteFile(fs, toDel.get(deletes)); + } + + U.sleep(delay); + } + } + + /** + * Creates file. + * + * @param fs File system. + * @param file File path. + * @param fileSize File size. + * @param bufSize Write buffer size. + * @throws IOException If operation failed. + */ + private static void createFile(FileSystem fs, Path file, int fileSize, int bufSize) throws IOException { + create(fs, file, fileSize, bufSize, false); + } + + /** + * Overwrites file. + * + * @param fs File system. + * @param file File path. + * @param fileSize File size. + * @param bufSize Write buffer size. + * @throws IOException If operation failed. + */ + private static void overwriteFile(FileSystem fs, Path file, int fileSize, int bufSize) throws IOException { + create(fs, file, fileSize, bufSize, true); + } + + /** + * Appends to file. + * + * @param fs File system. + * @param file File path. + * @param appendSize Append size. + * @throws IOException If operation failed. + */ + private static void appendToFile(FileSystem fs, Path file, int appendSize) throws IOException { + try (FSDataOutputStream out = fs.append(file)) { + out.write(new byte[appendSize]); + } + } + + /** + * Reads whole file. + * + * @param fs File system. + * @param file File path. + * @param bufSize Read buffer size. + * @throws IOException If operation failed. + */ + @SuppressWarnings("StatementWithEmptyBody") + private static void readFull(FileSystem fs, Path file, int bufSize) throws IOException { + try (FSDataInputStream in = fs.open(file)) { + byte[] readBuf = new byte[bufSize]; + + while (in.read(readBuf) > 0) { + // No-op. + } + } + } + + /** + * Deletes file. + * + * @param fs File system. + * @param path File path. + * @throws IOException If operation failed. + */ + private static void deleteFile(FileSystem fs, Path path) throws IOException { + fs.delete(path, false); + } + + /** + * Reads from random position. + * + * @param fs File system. + * @param path File path. + * @param fileSize File size. + * @param readSize Read size. + * @throws IOException If operation failed. + */ + private static void readRandom(FileSystem fs, Path path, int fileSize, int readSize) throws IOException { + byte[] readBuf = new byte[readSize]; + + try (FSDataInputStream in = fs.open(path)) { + in.seek(random().nextInt(fileSize)); + + in.read(readBuf); + } + } + + /** + * Creates file. + * + * @param fs File system. + * @param file File path. + * @param fileSize File size. + * @param bufSize Buffer size. + * @param overwrite Overwrite flag. + * @throws IOException If operation failed. + */ + private static void create(FileSystem fs, Path file, int fileSize, int bufSize, boolean overwrite) + throws IOException { + try (FSDataOutputStream out = fs.create(file, overwrite)) { + int size = 0; + + byte[] buf = new byte[bufSize]; + + while (size < fileSize) { + int len = Math.min(fileSize - size, bufSize); + + out.write(buf, 0, len); + + size += len; + } + } + } + + /** + * Creates directory in the given parent directory. + * + * @param fs File system. + * @param parentDir Parent directory. + * @param dirName Directory name. + * @return Path for created directory. + * @throws IOException If operation failed. + */ + private static Path mkdir(FileSystem fs, Path parentDir, String dirName) throws IOException { + Path path = new Path(parentDir, dirName); + + fs.mkdirs(path, FsPermission.getDefault()); + + return path; + } + + /** + * @return Thread local random. + */ + private static Random random() { + return ThreadLocalRandom.current(); + } +}