# IGNITE-386: WIP on internal namings (3).
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/1c4b00d4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/1c4b00d4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/1c4b00d4 Branch: refs/heads/ignite-386 Commit: 1c4b00d4131fef7e05a661516b2b2f8fd480bb2e Parents: 288709a Author: vozerov-gridgain <voze...@gridgain.com> Authored: Tue Mar 3 16:01:21 2015 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Mar 3 16:01:21 2015 +0300 ---------------------------------------------------------------------- .../processors/hadoop/HadoopClassLoader.java | 4 +- .../processors/hadoop/HadoopCounters.java | 4 +- .../processors/hadoop/HadoopDefaultJobInfo.java | 2 +- .../internal/processors/hadoop/HadoopUtils.java | 6 +- .../taskexecutor/GridHadoopRunnableTask.java | 268 ---- .../HadoopEmbeddedTaskExecutor.java | 16 +- .../taskexecutor/HadoopExecutorService.java | 4 +- .../hadoop/taskexecutor/HadoopRunnableTask.java | 268 ++++ .../external/HadoopExternalTaskExecutor.java | 8 +- .../child/GridHadoopChildProcessRunner.java | 440 ------ .../child/GridHadoopExternalProcessStarter.java | 296 ---- .../child/HadoopChildProcessRunner.java | 440 ++++++ .../child/HadoopExternalProcessStarter.java | 296 ++++ .../GridHadoopAbstractCommunicationClient.java | 96 -- .../GridHadoopCommunicationClient.java | 72 - .../GridHadoopExternalCommunication.java | 1431 ------------------ .../GridHadoopHandshakeTimeoutException.java | 42 - .../GridHadoopIpcToNioAdapter.java | 239 --- .../GridHadoopMarshallerFilter.java | 84 - .../GridHadoopMessageListener.java | 39 - .../GridHadoopTcpNioCommunicationClient.java | 99 -- .../HadoopAbstractCommunicationClient.java | 96 ++ .../HadoopCommunicationClient.java | 72 + .../HadoopExternalCommunication.java | 1431 ++++++++++++++++++ .../HadoopHandshakeTimeoutException.java | 42 + .../communication/HadoopIpcToNioAdapter.java | 239 +++ .../communication/HadoopMarshallerFilter.java | 84 + .../communication/HadoopMessageListener.java | 39 + .../HadoopTcpNioCommunicationClient.java | 99 ++ .../hadoop/v1/GridHadoopV1CleanupTask.java | 62 - .../hadoop/v1/GridHadoopV1Counter.java | 105 -- .../hadoop/v1/GridHadoopV1MapTask.java | 111 -- .../hadoop/v1/GridHadoopV1OutputCollector.java | 130 -- .../hadoop/v1/GridHadoopV1Partitioner.java | 44 - .../hadoop/v1/GridHadoopV1ReduceTask.java | 92 -- .../hadoop/v1/GridHadoopV1Reporter.java | 79 - .../hadoop/v1/GridHadoopV1SetupTask.java | 56 - .../hadoop/v1/GridHadoopV1Splitter.java | 97 -- .../processors/hadoop/v1/GridHadoopV1Task.java | 95 -- .../hadoop/v1/HadoopV1CleanupTask.java | 62 + .../processors/hadoop/v1/HadoopV1Counter.java | 105 ++ .../processors/hadoop/v1/HadoopV1MapTask.java | 111 ++ .../hadoop/v1/HadoopV1OutputCollector.java | 130 ++ .../hadoop/v1/HadoopV1Partitioner.java | 44 + .../hadoop/v1/HadoopV1ReduceTask.java | 92 ++ .../processors/hadoop/v1/HadoopV1Reporter.java | 79 + .../processors/hadoop/v1/HadoopV1SetupTask.java | 56 + .../processors/hadoop/v1/HadoopV1Splitter.java | 97 ++ .../processors/hadoop/v1/HadoopV1Task.java | 95 ++ .../hadoop/v2/GridHadoopExternalSplit.java | 87 -- .../hadoop/v2/GridHadoopNativeCodeLoader.java | 74 - .../v2/GridHadoopSerializationWrapper.java | 133 -- .../v2/GridHadoopShutdownHookManager.java | 96 -- .../hadoop/v2/GridHadoopSplitWrapper.java | 118 -- .../hadoop/v2/GridHadoopV2CleanupTask.java | 73 - .../hadoop/v2/GridHadoopV2Context.java | 230 --- .../hadoop/v2/GridHadoopV2Counter.java | 87 -- .../processors/hadoop/v2/GridHadoopV2Job.java | 280 ---- .../v2/GridHadoopV2JobResourceManager.java | 305 ---- .../hadoop/v2/GridHadoopV2MapTask.java | 109 -- .../hadoop/v2/GridHadoopV2Partitioner.java | 44 - .../hadoop/v2/GridHadoopV2ReduceTask.java | 88 -- .../hadoop/v2/GridHadoopV2SetupTask.java | 66 - .../hadoop/v2/GridHadoopV2Splitter.java | 105 -- .../processors/hadoop/v2/GridHadoopV2Task.java | 181 --- .../hadoop/v2/GridHadoopV2TaskContext.java | 443 ------ .../v2/GridHadoopWritableSerialization.java | 74 - .../hadoop/v2/HadoopExternalSplit.java | 87 ++ .../hadoop/v2/HadoopNativeCodeLoader.java | 74 + .../hadoop/v2/HadoopSerializationWrapper.java | 133 ++ .../hadoop/v2/HadoopShutdownHookManager.java | 96 ++ .../hadoop/v2/HadoopSplitWrapper.java | 118 ++ .../hadoop/v2/HadoopV2CleanupTask.java | 73 + .../processors/hadoop/v2/HadoopV2Context.java | 230 +++ .../processors/hadoop/v2/HadoopV2Counter.java | 87 ++ .../processors/hadoop/v2/HadoopV2Job.java | 280 ++++ .../hadoop/v2/HadoopV2JobResourceManager.java | 305 ++++ .../processors/hadoop/v2/HadoopV2MapTask.java | 109 ++ .../hadoop/v2/HadoopV2Partitioner.java | 44 + .../hadoop/v2/HadoopV2ReduceTask.java | 88 ++ .../processors/hadoop/v2/HadoopV2SetupTask.java | 66 + .../processors/hadoop/v2/HadoopV2Splitter.java | 105 ++ .../processors/hadoop/v2/HadoopV2Task.java | 181 +++ .../hadoop/v2/HadoopV2TaskContext.java | 443 ++++++ .../hadoop/v2/HadoopWritableSerialization.java | 74 + .../GridHadoopSerializationWrapperSelfTest.java | 74 - .../hadoop/GridHadoopSplitWrapperSelfTest.java | 68 - .../hadoop/GridHadoopTasksAllVersionsTest.java | 30 +- .../hadoop/GridHadoopTasksV1Test.java | 4 +- .../hadoop/GridHadoopTasksV2Test.java | 4 +- .../hadoop/GridHadoopTestTaskContext.java | 219 --- .../hadoop/GridHadoopV2JobSelfTest.java | 88 -- .../HadoopSerializationWrapperSelfTest.java | 74 + .../hadoop/HadoopSplitWrapperSelfTest.java | 68 + .../hadoop/HadoopTestTaskContext.java | 219 +++ .../processors/hadoop/HadoopV2JobSelfTest.java | 88 ++ .../collections/GridHadoopAbstractMapTest.java | 4 +- ...GridHadoopExternalCommunicationSelfTest.java | 209 --- .../HadoopExternalCommunicationSelfTest.java | 209 +++ .../testsuites/IgniteHadoopTestSuite.java | 8 +- 100 files changed, 7275 insertions(+), 7275 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java index 6915d17..2f484d8 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopClassLoader.java @@ -93,9 +93,9 @@ public class HadoopClassLoader extends URLClassLoader { try { if (isHadoop(name)) { // Always load Hadoop classes explicitly, since Hadoop can be available in App classpath. if (name.endsWith(".util.ShutdownHookManager")) // Dirty hack to get rid of Hadoop shutdown hooks. - return loadFromBytes(name, GridHadoopShutdownHookManager.class.getName()); + return loadFromBytes(name, HadoopShutdownHookManager.class.getName()); else if (name.endsWith(".util.NativeCodeLoader")) - return loadFromBytes(name, GridHadoopNativeCodeLoader.class.getName()); + return loadFromBytes(name, HadoopNativeCodeLoader.class.getName()); return loadClassExplicitly(name, resolve); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java index ad699ec..3482640 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopCounters.java @@ -186,7 +186,7 @@ public class HadoopCounters extends Counters { for (HadoopLongCounter counter : cntrs.values()) { if (grpName.equals(counter.group())) - grpCounters.add(new GridHadoopV2Counter(counter)); + grpCounters.add(new HadoopV2Counter(counter)); } return grpCounters.iterator(); @@ -211,6 +211,6 @@ public class HadoopCounters extends Counters { cntrs.put(key, new HadoopLongCounter(grpName,cntrName)); } - return internalCntr == null ? null : new GridHadoopV2Counter(internalCntr); + return internalCntr == null ? null : new HadoopV2Counter(internalCntr); } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java index 370b82d..2f44778 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopDefaultJobInfo.java @@ -91,7 +91,7 @@ public class HadoopDefaultJobInfo implements GridHadoopJobInfo, Externalizable { if ((jobCls0 = jobCls) == null) { HadoopClassLoader ldr = new HadoopClassLoader(null); - jobCls = jobCls0 = ldr.loadClass(GridHadoopV2Job.class.getName()); + jobCls = jobCls0 = ldr.loadClass(HadoopV2Job.class.getName()); } } } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java index 46594ce..62b5a98 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java @@ -65,7 +65,7 @@ public class HadoopUtils { * @param hosts Hosts. * @throws IOException If failed. */ - public static GridHadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException { + public static HadoopSplitWrapper wrapSplit(int id, Object split, String[] hosts) throws IOException { ByteArrayOutputStream arr = new ByteArrayOutputStream(); ObjectOutput out = new ObjectOutputStream(arr); @@ -75,7 +75,7 @@ public class HadoopUtils { out.flush(); - return new GridHadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts); + return new HadoopSplitWrapper(id, split.getClass().getName(), arr.toByteArray(), hosts); } /** @@ -84,7 +84,7 @@ public class HadoopUtils { * @param o Wrapper. * @return Split. */ - public static Object unwrapSplit(GridHadoopSplitWrapper o) { + public static Object unwrapSplit(HadoopSplitWrapper o) { try { Writable w = (Writable)HadoopUtils.class.getClassLoader().loadClass(o.className()).newInstance(); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java deleted file mode 100644 index 1ce7d4a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/GridHadoopRunnableTask.java +++ /dev/null @@ -1,268 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.counter.*; -import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.*; -import java.util.concurrent.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*; -import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*; - -/** - * Runnable task. - */ -public abstract class GridHadoopRunnableTask implements Callable<Void> { - /** */ - private final GridUnsafeMemory mem; - - /** */ - private final IgniteLogger log; - - /** */ - private final GridHadoopJob job; - - /** Task to run. */ - private final GridHadoopTaskInfo info; - - /** Submit time. */ - private final long submitTs = U.currentTimeMillis(); - - /** Execution start timestamp. */ - private long execStartTs; - - /** Execution end timestamp. */ - private long execEndTs; - - /** */ - private HadoopMultimap combinerInput; - - /** */ - private volatile GridHadoopTaskContext ctx; - - /** Set if task is to cancelling. */ - private volatile boolean cancelled; - - /** Node id. */ - private UUID nodeId; - - /** - * @param log Log. - * @param job Job. - * @param mem Memory. - * @param info Task info. - * @param nodeId Node id. - */ - protected GridHadoopRunnableTask(IgniteLogger log, GridHadoopJob job, GridUnsafeMemory mem, GridHadoopTaskInfo info, - UUID nodeId) { - this.nodeId = nodeId; - this.log = log.getLogger(GridHadoopRunnableTask.class); - this.job = job; - this.mem = mem; - this.info = info; - } - - /** - * @return Wait time. - */ - public long waitTime() { - return execStartTs - submitTs; - } - - /** - * @return Execution time. - */ - public long executionTime() { - return execEndTs - execStartTs; - } - - /** {@inheritDoc} */ - @Override public Void call() throws IgniteCheckedException { - execStartTs = U.currentTimeMillis(); - - Throwable err = null; - - HadoopTaskState state = HadoopTaskState.COMPLETED; - - HadoopPerformanceCounter perfCntr = null; - - try { - ctx = job.getTaskContext(info); - - perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId); - - perfCntr.onTaskSubmit(info, submitTs); - perfCntr.onTaskPrepare(info, execStartTs); - - ctx.prepareTaskEnvironment(); - - runTask(perfCntr); - - if (info.type() == MAP && job.info().hasCombiner()) { - ctx.taskInfo(new GridHadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null)); - - try { - runTask(perfCntr); - } - finally { - ctx.taskInfo(info); - } - } - } - catch (HadoopTaskCancelledException ignored) { - state = HadoopTaskState.CANCELED; - } - catch (Throwable e) { - state = HadoopTaskState.FAILED; - err = e; - - U.error(log, "Task execution failed.", e); - } - finally { - execEndTs = U.currentTimeMillis(); - - if (perfCntr != null) - perfCntr.onTaskFinish(info, execEndTs); - - onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : ctx.counters())); - - if (combinerInput != null) - combinerInput.close(); - - if (ctx != null) - ctx.cleanupTaskEnvironment(); - } - - return null; - } - - /** - * @param perfCntr Performance counter. - * @throws IgniteCheckedException If failed. - */ - private void runTask(HadoopPerformanceCounter perfCntr) throws IgniteCheckedException { - if (cancelled) - throw new HadoopTaskCancelledException("Task cancelled."); - - try (GridHadoopTaskOutput out = createOutputInternal(ctx); - GridHadoopTaskInput in = createInputInternal(ctx)) { - - ctx.input(in); - ctx.output(out); - - perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis()); - - ctx.run(); - } - } - - /** - * Cancel the executed task. - */ - public void cancel() { - cancelled = true; - - if (ctx != null) - ctx.cancel(); - } - - /** - * @param status Task status. - */ - protected abstract void onTaskFinished(HadoopTaskStatus status); - - /** - * @param ctx Task context. - * @return Task input. - * @throws IgniteCheckedException If failed. - */ - @SuppressWarnings("unchecked") - private GridHadoopTaskInput createInputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException { - switch (ctx.taskInfo().type()) { - case SETUP: - case MAP: - case COMMIT: - case ABORT: - return null; - - case COMBINE: - assert combinerInput != null; - - return combinerInput.input(ctx); - - default: - return createInput(ctx); - } - } - - /** - * @param ctx Task context. - * @return Input. - * @throws IgniteCheckedException If failed. - */ - protected abstract GridHadoopTaskInput createInput(GridHadoopTaskContext ctx) throws IgniteCheckedException; - - /** - * @param ctx Task info. - * @return Output. - * @throws IgniteCheckedException If failed. - */ - protected abstract GridHadoopTaskOutput createOutput(GridHadoopTaskContext ctx) throws IgniteCheckedException; - - /** - * @param ctx Task info. - * @return Task output. - * @throws IgniteCheckedException If failed. - */ - private GridHadoopTaskOutput createOutputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException { - switch (ctx.taskInfo().type()) { - case SETUP: - case REDUCE: - case COMMIT: - case ABORT: - return null; - - case MAP: - if (job.info().hasCombiner()) { - assert combinerInput == null; - - combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ? - new HadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)): - new HadoopSkipList(job.info(), mem); // TODO replace with red-black tree - - return combinerInput.startAdding(ctx); - } - - default: - return createOutput(ctx); - } - } - - /** - * @return Task info. - */ - public GridHadoopTaskInfo taskInfo() { - return info; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java index 934ff35..e217c57 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopEmbeddedTaskExecutor.java @@ -35,7 +35,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { private HadoopJobTracker jobTracker; /** */ - private final ConcurrentMap<GridHadoopJobId, Collection<GridHadoopRunnableTask>> jobs = new ConcurrentHashMap<>(); + private final ConcurrentMap<GridHadoopJobId, Collection<HadoopRunnableTask>> jobs = new ConcurrentHashMap<>(); /** Executor service to run tasks. */ private HadoopExecutorService exec; @@ -74,22 +74,22 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { log.debug("Submitting tasks for local execution [locNodeId=" + ctx.localNodeId() + ", tasksCnt=" + tasks.size() + ']'); - Collection<GridHadoopRunnableTask> executedTasks = jobs.get(job.id()); + Collection<HadoopRunnableTask> executedTasks = jobs.get(job.id()); if (executedTasks == null) { executedTasks = new GridConcurrentHashSet<>(); - Collection<GridHadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks); + Collection<HadoopRunnableTask> extractedCol = jobs.put(job.id(), executedTasks); assert extractedCol == null; } - final Collection<GridHadoopRunnableTask> finalExecutedTasks = executedTasks; + final Collection<HadoopRunnableTask> finalExecutedTasks = executedTasks; for (final GridHadoopTaskInfo info : tasks) { assert info != null; - GridHadoopRunnableTask task = new GridHadoopRunnableTask(log, job, ctx.shuffle().memory(), info, + HadoopRunnableTask task = new HadoopRunnableTask(log, job, ctx.shuffle().memory(), info, ctx.localNodeId()) { @Override protected void onTaskFinished(HadoopTaskStatus status) { if (log.isDebugEnabled()) @@ -127,10 +127,10 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { * @param jobId Job ID to cancel. */ @Override public void cancelTasks(GridHadoopJobId jobId) { - Collection<GridHadoopRunnableTask> executedTasks = jobs.get(jobId); + Collection<HadoopRunnableTask> executedTasks = jobs.get(jobId); if (executedTasks != null) { - for (GridHadoopRunnableTask task : executedTasks) + for (HadoopRunnableTask task : executedTasks) task.cancel(); } } @@ -138,7 +138,7 @@ public class HadoopEmbeddedTaskExecutor extends HadoopTaskExecutorAdapter { /** {@inheritDoc} */ @Override public void onJobStateChanged(HadoopJobMetadata meta) throws IgniteCheckedException { if (meta.phase() == GridHadoopJobPhase.PHASE_COMPLETE) { - Collection<GridHadoopRunnableTask> executedTasks = jobs.remove(meta.jobId()); + Collection<HadoopRunnableTask> executedTasks = jobs.remove(meta.jobId()); assert executedTasks == null || executedTasks.isEmpty(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java index 19f903f..d89d7d0 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopExecutorService.java @@ -169,8 +169,8 @@ public class HadoopExecutorService { private void startThread(final Callable<?> task) { String workerName; - if (task instanceof GridHadoopRunnableTask) { - final GridHadoopTaskInfo i = ((GridHadoopRunnableTask)task).taskInfo(); + if (task instanceof HadoopRunnableTask) { + final GridHadoopTaskInfo i = ((HadoopRunnableTask)task).taskInfo(); workerName = "Hadoop-task-" + i.jobId() + "-" + i.type() + "-" + i.taskNumber() + "-" + i.attempt(); } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java new file mode 100644 index 0000000..5b10d6f --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/HadoopRunnableTask.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor; + +import org.apache.ignite.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.counter.*; +import org.apache.ignite.internal.processors.hadoop.shuffle.collections.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.*; +import java.util.concurrent.*; + +import static org.apache.ignite.internal.processors.hadoop.GridHadoopJobProperty.*; +import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*; + +/** + * Runnable task. + */ +public abstract class HadoopRunnableTask implements Callable<Void> { + /** */ + private final GridUnsafeMemory mem; + + /** */ + private final IgniteLogger log; + + /** */ + private final GridHadoopJob job; + + /** Task to run. */ + private final GridHadoopTaskInfo info; + + /** Submit time. */ + private final long submitTs = U.currentTimeMillis(); + + /** Execution start timestamp. */ + private long execStartTs; + + /** Execution end timestamp. */ + private long execEndTs; + + /** */ + private HadoopMultimap combinerInput; + + /** */ + private volatile GridHadoopTaskContext ctx; + + /** Set if task is to cancelling. */ + private volatile boolean cancelled; + + /** Node id. */ + private UUID nodeId; + + /** + * @param log Log. + * @param job Job. + * @param mem Memory. + * @param info Task info. + * @param nodeId Node id. + */ + protected HadoopRunnableTask(IgniteLogger log, GridHadoopJob job, GridUnsafeMemory mem, GridHadoopTaskInfo info, + UUID nodeId) { + this.nodeId = nodeId; + this.log = log.getLogger(HadoopRunnableTask.class); + this.job = job; + this.mem = mem; + this.info = info; + } + + /** + * @return Wait time. + */ + public long waitTime() { + return execStartTs - submitTs; + } + + /** + * @return Execution time. + */ + public long executionTime() { + return execEndTs - execStartTs; + } + + /** {@inheritDoc} */ + @Override public Void call() throws IgniteCheckedException { + execStartTs = U.currentTimeMillis(); + + Throwable err = null; + + HadoopTaskState state = HadoopTaskState.COMPLETED; + + HadoopPerformanceCounter perfCntr = null; + + try { + ctx = job.getTaskContext(info); + + perfCntr = HadoopPerformanceCounter.getCounter(ctx.counters(), nodeId); + + perfCntr.onTaskSubmit(info, submitTs); + perfCntr.onTaskPrepare(info, execStartTs); + + ctx.prepareTaskEnvironment(); + + runTask(perfCntr); + + if (info.type() == MAP && job.info().hasCombiner()) { + ctx.taskInfo(new GridHadoopTaskInfo(COMBINE, info.jobId(), info.taskNumber(), info.attempt(), null)); + + try { + runTask(perfCntr); + } + finally { + ctx.taskInfo(info); + } + } + } + catch (HadoopTaskCancelledException ignored) { + state = HadoopTaskState.CANCELED; + } + catch (Throwable e) { + state = HadoopTaskState.FAILED; + err = e; + + U.error(log, "Task execution failed.", e); + } + finally { + execEndTs = U.currentTimeMillis(); + + if (perfCntr != null) + perfCntr.onTaskFinish(info, execEndTs); + + onTaskFinished(new HadoopTaskStatus(state, err, ctx==null ? null : ctx.counters())); + + if (combinerInput != null) + combinerInput.close(); + + if (ctx != null) + ctx.cleanupTaskEnvironment(); + } + + return null; + } + + /** + * @param perfCntr Performance counter. + * @throws IgniteCheckedException If failed. + */ + private void runTask(HadoopPerformanceCounter perfCntr) throws IgniteCheckedException { + if (cancelled) + throw new HadoopTaskCancelledException("Task cancelled."); + + try (GridHadoopTaskOutput out = createOutputInternal(ctx); + GridHadoopTaskInput in = createInputInternal(ctx)) { + + ctx.input(in); + ctx.output(out); + + perfCntr.onTaskStart(ctx.taskInfo(), U.currentTimeMillis()); + + ctx.run(); + } + } + + /** + * Cancel the executed task. + */ + public void cancel() { + cancelled = true; + + if (ctx != null) + ctx.cancel(); + } + + /** + * @param status Task status. + */ + protected abstract void onTaskFinished(HadoopTaskStatus status); + + /** + * @param ctx Task context. + * @return Task input. + * @throws IgniteCheckedException If failed. + */ + @SuppressWarnings("unchecked") + private GridHadoopTaskInput createInputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException { + switch (ctx.taskInfo().type()) { + case SETUP: + case MAP: + case COMMIT: + case ABORT: + return null; + + case COMBINE: + assert combinerInput != null; + + return combinerInput.input(ctx); + + default: + return createInput(ctx); + } + } + + /** + * @param ctx Task context. + * @return Input. + * @throws IgniteCheckedException If failed. + */ + protected abstract GridHadoopTaskInput createInput(GridHadoopTaskContext ctx) throws IgniteCheckedException; + + /** + * @param ctx Task info. + * @return Output. + * @throws IgniteCheckedException If failed. + */ + protected abstract GridHadoopTaskOutput createOutput(GridHadoopTaskContext ctx) throws IgniteCheckedException; + + /** + * @param ctx Task info. + * @return Task output. + * @throws IgniteCheckedException If failed. + */ + private GridHadoopTaskOutput createOutputInternal(GridHadoopTaskContext ctx) throws IgniteCheckedException { + switch (ctx.taskInfo().type()) { + case SETUP: + case REDUCE: + case COMMIT: + case ABORT: + return null; + + case MAP: + if (job.info().hasCombiner()) { + assert combinerInput == null; + + combinerInput = get(job.info(), SHUFFLE_COMBINER_NO_SORTING, false) ? + new HadoopHashMultimap(job.info(), mem, get(job.info(), COMBINER_HASHMAP_SIZE, 8 * 1024)): + new HadoopSkipList(job.info(), mem); // TODO replace with red-black tree + + return combinerInput.startAdding(ctx); + } + + default: + return createOutput(ctx); + } + } + + /** + * @return Task info. + */ + public GridHadoopTaskInfo taskInfo() { + return info; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java index 10ad648..f05761e 100644 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/HadoopExternalTaskExecutor.java @@ -64,7 +64,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { private String pathSep; /** Hadoop external communication. */ - private GridHadoopExternalCommunication comm; + private HadoopExternalCommunication comm; /** Starting processes. */ private final ConcurrentMap<UUID, HadoopProcess> runningProcsByProcId = new ConcurrentHashMap8<>(); @@ -90,7 +90,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { initJavaCommand(); - comm = new GridHadoopExternalCommunication( + comm = new HadoopExternalCommunication( ctx.localNodeId(), UUID.randomUUID(), ctx.kernalContext().config().getMarshaller(), @@ -508,7 +508,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { cmd.addAll(startMeta.jvmOptions()); cmd.add("-cp"); cmd.add(buildClasspath(startMeta.classpath())); - cmd.add(GridHadoopExternalProcessStarter.class.getName()); + cmd.add(HadoopExternalProcessStarter.class.getName()); cmd.add("-cpid"); cmd.add(String.valueOf(childProcId)); cmd.add("-ppid"); @@ -635,7 +635,7 @@ public class HadoopExternalTaskExecutor extends HadoopTaskExecutorAdapter { /** * */ - private class MessageListener implements GridHadoopMessageListener { + private class MessageListener implements HadoopMessageListener { /** {@inheritDoc} */ @Override public void onMessageReceived(HadoopProcessDescriptor desc, HadoopMessage msg) { if (!busyLock.tryReadLock()) http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java deleted file mode 100644 index 21552e2..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopChildProcessRunner.java +++ /dev/null @@ -1,440 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child; - -import org.apache.ignite.*; -import org.apache.ignite.internal.*; -import org.apache.ignite.internal.processors.hadoop.*; -import org.apache.ignite.internal.processors.hadoop.message.*; -import org.apache.ignite.internal.processors.hadoop.shuffle.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*; -import org.apache.ignite.internal.util.future.*; -import org.apache.ignite.internal.util.lang.*; -import org.apache.ignite.internal.util.offheap.unsafe.*; -import org.apache.ignite.internal.util.typedef.*; -import org.apache.ignite.internal.util.typedef.internal.*; - -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - -import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*; - -/** - * Hadoop process base. - */ -@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") -public class GridHadoopChildProcessRunner { - /** Node process descriptor. */ - private HadoopProcessDescriptor nodeDesc; - - /** Message processing executor service. */ - private ExecutorService msgExecSvc; - - /** Task executor service. */ - private HadoopExecutorService execSvc; - - /** */ - protected GridUnsafeMemory mem = new GridUnsafeMemory(0); - - /** External communication. */ - private GridHadoopExternalCommunication comm; - - /** Logger. */ - private IgniteLogger log; - - /** Init guard. */ - private final AtomicBoolean initGuard = new AtomicBoolean(); - - /** Start time. */ - private long startTime; - - /** Init future. */ - private final GridFutureAdapterEx<?> initFut = new GridFutureAdapterEx<>(); - - /** Job instance. */ - private GridHadoopJob job; - - /** Number of uncompleted tasks. */ - private final AtomicInteger pendingTasks = new AtomicInteger(); - - /** Shuffle job. */ - private HadoopShuffleJob<HadoopProcessDescriptor> shuffleJob; - - /** Concurrent mappers. */ - private int concMappers; - - /** Concurrent reducers. */ - private int concReducers; - - /** - * Starts child process runner. - */ - public void start(GridHadoopExternalCommunication comm, HadoopProcessDescriptor nodeDesc, - ExecutorService msgExecSvc, IgniteLogger parentLog) - throws IgniteCheckedException { - this.comm = comm; - this.nodeDesc = nodeDesc; - this.msgExecSvc = msgExecSvc; - - comm.setListener(new MessageListener()); - log = parentLog.getLogger(GridHadoopChildProcessRunner.class); - - startTime = U.currentTimeMillis(); - - // At this point node knows that this process has started. - comm.sendMessage(this.nodeDesc, new HadoopProcessStartedAck()); - } - - /** - * Initializes process for task execution. - * - * @param req Initialization request. - */ - private void prepareProcess(HadoopPrepareForJobRequest req) { - if (initGuard.compareAndSet(false, true)) { - try { - if (log.isDebugEnabled()) - log.debug("Initializing external hadoop task: " + req); - - assert job == null; - - job = req.jobInfo().createJob(req.jobId(), log); - - job.initialize(true, nodeDesc.processId()); - - shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem, - req.totalReducerCount(), req.localReducers()); - - initializeExecutors(req); - - if (log.isDebugEnabled()) - log.debug("External process initialized [initWaitTime=" + - (U.currentTimeMillis() - startTime) + ']'); - - initFut.onDone(null, null); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to initialize process: " + req, e); - - initFut.onDone(e); - } - } - else - log.warning("Duplicate initialize process request received (will ignore): " + req); - } - - /** - * @param req Task execution request. - */ - private void runTasks(final HadoopTaskExecutionRequest req) { - if (!initFut.isDone() && log.isDebugEnabled()) - log.debug("Will wait for process initialization future completion: " + req); - - initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - try { - // Make sure init was successful. - f.get(); - - boolean set = pendingTasks.compareAndSet(0, req.tasks().size()); - - assert set; - - GridHadoopTaskInfo info = F.first(req.tasks()); - - assert info != null; - - int size = info.type() == MAP ? concMappers : concReducers; - -// execSvc.setCorePoolSize(size); -// execSvc.setMaximumPoolSize(size); - - if (log.isDebugEnabled()) - log.debug("Set executor service size for task type [type=" + info.type() + - ", size=" + size + ']'); - - for (GridHadoopTaskInfo taskInfo : req.tasks()) { - if (log.isDebugEnabled()) - log.debug("Submitted task for external execution: " + taskInfo); - - execSvc.submit(new GridHadoopRunnableTask(log, job, mem, taskInfo, nodeDesc.parentNodeId()) { - @Override protected void onTaskFinished(HadoopTaskStatus status) { - onTaskFinished0(this, status); - } - - @Override protected GridHadoopTaskInput createInput(GridHadoopTaskContext ctx) - throws IgniteCheckedException { - return shuffleJob.input(ctx); - } - - @Override protected GridHadoopTaskOutput createOutput(GridHadoopTaskContext ctx) - throws IgniteCheckedException { - return shuffleJob.output(ctx); - } - }); - } - } - catch (IgniteCheckedException e) { - for (GridHadoopTaskInfo info : req.tasks()) - notifyTaskFinished(info, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); - } - } - }); - } - - /** - * Creates executor services. - * - * @param req Init child process request. - */ - private void initializeExecutors(HadoopPrepareForJobRequest req) { - int cpus = Runtime.getRuntime().availableProcessors(); -// -// concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus); -// concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, cpus); - - execSvc = new HadoopExecutorService(log, "", cpus * 2, 1024); - } - - /** - * Updates external process map so that shuffle can proceed with sending messages to reducers. - * - * @param req Update request. - */ - private void updateTasks(final HadoopJobInfoUpdateRequest req) { - initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> gridFut) { - assert initGuard.get(); - - assert req.jobId().equals(job.id()); - - if (req.reducersAddresses() != null) { - if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) { - shuffleJob.startSending("external", - new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() { - @Override public void applyx(HadoopProcessDescriptor dest, - HadoopShuffleMessage msg) throws IgniteCheckedException { - comm.sendMessage(dest, msg); - } - }); - } - } - } - }); - } - - /** - * Stops all executors and running tasks. - */ - private void shutdown() { - if (execSvc != null) - execSvc.shutdown(5000); - - if (msgExecSvc != null) - msgExecSvc.shutdownNow(); - - try { - job.dispose(true); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to dispose job.", e); - } - } - - /** - * Notifies node about task finish. - * - * @param run Finished task runnable. - * @param status Task status. - */ - private void onTaskFinished0(GridHadoopRunnableTask run, HadoopTaskStatus status) { - GridHadoopTaskInfo info = run.taskInfo(); - - int pendingTasks0 = pendingTasks.decrementAndGet(); - - if (log.isDebugEnabled()) - log.debug("Hadoop task execution finished [info=" + info - + ", state=" + status.state() + ", waitTime=" + run.waitTime() + ", execTime=" + run.executionTime() + - ", pendingTasks=" + pendingTasks0 + - ", err=" + status.failCause() + ']'); - - assert info.type() == MAP || info.type() == REDUCE : "Only MAP or REDUCE tasks are supported."; - - boolean flush = pendingTasks0 == 0 && info.type() == MAP; - - notifyTaskFinished(info, status, flush); - } - - /** - * @param taskInfo Finished task info. - * @param status Task status. - */ - private void notifyTaskFinished(final GridHadoopTaskInfo taskInfo, final HadoopTaskStatus status, - boolean flush) { - - final HadoopTaskState state = status.state(); - final Throwable err = status.failCause(); - - if (!flush) { - try { - if (log.isDebugEnabled()) - log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + state + - ", err=" + err + ']'); - - comm.sendMessage(nodeDesc, new HadoopTaskFinishedMessage(taskInfo, status)); - } - catch (IgniteCheckedException e) { - log.error("Failed to send message to parent node (will terminate child process).", e); - - shutdown(); - - terminate(); - } - } - else { - if (log.isDebugEnabled()) - log.debug("Flushing shuffle messages before sending last task completion notification [taskInfo=" + - taskInfo + ", state=" + state + ", err=" + err + ']'); - - final long start = U.currentTimeMillis(); - - try { - shuffleJob.flush().listenAsync(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - long end = U.currentTimeMillis(); - - if (log.isDebugEnabled()) - log.debug("Finished flushing shuffle messages [taskInfo=" + taskInfo + - ", flushTime=" + (end - start) + ']'); - - try { - // Check for errors on shuffle. - f.get(); - - notifyTaskFinished(taskInfo, status, false); - } - catch (IgniteCheckedException e) { - log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo + - ", state=" + state + ", err=" + err + ']', e); - - notifyTaskFinished(taskInfo, - new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); - } - } - }); - } - catch (IgniteCheckedException e) { - log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo + - ", state=" + state + ", err=" + err + ']', e); - - notifyTaskFinished(taskInfo, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); - } - } - } - - /** - * Checks if message was received from parent node and prints warning if not. - * - * @param desc Sender process ID. - * @param msg Received message. - * @return {@code True} if received from parent node. - */ - private boolean validateNodeMessage(HadoopProcessDescriptor desc, HadoopMessage msg) { - if (!nodeDesc.processId().equals(desc.processId())) { - log.warning("Received process control request from unknown process (will ignore) [desc=" + desc + - ", msg=" + msg + ']'); - - return false; - } - - return true; - } - - /** - * Stops execution of this process. - */ - private void terminate() { - System.exit(1); - } - - /** - * Message listener. - */ - private class MessageListener implements GridHadoopMessageListener { - /** {@inheritDoc} */ - @Override public void onMessageReceived(final HadoopProcessDescriptor desc, final HadoopMessage msg) { - if (msg instanceof HadoopTaskExecutionRequest) { - if (validateNodeMessage(desc, msg)) - runTasks((HadoopTaskExecutionRequest)msg); - } - else if (msg instanceof HadoopJobInfoUpdateRequest) { - if (validateNodeMessage(desc, msg)) - updateTasks((HadoopJobInfoUpdateRequest)msg); - } - else if (msg instanceof HadoopPrepareForJobRequest) { - if (validateNodeMessage(desc, msg)) - prepareProcess((HadoopPrepareForJobRequest)msg); - } - else if (msg instanceof HadoopShuffleMessage) { - if (log.isTraceEnabled()) - log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']'); - - initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { - @Override public void apply(IgniteInternalFuture<?> f) { - try { - HadoopShuffleMessage m = (HadoopShuffleMessage)msg; - - shuffleJob.onShuffleMessage(m); - - comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId())); - } - catch (IgniteCheckedException e) { - U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e); - } - } - }); - } - else if (msg instanceof HadoopShuffleAck) { - if (log.isTraceEnabled()) - log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']'); - - shuffleJob.onShuffleAck((HadoopShuffleAck)msg); - } - else - log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']'); - } - - /** {@inheritDoc} */ - @Override public void onConnectionLost(HadoopProcessDescriptor desc) { - if (log.isDebugEnabled()) - log.debug("Lost connection with remote process: " + desc); - - if (desc == null) - U.warn(log, "Handshake failed."); - else if (desc.processId().equals(nodeDesc.processId())) { - log.warning("Child process lost connection with parent node (will terminate child process)."); - - shutdown(); - - terminate(); - } - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java deleted file mode 100644 index 1216c9a..0000000 --- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/GridHadoopExternalProcessStarter.java +++ /dev/null @@ -1,296 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child; - -import org.apache.ignite.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; -import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*; -import org.apache.ignite.internal.util.typedef.internal.*; -import org.apache.ignite.lang.*; -import org.apache.ignite.logger.log4j.*; -import org.apache.ignite.marshaller.optimized.*; - -import java.io.*; -import java.net.*; -import java.util.*; -import java.util.concurrent.*; - -/** - * Hadoop external process base class. - */ -public class GridHadoopExternalProcessStarter { - /** Path to Log4j configuration file. */ - public static final String DFLT_LOG4J_CONFIG = "config/ignite-log4j.xml"; - - /** Arguments. */ - private Args args; - - /** System out. */ - private OutputStream out; - - /** System err. */ - private OutputStream err; - - /** - * @param args Parsed arguments. - */ - public GridHadoopExternalProcessStarter(Args args) { - this.args = args; - } - - /** - * @param cmdArgs Process arguments. - */ - public static void main(String[] cmdArgs) { - try { - Args args = arguments(cmdArgs); - - new GridHadoopExternalProcessStarter(args).run(); - } - catch (Exception e) { - System.err.println("Failed"); - - System.err.println(e.getMessage()); - - e.printStackTrace(System.err); - } - } - - /** - * - * @throws Exception - */ - public void run() throws Exception { - U.setWorkDirectory(args.workDir, U.getIgniteHome()); - - File outputDir = outputDirectory(); - - initializeStreams(outputDir); - - ExecutorService msgExecSvc = Executors.newFixedThreadPool( - Integer.getInteger("MSG_THREAD_POOL_SIZE", Runtime.getRuntime().availableProcessors() * 2)); - - IgniteLogger log = logger(outputDir); - - GridHadoopExternalCommunication comm = new GridHadoopExternalCommunication( - args.nodeId, - args.childProcId, - new OptimizedMarshaller(), - log, - msgExecSvc, - "external" - ); - - comm.start(); - - HadoopProcessDescriptor nodeDesc = new HadoopProcessDescriptor(args.nodeId, args.parentProcId); - nodeDesc.address(args.addr); - nodeDesc.tcpPort(args.tcpPort); - nodeDesc.sharedMemoryPort(args.shmemPort); - - GridHadoopChildProcessRunner runner = new GridHadoopChildProcessRunner(); - - runner.start(comm, nodeDesc, msgExecSvc, log); - - System.err.println("Started"); - System.err.flush(); - - System.setOut(new PrintStream(out)); - System.setErr(new PrintStream(err)); - } - - /** - * @param outputDir Directory for process output. - * @throws Exception - */ - private void initializeStreams(File outputDir) throws Exception { - out = new FileOutputStream(new File(outputDir, args.childProcId + ".out")); - err = new FileOutputStream(new File(outputDir, args.childProcId + ".err")); - } - - /** - * @return Path to output directory. - * @throws IOException If failed. - */ - private File outputDirectory() throws IOException { - File f = new File(args.out); - - if (!f.exists()) { - if (!f.mkdirs()) - throw new IOException("Failed to create output directory: " + args.out); - } - else { - if (f.isFile()) - throw new IOException("Output directory is a file: " + args.out); - } - - return f; - } - - /** - * @param outputDir Directory for process output. - * @return Logger. - */ - private IgniteLogger logger(final File outputDir) { - final URL url = U.resolveIgniteUrl(DFLT_LOG4J_CONFIG); - - Log4JLogger logger; - - try { - logger = url != null ? new Log4JLogger(url) : new Log4JLogger(true); - } - catch (IgniteCheckedException e) { - System.err.println("Failed to create URL-based logger. Will use default one."); - - e.printStackTrace(); - - logger = new Log4JLogger(true); - } - - logger.updateFilePath(new IgniteClosure<String, String>() { - @Override public String apply(String s) { - return new File(outputDir, args.childProcId + ".log").getAbsolutePath(); - } - }); - - return logger; - } - - /** - * @param processArgs Process arguments. - * @return Child process instance. - */ - private static Args arguments(String[] processArgs) throws Exception { - Args args = new Args(); - - for (int i = 0; i < processArgs.length; i++) { - String arg = processArgs[i]; - - switch (arg) { - case "-cpid": { - if (i == processArgs.length - 1) - throw new Exception("Missing process ID for '-cpid' parameter"); - - String procIdStr = processArgs[++i]; - - args.childProcId = UUID.fromString(procIdStr); - - break; - } - - case "-ppid": { - if (i == processArgs.length - 1) - throw new Exception("Missing process ID for '-ppid' parameter"); - - String procIdStr = processArgs[++i]; - - args.parentProcId = UUID.fromString(procIdStr); - - break; - } - - case "-nid": { - if (i == processArgs.length - 1) - throw new Exception("Missing node ID for '-nid' parameter"); - - String nodeIdStr = processArgs[++i]; - - args.nodeId = UUID.fromString(nodeIdStr); - - break; - } - - case "-addr": { - if (i == processArgs.length - 1) - throw new Exception("Missing node address for '-addr' parameter"); - - args.addr = processArgs[++i]; - - break; - } - - case "-tport": { - if (i == processArgs.length - 1) - throw new Exception("Missing tcp port for '-tport' parameter"); - - args.tcpPort = Integer.parseInt(processArgs[++i]); - - break; - } - - case "-sport": { - if (i == processArgs.length - 1) - throw new Exception("Missing shared memory port for '-sport' parameter"); - - args.shmemPort = Integer.parseInt(processArgs[++i]); - - break; - } - - case "-out": { - if (i == processArgs.length - 1) - throw new Exception("Missing output folder name for '-out' parameter"); - - args.out = processArgs[++i]; - - break; - } - - case "-wd": { - if (i == processArgs.length - 1) - throw new Exception("Missing work folder name for '-wd' parameter"); - - args.workDir = processArgs[++i]; - - break; - } - } - } - - return args; - } - - /** - * Execution arguments. - */ - private static class Args { - /** Process ID. */ - private UUID childProcId; - - /** Process ID. */ - private UUID parentProcId; - - /** Process ID. */ - private UUID nodeId; - - /** Node address. */ - private String addr; - - /** TCP port */ - private int tcpPort; - - /** Shmem port. */ - private int shmemPort = -1; - - /** Output folder. */ - private String out; - - /** Work directory. */ - private String workDir; - } -} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1c4b00d4/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java new file mode 100644 index 0000000..6345704 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/taskexecutor/external/child/HadoopChildProcessRunner.java @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.hadoop.taskexecutor.external.child; + +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.processors.hadoop.*; +import org.apache.ignite.internal.processors.hadoop.message.*; +import org.apache.ignite.internal.processors.hadoop.shuffle.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.*; +import org.apache.ignite.internal.processors.hadoop.taskexecutor.external.communication.*; +import org.apache.ignite.internal.util.future.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.offheap.unsafe.*; +import org.apache.ignite.internal.util.typedef.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + +import static org.apache.ignite.internal.processors.hadoop.GridHadoopTaskType.*; + +/** + * Hadoop process base. + */ +@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") +public class HadoopChildProcessRunner { + /** Node process descriptor. */ + private HadoopProcessDescriptor nodeDesc; + + /** Message processing executor service. */ + private ExecutorService msgExecSvc; + + /** Task executor service. */ + private HadoopExecutorService execSvc; + + /** */ + protected GridUnsafeMemory mem = new GridUnsafeMemory(0); + + /** External communication. */ + private HadoopExternalCommunication comm; + + /** Logger. */ + private IgniteLogger log; + + /** Init guard. */ + private final AtomicBoolean initGuard = new AtomicBoolean(); + + /** Start time. */ + private long startTime; + + /** Init future. */ + private final GridFutureAdapterEx<?> initFut = new GridFutureAdapterEx<>(); + + /** Job instance. */ + private GridHadoopJob job; + + /** Number of uncompleted tasks. */ + private final AtomicInteger pendingTasks = new AtomicInteger(); + + /** Shuffle job. */ + private HadoopShuffleJob<HadoopProcessDescriptor> shuffleJob; + + /** Concurrent mappers. */ + private int concMappers; + + /** Concurrent reducers. */ + private int concReducers; + + /** + * Starts child process runner. + */ + public void start(HadoopExternalCommunication comm, HadoopProcessDescriptor nodeDesc, + ExecutorService msgExecSvc, IgniteLogger parentLog) + throws IgniteCheckedException { + this.comm = comm; + this.nodeDesc = nodeDesc; + this.msgExecSvc = msgExecSvc; + + comm.setListener(new MessageListener()); + log = parentLog.getLogger(HadoopChildProcessRunner.class); + + startTime = U.currentTimeMillis(); + + // At this point node knows that this process has started. + comm.sendMessage(this.nodeDesc, new HadoopProcessStartedAck()); + } + + /** + * Initializes process for task execution. + * + * @param req Initialization request. + */ + private void prepareProcess(HadoopPrepareForJobRequest req) { + if (initGuard.compareAndSet(false, true)) { + try { + if (log.isDebugEnabled()) + log.debug("Initializing external hadoop task: " + req); + + assert job == null; + + job = req.jobInfo().createJob(req.jobId(), log); + + job.initialize(true, nodeDesc.processId()); + + shuffleJob = new HadoopShuffleJob<>(comm.localProcessDescriptor(), log, job, mem, + req.totalReducerCount(), req.localReducers()); + + initializeExecutors(req); + + if (log.isDebugEnabled()) + log.debug("External process initialized [initWaitTime=" + + (U.currentTimeMillis() - startTime) + ']'); + + initFut.onDone(null, null); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize process: " + req, e); + + initFut.onDone(e); + } + } + else + log.warning("Duplicate initialize process request received (will ignore): " + req); + } + + /** + * @param req Task execution request. + */ + private void runTasks(final HadoopTaskExecutionRequest req) { + if (!initFut.isDone() && log.isDebugEnabled()) + log.debug("Will wait for process initialization future completion: " + req); + + initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + try { + // Make sure init was successful. + f.get(); + + boolean set = pendingTasks.compareAndSet(0, req.tasks().size()); + + assert set; + + GridHadoopTaskInfo info = F.first(req.tasks()); + + assert info != null; + + int size = info.type() == MAP ? concMappers : concReducers; + +// execSvc.setCorePoolSize(size); +// execSvc.setMaximumPoolSize(size); + + if (log.isDebugEnabled()) + log.debug("Set executor service size for task type [type=" + info.type() + + ", size=" + size + ']'); + + for (GridHadoopTaskInfo taskInfo : req.tasks()) { + if (log.isDebugEnabled()) + log.debug("Submitted task for external execution: " + taskInfo); + + execSvc.submit(new HadoopRunnableTask(log, job, mem, taskInfo, nodeDesc.parentNodeId()) { + @Override protected void onTaskFinished(HadoopTaskStatus status) { + onTaskFinished0(this, status); + } + + @Override protected GridHadoopTaskInput createInput(GridHadoopTaskContext ctx) + throws IgniteCheckedException { + return shuffleJob.input(ctx); + } + + @Override protected GridHadoopTaskOutput createOutput(GridHadoopTaskContext ctx) + throws IgniteCheckedException { + return shuffleJob.output(ctx); + } + }); + } + } + catch (IgniteCheckedException e) { + for (GridHadoopTaskInfo info : req.tasks()) + notifyTaskFinished(info, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); + } + } + }); + } + + /** + * Creates executor services. + * + * @param req Init child process request. + */ + private void initializeExecutors(HadoopPrepareForJobRequest req) { + int cpus = Runtime.getRuntime().availableProcessors(); +// +// concMappers = get(req.jobInfo(), EXTERNAL_CONCURRENT_MAPPERS, cpus); +// concReducers = get(req.jobInfo(), EXTERNAL_CONCURRENT_REDUCERS, cpus); + + execSvc = new HadoopExecutorService(log, "", cpus * 2, 1024); + } + + /** + * Updates external process map so that shuffle can proceed with sending messages to reducers. + * + * @param req Update request. + */ + private void updateTasks(final HadoopJobInfoUpdateRequest req) { + initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> gridFut) { + assert initGuard.get(); + + assert req.jobId().equals(job.id()); + + if (req.reducersAddresses() != null) { + if (shuffleJob.initializeReduceAddresses(req.reducersAddresses())) { + shuffleJob.startSending("external", + new IgniteInClosure2X<HadoopProcessDescriptor, HadoopShuffleMessage>() { + @Override public void applyx(HadoopProcessDescriptor dest, + HadoopShuffleMessage msg) throws IgniteCheckedException { + comm.sendMessage(dest, msg); + } + }); + } + } + } + }); + } + + /** + * Stops all executors and running tasks. + */ + private void shutdown() { + if (execSvc != null) + execSvc.shutdown(5000); + + if (msgExecSvc != null) + msgExecSvc.shutdownNow(); + + try { + job.dispose(true); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to dispose job.", e); + } + } + + /** + * Notifies node about task finish. + * + * @param run Finished task runnable. + * @param status Task status. + */ + private void onTaskFinished0(HadoopRunnableTask run, HadoopTaskStatus status) { + GridHadoopTaskInfo info = run.taskInfo(); + + int pendingTasks0 = pendingTasks.decrementAndGet(); + + if (log.isDebugEnabled()) + log.debug("Hadoop task execution finished [info=" + info + + ", state=" + status.state() + ", waitTime=" + run.waitTime() + ", execTime=" + run.executionTime() + + ", pendingTasks=" + pendingTasks0 + + ", err=" + status.failCause() + ']'); + + assert info.type() == MAP || info.type() == REDUCE : "Only MAP or REDUCE tasks are supported."; + + boolean flush = pendingTasks0 == 0 && info.type() == MAP; + + notifyTaskFinished(info, status, flush); + } + + /** + * @param taskInfo Finished task info. + * @param status Task status. + */ + private void notifyTaskFinished(final GridHadoopTaskInfo taskInfo, final HadoopTaskStatus status, + boolean flush) { + + final HadoopTaskState state = status.state(); + final Throwable err = status.failCause(); + + if (!flush) { + try { + if (log.isDebugEnabled()) + log.debug("Sending notification to parent node [taskInfo=" + taskInfo + ", state=" + state + + ", err=" + err + ']'); + + comm.sendMessage(nodeDesc, new HadoopTaskFinishedMessage(taskInfo, status)); + } + catch (IgniteCheckedException e) { + log.error("Failed to send message to parent node (will terminate child process).", e); + + shutdown(); + + terminate(); + } + } + else { + if (log.isDebugEnabled()) + log.debug("Flushing shuffle messages before sending last task completion notification [taskInfo=" + + taskInfo + ", state=" + state + ", err=" + err + ']'); + + final long start = U.currentTimeMillis(); + + try { + shuffleJob.flush().listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + long end = U.currentTimeMillis(); + + if (log.isDebugEnabled()) + log.debug("Finished flushing shuffle messages [taskInfo=" + taskInfo + + ", flushTime=" + (end - start) + ']'); + + try { + // Check for errors on shuffle. + f.get(); + + notifyTaskFinished(taskInfo, status, false); + } + catch (IgniteCheckedException e) { + log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo + + ", state=" + state + ", err=" + err + ']', e); + + notifyTaskFinished(taskInfo, + new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); + } + } + }); + } + catch (IgniteCheckedException e) { + log.error("Failed to flush shuffle messages (will fail the task) [taskInfo=" + taskInfo + + ", state=" + state + ", err=" + err + ']', e); + + notifyTaskFinished(taskInfo, new HadoopTaskStatus(HadoopTaskState.FAILED, e), false); + } + } + } + + /** + * Checks if message was received from parent node and prints warning if not. + * + * @param desc Sender process ID. + * @param msg Received message. + * @return {@code True} if received from parent node. + */ + private boolean validateNodeMessage(HadoopProcessDescriptor desc, HadoopMessage msg) { + if (!nodeDesc.processId().equals(desc.processId())) { + log.warning("Received process control request from unknown process (will ignore) [desc=" + desc + + ", msg=" + msg + ']'); + + return false; + } + + return true; + } + + /** + * Stops execution of this process. + */ + private void terminate() { + System.exit(1); + } + + /** + * Message listener. + */ + private class MessageListener implements HadoopMessageListener { + /** {@inheritDoc} */ + @Override public void onMessageReceived(final HadoopProcessDescriptor desc, final HadoopMessage msg) { + if (msg instanceof HadoopTaskExecutionRequest) { + if (validateNodeMessage(desc, msg)) + runTasks((HadoopTaskExecutionRequest)msg); + } + else if (msg instanceof HadoopJobInfoUpdateRequest) { + if (validateNodeMessage(desc, msg)) + updateTasks((HadoopJobInfoUpdateRequest)msg); + } + else if (msg instanceof HadoopPrepareForJobRequest) { + if (validateNodeMessage(desc, msg)) + prepareProcess((HadoopPrepareForJobRequest)msg); + } + else if (msg instanceof HadoopShuffleMessage) { + if (log.isTraceEnabled()) + log.trace("Received shuffle message [desc=" + desc + ", msg=" + msg + ']'); + + initFut.listenAsync(new CI1<IgniteInternalFuture<?>>() { + @Override public void apply(IgniteInternalFuture<?> f) { + try { + HadoopShuffleMessage m = (HadoopShuffleMessage)msg; + + shuffleJob.onShuffleMessage(m); + + comm.sendMessage(desc, new HadoopShuffleAck(m.id(), m.jobId())); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to process hadoop shuffle message [desc=" + desc + ", msg=" + msg + ']', e); + } + } + }); + } + else if (msg instanceof HadoopShuffleAck) { + if (log.isTraceEnabled()) + log.trace("Received shuffle ack [desc=" + desc + ", msg=" + msg + ']'); + + shuffleJob.onShuffleAck((HadoopShuffleAck)msg); + } + else + log.warning("Unknown message received (will ignore) [desc=" + desc + ", msg=" + msg + ']'); + } + + /** {@inheritDoc} */ + @Override public void onConnectionLost(HadoopProcessDescriptor desc) { + if (log.isDebugEnabled()) + log.debug("Lost connection with remote process: " + desc); + + if (desc == null) + U.warn(log, "Handshake failed."); + else if (desc.processId().equals(nodeDesc.processId())) { + log.warning("Child process lost connection with parent node (will terminate child process)."); + + shutdown(); + + terminate(); + } + } + } +}