# IGNITE-226: WIP.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a79055cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a79055cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a79055cd Branch: refs/heads/ignite-143 Commit: a79055cdaeb02dfefaa9dc9aabc3cff7ac8cd489 Parents: 8d6347d Author: vozerov-gridgain <voze...@gridgain.com> Authored: Fri Feb 13 16:53:25 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Fri Feb 13 16:53:25 2015 +0300 ---------------------------------------------------------------------- docs/core-site.ignite.xml | 4 +- examples/config/filesystem/core-site.xml | 4 +- .../visor/node/VisorGridConfiguration.java | 4 +- .../visor/node/VisorIgfsConfiguration.java | 70 ++--- .../visor/node/VisorNodeDataCollectorJob.java | 6 +- .../node/VisorNodeDataCollectorJobResult.java | 16 +- .../node/VisorNodeDataCollectorTaskResult.java | 18 +- .../ignite/loadtests/ggfs/IgfsNodeStartup.java | 49 ---- .../ggfs/IgfsPerformanceBenchmark.java | 274 ------------------- .../ignite/loadtests/igfs/IgfsNodeStartup.java | 49 ++++ .../igfs/IgfsPerformanceBenchmark.java | 274 +++++++++++++++++++ 11 files changed, 384 insertions(+), 384 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a79055cd/docs/core-site.ignite.xml ---------------------------------------------------------------------- diff --git a/docs/core-site.ignite.xml b/docs/core-site.ignite.xml index 7f218d2..9aab773 100644 --- a/docs/core-site.ignite.xml +++ b/docs/core-site.ignite.xml @@ -48,7 +48,7 @@ --> <property> <name>fs.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v1.GridGgfsHadoopFileSystem</value> + <value>org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem</value> </property> <!-- @@ -56,7 +56,7 @@ --> <property> <name>fs.AbstractFileSystem.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v2.GridGgfsHadoopFileSystem</value> + <value>org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem</value> </property> <!-- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a79055cd/examples/config/filesystem/core-site.xml ---------------------------------------------------------------------- diff --git a/examples/config/filesystem/core-site.xml b/examples/config/filesystem/core-site.xml index 98e34a2..7c6cfaa 100644 --- a/examples/config/filesystem/core-site.xml +++ b/examples/config/filesystem/core-site.xml @@ -31,12 +31,12 @@ <property> <!-- FS driver class for the 'igfs://' URIs. --> <name>fs.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v1.GridGgfsHadoopFileSystem</value> + <value>org.apache.ignite.igfs.hadoop.v1.IgfsHadoopFileSystem</value> </property> <property> <!-- FS driver class for the 'igfs://' URIs in Hadoop2.x --> <name>fs.AbstractFileSystem.igfs.impl</name> - <value>org.apache.ignite.igfs.hadoop.v2.GridGgfsHadoopFileSystem</value> + <value>org.apache.ignite.igfs.hadoop.v2.IgfsHadoopFileSystem</value> </property> </configuration> http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a79055cd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorGridConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorGridConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorGridConfiguration.java index 6c22a65..2f05a37 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorGridConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorGridConfiguration.java @@ -108,7 +108,7 @@ public class VisorGridConfiguration implements Serializable { rest(VisorRestConfiguration.from(c)); userAttributes(c.getUserAttributes()); caches(VisorCacheConfiguration.list(ignite, c.getCacheConfiguration())); - ggfss(VisorIgfsConfiguration.list(c.getIgfsConfiguration())); + igfss(VisorIgfsConfiguration.list(c.getIgfsConfiguration())); streamers(VisorStreamerConfiguration.list(c.getStreamerConfiguration())); env(new HashMap<>(getenv())); systemProperties(getProperties()); @@ -308,7 +308,7 @@ public class VisorGridConfiguration implements Serializable { /** * @param igfss New igfss. */ - public void ggfss(Iterable<VisorIgfsConfiguration> igfss) { + public void igfss(Iterable<VisorIgfsConfiguration> igfss) { this.igfss = igfss; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a79055cd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java index a34834a..0cf84bd 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorIgfsConfiguration.java @@ -122,22 +122,22 @@ public class VisorIgfsConfiguration implements Serializable { private long trashPurgeTimeout; /** - * @param ggfs IGFS configuration. + * @param igfs IGFS configuration. * @return Data transfer object for IGFS configuration properties. */ - public static VisorIgfsConfiguration from(IgfsConfiguration ggfs) { + public static VisorIgfsConfiguration from(IgfsConfiguration igfs) { VisorIgfsConfiguration cfg = new VisorIgfsConfiguration(); - cfg.name(ggfs.getName()); - cfg.metaCacheName(ggfs.getMetaCacheName()); - cfg.dataCacheName(ggfs.getDataCacheName()); - cfg.blockSize(ggfs.getBlockSize()); - cfg.prefetchBlocks(ggfs.getPrefetchBlocks()); - cfg.streamBufferSize(ggfs.getStreamBufferSize()); - cfg.perNodeBatchSize(ggfs.getPerNodeBatchSize()); - cfg.perNodeParallelBatchCount(ggfs.getPerNodeParallelBatchCount()); + cfg.name(igfs.getName()); + cfg.metaCacheName(igfs.getMetaCacheName()); + cfg.dataCacheName(igfs.getDataCacheName()); + cfg.blockSize(igfs.getBlockSize()); + cfg.prefetchBlocks(igfs.getPrefetchBlocks()); + cfg.streamBufferSize(igfs.getStreamBufferSize()); + cfg.perNodeBatchSize(igfs.getPerNodeBatchSize()); + cfg.perNodeParallelBatchCount(igfs.getPerNodeParallelBatchCount()); - Igfs secFs = ggfs.getSecondaryFileSystem(); + Igfs secFs = igfs.getSecondaryFileSystem(); if (secFs != null) { Map<String, String> props = secFs.properties(); @@ -146,26 +146,26 @@ public class VisorIgfsConfiguration implements Serializable { cfg.secondaryHadoopFileSystemConfigPath(props.get(SECONDARY_FS_CONFIG_PATH)); } - cfg.defaultMode(ggfs.getDefaultMode()); - cfg.pathModes(ggfs.getPathModes()); - cfg.dualModePutExecutorService(compactClass(ggfs.getDualModePutExecutorService())); - cfg.dualModePutExecutorServiceShutdown(ggfs.getDualModePutExecutorServiceShutdown()); - cfg.dualModeMaxPendingPutsSize(ggfs.getDualModeMaxPendingPutsSize()); - cfg.maxTaskRangeLength(ggfs.getMaximumTaskRangeLength()); - cfg.fragmentizerConcurrentFiles(ggfs.getFragmentizerConcurrentFiles()); - cfg.fragmentizerLocalWritesRatio(ggfs.getFragmentizerLocalWritesRatio()); - cfg.fragmentizerEnabled(ggfs.isFragmentizerEnabled()); - cfg.fragmentizerThrottlingBlockLength(ggfs.getFragmentizerThrottlingBlockLength()); - cfg.fragmentizerThrottlingDelay(ggfs.getFragmentizerThrottlingDelay()); - - Map<String, String> endpointCfg = ggfs.getIpcEndpointConfiguration(); + cfg.defaultMode(igfs.getDefaultMode()); + cfg.pathModes(igfs.getPathModes()); + cfg.dualModePutExecutorService(compactClass(igfs.getDualModePutExecutorService())); + cfg.dualModePutExecutorServiceShutdown(igfs.getDualModePutExecutorServiceShutdown()); + cfg.dualModeMaxPendingPutsSize(igfs.getDualModeMaxPendingPutsSize()); + cfg.maxTaskRangeLength(igfs.getMaximumTaskRangeLength()); + cfg.fragmentizerConcurrentFiles(igfs.getFragmentizerConcurrentFiles()); + cfg.fragmentizerLocalWritesRatio(igfs.getFragmentizerLocalWritesRatio()); + cfg.fragmentizerEnabled(igfs.isFragmentizerEnabled()); + cfg.fragmentizerThrottlingBlockLength(igfs.getFragmentizerThrottlingBlockLength()); + cfg.fragmentizerThrottlingDelay(igfs.getFragmentizerThrottlingDelay()); + + Map<String, String> endpointCfg = igfs.getIpcEndpointConfiguration(); cfg.ipcEndpointConfiguration(endpointCfg != null ? endpointCfg.toString() : null); - cfg.ipcEndpointEnabled(ggfs.isIpcEndpointEnabled()); - cfg.maxSpace(ggfs.getMaxSpaceSize()); - cfg.managementPort(ggfs.getManagementPort()); - cfg.sequenceReadsBeforePrefetch(ggfs.getSequentialReadsBeforePrefetch()); - cfg.trashPurgeTimeout(ggfs.getTrashPurgeTimeout()); + cfg.ipcEndpointEnabled(igfs.isIpcEndpointEnabled()); + cfg.maxSpace(igfs.getMaxSpaceSize()); + cfg.managementPort(igfs.getManagementPort()); + cfg.sequenceReadsBeforePrefetch(igfs.getSequentialReadsBeforePrefetch()); + cfg.trashPurgeTimeout(igfs.getTrashPurgeTimeout()); return cfg; } @@ -173,17 +173,17 @@ public class VisorIgfsConfiguration implements Serializable { /** * Construct data transfer object for igfs configurations properties. * - * @param ggfss igfs configurations. + * @param igfss Igfs configurations. * @return igfs configurations properties. */ - public static Iterable<VisorIgfsConfiguration> list(IgfsConfiguration[] ggfss) { - if (ggfss == null) + public static Iterable<VisorIgfsConfiguration> list(IgfsConfiguration[] igfss) { + if (igfss == null) return Collections.emptyList(); - final Collection<VisorIgfsConfiguration> cfgs = new ArrayList<>(ggfss.length); + final Collection<VisorIgfsConfiguration> cfgs = new ArrayList<>(igfss.length); - for (IgfsConfiguration ggfs : ggfss) - cfgs.add(from(ggfs)); + for (IgfsConfiguration igfs : igfss) + cfgs.add(from(igfs)); return cfgs; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a79055cd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java index f1102f7..2bedf10 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJob.java @@ -123,13 +123,13 @@ public class VisorNodeDataCollectorJob extends VisorJob<VisorNodeDataCollectorTa /** Collect IGFS. */ private void igfs(VisorNodeDataCollectorJobResult res) { try { - IgfsProcessorAdapter ggfsProc = ((IgniteKernal)ignite).context().igfs(); + IgfsProcessorAdapter igfsProc = ((IgniteKernal)ignite).context().igfs(); - for (IgniteFs igfs : ggfsProc.igfss()) { + for (IgniteFs igfs : igfsProc.igfss()) { long start0 = U.currentTimeMillis(); try { - Collection<IpcServerEndpoint> endPoints = ggfsProc.endpoints(igfs.name()); + Collection<IpcServerEndpoint> endPoints = igfsProc.endpoints(igfs.name()); if (endPoints != null) { for (IpcServerEndpoint ep : endPoints) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a79055cd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java index 3147726..7d4d665 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorJobResult.java @@ -54,13 +54,13 @@ public class VisorNodeDataCollectorJobResult implements Serializable { private Throwable cachesEx; /** Node IGFSs. */ - private final Collection<VisorIgfs> ggfss = new ArrayList<>(); + private final Collection<VisorIgfs> igfss = new ArrayList<>(); /** All IGFS endpoints collected from nodes. */ - private final Collection<VisorIgfsEndpoint> ggfsEndpoints = new ArrayList<>(); + private final Collection<VisorIgfsEndpoint> igfsEndpoints = new ArrayList<>(); /** Exception while collecting node IGFSs. */ - private Throwable ggfssEx; + private Throwable igfssEx; /** Node streamers. */ private final Collection<VisorStreamer> streamers = new ArrayList<>(); @@ -129,19 +129,19 @@ public class VisorNodeDataCollectorJobResult implements Serializable { } public Collection<VisorIgfs> igfss() { - return ggfss; + return igfss; } public Collection<VisorIgfsEndpoint> igfsEndpoints() { - return ggfsEndpoints; + return igfsEndpoints; } public Throwable igfssEx() { - return ggfssEx; + return igfssEx; } - public void igfssEx(Throwable ggfssEx) { - this.ggfssEx = ggfssEx; + public void igfssEx(Throwable igfssEx) { + this.igfssEx = igfssEx; } public Collection<VisorStreamer> streamers() { http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a79055cd/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java index 755e358..0b3a95c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/visor/node/VisorNodeDataCollectorTaskResult.java @@ -57,13 +57,13 @@ public class VisorNodeDataCollectorTaskResult implements Serializable { private final Map<UUID, Throwable> cachesEx = new HashMap<>(); /** All IGFS collected from nodes. */ - private final Map<UUID, Collection<VisorIgfs>> ggfss = new HashMap<>(); + private final Map<UUID, Collection<VisorIgfs>> igfss = new HashMap<>(); /** All IGFS endpoints collected from nodes. */ - private final Map<UUID, Collection<VisorIgfsEndpoint>> ggfsEndpoints = new HashMap<>(); + private final Map<UUID, Collection<VisorIgfsEndpoint>> igfsEndpoints = new HashMap<>(); /** Exceptions caught during collecting IGFS from nodes. */ - private final Map<UUID, Throwable> ggfssEx = new HashMap<>(); + private final Map<UUID, Throwable> igfssEx = new HashMap<>(); /** All streamers collected from nodes. */ private final Map<UUID, Collection<VisorStreamer>> streamers = new HashMap<>(); @@ -84,9 +84,9 @@ public class VisorNodeDataCollectorTaskResult implements Serializable { eventsEx.isEmpty() && caches.isEmpty() && cachesEx.isEmpty() && - ggfss.isEmpty() && - ggfsEndpoints.isEmpty() && - ggfssEx.isEmpty() && + igfss.isEmpty() && + igfsEndpoints.isEmpty() && + igfssEx.isEmpty() && streamers.isEmpty() && streamersEx.isEmpty(); } @@ -151,21 +151,21 @@ public class VisorNodeDataCollectorTaskResult implements Serializable { * @return All IGFS collected from nodes. */ public Map<UUID, Collection<VisorIgfs>> igfss() { - return ggfss; + return igfss; } /** * @return All IGFS endpoints collected from nodes. */ public Map<UUID, Collection<VisorIgfsEndpoint>> igfsEndpoints() { - return ggfsEndpoints; + return igfsEndpoints; } /** * @return Exceptions caught during collecting IGFS from nodes. */ public Map<UUID, Throwable> igfssEx() { - return ggfssEx; + return igfssEx; } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a79055cd/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsNodeStartup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsNodeStartup.java b/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsNodeStartup.java deleted file mode 100644 index 2858603..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsNodeStartup.java +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.loadtests.ggfs; - -import org.apache.ignite.*; -import org.apache.ignite.internal.util.typedef.*; - -import javax.swing.*; - -/** - * Node startup for IGFS performance benchmark. - */ -public class IgfsNodeStartup { - /** - * Start up an empty node with specified cache configuration. - * - * @param args Command line arguments, none required. - * @throws IgniteCheckedException If example execution failed. - */ - public static void main(String[] args) throws IgniteCheckedException { - try (Ignite ignored = G.start("config/hadoop/default-config.xml")) { - // Wait until Ok is pressed. - JOptionPane.showMessageDialog( - null, - new JComponent[] { - new JLabel("Ignite started."), - new JLabel("Press OK to stop Ignite.") - }, - "Ignite", - JOptionPane.INFORMATION_MESSAGE - ); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a79055cd/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsPerformanceBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsPerformanceBenchmark.java b/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsPerformanceBenchmark.java deleted file mode 100644 index 108d38a..0000000 --- a/modules/hadoop/src/test/java/org/apache/ignite/loadtests/ggfs/IgfsPerformanceBenchmark.java +++ /dev/null @@ -1,274 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.loadtests.ggfs; - -import org.apache.hadoop.conf.*; -import org.apache.hadoop.fs.*; -import org.apache.hadoop.fs.FileSystem; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.testframework.*; -import org.jetbrains.annotations.*; - -import java.io.*; -import java.util.*; -import java.util.concurrent.atomic.*; - -/** - * Tests write throughput. - */ -public class IgfsPerformanceBenchmark { - /** Path to test hadoop configuration. */ - private static final String HADOOP_FS_CFG = "modules/core/src/test/config/hadoop/core-site.xml"; - - /** FS prefix. */ - private static final String FS_PREFIX = "igfs:///"; - - /** Test writes. */ - private static final int OP_WRITE = 0; - - /** Test reads. */ - private static final int OP_READ = 1; - - /** - * Starts benchmark. - * - * @param args Program arguments. - * [0] - number of threads, default 1. - * [1] - file length, default is 1GB. - * [2] - stream buffer size, default is 1M. - * [3] - fs config path. - * @throws Exception If failed. - */ - public static void main(String[] args) throws Exception { - final int threadNum = intArgument(args, 0, 1); - final int op = intArgument(args, 1, OP_WRITE); - final long fileLen = longArgument(args, 2, 256 * 1024 * 1024); - final int bufSize = intArgument(args, 3, 128 * 1024); - final String cfgPath = argument(args, 4, HADOOP_FS_CFG); - final String fsPrefix = argument(args, 5, FS_PREFIX); - final short replication = (short)intArgument(args, 6, 3); - - final Path ggfsHome = new Path(fsPrefix); - - final FileSystem fs = ggfs(ggfsHome, cfgPath); - - final AtomicLong progress = new AtomicLong(); - - final AtomicInteger idx = new AtomicInteger(); - - System.out.println("Warming up..."); - -// warmUp(fs, ggfsHome, op, fileLen); - - System.out.println("Finished warm up."); - - if (op == OP_READ) { - for (int i = 0; i < threadNum; i++) - benchmarkWrite(fs, new Path(ggfsHome, "in-" + i), fileLen, bufSize, replication, null); - } - - long total = 0; - - long start = System.currentTimeMillis(); - - IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { - @Override public void run() { - String fileIdx = op == OP_READ ? String.valueOf(idx.getAndIncrement()) : UUID.randomUUID().toString(); - - try { - for (int i = 0; i < 200; i++) { - if (op == OP_WRITE) - benchmarkWrite(fs, new Path(ggfsHome, "out-" + fileIdx), fileLen, bufSize, replication, - progress); - else - benchmarkRead(fs, new Path(ggfsHome, "in-" + fileIdx), bufSize, progress); - } - - System.out.println("Finished " + (op == OP_WRITE ? "writing" : "reading") + " data."); - } - catch (Exception e) { - System.out.println("Failed to process stream: " + e); - - e.printStackTrace(); - } - } - }, threadNum, "test-runner"); - - while (!fut.isDone()) { - U.sleep(1000); - - long written = progress.getAndSet(0); - - total += written; - - int mbytesPerSec = (int)(written / (1024 * 1024)); - - System.out.println((op == OP_WRITE ? "Write" : "Read") + " rate [threads=" + threadNum + - ", bufSize=" + bufSize + ", MBytes/s=" + mbytesPerSec + ']'); - } - - long now = System.currentTimeMillis(); - - System.out.println((op == OP_WRITE ? "Written" : "Read") + " " + total + " bytes in " + (now - start) + - "ms, avg write rate is " + (total * 1000 / ((now - start) * 1024 * 1024)) + "MBytes/s"); - - fs.close(); - } - - /** - * Warms up server side. - * - * @param fs File system. - * @param ggfsHome IGFS home. - * @throws Exception If failed. - */ - private static void warmUp(FileSystem fs, Path ggfsHome, int op, long fileLen) throws Exception { - Path file = new Path(ggfsHome, "out-0"); - - benchmarkWrite(fs, file, fileLen, 1024 * 1024, (short)1, null); - - for (int i = 0; i < 5; i++) { - if (op == OP_WRITE) - benchmarkWrite(fs, file, fileLen, 1024 * 1024, (short)1, null); - else - benchmarkRead(fs, file, 1024 * 1024, null); - } - - fs.delete(file, true); - } - - /** - * @param args Arguments. - * @param idx Index. - * @param dflt Default value. - * @return Argument value. - */ - private static int intArgument(String[] args, int idx, int dflt) { - if (args.length <= idx) - return dflt; - - try { - return Integer.parseInt(args[idx]); - } - catch (NumberFormatException ignored) { - return dflt; - } - } - - /** - * @param args Arguments. - * @param idx Index. - * @param dflt Default value. - * @return Argument value. - */ - private static long longArgument(String[] args, int idx, long dflt) { - if (args.length <= idx) - return dflt; - - try { - return Long.parseLong(args[idx]); - } - catch (NumberFormatException ignored) { - return dflt; - } - } - - /** - * @param args Arguments. - * @param idx Index. - * @param dflt Default value. - * @return Argument value. - */ - private static String argument(String[] args, int idx, String dflt) { - if (args.length <= idx) - return dflt; - - return args[idx]; - } - - /** {@inheritDoc} */ - private static FileSystem ggfs(Path home, String cfgPath) throws IOException { - Configuration cfg = new Configuration(); - - cfg.addResource(U.resolveIgniteUrl(cfgPath)); - - return FileSystem.get(home.toUri(), cfg); - } - - /** - * Tests stream write to specified file. - * - * @param file File to write to. - * @param len Length to write. - * @param bufSize Buffer size. - * @param replication Replication factor. - * @param progress Progress that will be incremented on each written chunk. - */ - private static void benchmarkWrite(FileSystem fs, Path file, long len, int bufSize, short replication, - @Nullable AtomicLong progress) throws Exception { - - try (FSDataOutputStream out = fs.create(file, true, bufSize, replication, fs.getDefaultBlockSize())) { - long written = 0; - - byte[] data = new byte[bufSize]; - - while (written < len) { - int chunk = (int) Math.min(len - written, bufSize); - - out.write(data, 0, chunk); - - written += chunk; - - if (progress != null) - progress.addAndGet(chunk); - } - - out.flush(); - } - catch (Exception e) { - e.printStackTrace(); - throw e; - } - } - - /** - * Tests stream read from specified file. - * - * @param file File to write from - * @param bufSize Buffer size. - * @param progress Progress that will be incremented on each written chunk. - */ - private static void benchmarkRead(FileSystem fs, Path file, int bufSize, @Nullable AtomicLong progress) - throws Exception { - - try (FSDataInputStream in = fs.open(file, bufSize)) { - byte[] data = new byte[32 * bufSize]; - - while (true) { - int read = in.read(data); - - if (read < 0) - return; - - if (progress != null) - progress.addAndGet(read); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a79055cd/modules/hadoop/src/test/java/org/apache/ignite/loadtests/igfs/IgfsNodeStartup.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/loadtests/igfs/IgfsNodeStartup.java b/modules/hadoop/src/test/java/org/apache/ignite/loadtests/igfs/IgfsNodeStartup.java new file mode 100644 index 0000000..8f67bcf --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/loadtests/igfs/IgfsNodeStartup.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.igfs; + +import org.apache.ignite.*; +import org.apache.ignite.internal.util.typedef.*; + +import javax.swing.*; + +/** + * Node startup for IGFS performance benchmark. + */ +public class IgfsNodeStartup { + /** + * Start up an empty node with specified cache configuration. + * + * @param args Command line arguments, none required. + * @throws IgniteCheckedException If example execution failed. + */ + public static void main(String[] args) throws IgniteCheckedException { + try (Ignite ignored = G.start("config/hadoop/default-config.xml")) { + // Wait until Ok is pressed. + JOptionPane.showMessageDialog( + null, + new JComponent[] { + new JLabel("Ignite started."), + new JLabel("Press OK to stop Ignite.") + }, + "Ignite", + JOptionPane.INFORMATION_MESSAGE + ); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a79055cd/modules/hadoop/src/test/java/org/apache/ignite/loadtests/igfs/IgfsPerformanceBenchmark.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/loadtests/igfs/IgfsPerformanceBenchmark.java b/modules/hadoop/src/test/java/org/apache/ignite/loadtests/igfs/IgfsPerformanceBenchmark.java new file mode 100644 index 0000000..d564484 --- /dev/null +++ b/modules/hadoop/src/test/java/org/apache/ignite/loadtests/igfs/IgfsPerformanceBenchmark.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.loadtests.igfs; + +import org.apache.hadoop.conf.*; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.testframework.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.atomic.*; + +/** + * Tests write throughput. + */ +public class IgfsPerformanceBenchmark { + /** Path to test hadoop configuration. */ + private static final String HADOOP_FS_CFG = "modules/core/src/test/config/hadoop/core-site.xml"; + + /** FS prefix. */ + private static final String FS_PREFIX = "igfs:///"; + + /** Test writes. */ + private static final int OP_WRITE = 0; + + /** Test reads. */ + private static final int OP_READ = 1; + + /** + * Starts benchmark. + * + * @param args Program arguments. + * [0] - number of threads, default 1. + * [1] - file length, default is 1GB. + * [2] - stream buffer size, default is 1M. + * [3] - fs config path. + * @throws Exception If failed. + */ + public static void main(String[] args) throws Exception { + final int threadNum = intArgument(args, 0, 1); + final int op = intArgument(args, 1, OP_WRITE); + final long fileLen = longArgument(args, 2, 256 * 1024 * 1024); + final int bufSize = intArgument(args, 3, 128 * 1024); + final String cfgPath = argument(args, 4, HADOOP_FS_CFG); + final String fsPrefix = argument(args, 5, FS_PREFIX); + final short replication = (short)intArgument(args, 6, 3); + + final Path ggfsHome = new Path(fsPrefix); + + final FileSystem fs = ggfs(ggfsHome, cfgPath); + + final AtomicLong progress = new AtomicLong(); + + final AtomicInteger idx = new AtomicInteger(); + + System.out.println("Warming up..."); + +// warmUp(fs, ggfsHome, op, fileLen); + + System.out.println("Finished warm up."); + + if (op == OP_READ) { + for (int i = 0; i < threadNum; i++) + benchmarkWrite(fs, new Path(ggfsHome, "in-" + i), fileLen, bufSize, replication, null); + } + + long total = 0; + + long start = System.currentTimeMillis(); + + IgniteInternalFuture<Long> fut = GridTestUtils.runMultiThreadedAsync(new Runnable() { + @Override public void run() { + String fileIdx = op == OP_READ ? String.valueOf(idx.getAndIncrement()) : UUID.randomUUID().toString(); + + try { + for (int i = 0; i < 200; i++) { + if (op == OP_WRITE) + benchmarkWrite(fs, new Path(ggfsHome, "out-" + fileIdx), fileLen, bufSize, replication, + progress); + else + benchmarkRead(fs, new Path(ggfsHome, "in-" + fileIdx), bufSize, progress); + } + + System.out.println("Finished " + (op == OP_WRITE ? "writing" : "reading") + " data."); + } + catch (Exception e) { + System.out.println("Failed to process stream: " + e); + + e.printStackTrace(); + } + } + }, threadNum, "test-runner"); + + while (!fut.isDone()) { + U.sleep(1000); + + long written = progress.getAndSet(0); + + total += written; + + int mbytesPerSec = (int)(written / (1024 * 1024)); + + System.out.println((op == OP_WRITE ? "Write" : "Read") + " rate [threads=" + threadNum + + ", bufSize=" + bufSize + ", MBytes/s=" + mbytesPerSec + ']'); + } + + long now = System.currentTimeMillis(); + + System.out.println((op == OP_WRITE ? "Written" : "Read") + " " + total + " bytes in " + (now - start) + + "ms, avg write rate is " + (total * 1000 / ((now - start) * 1024 * 1024)) + "MBytes/s"); + + fs.close(); + } + + /** + * Warms up server side. + * + * @param fs File system. + * @param ggfsHome IGFS home. + * @throws Exception If failed. + */ + private static void warmUp(FileSystem fs, Path ggfsHome, int op, long fileLen) throws Exception { + Path file = new Path(ggfsHome, "out-0"); + + benchmarkWrite(fs, file, fileLen, 1024 * 1024, (short)1, null); + + for (int i = 0; i < 5; i++) { + if (op == OP_WRITE) + benchmarkWrite(fs, file, fileLen, 1024 * 1024, (short)1, null); + else + benchmarkRead(fs, file, 1024 * 1024, null); + } + + fs.delete(file, true); + } + + /** + * @param args Arguments. + * @param idx Index. + * @param dflt Default value. + * @return Argument value. + */ + private static int intArgument(String[] args, int idx, int dflt) { + if (args.length <= idx) + return dflt; + + try { + return Integer.parseInt(args[idx]); + } + catch (NumberFormatException ignored) { + return dflt; + } + } + + /** + * @param args Arguments. + * @param idx Index. + * @param dflt Default value. + * @return Argument value. + */ + private static long longArgument(String[] args, int idx, long dflt) { + if (args.length <= idx) + return dflt; + + try { + return Long.parseLong(args[idx]); + } + catch (NumberFormatException ignored) { + return dflt; + } + } + + /** + * @param args Arguments. + * @param idx Index. + * @param dflt Default value. + * @return Argument value. + */ + private static String argument(String[] args, int idx, String dflt) { + if (args.length <= idx) + return dflt; + + return args[idx]; + } + + /** {@inheritDoc} */ + private static FileSystem ggfs(Path home, String cfgPath) throws IOException { + Configuration cfg = new Configuration(); + + cfg.addResource(U.resolveIgniteUrl(cfgPath)); + + return FileSystem.get(home.toUri(), cfg); + } + + /** + * Tests stream write to specified file. + * + * @param file File to write to. + * @param len Length to write. + * @param bufSize Buffer size. + * @param replication Replication factor. + * @param progress Progress that will be incremented on each written chunk. + */ + private static void benchmarkWrite(FileSystem fs, Path file, long len, int bufSize, short replication, + @Nullable AtomicLong progress) throws Exception { + + try (FSDataOutputStream out = fs.create(file, true, bufSize, replication, fs.getDefaultBlockSize())) { + long written = 0; + + byte[] data = new byte[bufSize]; + + while (written < len) { + int chunk = (int) Math.min(len - written, bufSize); + + out.write(data, 0, chunk); + + written += chunk; + + if (progress != null) + progress.addAndGet(chunk); + } + + out.flush(); + } + catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + + /** + * Tests stream read from specified file. + * + * @param file File to write from + * @param bufSize Buffer size. + * @param progress Progress that will be incremented on each written chunk. + */ + private static void benchmarkRead(FileSystem fs, Path file, int bufSize, @Nullable AtomicLong progress) + throws Exception { + + try (FSDataInputStream in = fs.open(file, bufSize)) { + byte[] data = new byte[32 * bufSize]; + + while (true) { + int read = in.read(data); + + if (read < 0) + return; + + if (progress != null) + progress.addAndGet(read); + } + } + } +}