#YARN Code cleanup. Added tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/960e19dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/960e19dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/960e19dd Branch: refs/heads/ignite-gg-9615 Commit: 960e19dda15d58ddc403a8e6856d0eb19d7794c1 Parents: 858d2a3 Author: nikolay tikhonov <ntikho...@gridgain.com> Authored: Tue Jun 9 16:38:07 2015 +0300 Committer: nikolay tikhonov <ntikho...@gridgain.com> Committed: Tue Jun 9 16:38:07 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/yarn/ApplicationMaster.java | 89 +++-- .../apache/ignite/yarn/ClusterProperties.java | 53 +-- .../apache/ignite/yarn/IgniteYarnClient.java | 30 +- .../org/apache/ignite/IgniteMesosTestSuite.java | 38 --- .../org/apache/ignite/IgniteYarnTestSuite.java | 38 +++ .../yarn/IgniteApplicationMasterSelfTest.java | 324 +++++++++++++++++++ .../ignite/yarn/IgniteSchedulerSelfTest.java | 29 -- 7 files changed, 460 insertions(+), 141 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java index 3bf0521..c552ea0 100644 --- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java +++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java @@ -47,27 +47,30 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { public static final String DELIM = ","; /** */ + private long schedulerTimeout = TimeUnit.SECONDS.toMillis(1); + + /** Yarn configuration. */ private YarnConfiguration conf; - /** */ + /** Cluster properties. */ private ClusterProperties props; - /** */ + /** Network manager. */ private NMClient nmClient; - /** */ - AMRMClientAsync<AMRMClient.ContainerRequest> rmClient; + /** Resource manager. */ + private AMRMClientAsync<AMRMClient.ContainerRequest> rmClient; - /** */ + /** Ignite path. */ private Path ignitePath; - /** */ + /** Config path. */ private Path cfgPath; - /** */ + /** Hadoop file system. */ private FileSystem fs; - /** */ + /** Running containers. */ private Map<ContainerId, IgniteContainer> containers = new ConcurrentHashMap<>(); /** @@ -76,13 +79,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { public ApplicationMaster(String ignitePath, ClusterProperties props) throws Exception { this.conf = new YarnConfiguration(); this.props = props; - this.fs = FileSystem.get(conf); this.ignitePath = new Path(ignitePath); - - nmClient = NMClient.createNMClient(); - - nmClient.init(conf); - nmClient.start(); } /** {@inheritDoc} */ @@ -103,11 +100,16 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { resources.put("ignite", IgniteYarnUtils.setupFile(ignitePath, fs, LocalResourceType.ARCHIVE)); resources.put("ignite-config.xml", IgniteYarnUtils.setupFile(cfgPath, fs, LocalResourceType.FILE)); + if (props.userLibs() != null) + resources.put("libs", IgniteYarnUtils.setupFile(new Path(props.userLibs()), fs, + LocalResourceType.FILE)); + ctx.setLocalResources(resources); ctx.setCommands( Collections.singletonList( - "./ignite/*/bin/ignite.sh " + "cp -r ./libs/* ./ignite/*/libs/ || true && " + + "./ignite/*/bin/ignite.sh " + "./ignite-config.xml" + " -J-Xmx" + c.getResource().getMemory() + "m" + " -J-Xms" + c.getResource().getMemory() + "m" @@ -153,7 +155,9 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { // Check that slave satisfies min requirements. if (cont.getResource().getVirtualCores() < props.cpusPerNode() || cont.getResource().getMemory() < props.memoryPerNode()) { - //log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList()); + log.log(Level.FINE, "Container resources not sufficient requirements. Host: {0}, cpu: {1}, mem: {2}", + new Object[]{cont.getNodeId().getHost(), cont.getResource().getVirtualCores(), + cont.getResource().getMemory()}); return false; } @@ -185,7 +189,8 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { for (ContainerStatus status : statuses) { containers.remove(status.getContainerId()); - //log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList()); + log.log(Level.INFO, "Container stopped. Container id: {0}. State: {1}.", + new Object[]{status.getContainerId(), status.getState()}); } } @@ -243,9 +248,6 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { * @throws Exception If failed. */ public void run() throws Exception { - rmClient.init(conf); - rmClient.start(); - // Register with ResourceManager rmClient.registerApplicationMaster("", 0, ""); @@ -260,7 +262,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { while (!nmClient.isInState(Service.STATE.STOPPED)) { int runningCnt = containers.size(); - if (runningCnt < props.instances() && checkAvailableResource(rmClient.getAvailableResources())) { + if (runningCnt < props.instances() && checkAvailableResource()) { // Resource requirements for worker containers. Resource capability = Records.newRecord(Resource.class); @@ -279,7 +281,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { } } - TimeUnit.SECONDS.sleep(5); + TimeUnit.MILLISECONDS.sleep(schedulerTimeout); } } catch (Exception e) { @@ -294,10 +296,11 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { } /** - * @param availableRes Available resources. * @return {@code True} if cluster contains available resources. */ - private boolean checkAvailableResource(Resource availableRes) { + private boolean checkAvailableResource() { + Resource availableRes = rmClient.getAvailableResources(); + return availableRes == null || availableRes.getMemory() >= props.memoryPerNode() && availableRes.getVirtualCores() >= props.cpusPerNode(); } @@ -306,10 +309,17 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { * @throws IOException */ public void init() throws IOException { + fs = FileSystem.get(conf); + + nmClient = NMClient.createNMClient(); + + nmClient.init(conf); + nmClient.start(); + // Create async application master. rmClient = AMRMClientAsync.createAMRMClientAsync(300, this); - if (props.igniteConfigUrl() == null || props.igniteConfigUrl().isEmpty()) { + if (props.igniteCfg() == null || props.igniteCfg().isEmpty()) { InputStream input = Thread.currentThread().getContextClassLoader() .getResourceAsStream(IgniteYarnUtils.DEFAULT_IGNITE_CONFIG); @@ -325,6 +335,33 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { IOUtils.closeQuietly(outputStream); } else - cfgPath = new Path(props.igniteConfigUrl()); + cfgPath = new Path(props.igniteCfg()); + } + + /** + * Sets NMClient. + * + * @param nmClient NMClient. + */ + public void setNmClient(NMClient nmClient) { + this.nmClient = nmClient; + } + + /** + * Sets RMClient + * + * @param rmClient AMRMClientAsync. + */ + public void setRmClient(AMRMClientAsync<AMRMClient.ContainerRequest> rmClient) { + this.rmClient = rmClient; + } + + /** + * Sets scheduler timeout. + * + * @param schedulerTimeout Scheduler timeout. + */ + public void setSchedulerTimeout(long schedulerTimeout) { + this.schedulerTimeout = schedulerTimeout; } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java index f9fdb59..d021d45 100644 --- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java +++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java @@ -59,8 +59,11 @@ public class ClusterProperties { /** */ public static final String IGNITE_NODE_COUNT = "IGNITE_NODE_COUNT"; + /** */ + public static final int DEFAULT_IGNITE_NODE_COUNT = 3; + /** Node count limit. */ - private double nodeCnt = 3; + private double nodeCnt = DEFAULT_IGNITE_NODE_COUNT; /** */ public static final String IGNITE_VERSION = "IGNITE_VERSION"; @@ -105,24 +108,12 @@ public class ClusterProperties { private String userLibs = null; /** */ - public static final String IGNITE_USERS_LIBS_URL = "IGNITE_USERS_LIBS_URL"; - - /** URL to users libs. */ - private String userLibsUrl = null; - - /** */ public static final String IGNITE_CONFIG_XML = "IGNITE_XML_CONFIG"; /** Ignite config. */ private String igniteCfg = null; /** */ - public static final String IGNITE_CONFIG_XML_URL = "IGNITE_CONFIG_XML_URL"; - - /** Url to ignite config. */ - private String igniteCfgUrl = null; - - /** */ public static final String IGNITE_HOSTNAME_CONSTRAINT = "IGNITE_HOSTNAME_CONSTRAINT"; /** Url to ignite config. */ @@ -179,6 +170,13 @@ public class ClusterProperties { } /** + * Sets instance count limit. + */ + public void instances(int nodeCnt) { + this.nodeCnt = nodeCnt; + } + + /** * Sets hostname constraint. * * @param pattern Hostname pattern. @@ -230,20 +228,6 @@ public class ClusterProperties { } /** - * @return Url to ignite configuration. - */ - public String igniteConfigUrl() { - return igniteCfgUrl; - } - - /** - * @return Url to users libs configuration. - */ - public String usersLibsUrl() { - return userLibsUrl; - } - - /** * @return Host name constraint. */ public Pattern hostnameConstraint() { @@ -268,15 +252,14 @@ public class ClusterProperties { prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, props, DEFAULT_CLUSTER_NAME); - prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, props, null); - prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, props, null); - prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, 1.0); prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, 2048.0); prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, 2.0); prop.igniteVer = getStringProperty(IGNITE_VERSION, props, DEFAULT_IGNITE_VERSION); prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, props, DEFAULT_IGNITE_WORK_DIR); + prop.igniteLocalWorkDir = getStringProperty(IGNITE_LOCAL_WORK_DIR, props, DEFAULT_IGNITE_LOCAL_WORK_DIR); + prop.igniteReleasesDir = getStringProperty(IGNITE_RELEASES_DIR, props, DEFAULT_IGNITE_RELEASES_DIR); prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, props, null); prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, props, null); @@ -306,15 +289,14 @@ public class ClusterProperties { prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, null, DEFAULT_CLUSTER_NAME); - prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, null, null); - prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, null, null); - prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, null, 1.0); prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, null, 2048.0); prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, null, 2.0); prop.igniteVer = getStringProperty(IGNITE_VERSION, null, DEFAULT_IGNITE_VERSION); prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, null, DEFAULT_IGNITE_WORK_DIR); + prop.igniteLocalWorkDir = getStringProperty(IGNITE_LOCAL_WORK_DIR, null, DEFAULT_IGNITE_LOCAL_WORK_DIR); + prop.igniteReleasesDir = getStringProperty(IGNITE_RELEASES_DIR, null, DEFAULT_IGNITE_RELEASES_DIR); prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, null, null); prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, null, null); @@ -342,15 +324,14 @@ public class ClusterProperties { envs.put(IGNITE_CLUSTER_NAME, toEnvVal(clusterName)); - envs.put(IGNITE_USERS_LIBS_URL, toEnvVal(userLibsUrl)); - envs.put(IGNITE_CONFIG_XML_URL, toEnvVal(igniteCfgUrl)); - envs.put(IGNITE_RUN_CPU_PER_NODE, toEnvVal(cpuPerNode)); envs.put(IGNITE_MEMORY_PER_NODE, toEnvVal(memPerNode)); envs.put(IGNITE_NODE_COUNT, toEnvVal(nodeCnt)); envs.put(IGNITE_VERSION, toEnvVal(igniteVer)); envs.put(IGNITE_WORKING_DIR, toEnvVal(igniteWorkDir)); + envs.put(IGNITE_LOCAL_WORK_DIR, toEnvVal(igniteLocalWorkDir)); + envs.put(IGNITE_RELEASES_DIR, toEnvVal(igniteReleasesDir)); envs.put(IGNITE_CONFIG_XML, toEnvVal(igniteCfg)); envs.put(IGNITE_USERS_LIBS, toEnvVal(userLibs)); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java index f74890d..764e717 100644 --- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java +++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java @@ -39,9 +39,9 @@ public class IgniteYarnClient { public static final Logger log = Logger.getLogger(IgniteYarnClient.class.getSimpleName()); /** - * Main methods has only one optional parameter - path to properties file. + * Main methods has one mandatory parameter and one optional parameter. * - * @param args Args. + * @param args Path to jar mandatory parameter and property file is optional. */ public static void main(String[] args) throws Exception { checkArguments(args); @@ -107,24 +107,27 @@ public class IgniteYarnClient { yarnClient.submitApplication(appContext); - log.log(Level.INFO, "Submitted application. Application id: [{0}]", appId); + log.log(Level.INFO, "Submitted application. Application id: {0}", appId); ApplicationReport appReport = yarnClient.getApplicationReport(appId); YarnApplicationState appState = appReport.getYarnApplicationState(); - while (appState != YarnApplicationState.FINISHED && - appState != YarnApplicationState.KILLED && - appState != YarnApplicationState.FAILED) { + while (appState == YarnApplicationState.NEW || + appState == YarnApplicationState.NEW_SAVING || + appState == YarnApplicationState.SUBMITTED || + appState == YarnApplicationState.ACCEPTED) { TimeUnit.SECONDS.sleep(1L); appReport = yarnClient.getApplicationReport(appId); + if (appState != YarnApplicationState.ACCEPTED + && appReport.getYarnApplicationState() == YarnApplicationState.ACCEPTED) + log.log(Level.INFO, "Application {0} is ACCEPTED.", appId); + appState = appReport.getYarnApplicationState(); } - yarnClient.killApplication(appId); - - log.log(Level.INFO, "Application [{0}] finished with state [{1}]", new Object[]{appId, appState}); + log.log(Level.INFO, "Application {0} is {1}.", new Object[]{appId, appState}); } /** @@ -134,7 +137,7 @@ public class IgniteYarnClient { */ private static void checkArguments(String[] args) { if (args.length < 1) - throw new IllegalArgumentException(); + throw new IllegalArgumentException("Invalid arguments."); } /** @@ -146,11 +149,14 @@ public class IgniteYarnClient { private static Path getIgnite(ClusterProperties props, FileSystem fileSystem) throws Exception { IgniteProvider provider = new IgniteProvider(props, fileSystem); - return provider.getIgnite(); + if (props.igniteVer() == null + || props.igniteVer().equalsIgnoreCase(ClusterProperties.DEFAULT_IGNITE_VERSION)) + return provider.getIgnite(); + else + return provider.getIgnite(props.igniteVer()); } /** - * * @param envs Environment variables. * @param conf Yarn configuration. */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java b/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java deleted file mode 100644 index e6920b3..0000000 --- a/modules/yarn/src/test/java/org/apache/ignite/IgniteMesosTestSuite.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite; - -import junit.framework.*; -import org.apache.ignite.yarn.*; - -/** - * Apache Mesos integration tests. - */ -public class IgniteMesosTestSuite extends TestSuite { - /** - * @return Test suite. - * @throws Exception Thrown in case of the failure. - */ - public static TestSuite suite() throws Exception { - TestSuite suite = new TestSuite("Apache Mesos Integration Test Suite"); - - suite.addTest(new TestSuite(IgniteSchedulerSelfTest.class)); - - return suite; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java b/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java new file mode 100644 index 0000000..aa31774 --- /dev/null +++ b/modules/yarn/src/test/java/org/apache/ignite/IgniteYarnTestSuite.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite; + +import junit.framework.*; +import org.apache.ignite.yarn.*; + +/** + * Apache Hadoop Yarn integration tests. + */ +public class IgniteYarnTestSuite extends TestSuite { + /** + * @return Test suite. + * @throws Exception Thrown in case of the failure. + */ + public static TestSuite suite() throws Exception { + TestSuite suite = new TestSuite("Apache Yarn Integration Test Suite"); + + suite.addTest(new TestSuite(IgniteApplicationMasterSelfTest.class)); + + return suite; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java new file mode 100644 index 0000000..d865659 --- /dev/null +++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteApplicationMasterSelfTest.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.yarn; + +import junit.framework.*; +import org.apache.curator.utils.ThreadUtils; +import org.apache.hadoop.util.ThreadUtil; +import org.apache.hadoop.yarn.api.protocolrecords.*; +import org.apache.hadoop.yarn.api.records.*; +import org.apache.hadoop.yarn.client.api.*; +import org.apache.hadoop.yarn.client.api.async.*; +import org.apache.hadoop.yarn.exceptions.*; + +import java.io.*; +import java.nio.*; +import java.util.*; + +/** + * Application master tests. + */ +public class IgniteApplicationMasterSelfTest extends TestCase { + /** */ + private ApplicationMaster appMaster; + + /** */ + private ClusterProperties props; + + /** */ + private RMMock rmMock = new RMMock(); + + /** + * @throws Exception If failed. + */ + @Override protected void setUp() throws Exception { + super.setUp(); + + props = new ClusterProperties(); + appMaster = new ApplicationMaster("test", props); + + appMaster.setSchedulerTimeout(100000); + + rmMock.clear(); + } + + /** + * @throws Exception If failed. + */ + public void testContainerAllocate() throws Exception { + appMaster.setRmClient(rmMock); + appMaster.setNmClient(new NMMock()); + + props.cpusPerNode(2); + props.memoryPerNode(1024); + props.instances(3); + + Thread thread = runAppMaster(appMaster); + + List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 2, 1000); + + interruptedThread(thread); + + assertEquals(3, contRequests.size()); + + for (AMRMClient.ContainerRequest req : contRequests) { + assertEquals(2, req.getCapability().getVirtualCores()); + assertEquals(1024, req.getCapability().getMemory()); + } + } + + + /** + * @throws Exception If failed. + */ + public void testClusterResource() throws Exception { + rmMock.availableRes(new MockResource(1024, 2)); + + appMaster.setRmClient(rmMock); + appMaster.setNmClient(new NMMock()); + + props.cpusPerNode(8); + props.memoryPerNode(10240); + props.instances(3); + + Thread thread = runAppMaster(appMaster); + + interruptedThread(thread); + + List<AMRMClient.ContainerRequest> contRequests = collectRequests(rmMock, 1, 1000); + + assertEquals(0, contRequests.size()); + } + + /** + * @param rmMock RM mock. + * @param expectedCnt Expected cnt. + * @param timeOut Timeout. + * @return Requests. + */ + private List<AMRMClient.ContainerRequest> collectRequests(RMMock rmMock, int expectedCnt, int timeOut) { + long startTime = System.currentTimeMillis(); + + List<AMRMClient.ContainerRequest> requests = rmMock.requests(); + + while (requests.size() < expectedCnt + && (System.currentTimeMillis() - startTime) < timeOut) + requests = rmMock.requests(); + + return requests; + } + + /** + * Runs appMaster other thread. + * + * @param appMaster Application master. + * @return Thread. + */ + private static Thread runAppMaster(final ApplicationMaster appMaster) { + Thread thread = new Thread(new Runnable() { + @Override public void run() { + try { + appMaster.run(); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + thread.start(); + + return thread; + } + + /** + * Interrupt thread and wait. + * + * @param thread Thread. + */ + private static void interruptedThread(Thread thread) throws InterruptedException { + thread.interrupt(); + + thread.join(); + } + + /** + * Resource manager mock. + */ + private static class RMMock extends AMRMClientAsync { + /** */ + private List<AMRMClient.ContainerRequest> contRequests = new ArrayList<>(); + + /** */ + private Resource availableRes; + + /** */ + public RMMock() { + super(0, null); + } + + /** + * @return Requests. + */ + public List<AMRMClient.ContainerRequest> requests() { + return contRequests; + } + + /** + * Sets resource. + * + * @param availableRes Available resource. + */ + public void availableRes(Resource availableRes) { + this.availableRes = availableRes; + } + + /** + * Clear internal state. + */ + public void clear() { + contRequests.clear(); + availableRes = null; + } + + /** {@inheritDoc} */ + @Override public List<? extends Collection> getMatchingRequests(Priority priority, String resourceName, + Resource capability) { + return null; + } + + /** {@inheritDoc} */ + @Override public RegisterApplicationMasterResponse registerApplicationMaster(String appHostName, + int appHostPort, String appTrackingUrl) throws YarnException, IOException { + return null; + } + + /** {@inheritDoc} */ + @Override public void unregisterApplicationMaster(FinalApplicationStatus appStatus, String appMessage, + String appTrackingUrl) throws YarnException, IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void addContainerRequest(AMRMClient.ContainerRequest req) { + contRequests.add(req); + } + + /** {@inheritDoc} */ + @Override public void removeContainerRequest(AMRMClient.ContainerRequest req) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void releaseAssignedContainer(ContainerId containerId) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public Resource getAvailableResources() { + return availableRes; + } + + /** {@inheritDoc} */ + @Override public int getClusterNodeCount() { + return 0; + } + + /** {@inheritDoc} */ + @Override public void updateBlacklist(List blacklistAdditions, List blacklistRemovals) { + // No-op. + } + } + + /** + * Network manager mock. + */ + public static class NMMock extends NMClient { + /** */ + protected NMMock() { + super("name"); + } + + /** {@inheritDoc} */ + @Override public Map<String, ByteBuffer> startContainer(Container container, + ContainerLaunchContext containerLaunchContext) throws YarnException, IOException { + return null; + } + + /** {@inheritDoc} */ + @Override public void stopContainer(ContainerId containerId, NodeId nodeId) throws YarnException, IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public ContainerStatus getContainerStatus(ContainerId containerId, NodeId nodeId) + throws YarnException, IOException { + return null; + } + + /** {@inheritDoc} */ + @Override public void cleanupRunningContainersOnStop(boolean enabled) { + // No-op. + } + } + + /** + * Resource. + */ + public static class MockResource extends Resource { + /** Memory. */ + private int mem; + + /** CPU. */ + private int cpu; + + /** + * @param mem Memory. + * @param cpu CPU. + */ + public MockResource(int mem, int cpu) { + this.mem = mem; + this.cpu = cpu; + } + + /** {@inheritDoc} */ + @Override public int getMemory() { + return mem; + } + + /** {@inheritDoc} */ + @Override public void setMemory(int memory) { + this.mem = memory; + } + + /** {@inheritDoc} */ + @Override public int getVirtualCores() { + return cpu; + } + + /** {@inheritDoc} */ + @Override public void setVirtualCores(int vCores) { + this.cpu = vCores; + } + + /** {@inheritDoc} */ + @Override public int compareTo(Resource resource) { + return 0; + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/960e19dd/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java deleted file mode 100644 index 04d3492..0000000 --- a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.yarn; - -import junit.framework.*; - -/** - * Scheduler tests. - */ -public class IgniteSchedulerSelfTest extends TestCase { - public void testName() throws Exception { - - } -}