#YARN WIP
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/858d2a3f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/858d2a3f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/858d2a3f Branch: refs/heads/master Commit: 858d2a3f757fea2b88ffcb907e0f221699e32420 Parents: 12d9c02 Author: nikolay tikhonov <ntikho...@gridgain.com> Authored: Mon Jun 8 20:03:31 2015 +0300 Committer: nikolay tikhonov <ntikho...@gridgain.com> Committed: Mon Jun 8 20:03:31 2015 +0300 ---------------------------------------------------------------------- modules/yarn/README.txt | 8 +- .../apache/ignite/yarn/ApplicationMaster.java | 187 +++++++++++++------ .../apache/ignite/yarn/ClusterProperties.java | 145 ++++---------- .../org/apache/ignite/yarn/IgniteContainer.java | 33 +++- .../org/apache/ignite/yarn/IgniteProvider.java | 4 +- .../apache/ignite/yarn/IgniteYarnClient.java | 13 +- .../org/apache/ignite/yarn/package-info.java | 2 +- .../ignite/yarn/utils/IgniteYarnUtils.java | 4 +- 8 files changed, 207 insertions(+), 189 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/README.txt ---------------------------------------------------------------------- diff --git a/modules/yarn/README.txt b/modules/yarn/README.txt index 75a62f8..5cdd4a2 100644 --- a/modules/yarn/README.txt +++ b/modules/yarn/README.txt @@ -1,9 +1,9 @@ -Apache Ignite Mesos Module +Apache Ignite Yarn Module ------------------------ -Apache Ignite Mesos module provides integration Apache Ignite with Apache Mesos. +Apache Ignite Yarn module provides integration Apache Ignite with Apache Hadoop Yarn. -Importing Apache Ignite Mesos Module In Maven Project +Importing Apache Ignite Yarn Module In Maven Project ------------------------------------- If you are using Maven to manage dependencies of your project, you can add Cloud module @@ -19,7 +19,7 @@ interested in): ... <dependency> <groupId>org.apache.ignite</groupId> - <artifactId>ignite-mesos</artifactId> + <artifactId>ignite-yarn</artifactId> <version>${ignite.version}</version> </dependency> ... http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java index fe065a3..3bf0521 100644 --- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java +++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ApplicationMaster.java @@ -19,7 +19,7 @@ package org.apache.ignite.yarn; import org.apache.commons.io.*; import org.apache.hadoop.fs.*; -import org.apache.hadoop.service.Service; +import org.apache.hadoop.service.*; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.client.api.*; import org.apache.hadoop.yarn.client.api.async.*; @@ -30,11 +30,16 @@ import org.apache.ignite.yarn.utils.*; import java.io.*; import java.util.*; import java.util.concurrent.*; +import java.util.logging.Level; +import java.util.logging.Logger; /** - * TODO + * Application master request containers from Yarn and decides how many resources will be occupied. */ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { + /** */ + public static final Logger log = Logger.getLogger(ApplicationMaster.class.getSimpleName()); + /** Default port range. */ public static final String DEFAULT_PORT = ":47500..47510"; @@ -51,6 +56,9 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { private NMClient nmClient; /** */ + AMRMClientAsync<AMRMClient.ContainerRequest> rmClient; + + /** */ private Path ignitePath; /** */ @@ -60,7 +68,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { private FileSystem fs; /** */ - private Map<String, IgniteContainer> containers = new HashMap<>(); + private Map<ContainerId, IgniteContainer> containers = new ConcurrentHashMap<>(); /** * Constructor. @@ -79,47 +87,81 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { /** {@inheritDoc} */ public synchronized void onContainersAllocated(List<Container> conts) { - for (Container container : conts) { - try { - ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); + for (Container c : conts) { + if (checkContainer(c)) { + try { + ContainerLaunchContext ctx = Records.newRecord(ContainerLaunchContext.class); - Map<String, String> env = new HashMap<>(System.getenv()); + Map<String, String> env = new HashMap<>(System.getenv()); - env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(container.getNodeId().getHost())); + //env.put("IGNITE_TCP_DISCOVERY_ADDRESSES", getAddress(c.getNodeId().getHost())); - ctx.setEnvironment(env); + ctx.setEnvironment(env); - Map<String, LocalResource> resources = new HashMap<>(); + Map<String, LocalResource> resources = new HashMap<>(); - resources.put("ignite", IgniteYarnUtils.setupFile(ignitePath, fs, LocalResourceType.ARCHIVE)); - resources.put("ignite-config.xml", IgniteYarnUtils.setupFile(cfgPath, fs, LocalResourceType.FILE)); + resources.put("ignite", IgniteYarnUtils.setupFile(ignitePath, fs, LocalResourceType.ARCHIVE)); + resources.put("ignite-config.xml", IgniteYarnUtils.setupFile(cfgPath, fs, LocalResourceType.FILE)); - ctx.setLocalResources(resources); + ctx.setLocalResources(resources); - ctx.setCommands( - Collections.singletonList( - "./ignite/*/bin/ignite.sh " - + "./ignite-config.xml" - + " -J-Xmx" + container.getResource().getMemory() + "m" - + " -J-Xms" + container.getResource().getMemory() + "m" - + IgniteYarnUtils.YARN_LOG_OUT - )); + ctx.setCommands( + Collections.singletonList( + "./ignite/*/bin/ignite.sh " + + "./ignite-config.xml" + + " -J-Xmx" + c.getResource().getMemory() + "m" + + " -J-Xms" + c.getResource().getMemory() + "m" + + IgniteYarnUtils.YARN_LOG_OUT + )); - System.out.println("[AM] Launching container " + container.getId()); + log.log(Level.INFO, "Launching container: {0}.", c.getId()); - nmClient.startContainer(container, ctx); + nmClient.startContainer(c, ctx); - containers.put(container.getNodeId().getHost(), - new IgniteContainer(container.getNodeId().getHost(), container.getResource().getVirtualCores(), - container.getResource().getMemory())); - } - catch (Exception ex) { - System.err.println("[AM] Error launching container " + container.getId() + " " + ex); + containers.put(c.getId(), + new IgniteContainer( + c.getId(), + c.getNodeId(), + c.getResource().getVirtualCores(), + c.getResource().getMemory())); + } + catch (Exception ex) { + System.err.println("[AM] Error launching container " + c.getId() + " " + ex); + } } + else + rmClient.releaseAssignedContainer(c.getId()); } } /** + * Checks that container + * + * @param cont Container. + * @return {@code True} if + */ + private boolean checkContainer(Container cont) { + // Check limit on running nodes. + if (props.instances() <= containers.size()) + return false; + + // Check host name + if (props.hostnameConstraint() != null + && props.hostnameConstraint().matcher(cont.getNodeId().getHost()).matches()) + return false; + + // Check that slave satisfies min requirements. + if (cont.getResource().getVirtualCores() < props.cpusPerNode() + || cont.getResource().getMemory() < props.memoryPerNode()) { + //log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList()); + + return false; + } + + return true; + } + + /** * @return Address running nodes. */ private String getAddress(String address) { @@ -133,7 +175,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { StringBuilder sb = new StringBuilder(); for (IgniteContainer cont : containers.values()) - sb.append(cont.host()).append(DEFAULT_PORT).append(DELIM); + sb.append(cont.nodeId.getHost()).append(DEFAULT_PORT).append(DELIM); return sb.substring(0, sb.length() - 1); } @@ -141,13 +183,30 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { /** {@inheritDoc} */ public synchronized void onContainersCompleted(List<ContainerStatus> statuses) { for (ContainerStatus status : statuses) { - synchronized (this) { - } + containers.remove(status.getContainerId()); + + //log.log(Level.FINE, "Offer not sufficient for slave request: {0}", offer.getResourcesList()); } } /** {@inheritDoc} */ - public void onNodesUpdated(List<NodeReport> updated) { + public synchronized void onNodesUpdated(List<NodeReport> updated) { + for (NodeReport node : updated) { + // If node unusable. + if (node.getNodeState().isUnusable()) { + for (IgniteContainer cont : containers.values()) { + if (cont.nodeId().equals(node.getNodeId())) { + containers.remove(cont.id()); + + log.log(Level.WARNING, "Node is unusable. Node: {0}, state: {1}.", + new Object[]{node.getNodeId().getHost(), node.getNodeState()}); + } + } + + log.log(Level.WARNING, "Node is unusable. Node: {0}, state: {1}.", + new Object[]{node.getNodeId().getHost(), node.getNodeState()}); + } + } } /** {@inheritDoc} */ @@ -169,7 +228,7 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { * @throws Exception If failed. */ public static void main(String[] args) throws Exception { - ClusterProperties props = ClusterProperties.from(null); + ClusterProperties props = ClusterProperties.from(); ApplicationMaster master = new ApplicationMaster(args[0], props); @@ -184,60 +243,72 @@ public class ApplicationMaster implements AMRMClientAsync.CallbackHandler { * @throws Exception If failed. */ public void run() throws Exception { - // Create asyn application master. - AMRMClientAsync<AMRMClient.ContainerRequest> rmClient = AMRMClientAsync.createAMRMClientAsync(300, this); - rmClient.init(conf); rmClient.start(); // Register with ResourceManager rmClient.registerApplicationMaster("", 0, ""); - System.out.println("[AM] registerApplicationMaster 1"); + log.log(Level.INFO, "Application master registered."); // Priority for worker containers - priorities are intra-application Priority priority = Records.newRecord(Priority.class); priority.setPriority(0); - // Check ignite cluster. - while (!nmClient.isInState(Service.STATE.STOPPED)) { - Resource availableRes = rmClient.getAvailableResources(); + try { + // Check ignite cluster. + while (!nmClient.isInState(Service.STATE.STOPPED)) { + int runningCnt = containers.size(); - if (containers.size() < props.instances() || availableRes.getMemory() >= props.cpusPerNode() - || availableRes.getVirtualCores() >= props.cpus()) { - // Resource requirements for worker containers - Resource capability = Records.newRecord(Resource.class); - capability.setMemory(1024); - capability.setVirtualCores(2); + if (runningCnt < props.instances() && checkAvailableResource(rmClient.getAvailableResources())) { + // Resource requirements for worker containers. + Resource capability = Records.newRecord(Resource.class); - for (int i = 0; i < 1; ++i) { - // Make container requests to ResourceManager - AMRMClient.ContainerRequest containerAsk = + capability.setMemory((int)props.memoryPerNode()); + capability.setVirtualCores((int)props.cpusPerNode()); + + for (int i = 0; i < props.instances() - runningCnt; ++i) { + // Make container requests to ResourceManager + AMRMClient.ContainerRequest containerAsk = new AMRMClient.ContainerRequest(capability, null, null, priority); - System.out.println("[AM] Making res-req " + i); + rmClient.addContainerRequest(containerAsk); - rmClient.addContainerRequest(containerAsk); + log.log(Level.INFO, "Making request. Memory: {0}, cpu {1}.", + new Object[]{props.memoryPerNode(), props.cpusPerNode()}); + } } - } - TimeUnit.SECONDS.sleep(5); + TimeUnit.SECONDS.sleep(5); + } } + catch (Exception e) { + // Un-register with ResourceManager + rmClient.unregisterApplicationMaster(FinalApplicationStatus.FAILED, "", ""); - System.out.println("[AM] waiting for containers to finish"); - - System.out.println("[AM] unregisterApplicationMaster 0"); + System.exit(1); + } // Un-register with ResourceManager rmClient.unregisterApplicationMaster(FinalApplicationStatus.KILLED, "", ""); + } - System.out.println("[AM] unregisterApplicationMaster 1"); + /** + * @param availableRes Available resources. + * @return {@code True} if cluster contains available resources. + */ + private boolean checkAvailableResource(Resource availableRes) { + return availableRes == null || availableRes.getMemory() >= props.memoryPerNode() + && availableRes.getVirtualCores() >= props.cpusPerNode(); } /** * @throws IOException */ public void init() throws IOException { + // Create async application master. + rmClient = AMRMClientAsync.createAMRMClientAsync(300, this); + if (props.igniteConfigUrl() == null || props.igniteConfigUrl().isEmpty()) { InputStream input = Thread.currentThread().getContextClassLoader() .getResourceAsStream(IgniteYarnUtils.DEFAULT_IGNITE_CONFIG); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java index adddd51..f9fdb59 100644 --- a/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java +++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/ClusterProperties.java @@ -41,28 +41,16 @@ public class ClusterProperties { /** */ public static final String DEFAULT_CLUSTER_NAME = "ignite-cluster"; - /** Mesos master url. */ + /** Cluster name. */ private String clusterName = DEFAULT_CLUSTER_NAME; /** */ - public static final String IGNITE_TOTAL_CPU = "IGNITE_TOTAL_CPU"; - - /** CPU limit. */ - private double cpu = UNLIMITED; - - /** */ public static final String IGNITE_RUN_CPU_PER_NODE = "IGNITE_RUN_CPU_PER_NODE"; /** CPU limit. */ private double cpuPerNode = UNLIMITED; /** */ - public static final String IGNITE_TOTAL_MEMORY = "IGNITE_TOTAL_MEMORY"; - - /** Memory limit. */ - private double mem = UNLIMITED; - - /** */ public static final String IGNITE_MEMORY_PER_NODE = "IGNITE_MEMORY_PER_NODE"; /** Memory limit. */ @@ -72,25 +60,7 @@ public class ClusterProperties { public static final String IGNITE_NODE_COUNT = "IGNITE_NODE_COUNT"; /** Node count limit. */ - private double nodeCnt = UNLIMITED; - - /** */ - public static final String IGNITE_MIN_CPU_PER_NODE = "IGNITE_MIN_CPU_PER_NODE"; - - /** */ - public static final double DEFAULT_RESOURCE_MIN_CPU = 1; - - /** Min memory per node. */ - private double minCpu = DEFAULT_RESOURCE_MIN_CPU; - - /** */ - public static final String IGNITE_MIN_MEMORY_PER_NODE = "IGNITE_MIN_MEMORY_PER_NODE"; - - /** */ - public static final double DEFAULT_RESOURCE_MIN_MEM = 256; - - /** Min memory per node. */ - private double minMemory = DEFAULT_RESOURCE_MIN_MEM; + private double nodeCnt = 3; /** */ public static final String IGNITE_VERSION = "IGNITE_VERSION"; @@ -170,19 +140,6 @@ public class ClusterProperties { return clusterName; } - /** - * @return CPU count limit. - */ - public double cpus() { - return cpu; - } - - /** - * Sets CPU count limit. - */ - public void cpus(double cpu) { - this.cpu = cpu; - } /** * @return CPU count limit. @@ -201,22 +158,6 @@ public class ClusterProperties { /** * @return mem limit. */ - public double memory() { - return mem; - } - - /** - * Sets mem limit. - * - * @param mem Memory. - */ - public void memory(double mem) { - this.mem = mem; - } - - /** - * @return mem limit. - */ public double memoryPerNode() { return memPerNode; } @@ -238,22 +179,6 @@ public class ClusterProperties { } /** - * @return min memory per node. - */ - public double minMemoryPerNode() { - return minMemory; - } - - /** - * Sets min memory. - * - * @param minMemory Min memory. - */ - public void minMemoryPerNode(double minMemory) { - this.minMemory = minMemory; - } - - /** * Sets hostname constraint. * * @param pattern Hostname pattern. @@ -263,22 +188,6 @@ public class ClusterProperties { } /** - * @return min cpu count per node. - */ - public double minCpuPerNode() { - return minCpu; - } - - /** - * Sets min cpu count per node. - * - * @param minCpu min cpu count per node. - */ - public void minCpuPerNode(double minCpu) { - this.minCpu = minCpu; - } - - /** * @return Ignite version. */ public String igniteVer() { @@ -362,13 +271,9 @@ public class ClusterProperties { prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, props, null); prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, props, null); - prop.cpu = getDoubleProperty(IGNITE_TOTAL_CPU, props, UNLIMITED); - prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, UNLIMITED); - prop.mem = getDoubleProperty(IGNITE_TOTAL_MEMORY, props, UNLIMITED); - prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, UNLIMITED); - prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, UNLIMITED); - prop.minCpu = getDoubleProperty(IGNITE_MIN_CPU_PER_NODE, props, DEFAULT_RESOURCE_MIN_CPU); - prop.minMemory = getDoubleProperty(IGNITE_MIN_MEMORY_PER_NODE, props, DEFAULT_RESOURCE_MIN_MEM); + prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, props, 1.0); + prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, props, 2048.0); + prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, props, 2.0); prop.igniteVer = getStringProperty(IGNITE_VERSION, props, DEFAULT_IGNITE_VERSION); prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, props, DEFAULT_IGNITE_WORK_DIR); @@ -394,6 +299,40 @@ public class ClusterProperties { } /** + * @return Cluster configuration. + */ + public static ClusterProperties from() { + ClusterProperties prop = new ClusterProperties(); + + prop.clusterName = getStringProperty(IGNITE_CLUSTER_NAME, null, DEFAULT_CLUSTER_NAME); + + prop.userLibsUrl = getStringProperty(IGNITE_USERS_LIBS_URL, null, null); + prop.igniteCfgUrl = getStringProperty(IGNITE_CONFIG_XML_URL, null, null); + + prop.cpuPerNode = getDoubleProperty(IGNITE_RUN_CPU_PER_NODE, null, 1.0); + prop.memPerNode = getDoubleProperty(IGNITE_MEMORY_PER_NODE, null, 2048.0); + prop.nodeCnt = getDoubleProperty(IGNITE_NODE_COUNT, null, 2.0); + + prop.igniteVer = getStringProperty(IGNITE_VERSION, null, DEFAULT_IGNITE_VERSION); + prop.igniteWorkDir = getStringProperty(IGNITE_WORKING_DIR, null, DEFAULT_IGNITE_WORK_DIR); + prop.igniteCfg = getStringProperty(IGNITE_CONFIG_XML, null, null); + prop.userLibs = getStringProperty(IGNITE_USERS_LIBS, null, null); + + String pattern = getStringProperty(IGNITE_HOSTNAME_CONSTRAINT, null, null); + + if (pattern != null) { + try { + prop.hostnameConstraint = Pattern.compile(pattern); + } + catch (PatternSyntaxException e) { + log.log(Level.WARNING, "IGNITE_HOSTNAME_CONSTRAINT has invalid pattern. It will be ignore.", e); + } + } + + return prop; + } + + /** * Convert to properties to map. * * @return Key-value map. @@ -406,13 +345,9 @@ public class ClusterProperties { envs.put(IGNITE_USERS_LIBS_URL, toEnvVal(userLibsUrl)); envs.put(IGNITE_CONFIG_XML_URL, toEnvVal(igniteCfgUrl)); - envs.put(IGNITE_TOTAL_CPU, toEnvVal(cpu)); envs.put(IGNITE_RUN_CPU_PER_NODE, toEnvVal(cpuPerNode)); - envs.put(IGNITE_TOTAL_MEMORY, toEnvVal(mem)); envs.put(IGNITE_MEMORY_PER_NODE, toEnvVal(memPerNode)); envs.put(IGNITE_NODE_COUNT, toEnvVal(nodeCnt)); - envs.put(IGNITE_MIN_CPU_PER_NODE, toEnvVal(minCpu)); - envs.put(IGNITE_MIN_MEMORY_PER_NODE, toEnvVal(minMemory)); envs.put(IGNITE_VERSION, toEnvVal(igniteVer)); envs.put(IGNITE_WORKING_DIR, toEnvVal(igniteWorkDir)); @@ -461,7 +396,7 @@ public class ClusterProperties { /** * @param val Value. - * @return If val is null {@link EMPTY_STRING} else to string. + * @return If val is null {@code EMPTY_STRING} else to string. */ private String toEnvVal(Object val) { return val == null ? EMPTY_STRING : val.toString(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java index 4e3c285..a8b0342 100644 --- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java +++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteContainer.java @@ -17,12 +17,17 @@ package org.apache.ignite.yarn; +import org.apache.hadoop.yarn.api.records.*; + /** * Information about launched task. */ public class IgniteContainer { /** */ - public final String host; + public final ContainerId id; + + /** */ + public final NodeId nodeId; /** */ public final double cpuCores; @@ -33,21 +38,29 @@ public class IgniteContainer { /** * Ignite launched task. * - * @param host Host. + * @param nodeId Node id. * @param cpuCores Cpu cores count. * @param mem Memory */ - public IgniteContainer(String host, double cpuCores, double mem) { - this.host = host; + public IgniteContainer(ContainerId id, NodeId nodeId, double cpuCores, double mem) { + this.id = id; + this.nodeId = nodeId; this.cpuCores = cpuCores; this.mem = mem; } /** + * @return Id. + */ + public ContainerId id() { + return id; + } + + /** * @return Host. */ - public String host() { - return host; + public NodeId nodeId() { + return nodeId; } /** @@ -64,10 +77,12 @@ public class IgniteContainer { return mem; } - @Override - public String toString() { + /** + * {@inheritDoc} + */ + @Override public String toString() { return "IgniteTask " + - "host: [" + host + ']' + + "host: [" + nodeId.getHost() + ']' + ", cpuCores: [" + cpuCores + "]" + ", mem: [" + mem + "]"; } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java index c6e07cb..1ac2974 100644 --- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java +++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteProvider.java @@ -18,7 +18,7 @@ package org.apache.ignite.yarn; import org.apache.hadoop.fs.*; -import org.apache.ignite.yarn.utils.IgniteYarnUtils; +import org.apache.ignite.yarn.utils.*; import java.io.*; import java.net.*; @@ -26,7 +26,7 @@ import java.nio.channels.*; import java.util.*; /** - * Class downloads and stores Ignite. + * Downloads and stores Ignite. */ public class IgniteProvider { /** */ http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java index 0ab9e91..f74890d 100644 --- a/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java +++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/IgniteYarnClient.java @@ -26,6 +26,7 @@ import org.apache.ignite.yarn.utils.*; import java.io.*; import java.util.*; +import java.util.concurrent.*; import java.util.logging.*; import static org.apache.hadoop.yarn.api.ApplicationConstants.*; @@ -69,9 +70,6 @@ public class IgniteYarnClient { // Set up the container launch context for the application master ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); - System.out.println(Environment.JAVA_HOME.$() + "/bin/java -Xmx512m " + ApplicationMaster.class.getName() - + IgniteYarnUtils.SPACE + ignite.toUri()); - amContainer.setCommands( Collections.singletonList( Environment.JAVA_HOME.$() + "/bin/java -Xmx512m " + ApplicationMaster.class.getName() @@ -106,16 +104,18 @@ public class IgniteYarnClient { // Submit application ApplicationId appId = appContext.getApplicationId(); - System.out.println("Submitting application " + appId); + yarnClient.submitApplication(appContext); + log.log(Level.INFO, "Submitted application. Application id: [{0}]", appId); + ApplicationReport appReport = yarnClient.getApplicationReport(appId); YarnApplicationState appState = appReport.getYarnApplicationState(); while (appState != YarnApplicationState.FINISHED && appState != YarnApplicationState.KILLED && appState != YarnApplicationState.FAILED) { - Thread.sleep(100); + TimeUnit.SECONDS.sleep(1L); appReport = yarnClient.getApplicationReport(appId); @@ -124,8 +124,7 @@ public class IgniteYarnClient { yarnClient.killApplication(appId); - System.out.println("Application " + appId + " finished with state " + appState + " at " - + appReport.getFinishTime()); + log.log(Level.INFO, "Application [{0}] finished with state [{1}]", new Object[]{appId, appState}); } /** http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java index c47f1e8..6734307 100644 --- a/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java +++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/package-info.java @@ -17,6 +17,6 @@ /** * <!-- Package description. --> - * Contains classes to support integration with Apache Mesos. + * Contains classes to support integration with Apache Hadoop Yarn. */ package org.apache.ignite.yarn; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/858d2a3f/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java ---------------------------------------------------------------------- diff --git a/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java b/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java index 1e6c414..3b62411 100644 --- a/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java +++ b/modules/yarn/src/main/java/org/apache/ignite/yarn/utils/IgniteYarnUtils.java @@ -21,12 +21,10 @@ import org.apache.hadoop.fs.*; import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.util.*; -import java.io.IOException; - import static org.apache.hadoop.yarn.api.ApplicationConstants.*; /** - * + * Utils. */ public class IgniteYarnUtils { /** */