Repository: incubator-ignite Updated Branches: refs/heads/ignite-901 3518ba859 -> f44924a48
#IGNITE-YARN Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b5691911 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b5691911 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b5691911 Branch: refs/heads/ignite-901 Commit: b5691911dc545db3264afcf23153e2f9ec914724 Parents: 50cfa27 Author: Tikhonov Nikolay <tikhonovnico...@gmail.com> Authored: Tue Jun 2 21:17:35 2015 +0300 Committer: Tikhonov Nikolay <tikhonovnico...@gmail.com> Committed: Tue Jun 2 21:17:35 2015 +0300 ---------------------------------------------------------------------- .../apache/ignite/yarn/ApplicationMaster.java | 120 ++++++++++--------- .../apache/ignite/yarn/IgniteYarnClient.java | 116 +++++++++++++++++- .../ignite/yarn/IgniteSchedulerSelfTest.java | 2 + 3 files changed, 179 insertions(+), 59 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5691911/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java index f52a1de..9ab70d4 100644 --- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java +++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java @@ -17,6 +17,7 @@ package org.apache.ignite.yarn; +import com.google.common.collect.Lists; import org.apache.hadoop.conf.*; import org.apache.hadoop.yarn.api.*; import org.apache.hadoop.yarn.api.protocolrecords.*; @@ -32,56 +33,87 @@ import java.util.*; * TODO */ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { - /** {@inheritDoc} */ - @Override public void onContainersCompleted(List<ContainerStatus> statuses) { - + Configuration configuration; + NMClient nmClient; + int numContainersToWaitFor = 5; + + public ApplicationMaster() { + configuration = new YarnConfiguration(); + nmClient = NMClient.createNMClient(); + nmClient.init(configuration); + nmClient.start(); } - /** {@inheritDoc} */ - @Override public void onContainersAllocated(List<Container> containers) { + public void onContainersAllocated(List<Container> containers) { + for (Container container : containers) { + try { + // Launch container by create ContainerLaunchContext + // bin/hadoop fs -rm /user/ntikhonov/*.jar && bin/hadoop fs -copyFromLocal ./ignite-yarn.jar /user/ntikhonov + ContainerLaunchContext ctx = + Records.newRecord(ContainerLaunchContext.class); + ctx.setCommands( + Lists.newArrayList( + "ls " + + " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + + " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" + )); + System.out.println("[AM] Launching container " + container.getId()); + nmClient.startContainer(container, ctx); + } catch (Exception ex) { + System.err.println("[AM] Error launching container " + container.getId() + " " + ex); + } + } + } + public void onContainersCompleted(List<ContainerStatus> statuses) { + for (ContainerStatus status : statuses) { + System.out.println("[AM] Completed container " + status.getContainerId()); + synchronized (this) { + numContainersToWaitFor--; + } + } } - /** {@inheritDoc} */ - @Override public void onShutdownRequest() { + public void onNodesUpdated(List<NodeReport> updated) { + } + public void onReboot() { } - /** {@inheritDoc} */ - @Override public void onNodesUpdated(List<NodeReport> updatedNodes) { + public void onShutdownRequest() { + } + public void onError(Throwable t) { } - /** {@inheritDoc} */ - @Override public float getProgress() { + public float getProgress() { return 0; } - /** {@inheritDoc} */ - @Override public void onError(Throwable e) { + public boolean doneWithContainers() { + return numContainersToWaitFor == 0; + } + public Configuration getConfiguration() { + return configuration; } - /** - * @param args Arguments. - */ public static void main(String[] args) throws Exception { - final String command = args[0]; - final int n = Integer.valueOf(args[1]); + ApplicationMaster master = new ApplicationMaster(); + master.runMainLoop(); - // Initialize clients to ResourceManager and NodeManagers - Configuration conf = new YarnConfiguration(); + } - AMRMClient<AMRMClient.ContainerRequest> rmClient = AMRMClient.createAMRMClient(); - rmClient.init(conf); - rmClient.start(); + public void runMainLoop() throws Exception { - NMClient nmClient = NMClient.createNMClient(); - nmClient.init(conf); - nmClient.start(); + AMRMClientAsync<AMRMClient.ContainerRequest> rmClient = AMRMClientAsync.createAMRMClientAsync(100, this); + rmClient.init(getConfiguration()); + rmClient.start(); // Register with ResourceManager + System.out.println("[AM] registerApplicationMaster 0"); rmClient.registerApplicationMaster("", 0, ""); + System.out.println("[AM] registerApplicationMaster 1"); // Priority for worker containers - priorities are intra-application Priority priority = Records.newRecord(Priority.class); @@ -93,41 +125,21 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { capability.setVirtualCores(1); // Make container requests to ResourceManager - for (int i = 0; i < n; ++i) { - AMRMClient.ContainerRequest containerAsk = - new AMRMClient.ContainerRequest(capability, null, null, priority); - + for (int i = 0; i < numContainersToWaitFor; ++i) { + AMRMClient.ContainerRequest containerAsk = new AMRMClient.ContainerRequest(capability, null, null, priority); + System.out.println("[AM] Making res-req " + i); rmClient.addContainerRequest(containerAsk); } - // Obtain allocated containers, launch and check for responses - int responseId = 0; - int completedContainers = 0; - while (completedContainers < n) { - AllocateResponse response = rmClient.allocate(responseId++); - for (Container container : response.getAllocatedContainers()) { - // Launch container by create ContainerLaunchContext - ContainerLaunchContext ctx = - Records.newRecord(ContainerLaunchContext.class); - - ctx.setCommands( - Collections.singletonList( - command + - " 1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout" + - " 2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr" - )); - - nmClient.startContainer(container, ctx); - } - for (ContainerStatus status : response.getCompletedContainersStatuses()) { - ++completedContainers; - System.out.println("Completed container " + status.getContainerId()); - } + System.out.println("[AM] waiting for containers to finish"); + while (!doneWithContainers()) { Thread.sleep(100); } + System.out.println("[AM] unregisterApplicationMaster 0"); // Un-register with ResourceManager rmClient.unregisterApplicationMaster( - FinalApplicationStatus.SUCCEEDED, "", ""); + FinalApplicationStatus.SUCCEEDED, "", ""); + System.out.println("[AM] unregisterApplicationMaster 1"); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5691911/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java index 7cef50d..e020ef4 100644 --- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java +++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java @@ -17,11 +17,24 @@ package org.apache.ignite.yarn; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.*; import org.apache.hadoop.yarn.conf.*; +import org.apache.hadoop.yarn.util.Apps; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.hadoop.yarn.util.Records; +import java.io.File; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import java.util.logging.*; +import static org.apache.hadoop.yarn.api.ApplicationConstants.*; + /** * Ignite yarn client. */ @@ -35,16 +48,109 @@ public class IgniteYarnClient { * @param args Args. */ public static void main(String[] args) throws Exception { - ClusterProperties clusterProps = ClusterProperties.from(args.length >= 1 ? args[0] : null); - - // Create yarnClient YarnConfiguration conf = new YarnConfiguration(); - YarnClient yarnClient = YarnClient.createYarnClient(); - yarnClient.init(conf); yarnClient.start(); + // Create application via yarnClient YarnClientApplication app = yarnClient.createApplication(); + + // Set up the container launch context for the application master + ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); + + amContainer.setCommands( + Collections.singletonList( + " $JAVA_HOME/bin/java -Xmx256M org.apache.ignite.yarn.ApplicationMaster" + + " 1>" + LOG_DIR_EXPANSION_VAR + "/stdout" + + " 2>" + LOG_DIR_EXPANSION_VAR + "/stderr" + ) + ); + + // Setup jar for ApplicationMaster + final LocalResource appMasterJar = Records.newRecord(LocalResource.class); + setupAppMasterJar(new Path("/user/ntikhonov/ignite-yarn.jar"), appMasterJar, conf); + + final LocalResource igniteZip = Records.newRecord(LocalResource.class); + setupAppMasterJar(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6.zip"), igniteZip, conf); + + FileSystem fileSystem = FileSystem.get(conf); + + Path path = fileSystem.makeQualified(new Path("/user/ntikhonov/gridgain-community-fabric-1.0.6/bin/ignite.sh")); + + System.out.println("Path: " + path); + System.out.println("Path URI: " + path.toUri().toString()); + + amContainer.setLocalResources(new HashMap<String, LocalResource>(){{ + put("ignite-yarn.jar", appMasterJar); + put("ignite", igniteZip); + }}); + + // Setup CLASSPATH for ApplicationMaster + Map<String, String> appMasterEnv = new HashMap<String, String>(); + setupAppMasterEnv(appMasterEnv, conf); + amContainer.setEnvironment(appMasterEnv); + + // Set up resource type requirements for ApplicationMaster + Resource capability = Records.newRecord(Resource.class); + capability.setMemory(256); + capability.setVirtualCores(1); + + // Finally, set-up ApplicationSubmissionContext for the application + ApplicationSubmissionContext appContext = + app.getApplicationSubmissionContext(); + appContext.setApplicationName("simple-yarn-app"); // application name + appContext.setAMContainerSpec(amContainer); + appContext.setResource(capability); + appContext.setQueue("default"); // queue + + // Submit application + ApplicationId appId = appContext.getApplicationId(); + System.out.println("Submitting application " + appId); + yarnClient.submitApplication(appContext); + + ApplicationReport appReport = yarnClient.getApplicationReport(appId); + YarnApplicationState appState = appReport.getYarnApplicationState(); + while (appState != YarnApplicationState.FINISHED && + appState != YarnApplicationState.KILLED && + appState != YarnApplicationState.FAILED) { + Thread.sleep(100); + appReport = yarnClient.getApplicationReport(appId); + appState = appReport.getYarnApplicationState(); + } + + System.out.println( + "Application " + appId + " finished with" + + " state " + appState + + " at " + appReport.getFinishTime()); + } + + private static void setupAppMasterJar(Path jarPath, LocalResource appMasterJar, YarnConfiguration conf) + throws Exception { + FileSystem fileSystem = FileSystem.get(conf); + jarPath = fileSystem.makeQualified(jarPath); + + FileStatus jarStat = fileSystem.getFileStatus(jarPath); + + appMasterJar.setResource(ConverterUtils.getYarnUrlFromPath(jarPath)); + appMasterJar.setSize(jarStat.getLen()); + appMasterJar.setTimestamp(jarStat.getModificationTime()); + appMasterJar.setType(LocalResourceType.ARCHIVE); + appMasterJar.setVisibility(LocalResourceVisibility.APPLICATION); + + System.out.println("Path :" + jarPath); + } + + private static void setupAppMasterEnv(Map<String, String> appMasterEnv, YarnConfiguration conf) { + for (String c : conf.getStrings( + YarnConfiguration.YARN_APPLICATION_CLASSPATH, + YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) + Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), + c.trim(), File.pathSeparator); + + Apps.addToEnvironment(appMasterEnv, + Environment.CLASSPATH.name(), + Environment.PWD.$() + File.separator + "*", + File.pathSeparator); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b5691911/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java index 1a03743..04d3492 100644 --- a/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java +++ b/modules/yarn/src/test/java/org/apache/ignite/yarn/IgniteSchedulerSelfTest.java @@ -23,5 +23,7 @@ import junit.framework.*; * Scheduler tests. */ public class IgniteSchedulerSelfTest extends TestCase { + public void testName() throws Exception { + } }