#[IGNITE-218]: tests for MapRed part.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bc87c4df Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bc87c4df Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bc87c4df Branch: refs/heads/ignite-218 Commit: bc87c4df4ddbc85197c7b812341437987bfe7010 Parents: 04653a0 Author: iveselovskiy <iveselovs...@gridgain.com> Authored: Mon Jun 1 20:22:25 2015 +0300 Committer: iveselovskiy <iveselovs...@gridgain.com> Committed: Mon Jun 1 20:22:25 2015 +0300 ---------------------------------------------------------------------- config/hadoop/default-config.xml | 2 +- .../hadoop/fs/v1/IgniteHadoopFileSystem.java | 45 ++-- .../hadoop/HadoopAbstractSelfTest.java | 18 +- .../processors/hadoop/HadoopMapReduceTest.java | 231 ++++++++++++------- .../hadoop/HadoopTaskExecutionSelfTest.java | 2 +- 5 files changed, 197 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc87c4df/config/hadoop/default-config.xml ---------------------------------------------------------------------- diff --git a/config/hadoop/default-config.xml b/config/hadoop/default-config.xml index c1e4855..54073fc 100644 --- a/config/hadoop/default-config.xml +++ b/config/hadoop/default-config.xml @@ -90,7 +90,7 @@ Configuration of Ignite node. --> <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> - <!-- Temporary workaround for tests: --> + <!-- TODO: Temporary workaround for tests: --> <property name="localHost" value="127.0.0.1"/> <!-- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc87c4df/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java index 328120b..ec4a26f 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/fs/v1/IgniteHadoopFileSystem.java @@ -353,10 +353,21 @@ public class IgniteHadoopFileSystem extends FileSystem { /** {@inheritDoc} */ @Override public void close() throws IOException { - if (cacheEnabled && get(getUri(), getConf()) == this) - return; + if (closeGuard.compareAndSet(false, true)) { + if (cacheEnabled) { + FileSystem cached = get(getUri(), getConf()); - close0(); + if (cached == this) + return; + else { + X.println("### Cache enabled, but this file system is not found in the cache: " + + " this = " + this + ", user =" + this.user + + " cached = " + cached + ", user =" + ((IgniteHadoopFileSystem)cached).user); + } + } + + close0(); + } } /** @@ -365,27 +376,25 @@ public class IgniteHadoopFileSystem extends FileSystem { * @throws IOException If failed. */ private void close0() throws IOException { - if (closeGuard.compareAndSet(false, true)) { - if (LOG.isDebugEnabled()) - LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']'); + if (LOG.isDebugEnabled()) + LOG.debug("File system closed [uri=" + uri + ", endpoint=" + uriAuthority + ']'); - if (rmtClient == null) - return; + if (rmtClient == null) + return; - super.close(); + super.close(); - rmtClient.close(false); + rmtClient.close(false); - if (clientLog.isLogEnabled()) - clientLog.close(); + if (clientLog.isLogEnabled()) + clientLog.close(); - if (secondaryFs != null) - U.closeQuiet(secondaryFs); + if (secondaryFs != null) + U.closeQuiet(secondaryFs); - // Reset initialized resources. - uri = null; - rmtClient = null; - } + // Reset initialized resources. + uri = null; + rmtClient = null; } /** {@inheritDoc} */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc87c4df/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java index f41eb17..7aa2bde 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopAbstractSelfTest.java @@ -61,6 +61,22 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { /** Initial REST port. */ private int restPort = REST_PORT; + /** Secondary file system REST endpoint configuration map. */ + protected static final IgfsIpcEndpointConfiguration SECONDARY_REST_CFG; + + static { +// PRIMARY_REST_CFG = new IgfsIpcEndpointConfiguration(); +// +// PRIMARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); +// PRIMARY_REST_CFG.setPort(10500); + + SECONDARY_REST_CFG = new IgfsIpcEndpointConfiguration(); + + SECONDARY_REST_CFG.setType(IgfsIpcEndpointType.TCP); + SECONDARY_REST_CFG.setPort(11500); + } + + /** Initial classpath. */ private static String initCp; @@ -132,7 +148,7 @@ public abstract class HadoopAbstractSelfTest extends GridCommonAbstractTest { /** * @return IGFS configuration. */ - public FileSystemConfiguration igfsConfiguration() { + public FileSystemConfiguration igfsConfiguration() throws Exception { FileSystemConfiguration cfg = new FileSystemConfiguration(); cfg.setName(igfsName); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc87c4df/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java index 7d09433..b0a098d 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopMapReduceTest.java @@ -24,32 +24,102 @@ import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.ignite.*; +import org.apache.ignite.cache.*; import org.apache.ignite.configuration.*; import org.apache.ignite.hadoop.fs.*; import org.apache.ignite.igfs.*; +import org.apache.ignite.igfs.secondary.*; import org.apache.ignite.internal.*; import org.apache.ignite.internal.processors.hadoop.counter.*; import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters; import org.apache.ignite.internal.processors.hadoop.examples.*; +import org.apache.ignite.internal.processors.igfs.*; import org.apache.ignite.internal.util.lang.*; import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.discovery.tcp.*; +import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; import org.apache.ignite.testframework.*; +import org.jetbrains.annotations.*; import java.io.*; import java.util.*; +import static org.apache.ignite.cache.CacheAtomicityMode.*; +import static org.apache.ignite.cache.CacheMode.*; +import static org.apache.ignite.igfs.IgfsMode.*; import static org.apache.ignite.internal.processors.hadoop.HadoopUtils.*; /** * Test of whole cycle of map-reduce processing via Job tracker. */ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { + /** IGFS block size. */ + protected static final int IGFS_BLOCK_SIZE = 512 * 1024; + + /** Amount of blocks to prefetch. */ + protected static final int PREFETCH_BLOCKS = 1; + + /** Amount of sequential block reads before prefetch is triggered. */ + protected static final int SEQ_READS_BEFORE_PREFETCH = 2; + + /** Secondary file system URI. */ + protected static final String SECONDARY_URI = "igfs://igfs-secondary:grid-secondary@127.0.0.1:11500/"; + + /** Secondary file system configuration path. */ + protected static final String SECONDARY_CFG = "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml"; + + protected static final String USER = "vasya"; + + protected static final String SECONDARY_IGFS_NAME = "igfs-secondary"; + + /** */ + protected Ignite igniteSecondary; + + /** */ + protected IgfsSecondaryFileSystem secondaryFs; + /** {@inheritDoc} */ @Override protected int gridCount() { return 3; } /** + * + * @param p + * @return + */ + private static String getOwner(IgfsEx i, IgfsPath p) { + return i.info(p).property(IgfsEx.PROP_USER_NAME); + } + + /** + * + * @param secFs + * @param p + * @return + */ + private static String getOwnerSecondary(final IgfsSecondaryFileSystem secFs, final IgfsPath p) { + return IgfsUserContext.doAs(USER, new IgniteOutClosure<String>() { + @Override public String apply() { + return secFs.info(p).property(IgfsEx.PROP_USER_NAME); + } + }); + } + + /** + * + * @param p + */ + private void checkOwner(IgfsPath p) { + String ownerPrim = getOwner(igfs, p); + assertEquals(USER, ownerPrim); + + String ownerSec = getOwnerSecondary(secondaryFs, p); + assertEquals(USER, ownerSec); + } + + /** * Tests whole job execution with all phases in all combination of new and old versions of API. * @throws Exception If fails. */ @@ -71,8 +141,11 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { JobConf jobConf = new JobConf(); + //jobConf.setBoolean("fs.igfs.impl.disable.cache", true); // avoid hangup in shutdown hook + // when fs.close() causes the fs to be created again. // This does not work, why????? + jobConf.set(JOB_COUNTER_WRITER_PROPERTY, IgniteHadoopFileSystemCounterWriter.class.getName()); - jobConf.setUser("yyy"); + jobConf.setUser(USER); jobConf.set(IgniteHadoopFileSystemCounterWriter.COUNTER_WRITER_DIR_PROPERTY, "/xxx/${USER}/zzz"); //To split into about 40 items for v2 @@ -108,6 +181,10 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { final String outFile = PATH_OUTPUT + "/" + (useNewReducer ? "part-r-" : "part-") + "00000"; + checkOwner(new IgfsPath(PATH_OUTPUT + "/" + "_SUCCESS")); + + checkOwner(new IgfsPath(outFile)); + assertEquals("Use new mapper: " + useNewMapper + ", new combiner: " + useNewCombiner + ", new reducer: " + useNewReducer, "blue\t200000\n" + @@ -185,7 +262,7 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { } } - final IgfsPath statPath = new IgfsPath("/xxx/yyy/zzz/" + jobId + "/performance"); + final IgfsPath statPath = new IgfsPath("/xxx/" + USER + "/zzz/" + jobId + "/performance"); assert GridTestUtils.waitForCondition(new GridAbsPredicate() { @Override public boolean apply() { @@ -216,95 +293,89 @@ public class HadoopMapReduceTest extends HadoopAbstractWordCountTest { } } -// /** -// * Startup secondary file system. -// * -// * @throws Exception If failed. -// */ -// private void startUpSecondary() throws Exception { -// FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); -// -// igfsCfg.setDataCacheName("partitioned"); -// igfsCfg.setMetaCacheName("replicated"); -// igfsCfg.setName("igfs-secondary"); -// igfsCfg.setBlockSize(512 * 1024); -// igfsCfg.setDefaultMode(PRIMARY); -// -// IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); -// -// endpointCfg.setType(IgfsIpcEndpointType.TCP); -// endpointCfg.setPort(11500); -// -// igfsCfg.setIpcEndpointConfiguration(endpointCfg); -// -// CacheConfiguration cacheCfg = defaultCacheConfiguration(); -// -// cacheCfg.setName("partitioned"); -// cacheCfg.setCacheMode(PARTITIONED); -// cacheCfg.setNearConfiguration(null); -// cacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); -// cacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(128)); -// cacheCfg.setBackups(0); -// cacheCfg.setAtomicityMode(TRANSACTIONAL); -// -// CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); -// -// metaCacheCfg.setName("replicated1"); -// metaCacheCfg.setCacheMode(REPLICATED); -// metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); -// metaCacheCfg.setAtomicityMode(TRANSACTIONAL); -// -// IgniteConfiguration cfg = new IgniteConfiguration(); -// -// cfg.setGridName("igfs-grid-secondary"); -// -// TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); -// -// discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); -// -// cfg.setDiscoverySpi(discoSpi); -// cfg.setCacheConfiguration(metaCacheCfg, cacheCfg); -// cfg.setFileSystemConfiguration(igfsCfg); -// -// cfg.setLocalHost("127.0.0.1"); -// -// G.start(cfg); -// } - /** {@inheritDoc} */ @Override protected void beforeTest() throws Exception { -// startUpSecondary(); + igniteSecondary = startGridWithIgfs("grid-secondary", SECONDARY_IGFS_NAME, PRIMARY, null, SECONDARY_REST_CFG); super.beforeTest(); } /** - * @return IGFS configuration. + * Start grid with IGFS. + * + * @param gridName Grid name. + * @param igfsName IGFS name + * @param mode IGFS mode. + * @param secondaryFs Secondary file system (optional). + * @param restCfg Rest configuration string (optional). + * @return Started grid instance. + * @throws Exception If failed. */ - @Override public FileSystemConfiguration igfsConfiguration() { + protected Ignite startGridWithIgfs(String gridName, String igfsName, IgfsMode mode, + @Nullable IgfsSecondaryFileSystem secondaryFs, @Nullable IgfsIpcEndpointConfiguration restCfg) throws Exception { + FileSystemConfiguration igfsCfg = new FileSystemConfiguration(); + + igfsCfg.setDataCacheName("dataCache"); + igfsCfg.setMetaCacheName("metaCache"); + igfsCfg.setName(igfsName); + igfsCfg.setBlockSize(IGFS_BLOCK_SIZE); + igfsCfg.setDefaultMode(mode); + igfsCfg.setIpcEndpointConfiguration(restCfg); + igfsCfg.setSecondaryFileSystem(secondaryFs); + igfsCfg.setPrefetchBlocks(PREFETCH_BLOCKS); + igfsCfg.setSequentialReadsBeforePrefetch(SEQ_READS_BEFORE_PREFETCH); + + CacheConfiguration dataCacheCfg = defaultCacheConfiguration(); + + dataCacheCfg.setName("dataCache"); + dataCacheCfg.setCacheMode(PARTITIONED); + dataCacheCfg.setNearConfiguration(null); + dataCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + dataCacheCfg.setAffinityMapper(new IgfsGroupDataBlocksKeyMapper(2)); + dataCacheCfg.setBackups(0); + dataCacheCfg.setAtomicityMode(TRANSACTIONAL); + dataCacheCfg.setOffHeapMaxMemory(0); + + CacheConfiguration metaCacheCfg = defaultCacheConfiguration(); + + metaCacheCfg.setName("metaCache"); + metaCacheCfg.setCacheMode(REPLICATED); + metaCacheCfg.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC); + metaCacheCfg.setAtomicityMode(TRANSACTIONAL); + + IgniteConfiguration cfg = new IgniteConfiguration(); + + cfg.setGridName(gridName); + TcpDiscoverySpi discoSpi = new TcpDiscoverySpi(); + + discoSpi.setIpFinder(new TcpDiscoveryVmIpFinder(true)); + + cfg.setDiscoverySpi(discoSpi); + cfg.setCacheConfiguration(dataCacheCfg, metaCacheCfg); + cfg.setFileSystemConfiguration(igfsCfg); + + cfg.setLocalHost("127.0.0.1"); + cfg.setConnectorConfiguration(null); + + return G.start(cfg); + } + + /** + * @return IGFS configuration. + */ + @Override public FileSystemConfiguration igfsConfiguration() throws Exception { FileSystemConfiguration fsCfg = super.igfsConfiguration(); -// -// fsCfg.setName("igfs-secondary"); -// fsCfg.setDefaultMode(PRIMARY); -// -// IgfsIpcEndpointConfiguration endpointCfg = new IgfsIpcEndpointConfiguration(); -// -// endpointCfg.setType(IgfsIpcEndpointType.TCP); -// endpointCfg.setPort(11500); -// -// fsCfg.setIpcEndpointConfiguration(endpointCfg); -// -// try { -// -// fsCfg.setSecondaryFileSystem(new IgniteHadoopIgfsSecondaryFileSystem( -// "igfs://igfs-secondary:igfs-grid-secondary@127.0.0.1:11500/", -// "modules/core/src/test/config/hadoop/core-site-loopback-secondary.xml")); -// } -// catch (Exception e) { -// throw new IgniteException(e); -// } + + secondaryFs = new IgniteHadoopIgfsSecondaryFileSystem(SECONDARY_URI, SECONDARY_CFG); + + fsCfg.setSecondaryFileSystem(secondaryFs); return fsCfg; } + +// /** {@inheritDoc} */ +// @Override protected long getTestTimeout() { +// return 30 * 60 * 1000; // TODO: for testing +// } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc87c4df/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java index 8dc9830..eee5c8b 100644 --- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java +++ b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/HadoopTaskExecutionSelfTest.java @@ -72,7 +72,7 @@ public class HadoopTaskExecutionSelfTest extends HadoopAbstractSelfTest { /** {@inheritDoc} */ - @Override public FileSystemConfiguration igfsConfiguration() { + @Override public FileSystemConfiguration igfsConfiguration() throws Exception { FileSystemConfiguration cfg = super.igfsConfiguration(); cfg.setFragmentizerEnabled(false);