http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java new file mode 100644 index 0000000..04a96de --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java @@ -0,0 +1,960 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.jobtracker.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.apache.ignite.lang.*; +import org.apache.ignite.spi.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.locks.*; + +import static org.apache.ignite.internal.processors.hadoop.taskexecutor.HadoopTaskState.*; + +/** + * External process registry. Handles external process lifecycle. + */ +public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { + /** Hadoop context. */ + private HadoopContext ctx; + + /** */ + private String javaCmd; + + /** Logger. */ + private IgniteLogger log; + + /** Node process descriptor. */ + private HadoopProcessDescriptor nodeDesc; + + /** Output base. */ + private File outputBase; + + /** Path separator. */ + private String pathSep; + + /** Hadoop external communication. */ + private HadoopExternalCommunication comm; + + /** Starting processes. */ + private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8<>(); + + /** Starting processes. */ + private final ConcurrentMap<HadoopJobId, HadoopProcess> runningProcsByJobId = new ConcurrentHashMap8<>(); + + /** Busy lock. */ + private final GridSpinReadWriteLock busyLock = new GridSpinReadWriteLock(); + + /** Job tracker. */ + private HadoopJobTracker jobTracker; + + /** {@inheritDoc} */ + @Override public void start(HadoopContext ctx) throws IgniteCheckedException { + this.ctx = ctx; + + log = ctx.kernalContext().log(HadoopExternalTaskExecutor.class); + + outputBase = U.resolveWorkDirectory("hadoop", false); + + pathSep = System.getProperty("path.separator", U.isWindows() ? ";" : ":"); + + initJavaCommand(); + + comm = new HadoopExternalCommunication( + ctx.localNodeId(), + UUID.randomUUID(), + ctx.kernalContext().config().getMarshaller(), + log, + ctx.kernalContext().getSystemExecutorService(), + ctx.kernalContext().gridName()); + + comm.setListener(new MessageListener()); + + comm.start(); + + nodeDesc = comm.localProcessDescriptor(); + + ctx.kernalContext().ports().registerPort(nodeDesc.tcpPort(), IgnitePortProtocol.TCP, + HadoopExternalTaskExecutor.class); + + if (nodeDesc.sharedMemoryPort() != -1) + ctx.kernalContext().ports().registerPort(nodeDesc.sharedMemoryPort(), IgnitePortProtocol.TCP, + HadoopExternalTaskExecutor.class); + + jobTracker = ctx.jobTracker(); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + busyLock.writeLock(); + + try { + comm.stop(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to gracefully stop external hadoop communication server (will shutdown anyway)", e); + } + } + + /** {@inheritDoc} */ + @Override public void onJobStateChanged(final HadoopJobMetadata meta) { + final HadoopProcess proc = runningProcsByJobId.get(meta.jobId()); + + // If we have a local process for this job. + if (proc != null) { + if (log.isDebugEnabled()) + log.debug("Updating job information for remote task process [proc=" + proc + ", meta=" + meta + ']'); + + if (meta.phase() == HadoopJobPhase.PHASE_COMPLETE) { + if (log.isDebugEnabled()) + log.debug("Completed job execution, will terminate child process [jobId=" + meta.jobId() + + ", proc=" + proc + ']'); + + runningProcsByJobId.remove(meta.jobId()); + runningProcsByProcId.remove(proc.descriptor().processId()); + + proc.terminate(); + + return; + } + + if (proc.initFut.isDone()) { + if (!proc.initFut.isFailed()) + sendJobInfoUpdate(proc, meta); + else if (log.isDebugEnabled()) + log.debug("Failed to initialize child process (will skip job state notification) " + + "[jobId=" + meta.jobId() + ", meta=" + meta + ']'); + } + else { + proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { + @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { + try { + f.get(); + + sendJobInfoUpdate(proc, meta); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to initialize child process (will skip job state notification) " + + "[jobId=" + meta.jobId() + ", meta=" + meta + ", err=" + e + ']'); + } + + } + }); + } + } + else if (ctx.isParticipating(meta)) { + HadoopJob job; + + try { + job = jobTracker.job(meta.jobId(), meta.jobInfo()); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to get job: " + meta.jobId(), e); + + return; + } + + startProcess(job, meta.mapReducePlan()); + } + } + + /** {@inheritDoc} */ + @SuppressWarnings("ConstantConditions") + @Override public void run(final HadoopJob job, final Collection<HadoopTaskInfo> tasks) throws IgniteCheckedException { + if (!busyLock.tryReadLock()) { + if (log.isDebugEnabled()) + log.debug("Failed to start hadoop tasks (grid is stopping, will ignore)."); + + return; + } + + try { + HadoopProcess proc = runningProcsByJobId.get(job.id()); + + HadoopTaskType taskType = F.first(tasks).type(); + + if (taskType == HadoopTaskType.SETUP || taskType == HadoopTaskType.ABORT || + taskType == HadoopTaskType.COMMIT) { + if (proc == null || proc.terminated()) { + runningProcsByJobId.remove(job.id(), proc); + + // Start new process for ABORT task since previous processes were killed. + proc = startProcess(job, jobTracker.plan(job.id())); + + if (log.isDebugEnabled()) + log.debug("Starting new process for maintenance task [jobId=" + job.id() + + ", proc=" + proc + ", taskType=" + taskType + ']'); + } + } + else + assert proc != null : "Missing started process for task execution request: " + job.id() + + ", tasks=" + tasks; + + final HadoopProcess proc0 = proc; + + proc.initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { + @Override public void apply( + IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { + if (!busyLock.tryReadLock()) + return; + + try { + f.get(); + + proc0.addTasks(tasks); + + if (log.isDebugEnabled()) + log.debug("Sending task execution request to child process [jobId=" + job.id() + + ", proc=" + proc0 + ", tasks=" + tasks + ']'); + + sendExecutionRequest(proc0, job, tasks); + } + catch (IgniteCheckedException e) { + notifyTasksFailed(tasks, FAILED, e); + } + finally { + busyLock.readUnlock(); + } + } + }); + } + finally { + busyLock.readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public void cancelTasks(HadoopJobId jobId) { + HadoopProcess proc = runningProcsByJobId.get(jobId); + + if (proc != null) + proc.terminate(); + } + + /** + * Sends execution request to remote node. + * + * @param proc Process to send request to. + * @param job Job instance. + * @param tasks Collection of tasks to execute in started process. + */ + private void sendExecutionRequest(HadoopProcess proc, HadoopJob job, Collection<HadoopTaskInfo> tasks) + throws IgniteCheckedException { + // Must synchronize since concurrent process crash may happen and will receive onConnectionLost(). + proc.lock(); + + try { + if (proc.terminated()) { + notifyTasksFailed(tasks, CRASHED, null); + + return; + } + + HadoopTaskExecutionRequest req = new HadoopTaskExecutionRequest(); + + req.jobId(job.id()); + req.jobInfo(job.info()); + req.tasks(tasks); + + comm.sendMessage(proc.descriptor(), req); + } + finally { + proc.unlock(); + } + } + + /** + * @return External task metadata. + */ + private HadoopExternalTaskMetadata buildTaskMeta() { + HadoopExternalTaskMetadata meta = new HadoopExternalTaskMetadata(); + + meta.classpath(Arrays.asList(System.getProperty("java.class.path").split(File.pathSeparator))); + meta.jvmOptions(Arrays.asList("-Xmx1g", "-ea", "-XX:+UseConcMarkSweepGC", "-XX:+CMSClassUnloadingEnabled", + "-DIGNITE_HOME=" + U.getIgniteHome())); + + return meta; + } + + /** + * @param tasks Tasks to notify about. + * @param state Fail state. + * @param e Optional error. + */ + private void notifyTasksFailed(Iterable<HadoopTaskInfo> tasks, HadoopTaskState state, Throwable e) { + HadoopTaskStatus fail = new HadoopTaskStatus(state, e); + + for (HadoopTaskInfo task : tasks) + jobTracker.onTaskFinished(task, fail); + } + + /** + * Starts process template that will be ready to execute Hadoop tasks. + * + * @param job Job instance. + * @param plan Map reduce plan. + */ + private HadoopProcess startProcess(final HadoopJob job, final HadoopMapReducePlan plan) { + final UUID childProcId = UUID.randomUUID(); + + HadoopJobId jobId = job.id(); + + final HadoopProcessFuture fut = new HadoopProcessFuture(childProcId, jobId, ctx.kernalContext()); + + final HadoopProcess proc = new HadoopProcess(jobId, fut, plan.reducers(ctx.localNodeId())); + + HadoopProcess old = runningProcsByJobId.put(jobId, proc); + + assert old == null; + + old = runningProcsByProcId.put(childProcId, proc); + + assert old == null; + + ctx.kernalContext().closure().runLocalSafe(new Runnable() { + @Override public void run() { + if (!busyLock.tryReadLock()) { + fut.onDone(new IgniteCheckedException("Failed to start external process (grid is stopping).")); + + return; + } + + try { + HadoopExternalTaskMetadata startMeta = buildTaskMeta(); + + if (log.isDebugEnabled()) + log.debug("Created hadoop child process metadata for job [job=" + job + + ", childProcId=" + childProcId + ", taskMeta=" + startMeta + ']'); + + Process proc = startJavaProcess(childProcId, startMeta, job); + + BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream())); + + String line; + + // Read up all the process output. + while ((line = rdr.readLine()) != null) { + if (log.isDebugEnabled()) + log.debug("Tracing process output: " + line); + + if ("Started".equals(line)) { + // Process started successfully, it should not write anything more to the output stream. + if (log.isDebugEnabled()) + log.debug("Successfully started child process [childProcId=" + childProcId + + ", meta=" + job + ']'); + + fut.onProcessStarted(proc); + + break; + } + else if ("Failed".equals(line)) { + StringBuilder sb = new StringBuilder("Failed to start child process: " + job + "\n"); + + while ((line = rdr.readLine()) != null) + sb.append(" ").append(line).append("\n"); + + // Cut last character. + sb.setLength(sb.length() - 1); + + log.warning(sb.toString()); + + fut.onDone(new IgniteCheckedException(sb.toString())); + + break; + } + } + } + catch (Throwable e) { + fut.onDone(new IgniteCheckedException("Failed to initialize child process: " + job, e)); + } + finally { + busyLock.readUnlock(); + } + } + }, true); + + fut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { + @Override public void apply(IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { + try { + // Make sure there were no exceptions. + f.get(); + + prepareForJob(proc, job, plan); + } + catch (IgniteCheckedException ignore) { + // Exception is printed in future's onDone() method. + } + } + }); + + return proc; + } + + /** + * Checks that java local command is available. + * + * @throws IgniteCheckedException If initialization failed. + */ + private void initJavaCommand() throws IgniteCheckedException { + String javaHome = System.getProperty("java.home"); + + if (javaHome == null) + javaHome = System.getenv("JAVA_HOME"); + + if (javaHome == null) + throw new IgniteCheckedException("Failed to locate JAVA_HOME."); + + javaCmd = javaHome + File.separator + "bin" + File.separator + (U.isWindows() ? "java.exe" : "java"); + + try { + Process proc = new ProcessBuilder(javaCmd, "-version").redirectErrorStream(true).start(); + + Collection<String> out = readProcessOutput(proc); + + int res = proc.waitFor(); + + if (res != 0) + throw new IgniteCheckedException("Failed to execute 'java -version' command (process finished with nonzero " + + "code) [exitCode=" + res + ", javaCmd='" + javaCmd + "', msg=" + F.first(out) + ']'); + + if (log.isInfoEnabled()) { + log.info("Will use java for external task execution: "); + + for (String s : out) + log.info(" " + s); + } + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to check java for external task execution.", e); + } + catch (InterruptedException e) { + Thread.currentThread().interrupt(); + + throw new IgniteCheckedException("Failed to wait for process completion (thread got interrupted).", e); + } + } + + /** + * Reads process output line-by-line. + * + * @param proc Process to read output. + * @return Read lines. + * @throws IOException If read failed. + */ + private Collection<String> readProcessOutput(Process proc) throws IOException { + BufferedReader rdr = new BufferedReader(new InputStreamReader(proc.getInputStream())); + + Collection<String> res = new ArrayList<>(); + + String s; + + while ((s = rdr.readLine()) != null) + res.add(s); + + return res; + } + + /** + * Builds process from metadata. + * + * @param childProcId Child process ID. + * @param startMeta Metadata. + * @param job Job. + * @return Started process. + */ + private Process startJavaProcess(UUID childProcId, HadoopExternalTaskMetadata startMeta, + HadoopJob job) throws Exception { + String outFldr = jobWorkFolder(job.id()) + File.separator + childProcId; + + if (log.isDebugEnabled()) + log.debug("Will write process log output to: " + outFldr); + + List<String> cmd = new ArrayList<>(); + + File workDir = U.resolveWorkDirectory("", false); + + cmd.add(javaCmd); + cmd.addAll(startMeta.jvmOptions()); + cmd.add("-cp"); + cmd.add(buildClasspath(startMeta.classpath())); + cmd.add(HadoopExternalProcessStarter.class.getName()); + cmd.add("-cpid"); + cmd.add(String.valueOf(childProcId)); + cmd.add("-ppid"); + cmd.add(String.valueOf(nodeDesc.processId())); + cmd.add("-nid"); + cmd.add(String.valueOf(nodeDesc.parentNodeId())); + cmd.add("-addr"); + cmd.add(nodeDesc.address()); + cmd.add("-tport"); + cmd.add(String.valueOf(nodeDesc.tcpPort())); + cmd.add("-sport"); + cmd.add(String.valueOf(nodeDesc.sharedMemoryPort())); + cmd.add("-out"); + cmd.add(outFldr); + cmd.add("-wd"); + cmd.add(workDir.getAbsolutePath()); + + return new ProcessBuilder(cmd) + .redirectErrorStream(true) + .directory(workDir) + .start(); + } + + /** + * Gets job work folder. + * + * @param jobId Job ID. + * @return Job work folder. + */ + private String jobWorkFolder(HadoopJobId jobId) { + return outputBase + File.separator + "Job_" + jobId; + } + + /** + * @param cp Classpath collection. + * @return Classpath string. + */ + private String buildClasspath(Collection<String> cp) { + assert !cp.isEmpty(); + + StringBuilder sb = new StringBuilder(); + + for (String s : cp) + sb.append(s).append(pathSep); + + sb.setLength(sb.length() - 1); + + return sb.toString(); + } + + /** + * Sends job info update request to remote process. + * + * @param proc Process to send request to. + * @param meta Job metadata. + */ + private void sendJobInfoUpdate(HadoopProcess proc, HadoopJobMetadata meta) { + Map<Integer, HadoopProcessDescriptor> rdcAddrs = meta.reducersAddresses(); + + int rdcNum = meta.mapReducePlan().reducers(); + + HadoopProcessDescriptor[] addrs = null; + + if (rdcAddrs != null && rdcAddrs.size() == rdcNum) { + addrs = new HadoopProcessDescriptor[rdcNum]; + + for (int i = 0; i < rdcNum; i++) { + HadoopProcessDescriptor desc = rdcAddrs.get(i); + + assert desc != null : "Missing reducing address [meta=" + meta + ", rdc=" + i + ']'; + + addrs[i] = desc; + } + } + + try { + comm.sendMessage(proc.descriptor(), new HadoopJobInfoUpdateRequest(proc.jobId, meta.phase(), addrs)); + } + catch (IgniteCheckedException e) { + if (!proc.terminated()) { + log.error("Failed to send job state update message to remote child process (will kill the process) " + + "[jobId=" + proc.jobId + ", meta=" + meta + ']', e); + + proc.terminate(); + } + } + } + + /** + * Sends prepare request to remote process. + * + * @param proc Process to send request to. + * @param job Job. + * @param plan Map reduce plan. + */ + private void prepareForJob(HadoopProcess proc, HadoopJob job, HadoopMapReducePlan plan) { + try { + comm.sendMessage(proc.descriptor(), new HadoopPrepareForJobRequest(job.id(), job.info(), + plan.reducers(), plan.reducers(ctx.localNodeId()))); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send job prepare request to remote process [proc=" + proc + ", job=" + job + + ", plan=" + plan + ']', e); + + proc.terminate(); + } + } + + /** + * Processes task finished message. + * + * @param desc Remote process descriptor. + * @param taskMsg Task finished message. + */ + private void processTaskFinishedMessage(HadoopProcessDescriptor desc, HadoopTaskFinishedMessage taskMsg) { + HadoopProcess proc = runningProcsByProcId.get(desc.processId()); + + if (proc != null) + proc.removeTask(taskMsg.taskInfo()); + + jobTracker.onTaskFinished(taskMsg.taskInfo(), taskMsg.status()); + } + + /** + * + */ + private class MessageListener implements HadoopMessageListener { + /** {@inheritDoc} */ + @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) { + if (!busyLock.tryReadLock()) + return; + + try { + if (msg instanceof HadoopProcessStartedAck) { + HadoopProcess proc = runningProcsByProcId.get(desc.processId()); + + assert proc != null : "Missing child process for processId: " + desc; + + HadoopProcessFuture fut = proc.initFut; + + if (fut != null) + fut.onReplyReceived(desc); + // Safety. + else + log.warning("Failed to find process start future (will ignore): " + desc); + } + else if (msg instanceof HadoopTaskFinishedMessage) { + HadoopTaskFinishedMessage taskMsg = (HadoopTaskFinishedMessage)msg; + + processTaskFinishedMessage(desc, taskMsg); + } + else + log.warning("Unexpected message received by node [desc=" + desc + ", msg=" + msg + ']'); + } + finally { + busyLock.readUnlock(); + } + } + + /** {@inheritDoc} */ + @Override public void onConnectionLost(HadoopProcessDescriptor desc) { + if (!busyLock.tryReadLock()) + return; + + try { + if (desc == null) { + U.warn(log, "Handshake failed."); + + return; + } + + // Notify job tracker about failed tasks. + HadoopProcess proc = runningProcsByProcId.get(desc.processId()); + + if (proc != null) { + Collection<HadoopTaskInfo> tasks = proc.tasks(); + + if (!F.isEmpty(tasks)) { + log.warning("Lost connection with alive process (will terminate): " + desc); + + HadoopTaskStatus status = new HadoopTaskStatus(CRASHED, + new IgniteCheckedException("Failed to run tasks (external process finished unexpectedly): " + desc)); + + for (HadoopTaskInfo info : tasks) + jobTracker.onTaskFinished(info, status); + + runningProcsByJobId.remove(proc.jobId(), proc); + } + + // Safety. + proc.terminate(); + } + } + finally { + busyLock.readUnlock(); + } + } + } + + /** + * Hadoop process. + */ + private static class HadoopProcess extends ReentrantLock { + /** */ + private static final long serialVersionUID = 0L; + + /** Job ID. */ + private final HadoopJobId jobId; + + /** Process. */ + private Process proc; + + /** Init future. Completes when process is ready to receive messages. */ + private final HadoopProcessFuture initFut; + + /** Process descriptor. */ + private HadoopProcessDescriptor procDesc; + + /** Reducers planned for this process. */ + private Collection<Integer> reducers; + + /** Tasks. */ + private final Collection<HadoopTaskInfo> tasks = new ConcurrentLinkedDeque8<>(); + + /** Terminated flag. */ + private volatile boolean terminated; + + /** + * @param jobId Job ID. + * @param initFut Init future. + */ + private HadoopProcess(HadoopJobId jobId, HadoopProcessFuture initFut, + int[] reducers) { + this.jobId = jobId; + this.initFut = initFut; + + if (!F.isEmpty(reducers)) { + this.reducers = new ArrayList<>(reducers.length); + + for (int r : reducers) + this.reducers.add(r); + } + } + + /** + * @return Communication process descriptor. + */ + private HadoopProcessDescriptor descriptor() { + return procDesc; + } + + /** + * @return Job ID. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** + * Initialized callback. + * + * @param proc Java process representation. + * @param procDesc Process descriptor. + */ + private void onInitialized(Process proc, HadoopProcessDescriptor procDesc) { + this.proc = proc; + this.procDesc = procDesc; + } + + /** + * Terminates process (kills it). + */ + private void terminate() { + // Guard against concurrent message sending. + lock(); + + try { + terminated = true; + + if (!initFut.isDone()) + initFut.listenAsync(new CI1<IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>>>() { + @Override public void apply( + IgniteInternalFuture<IgniteBiTuple<Process, HadoopProcessDescriptor>> f) { + proc.destroy(); + } + }); + else + proc.destroy(); + } + finally { + unlock(); + } + } + + /** + * @return Terminated flag. + */ + private boolean terminated() { + return terminated; + } + + /** + * Sets process tasks. + * + * @param tasks Tasks to set. + */ + private void addTasks(Collection<HadoopTaskInfo> tasks) { + this.tasks.addAll(tasks); + } + + /** + * Removes task when it was completed. + * + * @param task Task to remove. + */ + private void removeTask(HadoopTaskInfo task) { + if (tasks != null) + tasks.remove(task); + } + + /** + * @return Collection of tasks. + */ + private Collection<HadoopTaskInfo> tasks() { + return tasks; + } + + /** + * @return Planned reducers. + */ + private Collection<Integer> reducers() { + return reducers; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopProcess.class, this); + } + } + + /** + * + */ + private class HadoopProcessFuture extends GridFutureAdapter<IgniteBiTuple<Process, HadoopProcessDescriptor>> { + /** */ + private static final long serialVersionUID = 0L; + + /** Child process ID. */ + private UUID childProcId; + + /** Job ID. */ + private HadoopJobId jobId; + + /** Process descriptor. */ + private HadoopProcessDescriptor desc; + + /** Running process. */ + private Process proc; + + /** Process started flag. */ + private volatile boolean procStarted; + + /** Reply received flag. */ + private volatile boolean replyReceived; + + /** Logger. */ + private final IgniteLogger log = HadoopExternalTaskExecutor.this.log; + + /** + * Empty constructor. + */ + public HadoopProcessFuture() { + // No-op. + } + + /** + * @param ctx Kernal context. + */ + private HadoopProcessFuture(UUID childProcId, HadoopJobId jobId, GridKernalContext ctx) { + super(ctx); + + this.childProcId = childProcId; + this.jobId = jobId; + } + + /** + * Process started callback. + */ + public void onProcessStarted(Process proc) { + this.proc = proc; + + procStarted = true; + + if (procStarted && replyReceived) + onDone(F.t(proc, desc)); + } + + /** + * Reply received callback. + */ + public void onReplyReceived(HadoopProcessDescriptor desc) { + assert childProcId.equals(desc.processId()); + + this.desc = desc; + + replyReceived = true; + + if (procStarted && replyReceived) + onDone(F.t(proc, desc)); + } + + /** {@inheritDoc} */ + @Override public boolean onDone(@Nullable IgniteBiTuple<Process, HadoopProcessDescriptor> res, + @Nullable Throwable err) { + if (err == null) { + HadoopProcess proc = runningProcsByProcId.get(childProcId); + + assert proc != null; + + assert proc.initFut == this; + + proc.onInitialized(res.get1(), res.get2()); + + if (!F.isEmpty(proc.reducers())) + jobTracker.onExternalMappersInitialized(jobId, proc.reducers(), desc); + } + else { + // Clean up since init failed. + runningProcsByJobId.remove(jobId); + runningProcsByProcId.remove(childProcId); + } + + if (super.onDone(res, err)) { + if (err == null) { + if (log.isDebugEnabled()) + log.debug("Initialized child process for external task execution [jobId=" + jobId + + ", desc=" + desc + ", initTime=" + duration() + ']'); + } + else + U.error(log, "Failed to initialize child process for external task execution [jobId=" + jobId + + ", desc=" + desc + ']', err); + + return true; + } + + return false; + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java new file mode 100644 index 0000000..f0acc9f --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskMetadata.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; + +/** + * External task metadata (classpath, JVM options) needed to start external process execution. + */ +public class HadoopExternalTaskMetadata { + /** Process classpath. */ + private Collection<String> classpath; + + /** JVM options. */ + @GridToStringInclude + private Collection<String> jvmOpts; + + /** + * @return JVM Options. + */ + public Collection<String> jvmOptions() { + return jvmOpts; + } + + /** + * @param jvmOpts JVM options. + */ + public void jvmOptions(Collection<String> jvmOpts) { + this.jvmOpts = jvmOpts; + } + + /** + * @return Classpath. + */ + public Collection<String> classpath() { + return classpath; + } + + /** + * @param classpath Classpath. + */ + public void classpath(Collection<String> classpath) { + this.classpath = classpath; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopExternalTaskMetadata.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java new file mode 100644 index 0000000..25c9408 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopJobInfoUpdateRequest.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Job info update request. + */ +public class HadoopJobInfoUpdateRequest implements HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Job ID. */ + @GridToStringInclude + private HadoopJobId jobId; + + /** Job phase. */ + @GridToStringInclude + private HadoopJobPhase jobPhase; + + /** Reducers addresses. */ + @GridToStringInclude + private HadoopProcessDescriptor[] reducersAddrs; + + /** + * Constructor required by {@link Externalizable}. + */ + public HadoopJobInfoUpdateRequest() { + // No-op. + } + + /** + * @param jobId Job ID. + * @param jobPhase Job phase. + * @param reducersAddrs Reducers addresses. + */ + public HadoopJobInfoUpdateRequest(HadoopJobId jobId, HadoopJobPhase jobPhase, + HadoopProcessDescriptor[] reducersAddrs) { + assert jobId != null; + + this.jobId = jobId; + this.jobPhase = jobPhase; + this.reducersAddrs = reducersAddrs; + } + + /** + * @return Job ID. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** + * @return Job phase. + */ + public HadoopJobPhase jobPhase() { + return jobPhase; + } + + /** + * @return Reducers addresses. + */ + public HadoopProcessDescriptor[] reducersAddresses() { + return reducersAddrs; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + jobId.writeExternal(out); + + out.writeObject(jobPhase); + U.writeArray(out, reducersAddrs); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = new HadoopJobId(); + jobId.readExternal(in); + + jobPhase = (HadoopJobPhase)in.readObject(); + reducersAddrs = (HadoopProcessDescriptor[])U.readArray(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopJobInfoUpdateRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java new file mode 100644 index 0000000..df44dd7 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopPrepareForJobRequest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Child process initialization request. + */ +public class HadoopPrepareForJobRequest implements HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Job ID. */ + @GridToStringInclude + private HadoopJobId jobId; + + /** Job info. */ + @GridToStringInclude + private HadoopJobInfo jobInfo; + + /** Total amount of reducers in the job. */ + @GridToStringInclude + private int totalReducersCnt; + + /** Reducers to be executed on current node. */ + @GridToStringInclude + private int[] locReducers; + + /** + * Constructor required by {@link Externalizable}. + */ + public HadoopPrepareForJobRequest() { + // No-op. + } + + /** + * @param jobId Job ID. + * @param jobInfo Job info. + * @param totalReducersCnt Number of reducers in the job. + * @param locReducers Reducers to be executed on current node. + */ + public HadoopPrepareForJobRequest(HadoopJobId jobId, HadoopJobInfo jobInfo, int totalReducersCnt, + int[] locReducers) { + assert jobId != null; + + this.jobId = jobId; + this.jobInfo = jobInfo; + this.totalReducersCnt = totalReducersCnt; + this.locReducers = locReducers; + } + + /** + * @return Job info. + */ + public HadoopJobInfo jobInfo() { + return jobInfo; + } + + /** + * @return Job ID. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** + * @return Reducers to be executed on current node. + */ + public int[] localReducers() { + return locReducers; + } + + /** + * @return Number of reducers in job. + */ + public int totalReducerCount() { + return totalReducersCnt; + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + jobId.writeExternal(out); + + out.writeObject(jobInfo); + out.writeInt(totalReducersCnt); + + U.writeIntArray(out, locReducers); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = new HadoopJobId(); + jobId.readExternal(in); + + jobInfo = (HadoopJobInfo)in.readObject(); + totalReducersCnt = in.readInt(); + + locReducers = U.readIntArray(in); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopPrepareForJobRequest.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java new file mode 100644 index 0000000..dea73c3 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessDescriptor.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Process descriptor used to identify process for which task is running. + */ +public class HadoopProcessDescriptor implements Serializable { + /** */ + private static final long serialVersionUID = 0L; + + /** Parent node ID. */ + private UUID parentNodeId; + + /** Process ID. */ + private UUID procId; + + /** Address. */ + private String addr; + + /** TCP port. */ + private int tcpPort; + + /** Shared memory port. */ + private int shmemPort; + + /** + * @param parentNodeId Parent node ID. + * @param procId Process ID. + */ + public HadoopProcessDescriptor(UUID parentNodeId, UUID procId) { + this.parentNodeId = parentNodeId; + this.procId = procId; + } + + /** + * Gets process ID. + * + * @return Process ID. + */ + public UUID processId() { + return procId; + } + + /** + * Gets parent node ID. + * + * @return Parent node ID. + */ + public UUID parentNodeId() { + return parentNodeId; + } + + /** + * Gets host address. + * + * @return Host address. + */ + public String address() { + return addr; + } + + /** + * Sets host address. + * + * @param addr Host address. + */ + public void address(String addr) { + this.addr = addr; + } + + /** + * @return Shared memory port. + */ + public int sharedMemoryPort() { + return shmemPort; + } + + /** + * Sets shared memory port. + * + * @param shmemPort Shared memory port. + */ + public void sharedMemoryPort(int shmemPort) { + this.shmemPort = shmemPort; + } + + /** + * @return TCP port. + */ + public int tcpPort() { + return tcpPort; + } + + /** + * Sets TCP port. + * + * @param tcpPort TCP port. + */ + public void tcpPort(int tcpPort) { + this.tcpPort = tcpPort; + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object o) { + if (this == o) + return true; + + if (!(o instanceof HadoopProcessDescriptor)) + return false; + + HadoopProcessDescriptor that = (HadoopProcessDescriptor)o; + + return parentNodeId.equals(that.parentNodeId) && procId.equals(that.procId); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + int result = parentNodeId.hashCode(); + + result = 31 * result + procId.hashCode(); + + return result; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopProcessDescriptor.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java new file mode 100644 index 0000000..49ff4bf --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopProcessStartedAck.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Process started message. + */ +public class HadoopProcessStartedAck implements HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopProcessStartedAck.class, this); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java new file mode 100644 index 0000000..05e12ef --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskExecutionRequest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.util.tostring.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; +import java.util.*; + +/** + * Message sent from node to child process to start task(s) execution. + */ +public class HadoopTaskExecutionRequest implements HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Job ID. */ + @GridToStringInclude + private HadoopJobId jobId; + + /** Job info. */ + @GridToStringInclude + private HadoopJobInfo jobInfo; + + /** Mappers. */ + @GridToStringInclude + private Collection<HadoopTaskInfo> tasks; + + /** + * @return Job ID. + */ + public HadoopJobId jobId() { + return jobId; + } + + /** + * @param jobId Job ID. + */ + public void jobId(HadoopJobId jobId) { + this.jobId = jobId; + } + + /** + * @return Jon info. + */ + public HadoopJobInfo jobInfo() { + return jobInfo; + } + + /** + * @param jobInfo Job info. + */ + public void jobInfo(HadoopJobInfo jobInfo) { + this.jobInfo = jobInfo; + } + + /** + * @return Tasks. + */ + public Collection<HadoopTaskInfo> tasks() { + return tasks; + } + + /** + * @param tasks Tasks. + */ + public void tasks(Collection<HadoopTaskInfo> tasks) { + this.tasks = tasks; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopTaskExecutionRequest.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + jobId.writeExternal(out); + + out.writeObject(jobInfo); + U.writeCollection(out, tasks); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + jobId = new HadoopJobId(); + jobId.readExternal(in); + + jobInfo = (HadoopJobInfo)in.readObject(); + tasks = U.readCollection(in); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java new file mode 100644 index 0000000..d3639c7 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopTaskFinishedMessage.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external; + +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Task finished message. Sent when local task finishes execution. + */ +public class HadoopTaskFinishedMessage implements HadoopMessage { + /** */ + private static final long serialVersionUID = 0L; + + /** Finished task info. */ + private HadoopTaskInfo taskInfo; + + /** Task finish status. */ + private HadoopTaskStatus status; + + /** + * Constructor required by {@link Externalizable}. + */ + public HadoopTaskFinishedMessage() { + // No-op. + } + + /** + * @param taskInfo Finished task info. + * @param status Task finish status. + */ + public HadoopTaskFinishedMessage(HadoopTaskInfo taskInfo, HadoopTaskStatus status) { + assert taskInfo != null; + assert status != null; + + this.taskInfo = taskInfo; + this.status = status; + } + + /** + * @return Finished task info. + */ + public HadoopTaskInfo taskInfo() { + return taskInfo; + } + + /** + * @return Task finish status. + */ + public HadoopTaskStatus status() { + return status; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(HadoopTaskFinishedMessage.class, this); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + taskInfo.writeExternal(out); + status.writeExternal(out); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + taskInfo = new HadoopTaskInfo(); + taskInfo.readExternal(in); + + status = new HadoopTaskStatus(); + status.readExternal(in); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java deleted file mode 100644 index 2d00222..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java +++ /dev/null @@ -1,440 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.processors.hadoop.shuffle.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*; - -/** - * Hadoop process base. - */ -@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") -public class GridHadoopChildProcessRunner { - /** Node process descriptor. */ - private GridHadoopProcessDescriptor nodeDesc; - - /** Message processing executor service. */ - private ExecutorService msgExecSvc; - - /** Task executor service. */ - private GridHadoopExecutorService execSvc; - - /** */ - protected GridUnsafeMemory mem = new GridUnsafeMemory(0); - - /** External communication. */ - private GridHadoopExternalCommunication comm; - - /** Logger. */ - private IgniteLogger log; - - /** Init guard. */ - private final AtomicBoolean initGuard = new AtomicBoolean(); - - /** Start time. */ - private long startTime; - - /** Init future. */ - private final GridFutureAdapterEx<?> initFut = new GridFutureAdapterEx<>(); - - /** Job instance. */ - private GridHadoopJob job; - - /** Number of uncompleted tasks. */ - private final AtomicInteger pendingTasks = new AtomicInteger(); - - /** Shuffle job. */ - private GridHadoopShuffleJob<GridHadoopProcessDescriptor> shuffleJob; - - /** Concurrent mappers. */ - private int concMappers; - - /** Concurrent reducers. */ - private int concReducers; - - /** - * Starts child process runner. - */ - public void start(GridHadoopExternalCommunication comm, GridHadoopProcessDescriptor nodeDesc, - ExecutorService msgExecSvc, IgniteLogger parentLog) - throws IgniteCheckedException { - this.comm = comm; - this.nodeDesc = nodeDesc; - this.msgExecSvc = msgExecSvc; - - comm.setListener(new MessageListener()); - log = parentLog.getLogger(GridHadoopChildProcessRunner.class); - - startTime = U.currentTimeMillis(); - - // At this point node knows that this process has started. - comm.sendMessage(this.nodeDesc, new GridHadoopProcessStartedAck()); - } - - /** - * Initializes process for task execution. - * - * @param req Initialization request. - */ - private void prepareProcess(GridHadoopPrepareForJobRequest req) { - if (initGuard.compareAndSet(false, true)) { - try { - if (log.isDebugEnabled()) - log.debug("Initializing external hadoop task: " + req); - - assert job == null; - - job = req.jobInfo().createJob(req.jobId(), log); - - job.initialize(true, nodeDesc.processId()); - - shuffleJob = new GridHadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem, - req.totalReducerCount(), req.localReducers()); - - initializeExecutors(req); - - if (log.isDebugEnabled()) - log.debug("External process initialized [initWaitTime=" + - (U.currentTimeMillis() - startTime) + ']'); - - initFut.onDone(null, null); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to initialize process: " + req, e); - - initFut.onDone(e); - } - } - else - log.warning("Duplicate initialize process request received (will ignore): " + req); - } - - /** - * @param req Task execution request. - */ - private void runTasks(final GridHadoopTaskExecutionRequest req) { - if (!initFut.isDone() && log.isDebugEnabled()) - log.debug("Will wait for process initialization future completion: " + req); - - initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - try { - // Make sure init was successful. - f.get(); - - boolean set = pendingTasks.compareAndSet(0, req.tasks().size()); - - assert set; - - GridHadoopTaskInfo info = F.first(req.tasks()); - - assert info != null; - - int size = info.type() == MAP ? concMappers : concReducers; - -// execSvc.setCorePoolSize(size); -// execSvc.setMaximumPoolSize(size); - - if (log.isDebugEnabled()) - log.debug("Set executor service size for task type [type=" + info.type() + - ", size=" + size + ']'); - - for (GridHadoopTaskInfo taskInfo : req.tasks()) { - if (log.isDebugEnabled()) - log.debug("Submitted task for external execution: " + taskInfo); - - execSvc.submit(new GridHadoopRunnableTask(log, job, mem, taskInfo, nodeDesc.parentNodeId()) { - @Override protected void onTaskFinished(GridHadoopTaskStatus status) { - onTaskFinished0(this, status); - } - - @Override protected GridHadoopTaskInput createInput(GridHadoopTaskContext ctx) - throws IgniteCheckedException { - return shuffleJob.input(ctx); - } - - @Override protected GridHadoopTaskOutput createOutput(GridHadoopTaskContext ctx) - throws IgniteCheckedException { - return shuffleJob.output(ctx); - } - }); - } - } - catch (IgniteCheckedException e) { - for (GridHadoopTaskInfo info : req.tasks()) - notifyTaskFinished(info, new GridHadoopTaskStatus(GridHadoopTaskState.FAILED, e), false); - } - } - }); - } - - /** - * Creates executor services. - * - * @param req Init child process request. - */ - private void initializeExecutors(GridHadoopPrepareForJobRequest req) { - int cpus = Runtime.getRuntime().availableProcessors(); -// -// concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus); -// concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, cpus); - - execSvc = new GridHadoopExecutorService(log, "", cpus * 2, 1024); - } - - /** - * Updates external process map so that shuffle can proceed with sending messages to reducers. - * - * @param req Update request. - */ - private void updateTasks(final GridHadoopJobInfoUpdateRequest req) { - initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> gridFut) { - assert initGuard.get(); - - assert req.jobId().equals(job.id()); - - if (req.reducersAddresses() != null) { - if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) { - shuffleJob.startSending("external", - new IgniteInClosure2X<GridHadoopProcessDescriptor, GridHadoopShuffleMessage>() { - @Override public void applyx(GridHadoopProcessDescriptor dest, - GridHadoopShuffleMessage msg) throws IgniteCheckedException { - comm.sendMessage(dest, msg); - } - }); - } - } - } - }); - } - - /** - * Stops all executors and running tasks. - */ - private void shutdown() { - if (execSvc != null) - execSvc.shutdown(5000); - - if (msgExecSvc != null) - msgExecSvc.shutdownNow(); - - try { - job.dispose(true); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to dispose job.", e); - } - } - - /** - * Notifies node about task finish. - * - * @param run Finished task runnable. - * @param status Task status. - */ - private void onTaskFinished0(GridHadoopRunnableTask run, GridHadoopTaskStatus status) { - GridHadoopTaskInfo info = run.taskInfo(); - - int pendingTasks0 = pendingTasks.decrementAndGet(); - - if (log.isDebugEnabled()) - log.debug("Hadoop task execution finished [info=" + info - + ", state=" + status.state() + ", waitTime=" + run.waitTime() + ", execTime=" + run.executionTime() + - ", pendingTasks=" + pendingTasks0 + - ", err=" + status.failCause() + ']'); - - assert info.type() == MAP || info.type() == REDUCE : "Only MAP or REDUCE tasks are supported."; - - boolean flush = pendingTasks0 == 0 && info.type() == MAP; - - notifyTaskFinished(info, status, flush); - } - - /** - * @param taskInfo Finished task info. - * @param status Task status. - */ - private void notifyTaskFinished(final GridHadoopTaskInfo taskInfo, final GridHadoopTaskStatus status, - boolean flush) { - - final GridHadoopTaskState state = status.state(); - final Throwable err = status.failCause(); - - if (!flush) { - try { - if (log.isDebugEnabled()) - log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + state + - ", err=" + err + ']'); - - comm.sendMessage(nodeDesc, new GridHadoopTaskFinishedMessage(taskInfo, status)); - } - catch (IgniteCheckedException e) { - log.error("Failed to send message to parent node (will terminate child process).", e); - - shutdown(); - - terminate(); - } - } - else { - if (log.isDebugEnabled()) - log.debug("Flushing shuffle messages before sending last task completion notification [taskInfo=" + - taskInfo + ", state=" + state + ", err=" + err + ']'); - - final long start = U.currentTimeMillis(); - - try { - shuffleJob.flush().listenAsync(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - long end = U.currentTimeMillis(); - - if (log.isDebugEnabled()) - log.debug("Finished flushing shuffle messages [taskInfo=" + taskInfo + - ", flushTime=" + (end - start) + ']'); - - try { - // Check for errors on shuffle. - f.get(); - - notifyTaskFinished(taskInfo, status, false); - } - catch (IgniteCheckedException e) { - log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo + - ", state=" + state + ", err=" + err + ']', e); - - notifyTaskFinished(taskInfo, - new GridHadoopTaskStatus(GridHadoopTaskState.FAILED, e), false); - } - } - }); - } - catch (IgniteCheckedException e) { - log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo + - ", state=" + state + ", err=" + err + ']', e); - - notifyTaskFinished(taskInfo, new GridHadoopTaskStatus(GridHadoopTaskState.FAILED, e), false); - } - } - } - - /** - * Checks if message was received from parent node and prints warning if not. - * - * @param desc Sender process ID. - * @param msg Received message. - * @return {@code True} if received from parent node. - */ - private boolean validateNodeMessage(GridHadoopProcessDescriptor desc, GridHadoopMessage msg) { - if (!nodeDesc.processId().equals(desc.processId())) { - log.warning("Received process control request from unknown process (will ignore) [desc=" + desc + - ", msg=" + msg + ']'); - - return false; - } - - return true; - } - - /** - * Stops execution of this process. - */ - private void terminate() { - System.exit(1); - } - - /** - * Message listener. - */ - private class MessageListener implements GridHadoopMessageListener { - /** {@inheritDoc} */ - @Override public void onMessageReceived(final GridHadoopProcessDescriptor desc, final GridHadoopMessage msg) { - if (msg instanceof GridHadoopTaskExecutionRequest) { - if (validateNodeMessage(desc, msg)) - runTasks((GridHadoopTaskExecutionRequest)msg); - } - else if (msg instanceof GridHadoopJobInfoUpdateRequest) { - if (validateNodeMessage(desc, msg)) - updateTasks((GridHadoopJobInfoUpdateRequest)msg); - } - else if (msg instanceof GridHadoopPrepareForJobRequest) { - if (validateNodeMessage(desc, msg)) - prepareProcess((GridHadoopPrepareForJobRequest)msg); - } - else if (msg instanceof GridHadoopShuffleMessage) { - if (log.isTraceEnabled()) - log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']'); - - initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - try { - GridHadoopShuffleMessage m = (GridHadoopShuffleMessage)msg; - - shuffleJob.onShuffleMessage(m); - - comm.sendMessage(desc, new GridHadoopShuffleAck(m.id(), m.jobId())); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e); - } - } - }); - } - else if (msg instanceof GridHadoopShuffleAck) { - if (log.isTraceEnabled()) - log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']'); - - shuffleJob.onShuffleAck((GridHadoopShuffleAck)msg); - } - else - log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']'); - } - - /** {@inheritDoc} */ - @Override public void onConnectionLost(GridHadoopProcessDescriptor desc) { - if (log.isDebugEnabled()) - log.debug("Lost connection with remote process: " + desc); - - if (desc == null) - U.warn(log, "Handshake failed."); - else if (desc.processId().equals(nodeDesc.processId())) { - log.warning("Child process lost connection with parent node (will terminate child process)."); - - shutdown(); - - terminate(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/6423cf02/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java deleted file mode 100644 index 5aeeeee..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.logger.log4j.*; -import org.apache.ignite.marshaller.optimized.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Hadoop external process base class. - */ -public class GridHadoopExternalProcessStarter { - /** Path to Log4j configuration file. */ - public static final String DFLT_LOG4J_CONFIG = "config/ignite-log4j.xml"; - - /** Arguments. */ - private Args args; - - /** System out. */ - private OutputStream out; - - /** System err. */ - private OutputStream err; - - /** - * @param args Parsed arguments. - */ - public GridHadoopExternalProcessStarter(Args args) { - this.args = args; - } - - /** - * @param cmdArgs Process arguments. - */ - public static void main(String[] cmdArgs) { - try { - Args args = arguments(cmdArgs); - - new GridHadoopExternalProcessStarter(args).run(); - } - catch (Exception e) { - System.err.println("Failed"); - - System.err.println(e.getMessage()); - - e.printStackTrace(System.err); - } - } - - /** - * - * @throws Exception - */ - public void run() throws Exception { - U.setWorkDirectory(args.workDir, U.getIgniteHome()); - - File outputDir = outputDirectory(); - - initializeStreams(outputDir); - - ExecutorService msgExecSvc = Executors.newFixedThreadPool( - Integer.getInteger("MSG_THREAD_POOL_SIZE", Runtime.getRuntime().availableProcessors() * 2)); - - IgniteLogger log = logger(outputDir); - - GridHadoopExternalCommunication comm = new GridHadoopExternalCommunication( - args.nodeId, - args.childProcId, - new OptimizedMarshaller(), - log, - msgExecSvc, - "external" - ); - - comm.start(); - - GridHadoopProcessDescriptor nodeDesc = new GridHadoopProcessDescriptor(args.nodeId, args.parentProcId); - nodeDesc.address(args.addr); - nodeDesc.tcpPort(args.tcpPort); - nodeDesc.sharedMemoryPort(args.shmemPort); - - GridHadoopChildProcessRunner runner = new GridHadoopChildProcessRunner(); - - runner.start(comm, nodeDesc, msgExecSvc, log); - - System.err.println("Started"); - System.err.flush(); - - System.setOut(new PrintStream(out)); - System.setErr(new PrintStream(err)); - } - - /** - * @param outputDir Directory for process output. - * @throws Exception - */ - private void initializeStreams(File outputDir) throws Exception { - out = new FileOutputStream(new File(outputDir, args.childProcId + ".out")); - err = new FileOutputStream(new File(outputDir, args.childProcId + ".err")); - } - - /** - * @return Path to output directory. - * @throws IOException If failed. - */ - private File outputDirectory() throws IOException { - File f = new File(args.out); - - if (!f.exists()) { - if (!f.mkdirs()) - throw new IOException("Failed to create output directory: " + args.out); - } - else { - if (f.isFile()) - throw new IOException("Output directory is a file: " + args.out); - } - - return f; - } - - /** - * @param outputDir Directory for process output. - * @return Logger. - */ - private IgniteLogger logger(final File outputDir) { - final URL url = U.resolveIgniteUrl(DFLT_LOG4J_CONFIG); - - Log4JLogger logger; - - try { - logger = url != null ? new Log4JLogger(url) : new Log4JLogger(true); - } - catch (IgniteCheckedException e) { - System.err.println("Failed to create URL-based logger. Will use default one."); - - e.printStackTrace(); - - logger = new Log4JLogger(true); - } - - logger.updateFilePath(new IgniteClosure<String, String>() { - @Override public String apply(String s) { - return new File(outputDir, args.childProcId + ".log").getAbsolutePath(); - } - }); - - return logger; - } - - /** - * @param processArgs Process arguments. - * @return Child process instance. - */ - private static Args arguments(String[] processArgs) throws Exception { - Args args = new Args(); - - for (int i = 0; i < processArgs.length; i++) { - String arg = processArgs[i]; - - switch (arg) { - case "-cpid": { - if (i == processArgs.length - 1) - throw new Exception("Missing process ID for '-cpid' parameter"); - - String procIdStr = processArgs[++i]; - - args.childProcId = UUID.fromString(procIdStr); - - break; - } - - case "-ppid": { - if (i == processArgs.length - 1) - throw new Exception("Missing process ID for '-ppid' parameter"); - - String procIdStr = processArgs[++i]; - - args.parentProcId = UUID.fromString(procIdStr); - - break; - } - - case "-nid": { - if (i == processArgs.length - 1) - throw new Exception("Missing node ID for '-nid' parameter"); - - String nodeIdStr = processArgs[++i]; - - args.nodeId = UUID.fromString(nodeIdStr); - - break; - } - - case "-addr": { - if (i == processArgs.length - 1) - throw new Exception("Missing node address for '-addr' parameter"); - - args.addr = processArgs[++i]; - - break; - } - - case "-tport": { - if (i == processArgs.length - 1) - throw new Exception("Missing tcp port for '-tport' parameter"); - - args.tcpPort = Integer.parseInt(processArgs[++i]); - - break; - } - - case "-sport": { - if (i == processArgs.length - 1) - throw new Exception("Missing shared memory port for '-sport' parameter"); - - args.shmemPort = Integer.parseInt(processArgs[++i]); - - break; - } - - case "-out": { - if (i == processArgs.length - 1) - throw new Exception("Missing output folder name for '-out' parameter"); - - args.out = processArgs[++i]; - - break; - } - - case "-wd": { - if (i == processArgs.length - 1) - throw new Exception("Missing work folder name for '-wd' parameter"); - - args.workDir = processArgs[++i]; - - break; - } - } - } - - return args; - } - - /** - * Execution arguments. - */ - private static class Args { - /** Process ID. */ - private UUID childProcId; - - /** Process ID. */ - private UUID parentProcId; - - /** Process ID. */ - private UUID nodeId; - - /** Node address. */ - private String addr; - - /** TCP port */ - private int tcpPort; - - /** Shmem port. */ - private int shmemPort = -1; - - /** Output folder. */ - private String out; - - /** Work directory. */ - private String workDir; - } -}