#YARN Code cleanup. Added tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7e072dc4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7e072dc4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7e072dc4 Branch: refs/heads/ignite-901 Commit: 7e072dc44eacd4ad088901ea7888aa8ccaf7d44a Parents: 960e19d Author: nikolay tikhonov <ntikho...@gridgain.com> Authored: Tue Jun 9 19:02:56 2015 +0300 Committer: nikolay tikhonov <ntikho...@gridgain.com> Committed: Tue Jun 9 19:02:56 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/yarn/ApplicationMaster.java | 31 ++- .../yarn/IgniteApplicationMasterSelfTest.java | 226 ++++++++++++++++++- 2 files changed, 243 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e072dc4/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java index c552ea0..0ef1362 100644 --- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java +++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java @@ -91,7 +91,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { Map<String, String> env = new HashMap<>(System.getenv()); - //env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(c.getNodeId().getHost())); + env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(c.getNodeId().getHost())); ctx.setEnvironment(env); @@ -284,15 +284,18 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { TimeUnit.MILLISECONDS.sleep(schedulerTimeout); } } + catch (InterruptedException e) { + // Un-register with ResourceManager + rmClient.unregisterApplicationMaster(FinalApplicationStatus.KILLED, "", ""); + + log.log(Level.WARNING, "Application master killed."); + } catch (Exception e) { // Un-register with ResourceManager rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, "", ""); - System.exit(1); + log.log(Level.SEVERE, "Application master failed.", e); } - - // Un-register with ResourceManager - rmClient.unregisterApplicationMaster(FinalApplicationStatus.KILLED, "", ""); } /** @@ -364,4 +367,22 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { public void setSchedulerTimeout(long schedulerTimeout) { this.schedulerTimeout = schedulerTimeout; } + + /** + * Sets file system. + * @param fs File system. + */ + public void setFs(FileSystem fs) { + this.fs = fs; + } + + /** + * JUST FOR TESTING!!! + * + * @return Running containers. + */ + @Deprecated + public Map<ContainerId, IgniteContainer> getContainers() { + return containers; + } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7e072dc4/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java index d865659..abac58e 100644 --- a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java +++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java @@ -18,8 +18,9 @@ package org.apache.ignite.yarn; import junit.framework.*; -import org.apache.curator.utils.ThreadUtils; -import org.apache.hadoop.util.ThreadUtil; +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.permission.*; +import org.apache.hadoop.util.*; import org.apache.hadoop.yarn.api.protocolrecords.*; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.*; @@ -27,8 +28,11 @@ import org.apache.hadoop.yarn.client.api.async.*; import org.apache.hadoop.yarn.exceptions.*; import java.io.*; +import java.net.*; import java.nio.*; import java.util.*; +import java.util.concurrent.*; +import java.util.regex.*; /** * Application master tests. @@ -52,7 +56,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase { props = new ClusterProperties(); appMaster = new ApplicationMaster("test", props); - appMaster.setSchedulerTimeout(100000); + appMaster.setSchedulerTimeout(10000); rmMock.clear(); } @@ -82,7 +86,6 @@ public class IgniteApplicationMasterSelfTest extends TestCase { } } - /** * @throws Exception If failed. */ @@ -98,14 +101,125 @@ public class IgniteApplicationMasterSelfTest extends TestCase { Thread thread = runAppMaster(appMaster); - interruptedThread(thread); - List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 1, 1000); + interruptedThread(thread); + assertEquals(0, contRequests.size()); } /** + * @throws Exception If failed. + */ + public void testClusterAllocatedResource() throws Exception { + rmMock.availableRes(new MockResource(1024, 2)); + + appMaster.setRmClient(rmMock); + appMaster.setNmClient(new NMMock()); + + appMaster.setFs(new MockFileSystem()); + + props.cpusPerNode(8); + props.memoryPerNode(5000); + props.instances(3); + + // Check that container resources + appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 5, 2000))); + assertEquals(0, appMaster.getContainers().size()); + + appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 10, 2000))); + assertEquals(0, appMaster.getContainers().size()); + + appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 1, 7000))); + assertEquals(0, appMaster.getContainers().size()); + + appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 8, 5000))); + assertEquals(1, appMaster.getContainers().size()); + + appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 10, 7000))); + assertEquals(2, appMaster.getContainers().size()); + } + + /** + * @throws Exception If failed. + */ + public void testStartReleaseContainer() throws Exception { + rmMock.availableRes(new MockResource(1024, 2)); + + NMMock nmClient = new NMMock(); + + appMaster.setRmClient(rmMock); + appMaster.setNmClient(nmClient); + + appMaster.setFs(new MockFileSystem()); + + props.cpusPerNode(8); + props.memoryPerNode(5000); + props.instances(3); + + // Check that container resources + appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 5, 2000))); + assertEquals(1, rmMock.releasedResources().size()); + + appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 5, 7000))); + assertEquals(2, rmMock.releasedResources().size()); + + appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 9, 2000))); + assertEquals(3, rmMock.releasedResources().size()); + + appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 8, 5000))); + assertEquals(3, rmMock.releasedResources().size()); + assertEquals(1, nmClient.startedContainer().size()); + } + + + /** + * @throws Exception If failed. + */ + public void testHostnameConstraint() throws Exception { + rmMock.availableRes(new MockResource(1024, 2)); + + NMMock nmClient = new NMMock(); + + appMaster.setRmClient(rmMock); + appMaster.setNmClient(nmClient); + + appMaster.setFs(new MockFileSystem()); + + props.cpusPerNode(8); + props.memoryPerNode(5000); + props.instances(3); + props.hostnameConstraint(Pattern.compile("ignoreHost")); + + // Check that container resources + appMaster.onContainersAllocated(Collections.singletonList(createContainer("simple", 8, 5000))); + assertEquals(0, rmMock.releasedResources().size()); + assertEquals(1, nmClient.startedContainer().size()); + + appMaster.onContainersAllocated(Collections.singletonList(createContainer("ignoreHost", 8, 5000))); + assertEquals(1, rmMock.releasedResources().size()); + assertEquals(1, nmClient.startedContainer().size()); + } + + /** + * @param host Host. + * @param cpu Cpu count. + * @param mem Memory. + * @return Container. + */ + private Container createContainer(String host, int cpu, int mem) { + return Container.newInstance( + ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(0l, 0), 0), + ThreadLocalRandom.current().nextLong()), + NodeId.newInstance(host, 0), + "example.com", + new MockResource(mem, cpu), + Priority.newInstance(0), + null + ); + } + + /** * @param rmMock RM mock. * @param expectedCnt Expected cnt. * @param timeOut Timeout. @@ -147,7 +261,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase { } /** - * Interrupt thread and wait. + * Interrupt thread and join. * * @param thread Thread. */ @@ -165,6 +279,9 @@ public class IgniteApplicationMasterSelfTest extends TestCase { private List<AMRMClient.ContainerRequest> contRequests = new ArrayList<>(); /** */ + private List<ContainerId> releasedConts = new ArrayList<>(); + + /** */ private Resource availableRes; /** */ @@ -180,6 +297,13 @@ public class IgniteApplicationMasterSelfTest extends TestCase { } /** + * @return Released resources. + */ + public List<ContainerId> releasedResources() { + return releasedConts; + } + + /** * Sets resource. * * @param availableRes Available resource. @@ -193,6 +317,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase { */ public void clear() { contRequests.clear(); + releasedConts.clear(); availableRes = null; } @@ -226,7 +351,7 @@ public class IgniteApplicationMasterSelfTest extends TestCase { /** {@inheritDoc} */ @Override public void releaseAssignedContainer(ContainerId containerId) { - // No-op. + releasedConts.add(containerId); } /** {@inheritDoc} */ @@ -250,13 +375,26 @@ public class IgniteApplicationMasterSelfTest extends TestCase { */ public static class NMMock extends NMClient { /** */ - protected NMMock() { + private List<ContainerLaunchContext> startedContainer = new ArrayList<>(); + + /** */ + public NMMock() { super("name"); } + /** + * @return Started containers. + */ + public List<ContainerLaunchContext> startedContainer() { + return startedContainer; + } + /** {@inheritDoc} */ @Override public Map<String, ByteBuffer> startContainer(Container container, ContainerLaunchContext containerLaunchContext) throws YarnException, IOException { + + startedContainer.add(containerLaunchContext); + return null; } @@ -321,4 +459,74 @@ public class IgniteApplicationMasterSelfTest extends TestCase { return 0; } } + + /** + * Mock file system. + */ + public static class MockFileSystem extends FileSystem { + /** */ + public MockFileSystem() { + } + + /** {@inheritDoc} */ + @Override public Path makeQualified(Path path) { + return new Path("/test/path"); + } + + /** {@inheritDoc} */ + @Override public FileStatus getFileStatus(Path f) throws IOException { + return new FileStatus(); + } + + /** {@inheritDoc} */ + @Override public boolean mkdirs(Path f, FsPermission permission) throws IOException { + return false; + } + + /** {@inheritDoc} */ + @Override public Path getWorkingDirectory() { + return null; + } + + /** {@inheritDoc} */ + @Override public void setWorkingDirectory(Path new_dir) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException { + return new FileStatus[0]; + } + + /** {@inheritDoc} */ + @Override public boolean delete(Path f, boolean recursive) throws IOException { + return false; + } + + /** {@inheritDoc} */ + @Override public boolean rename(Path src, Path dst) throws IOException { + return false; + } + + /** {@inheritDoc} */ + @Override public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException { + return null; + } + + /** {@inheritDoc} */ + @Override public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize, + short replication, long blockSize, Progressable progress) throws IOException { + return null; + } + + /** {@inheritDoc} */ + @Override public FSDataInputStream open(Path f, int bufferSize) throws IOException { + return null; + } + + /** {@inheritDoc} */ + @Override public URI getUri() { + return null; + } + } }