http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index 0000000,13dc389..8ed6319
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@@ -1,0 -1,894 +1,888 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.job;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.compute.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.fs.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.marshaller.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.processors.task.*;
+ import org.apache.ignite.internal.processors.timeout.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.worker.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static org.apache.ignite.events.IgniteEventType.*;
+ import static org.apache.ignite.internal.GridTopic.*;
+ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+ 
+ /**
+  * Job worker.
+  */
+ public class GridJobWorker extends GridWorker implements GridTimeoutObject {
+     /** Per-thread held flag. */
+     private static final ThreadLocal<Boolean> HOLD = new 
ThreadLocal<Boolean>() {
+         @Override protected Boolean initialValue() {
+             return false;
+         }
+     };
+ 
+     /** Static logger to avoid re-creation. */
+     private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
+ 
+     /** */
+     private final long createTime;
+ 
+     /** */
+     private volatile long startTime;
+ 
+     /** */
+     private volatile long finishTime;
+ 
+     /** */
+     private final GridKernalContext ctx;
+ 
+     /** */
+     private final Object jobTopic;
+ 
+     /** */
+     private final Object taskTopic;
+ 
+     /** */
+     private byte[] jobBytes;
+ 
+     /** Task originating node. */
+     private final ClusterNode taskNode;
+ 
+     /** Flag set when visor or internal task is running. */
+     private final boolean internal;
+ 
+     /** */
+     private final IgniteLogger log;
+ 
+     /** */
+     private final IgniteMarshaller marsh;
+ 
+     /** */
+     private final GridJobSessionImpl ses;
+ 
+     /** */
+     private final GridJobContextImpl jobCtx;
+ 
+     /** */
+     private final GridJobEventListener evtLsnr;
+ 
+     /** Deployment. */
+     private final GridDeployment dep;
+ 
+     /** */
+     private final AtomicBoolean finishing = new AtomicBoolean();
+ 
+     /** Guard ensuring that master-leave callback is not execute more than 
once. */
+     private final AtomicBoolean masterLeaveGuard = new AtomicBoolean();
+ 
+     /** */
+     private volatile boolean timedOut;
+ 
+     /** */
+     private volatile boolean sysCancelled;
+ 
+     /** */
+     private volatile boolean sysStopping;
+ 
+     /** */
+     private volatile boolean isStarted;
+ 
+     /** Deployed job. */
+     private ComputeJob job;
+ 
+     /** Halted flag (if greater than 0, job is halted). */
+     private final AtomicInteger held = new AtomicInteger();
+ 
+     /** Hold/unhold listener to notify job processor. */
+     private final GridJobHoldListener holdLsnr;
+ 
+     /**
+      * @param ctx Kernal context.
+      * @param dep Grid deployment.
+      * @param createTime Create time.
+      * @param ses Grid task session.
+      * @param jobCtx Job context.
+      * @param jobBytes Grid job bytes.
+      * @param job Job.
+      * @param taskNode Grid task node.
+      * @param internal Whether or not task was marked with {@link 
GridInternal}
+      * @param evtLsnr Job event listener.
+      * @param holdLsnr Hold listener.
+      */
+     GridJobWorker(
+         GridKernalContext ctx,
+         GridDeployment dep,
+         long createTime,
+         GridJobSessionImpl ses,
+         GridJobContextImpl jobCtx,
+         byte[] jobBytes,
+         ComputeJob job,
+         ClusterNode taskNode,
+         boolean internal,
+         GridJobEventListener evtLsnr,
+         GridJobHoldListener holdLsnr) {
+         super(ctx.gridName(), "grid-job-worker", ctx.log());
+ 
+         assert ctx != null;
+         assert ses != null;
+         assert jobCtx != null;
+         assert taskNode != null;
+         assert evtLsnr != null;
+         assert dep != null;
+         assert holdLsnr != null;
+ 
+         this.ctx = ctx;
+         this.createTime = createTime;
+         this.evtLsnr = evtLsnr;
+         this.dep = dep;
+         this.ses = ses;
+         this.jobCtx = jobCtx;
+         this.jobBytes = jobBytes;
+         this.taskNode = taskNode;
+         this.internal = internal;
+         this.holdLsnr = holdLsnr;
+ 
+         if (job != null)
+             this.job = job;
+ 
+         log = U.logger(ctx, logRef, this);
+ 
+         marsh = ctx.config().getMarshaller();
+ 
+         UUID locNodeId = ctx.discovery().localNode().id();
+ 
+         jobTopic = TOPIC_JOB.topic(ses.getJobId(), locNodeId);
+         taskTopic = TOPIC_TASK.topic(ses.getJobId(), locNodeId);
+     }
+ 
+     /**
+      * Gets deployed job or {@code null} of job could not be deployed.
+      *
+      * @return Deployed job.
+      */
+     @Nullable public ComputeJob getJob() {
+         return job;
+     }
+ 
+     /**
+      * @return Deployed task.
+      */
+     public GridDeployment getDeployment() {
+         return dep;
+     }
+ 
+     /**
+      * Returns {@code True} if job was cancelled by the system.
+      *
+      * @return {@code True} if job was cancelled by the system.
+      */
+     boolean isSystemCanceled() {
+         return sysCancelled;
+     }
+ 
+     /**
+      * @return Create time.
+      */
+     long getCreateTime() {
+         return createTime;
+     }
+ 
+     /**
+      * @return Unique job ID.
+      */
+     public IgniteUuid getJobId() {
+         IgniteUuid jobId = ses.getJobId();
+ 
+         assert jobId != null;
+ 
+         return jobId;
+     }
+ 
+     /**
+      * @return Job context.
+      */
+     public ComputeJobContext getJobContext() {
+         return jobCtx;
+     }
+ 
+     /**
+      * @return Job communication topic.
+      */
+     Object getJobTopic() {
+         return jobTopic;
+     }
+ 
+     /**
+      * @return Task communication topic.
+      */
+     Object getTaskTopic() {
+         return taskTopic;
+     }
+ 
+     /**
+      * @return Session.
+      */
+     public GridJobSessionImpl getSession() {
+         return ses;
+     }
+ 
+     /**
+      * Gets job finishing state.
+      *
+      * @return {@code true} if job is being finished after execution
+      *      and {@code false} otherwise.
+      */
+     boolean isFinishing() {
+         return finishing.get();
+     }
+ 
+     /**
+      * @return Parent task node ID.
+      */
+     ClusterNode getTaskNode() {
+         return taskNode;
+     }
+ 
+     /**
+      * @return Job execution time.
+      */
+     long getExecuteTime() {
+         long startTime0 = startTime;
+         long finishTime0 = finishTime;
+ 
+         return startTime0 == 0 ? 0 : finishTime0 == 0 ?
+             U.currentTimeMillis() - startTime0 : finishTime0 - startTime0;
+     }
+ 
+     /**
+      * @return Time job spent on waiting queue.
+      */
+     long getQueuedTime() {
+         long startTime0 = startTime;
+ 
+         return startTime0 == 0 ? U.currentTimeMillis() - createTime : 
startTime0 - createTime;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long endTime() {
+         return ses.getEndTime();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteUuid timeoutId() {
+         IgniteUuid jobId = ses.getJobId();
+ 
+         assert jobId != null;
+ 
+         return jobId;
+     }
+ 
+     /**
+      * @return {@code True} if job is timed out.
+      */
+     boolean isTimedOut() {
+         return timedOut;
+     }
+ 
+     /**
+      * @return {@code True} if parent task is internal or Visor-related.
+      */
+     public boolean isInternal() {
+         return internal;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void onTimeout() {
+         if (finishing.get())
+             return;
+ 
+         timedOut = true;
+ 
+         U.warn(log, "Job has timed out: " + ses);
+ 
+         cancel();
+ 
+         if (!internal && ctx.event().isRecordable(EVT_JOB_TIMEDOUT))
+             recordEvent(EVT_JOB_TIMEDOUT, "Job has timed out: " + job);
+     }
+ 
+     /**
+      * Callback for whenever grid is stopping.
+      */
+     public void onStopping() {
+         sysStopping = true;
+     }
+ 
+     /**
+      * @return {@code True} if job was halted.
+      */
+     public boolean held() {
+         return held.get() > 0;
+     }
+ 
+     /**
+      * Sets halt flags.
+      */
+     public void hold() {
+         held.incrementAndGet();
+ 
+         HOLD.set(true);
+ 
+         holdLsnr.onHold(this);
+     }
+ 
+     /**
+      * Initializes job. Handles deployments and event recording.
+      *
+      * @param dep Job deployed task.
+      * @param taskCls Task class.
+      * @return {@code True} if job was successfully initialized.
+      */
+     boolean initialize(GridDeployment dep, Class<?> taskCls) {
+         assert dep != null;
+ 
+         IgniteCheckedException ex = null;
+ 
+         try {
+             if (job == null) {
+                 job = marsh.unmarshal(jobBytes, dep.classLoader());
+ 
+                 // No need to hold reference any more.
+                 jobBytes = null;
+             }
+ 
+             // Inject resources.
+             ctx.resource().inject(dep, taskCls, job, ses, jobCtx);
+ 
+             if (!internal && ctx.event().isRecordable(EVT_JOB_QUEUED))
+                 recordEvent(EVT_JOB_QUEUED, "Job got queued for 
computation.");
+         }
+         catch (IgniteCheckedException e) {
+             U.error(log, "Failed to initialize job [jobId=" + ses.getJobId() 
+ ", ses=" + ses + ']', e);
+ 
+             ex = e;
+         }
+         catch (Throwable e) {
+             ex = handleThrowable(e);
+ 
+             assert ex != null;
+         }
+         finally {
+             if (ex != null)
+                 finishJob(null, ex, true);
+         }
+ 
+         return ex == null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void body() {
+         assert job != null;
+ 
+         startTime = U.currentTimeMillis();
+ 
+         isStarted = true;
+ 
+         // Event notification.
+         evtLsnr.onJobStarted(this);
+ 
+         if (!internal && ctx.event().isRecordable(EVT_JOB_STARTED))
+             recordEvent(EVT_JOB_STARTED, /*no message for success*/null);
+ 
+         execute0(true);
+     }
+ 
+     /**
+      * Executes the job.
+      */
+     public void execute() {
+         execute0(false);
+     }
+ 
+     /**
+      * @param skipNtf {@code True} to skip job processor {@code onUnhold()}
+      *      notification (only from {@link #body()}).
+      */
+     private void execute0(boolean skipNtf) {
+         // Make sure flag is not set for current thread.
+         HOLD.set(false);
+ 
+         if (isCancelled())
+             // If job was cancelled prior to assigning runner to it?
+             super.cancel();
+ 
+         if (!skipNtf) {
+             holdLsnr.onUnhold(this);
+ 
+             int c = held.decrementAndGet();
+ 
+             if (c > 0) {
+                 if (log.isDebugEnabled())
+                     log.debug("Ignoring job execution (job was held several 
times) [c=" + c + ']');
+ 
+                 return;
+             }
+         }
+ 
+         boolean sndRes = true;
+ 
+         Object res = null;
+ 
+         IgniteCheckedException ex = null;
+ 
+         try {
+             ctx.job().currentTaskSession(ses);
+ 
+             // If job has timed out, then
+             // avoid computation altogether.
+             if (isTimedOut())
+                 sndRes = false;
+             else {
+                 res = U.wrapThreadLoader(dep.classLoader(), new 
Callable<Object>() {
+                     @Nullable @Override public Object call() throws 
IgniteCheckedException {
+                         try {
+                             if (internal && 
ctx.config().isPeerClassLoadingEnabled())
+                                 ctx.job().internal(true);
+ 
+                             return job.execute();
+                         }
+                         finally {
+                             if (internal && 
ctx.config().isPeerClassLoadingEnabled())
+                                 ctx.job().internal(false);
+                         }
+                     }
+                 });
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Job execution has successfully finished [job=" 
+ job + ", res=" + res + ']');
+             }
+         }
+         catch (IgniteCheckedException e) {
+             if (sysStopping && e.hasCause(IgniteInterruptedException.class, 
InterruptedException.class)) {
+                 ex = handleThrowable(e);
+ 
+                 assert ex != null;
+             }
+             else {
+                 if (X.hasCause(e, GridInternalException.class) || 
X.hasCause(e, IgniteFsOutOfSpaceException.class)) {
+                     // Print exception for internal errors only if debug is 
enabled.
+                     if (log.isDebugEnabled())
+                         U.error(log, "Failed to execute job [jobId=" + 
ses.getJobId() + ", ses=" + ses + ']', e);
+                 }
+                 else if (X.hasCause(e, InterruptedException.class)) {
+                     String msg = "Job was cancelled [jobId=" + ses.getJobId() 
+ ", ses=" + ses + ']';
+ 
+                     if (log.isDebugEnabled())
+                         U.error(log, msg, e);
+                     else
+                         U.warn(log, msg);
+                 }
+                 else
+                     U.error(log, "Failed to execute job [jobId=" + 
ses.getJobId() + ", ses=" + ses + ']', e);
+ 
+                 ex = e;
+             }
+         }
+         // Catch Throwable to protect against bad user code except
+         // InterruptedException if job is being cancelled.
+         catch (Throwable e) {
+             ex = handleThrowable(e);
+ 
+             assert ex != null;
+         }
+         finally {
+             // Finish here only if not held by this thread.
+             if (!HOLD.get())
+                 finishJob(res, ex, sndRes);
+ 
+             ctx.job().currentTaskSession(null);
+         }
+     }
+ 
+     /**
+      * Handles {@link Throwable} generic exception for task
+      * deployment and execution.
+      *
+      * @param e Exception.
+      * @return Wrapped exception.
+      */
+     private IgniteCheckedException handleThrowable(Throwable e) {
+         String msg = null;
+ 
+         IgniteCheckedException ex = null;
+ 
+         // Special handling for weird interrupted exception which
+         // happens due to JDk 1.5 bug.
+         if (e instanceof InterruptedException && !sysStopping) {
+             msg = "Failed to execute job due to interrupted exception.";
+ 
+             // Turn interrupted exception into checked exception.
+             ex = new IgniteCheckedException(msg, e);
+         }
+         // Special 'NoClassDefFoundError' handling if P2P is on. We had many 
questions
+         // about this exception and decided to change error message.
+         else if ((e instanceof NoClassDefFoundError || e instanceof 
ClassNotFoundException)
+             && ctx.config().isPeerClassLoadingEnabled()) {
+             msg = "Failed to execute job due to class or resource loading 
exception (make sure that task " +
+                 "originating node is still in grid and requested class is in 
the task class path) [jobId=" +
+                 ses.getJobId() + ", ses=" + ses + ']';
+ 
+             ex = new ComputeUserUndeclaredException(msg, e);
+         }
+         else if (sysStopping && X.hasCause(e, InterruptedException.class, 
IgniteInterruptedException.class)) {
+             msg = "Job got interrupted due to system stop (will attempt 
failover).";
+ 
+             ex = new ComputeExecutionRejectedException(e);
+         }
+ 
+         if (msg == null) {
+             msg = "Failed to execute job due to unexpected runtime exception 
[jobId=" + ses.getJobId() +
+                 ", ses=" + ses + ']';
+ 
+             ex = new ComputeUserUndeclaredException(msg, e);
+         }
+ 
+         assert msg != null;
+         assert ex != null;
+ 
+         U.error(log, msg, e);
+ 
+         return ex;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void cancel() {
+         cancel(false);
+     }
+ 
+     /**
+      * @param sys System flag.
+      */
+     public void cancel(boolean sys) {
+         try {
+             super.cancel();
+ 
+             final ComputeJob job0 = job;
+ 
+             if (sys)
+                 sysCancelled = true;
+ 
+             if (job0 != null) {
+                 if (log.isDebugEnabled())
+                     log.debug("Cancelling job: " + ses);
+ 
+                 U.wrapThreadLoader(dep.classLoader(), new IgniteRunnable() {
+                     @Override public void run() {
+                         job0.cancel();
+                     }
+                 });
+             }
+ 
+             if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED))
+                 recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0);
+         }
+         // Catch throwable to protect against bad user code.
+         catch (Throwable e) {
+             U.error(log, "Failed to cancel job due to undeclared user 
exception [jobId=" + ses.getJobId() +
+                 ", ses=" + ses + ']', e);
+         }
+     }
+ 
+     /**
+      * @param evtType Event type.
+      * @param msg Message.
+      */
+     private void recordEvent(int evtType, @Nullable String msg) {
+         assert ctx.event().isRecordable(evtType);
+         assert !internal;
+ 
+         IgniteJobEvent evt = new IgniteJobEvent();
+ 
+         evt.jobId(ses.getJobId());
+         evt.message(msg);
+         evt.node(ctx.discovery().localNode());
+         evt.taskName(ses.getTaskName());
+         evt.taskClassName(ses.getTaskClassName());
+         evt.taskSessionId(ses.getId());
+         evt.type(evtType);
+         evt.taskNode(taskNode);
+         evt.taskSubjectId(ses.subjectId());
+ 
+         ctx.event().record(evt);
+     }
+ 
+     /**
+      * @param res Result.
+      * @param ex Error.
+      * @param sndReply If {@code true}, reply will be sent.
+      */
+     void finishJob(@Nullable Object res, @Nullable IgniteCheckedException ex, 
boolean sndReply) {
+         // Avoid finishing a job more than once from different threads.
+         if (!finishing.compareAndSet(false, true))
+             return;
+ 
+         // Do not send reply if job has been cancelled from system.
+         if (sndReply)
+             sndReply = !sysCancelled;
+ 
+         // We should save message ID here since listener callback will reset 
sequence.
+         ClusterNode sndNode = ctx.discovery().node(taskNode.id());
+ 
 -        long msgId = sndNode != null && ses.isFullSupport() ?
 -            ctx.io().nextMessageId(taskTopic, sndNode.id()) : -1;
 -
+         finishTime = U.currentTimeMillis();
+ 
+         Collection<IgniteBiTuple<Integer, String>> evts = null;
+ 
+         try {
+             if (ses.isFullSupport())
+                 evtLsnr.onBeforeJobResponseSent(this);
+ 
+             // Send response back only if job has not timed out.
+             if (!isTimedOut()) {
+                 if (sndReply) {
+                     if (sndNode == null) {
+                         onMasterNodeLeft();
+ 
+                         U.warn(log, "Failed to reply to sender node because 
it left grid [nodeId=" + taskNode.id() +
+                             ", ses=" + ses + ", jobId=" + ses.getJobId() + ", 
job=" + job + ']');
+ 
+                         // Record job reply failure.
+                         if (!internal && 
ctx.event().isRecordable(EVT_JOB_FAILED))
+                             evts = addEvent(evts, EVT_JOB_FAILED, "Job reply 
failed (task node left grid): " + job);
+                     }
+                     else {
+                         try {
+                             if (ex != null) {
+                                 if (isStarted) {
+                                     // Job failed.
+                                     if (!internal && 
ctx.event().isRecordable(EVT_JOB_FAILED))
+                                         evts = addEvent(evts, EVT_JOB_FAILED, 
"Job failed due to exception [ex=" +
+                                             ex + ", job=" + job + ']');
+                                 }
+                                 else if (!internal && 
ctx.event().isRecordable(EVT_JOB_REJECTED))
+                                     evts = addEvent(evts, EVT_JOB_REJECTED, 
"Job has not been started " +
+                                         "[ex=" + ex + ", job=" + job + ']');
+                             }
+                             else if (!internal && 
ctx.event().isRecordable(EVT_JOB_FINISHED))
+                                 evts = addEvent(evts, EVT_JOB_FINISHED, /*no 
message for success. */null);
+ 
+                             boolean loc = 
ctx.localNodeId().equals(sndNode.id()) && !ctx.config().isMarshalLocalJobs();
+ 
+                             Map<Object, Object> attrs = 
jobCtx.getAttributes();
+ 
+                             GridJobExecuteResponse jobRes = new 
GridJobExecuteResponse(
+                                 ctx.localNodeId(),
+                                 ses.getId(),
+                                 ses.getJobId(),
+                                 loc ? null : marsh.marshal(ex),
+                                 loc ? ex : null,
+                                 loc ? null: marsh.marshal(res),
+                                 loc ? res : null,
+                                 loc ? null : marsh.marshal(attrs),
+                                 loc ? attrs : null,
+                                 isCancelled());
+ 
+                             long timeout = ses.getEndTime() - 
U.currentTimeMillis();
+ 
+                             if (timeout <= 0)
+                                 // Ignore the actual timeout and send 
response anyway.
+                                 timeout = 1;
+ 
+                             if (ses.isFullSupport()) {
+                                 // Send response to designated job topic.
+                                 // Always go through communication to 
preserve order,
+                                 // if attributes are enabled.
 -                                assert msgId > 0;
 -
+                                 ctx.io().sendOrderedMessage(
+                                     sndNode,
+                                     taskTopic,
 -                                    msgId,
+                                     jobRes,
+                                     internal ? MANAGEMENT_POOL : SYSTEM_POOL,
+                                     timeout,
+                                     false);
+                             }
+                             else if (ctx.localNodeId().equals(sndNode.id()))
+                                 
ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes);
+                             else
+                                 // Send response to common topic as unordered 
message.
+                                 ctx.io().send(sndNode, TOPIC_TASK, jobRes, 
internal ? MANAGEMENT_POOL : SYSTEM_POOL);
+                         }
+                         catch (IgniteCheckedException e) {
+                             // Log and invoke the master-leave callback.
+                             if (isDeadNode(taskNode.id())) {
+                                 onMasterNodeLeft();
+ 
+                                 // Avoid stack trace for left nodes.
+                                 U.warn(log, "Failed to reply to sender node 
because it left grid " +
+                                     "[nodeId=" + taskNode.id() + ", jobId=" + 
ses.getJobId() +
+                                     ", ses=" + ses + ", job=" + job + ']');
+                             }
+                             else
+                                 U.error(log, "Error sending reply for job 
[nodeId=" + sndNode.id() + ", jobId=" +
+                                     ses.getJobId() + ", ses=" + ses + ", 
job=" + job + ']', e);
+ 
+                             if (!internal && 
ctx.event().isRecordable(EVT_JOB_FAILED))
+                                 evts = addEvent(evts, EVT_JOB_FAILED, "Failed 
to send reply for job [nodeId=" +
+                                     taskNode.id() + ", job=" + job + ']');
+                         }
+                         // Catching interrupted exception because
+                         // it gets thrown for some reason.
+                         catch (Exception e) {
+                             String msg = "Failed to send reply for job 
[nodeId=" + taskNode.id() + ", job=" + job + ']';
+ 
+                             U.error(log, msg, e);
+ 
+                             if (!internal && 
ctx.event().isRecordable(EVT_JOB_FAILED))
+                                 evts = addEvent(evts, EVT_JOB_FAILED, msg);
+                         }
+                     }
+                 }
+                 else {
+                     if (ex != null) {
+                         if (isStarted) {
+                             if (!internal && 
ctx.event().isRecordable(EVT_JOB_FAILED))
+                                 evts = addEvent(evts, EVT_JOB_FAILED, "Job 
failed due to exception [ex=" + ex +
+                                     ", job=" + job + ']');
+                         }
+                         else if (!internal && 
ctx.event().isRecordable(EVT_JOB_REJECTED))
+                             evts = addEvent(evts, EVT_JOB_REJECTED, "Job has 
not been started [ex=" + ex +
+                                 ", job=" + job + ']');
+                     }
+                     else if (!internal && 
ctx.event().isRecordable(EVT_JOB_FINISHED))
+                         evts = addEvent(evts, EVT_JOB_FINISHED, /*no message 
for success. */null);
+                 }
+             }
+             // Job timed out.
+             else if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED))
+                 evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to 
timeout: " + job);
+         }
+         finally {
+             if (evts != null) {
+                 for (IgniteBiTuple<Integer, String> t : evts)
+                     recordEvent(t.get1(), t.get2());
+             }
+ 
+             // Listener callback.
+             evtLsnr.onJobFinished(this);
+         }
+     }
+ 
+     /**
+      * If the job implements {@link 
org.apache.ignite.compute.ComputeJobMasterLeaveAware#onMasterNodeLeft} 
interface then invoke
+      * {@link 
org.apache.ignite.compute.ComputeJobMasterLeaveAware#onMasterNodeLeft(org.apache.ignite.compute.ComputeTaskSession)}
 method.
+      *
+      * @return {@code True} if master leave has been handled (either by this 
call or before).
+      */
+     boolean onMasterNodeLeft() {
+         if (job instanceof ComputeJobMasterLeaveAware) {
+             if (masterLeaveGuard.compareAndSet(false, true)) {
+                 try {
+                     
((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses.session());
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Successfully executed 
GridComputeJobMasterLeaveAware.onMasterNodeLeft() callback " +
+                             "[nodeId=" + taskNode.id() + ", jobId=" + 
ses.getJobId() + ", job=" + job + ']');
+                 }
+                 catch (IgniteCheckedException e) {
+                     U.error(log, "Failed to execute 
GridComputeJobMasterLeaveAware.onMasterNodeLeft() callback " +
+                         "[nodeId=" + taskNode.id() + ", jobId=" + 
ses.getJobId() + ", job=" + job + ']', e);
+                 }
+             }
+ 
+             return true;
+         }
+ 
+         return false;
+     }
+ 
+     /**
+      * @param evts Collection (created if {@code null}).
+      * @param evt Event.
+      * @param msg Message (optional).
+      * @return Collection with event added.
+      */
+     Collection<IgniteBiTuple<Integer, String>> addEvent(@Nullable 
Collection<IgniteBiTuple<Integer, String>> evts,
+         Integer evt, @Nullable String msg) {
+         assert ctx.event().isRecordable(evt);
+         assert !internal;
+ 
+         if (evts == null)
+             evts = new ArrayList<>();
+ 
+         evts.add(F.t(evt, msg));
+ 
+         return evts;
+     }
+ 
+     /**
+      * Checks whether node is alive or dead.
+      *
+      * @param uid UID of node to check.
+      * @return {@code true} if node is dead, {@code false} is node is alive.
+      */
+     private boolean isDeadNode(UUID uid) {
+         return ctx.discovery().node(uid) == null || 
!ctx.discovery().pingNode(uid);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public boolean equals(Object obj) {
+         if (this == obj)
+             return true;
+ 
+         if (obj == null)
+             return false;
+ 
+         assert obj instanceof GridJobWorker;
+ 
+         IgniteUuid jobId1 = ses.getJobId();
+         IgniteUuid jobId2 = ((GridJobWorker)obj).ses.getJobId();
+ 
+         assert jobId1 != null;
+         assert jobId2 != null;
+ 
+         return jobId1.equals(jobId2);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int hashCode() {
+         IgniteUuid jobId = ses.getJobId();
+ 
+         assert jobId != null;
+ 
+         return jobId.hashCode();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
+         return S.toString(GridJobWorker.class, this);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
index 0000000,3e4fd5c..2ecb4b4
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java
@@@ -1,0 -1,1295 +1,1288 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.task;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.compute.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.marshaller.*;
+ import org.apache.ignite.plugin.security.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.util.lang.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.util.*;
+ import java.util.concurrent.*;
+ 
+ import static org.apache.ignite.events.IgniteEventType.*;
+ import static org.apache.ignite.internal.GridTopic.*;
+ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+ import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*;
+ 
+ /**
+  * This class defines task processor.
+  */
+ public class GridTaskProcessor extends GridProcessorAdapter {
+     /** Wait for 5 seconds to allow discovery to take effect (best effort). */
+     private static final long DISCO_TIMEOUT = 5000;
+ 
+     /** */
+     private static final Map<GridTaskThreadContextKey, Object> EMPTY_ENUM_MAP 
=
+         new EnumMap<>(GridTaskThreadContextKey.class);
+ 
+     /** */
+     private final IgniteMarshaller marsh;
+ 
+     /** */
 -    private final ConcurrentMap<IgniteUuid, GridTaskWorker<?, ?>> tasks = 
GridConcurrentFactory.newMap();
++    public final ConcurrentMap<IgniteUuid, GridTaskWorker<?, ?>> tasks = 
GridConcurrentFactory.newMap();
+ 
+     /** */
+     private boolean stopping;
+ 
+     /** */
+     private boolean waiting;
+ 
+     /** */
+     private final GridLocalEventListener discoLsnr;
+ 
+     /** Total executed tasks. */
+     private final LongAdder execTasks = new LongAdder();
+ 
+     /** */
+     private final ThreadLocal<Map<GridTaskThreadContextKey, Object>> thCtx =
+         new ThreadLocal<>();
+ 
+     /** */
+     private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock();
+ 
+     /** Internal metadata cache. */
+     private GridCache<GridTaskNameHashKey, String> tasksMetaCache;
+ 
+     /**
+      * @param ctx Kernal context.
+      */
+     public GridTaskProcessor(GridKernalContext ctx) {
+         super(ctx);
+ 
+         marsh = ctx.config().getMarshaller();
+ 
+         discoLsnr = new TaskDiscoveryListener();
+ 
+         tasksMetaCache = ctx.security().enabled() ? 
ctx.cache().<GridTaskNameHashKey, String>utilityCache() : null;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void start() {
+         ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, 
EVT_NODE_LEFT);
+ 
+         ctx.io().addMessageListener(TOPIC_JOB_SIBLINGS, new 
JobSiblingsMessageListener());
+         ctx.io().addMessageListener(TOPIC_TASK_CANCEL, new 
TaskCancelMessageListener());
+         ctx.io().addMessageListener(TOPIC_TASK, new JobMessageListener(true));
+ 
+         if (log.isDebugEnabled())
+             log.debug("Started task processor.");
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("TooBroadScope")
+     @Override public void onKernalStop(boolean cancel) {
+         lock.writeLock();
+ 
+         try {
+             stopping = true;
+ 
+             waiting = !cancel;
+         }
+         finally {
+             lock.writeUnlock();
+         }
+ 
+         int size = tasks.size();
+ 
+         if (size > 0) {
+             if (cancel)
+                 U.warn(log, "Will cancel unfinished tasks due to stopping of 
the grid [cnt=" + size + "]");
+             else
+                 U.warn(log, "Will wait for all job responses from worker 
nodes before stopping grid.");
+ 
+             for (GridTaskWorker<?, ?> task : tasks.values()) {
+                 if (!cancel) {
+                     try {
+                         task.getTaskFuture().get();
+                     }
+                     catch (ComputeTaskCancelledException e) {
+                         U.warn(log, e.getMessage());
+                     }
+                     catch (IgniteCheckedException e) {
+                         U.error(log, "Task failed: " + task, e);
+                     }
+                 }
+                 else {
+                     for (ClusterNode node : 
ctx.discovery().nodes(task.getSession().getTopology())) {
+                         if (ctx.localNodeId().equals(node.id()))
+                             
ctx.job().masterLeaveLocal(task.getSession().getId());
+                     }
+ 
+                     task.cancel();
+ 
+                     Throwable ex = new ComputeTaskCancelledException("Task 
cancelled due to stopping of the grid: " +
+                         task);
+ 
+                     task.finishTask(null, ex, false);
+                 }
+             }
+ 
+             U.join(tasks.values(), log);
+         }
+ 
+         // Remove discovery and message listeners.
+         ctx.event().removeLocalEventListener(discoLsnr);
+ 
+         ctx.io().removeMessageListener(TOPIC_JOB_SIBLINGS);
+         ctx.io().removeMessageListener(TOPIC_TASK_CANCEL);
+ 
+         // Set waiting flag to false to make sure that we do not get
+         // listener notifications any more.
+         if (!cancel) {
+             lock.writeLock();
+ 
+             try {
+                 waiting = false;
+             }
+             finally {
+                 lock.writeUnlock();
+             }
+         }
+ 
+         assert tasks.isEmpty();
+ 
+         if (log.isDebugEnabled())
+             log.debug("Finished executing task processor onKernalStop() 
callback.");
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void stop(boolean cancel) {
+         if (log.isDebugEnabled())
+             log.debug("Stopped task processor.");
+     }
+ 
+     /**
+      * Sets the thread-local context value.
+      *
+      * @param key Key.
+      * @param val Value.
+      */
+     public void setThreadContext(GridTaskThreadContextKey key, Object val) {
+         assert key != null;
+         assert val != null;
+ 
+         Map<GridTaskThreadContextKey, Object> map = thCtx.get();
+ 
+         // NOTE: access to 'map' is always single-threaded since it's held
+         // in a thread local.
+         if (map == null)
+             thCtx.set(map = new EnumMap<>(GridTaskThreadContextKey.class));
+ 
+         map.put(key, val);
+     }
+ 
+     /**
+      * Sets the thread-local context value, if it is not null.
+      *
+      * @param key Key.
+      * @param val Value.
+      */
+     public void setThreadContextIfNotNull(GridTaskThreadContextKey key, 
@Nullable Object val) {
+         if (val != null)
+             setThreadContext(key, val);
+     }
+ 
+     /**
+      * Gets thread-local context value for a given {@code key}.
+      *
+      * @param key Thread-local context key.
+      * @return Thread-local context value associated with given {@code key} - 
or {@code null}
+      *      if value with given {@code key} doesn't exist.
+      */
+     @Nullable public <T> T getThreadContext(GridTaskThreadContextKey key) {
+         assert(key != null);
+ 
+         Map<GridTaskThreadContextKey, Object> map = thCtx.get();
+ 
+         return map == null ? null : (T)map.get(key);
+     }
+ 
+     /**
+      * Gets currently used deployments.
+      *
+      * @return Currently used deployments.
+      */
+     public Collection<GridDeployment> getUsedDeployments() {
+         return F.viewReadOnly(tasks.values(), new C1<GridTaskWorker<?, ?>, 
GridDeployment>() {
+             @Override public GridDeployment apply(GridTaskWorker<?, ?> w) {
+                 return w.getDeployment();
+             }
+         });
+     }
+ 
+     /**
+      * Gets currently used deployments mapped by task name or aliases.
+      *
+      * @return Currently used deployments.
+      */
+     public Map<String, GridDeployment> getUsedDeploymentMap() {
+         Map<String, GridDeployment> deps = new HashMap<>();
+ 
+         for (GridTaskWorker w : tasks.values()) {
+             GridTaskSessionImpl ses = w.getSession();
+ 
+             deps.put(ses.getTaskClassName(), w.getDeployment());
+ 
+             if (ses.getTaskName() != null && 
ses.getTaskClassName().equals(ses.getTaskName()))
+                 deps.put(ses.getTaskName(), w.getDeployment());
+         }
+ 
+         return deps;
+     }
+ 
+     /**
+      * @param taskCls Task class.
+      * @param arg Optional execution argument.
+      * @return Task future.
+      * @param <T> Task argument type.
+      * @param <R> Task return value type.
+      */
+     public <T, R> ComputeTaskFuture<R> execute(Class<? extends ComputeTask<T, 
R>> taskCls, @Nullable T arg) {
+         assert taskCls != null;
+ 
+         lock.readLock();
+ 
+         try {
+             if (stopping)
+                 throw new IllegalStateException("Failed to execute task due 
to grid shutdown: " + taskCls);
+ 
+             return startTask(null, taskCls, null, 
IgniteUuid.fromUuid(ctx.localNodeId()), arg, false);
+         }
+         finally {
+             lock.readUnlock();
+         }
+     }
+ 
+     /**
+      * @param task Actual task.
+      * @param arg Optional task argument.
+      * @return Task future.
+      * @param <T> Task argument type.
+      * @param <R> Task return value type.
+      */
+     public <T, R> ComputeTaskFuture<R> execute(ComputeTask<T, R> task, 
@Nullable T arg) {
+         return execute(task, arg, false);
+     }
+ 
+     /**
+      * @param task Actual task.
+      * @param arg Optional task argument.
+      * @param sys If {@code true}, then system pool will be used.
+      * @return Task future.
+      * @param <T> Task argument type.
+      * @param <R> Task return value type.
+      */
+     public <T, R> ComputeTaskFuture<R> execute(ComputeTask<T, R> task, 
@Nullable T arg, boolean sys) {
+         lock.readLock();
+ 
+         try {
+             if (stopping)
+                 throw new IllegalStateException("Failed to execute task due 
to grid shutdown: " + task);
+ 
+             return startTask(null, null, task, 
IgniteUuid.fromUuid(ctx.localNodeId()), arg, sys);
+         }
+         finally {
+             lock.readUnlock();
+         }
+     }
+ 
+     /**
+      * Resolves task name by task name hash.
+      *
+      * @param taskNameHash Task name hash.
+      * @return Task name or {@code null} if not found.
+      */
+     public String resolveTaskName(int taskNameHash) {
+         if (taskNameHash == 0)
+             return null;
+ 
+         assert ctx.security().enabled();
+ 
+         return tasksMetaCache.peek(new GridTaskNameHashKey(taskNameHash));
+     }
+ 
+     /**
+      * @param taskName Task name.
+      * @param arg Optional execution argument.
+      * @return Task future.
+      * @param <T> Task argument type.
+      * @param <R> Task return value type.
+      */
+     public <T, R> ComputeTaskFuture<R> execute(String taskName, @Nullable T 
arg) {
+         assert taskName != null;
+ 
+         lock.readLock();
+ 
+         try {
+             if (stopping)
+                 throw new IllegalStateException("Failed to execute task due 
to grid shutdown: " + taskName);
+ 
+             return startTask(taskName, null, null, 
IgniteUuid.fromUuid(ctx.localNodeId()), arg, false);
+         }
+         finally {
+             lock.readUnlock();
+         }
+     }
+ 
+     /**
+      * @param taskName Task name.
+      * @param taskCls Task class.
+      * @param task Task.
+      * @param sesId Task session ID.
+      * @param arg Optional task argument.
+      * @param sys If {@code true}, then system pool will be used.
+      * @return Task future.
+      */
+     @SuppressWarnings("unchecked")
+     private <T, R> ComputeTaskFuture<R> startTask(
+         @Nullable String taskName,
+         @Nullable Class<?> taskCls,
+         @Nullable ComputeTask<T, R> task,
+         IgniteUuid sesId,
+         @Nullable T arg,
+         boolean sys) {
+         assert sesId != null;
+ 
+         String taskClsName;
+ 
+         if (task != null)
+             taskClsName = task.getClass().getName();
+         else
+             taskClsName = taskCls != null ? taskCls.getName() : taskName;
+ 
+         ctx.security().authorize(taskClsName, 
GridSecurityPermission.TASK_EXECUTE, null);
+ 
+         // Get values from thread-local context.
+         Map<GridTaskThreadContextKey, Object> map = thCtx.get();
+ 
+         if (map == null)
+             map = EMPTY_ENUM_MAP;
+         else
+             // Reset thread-local context.
+             thCtx.remove();
+ 
+         Long timeout = (Long)map.get(TC_TIMEOUT);
+ 
+         long timeout0 = timeout == null || timeout == 0 ? Long.MAX_VALUE : 
timeout;
+ 
+         long startTime = U.currentTimeMillis();
+ 
+         long endTime = timeout0 + startTime;
+ 
+         // Account for overflow.
+         if (endTime < 0)
+             endTime = Long.MAX_VALUE;
+ 
+         IgniteCheckedException deployEx = null;
+         GridDeployment dep = null;
+ 
+         // User provided task name.
+         if (taskName != null) {
+             assert taskCls == null;
+             assert task == null;
+ 
+             try {
+                 dep = ctx.deploy().getDeployment(taskName);
+ 
+                 if (dep == null)
+                     throw new IgniteDeploymentException("Unknown task name or 
failed to auto-deploy " +
+                         "task (was task (re|un)deployed?): " + taskName);
+ 
+                 taskCls = dep.deployedClass(taskName);
+ 
+                 if (taskCls == null)
+                     throw new IgniteDeploymentException("Unknown task name or 
failed to auto-deploy " +
+                         "task (was task (re|un)deployed?) [taskName=" + 
taskName + ", dep=" + dep + ']');
+ 
+                 if (!ComputeTask.class.isAssignableFrom(taskCls))
+                     throw new IgniteCheckedException("Failed to auto-deploy 
task (deployed class is not a task) [taskName=" +
+                         taskName + ", depCls=" + taskCls + ']');
+             }
+             catch (IgniteCheckedException e) {
+                 deployEx = e;
+             }
+         }
+         // Deploy user task class.
+         else if (taskCls != null) {
+             assert task == null;
+ 
+             try {
+                 // Implicit deploy.
+                 dep = ctx.deploy().deploy(taskCls, 
U.detectClassLoader(taskCls));
+ 
+                 if (dep == null)
+                     throw new IgniteDeploymentException("Failed to 
auto-deploy task (was task (re|un)deployed?): " +
+                         taskCls);
+ 
+                 taskName = taskName(dep, taskCls, map);
+             }
+             catch (IgniteCheckedException e) {
+                 taskName = taskCls.getName();
+ 
+                 deployEx = e;
+             }
+         }
+         // Deploy user task.
+         else if (task != null) {
+             try {
+                 ClassLoader ldr;
+ 
+                 Class<?> cls;
+ 
+                 if (task instanceof GridPeerDeployAware) {
+                     GridPeerDeployAware depAware = (GridPeerDeployAware)task;
+ 
+                     cls = depAware.deployClass();
+                     ldr = depAware.classLoader();
+ 
+                     // Set proper class name to make peer-loading possible.
+                     taskCls = cls;
+                 }
+                 else {
+                     taskCls = task.getClass();
+ 
+                     assert ComputeTask.class.isAssignableFrom(taskCls);
+ 
+                     cls = task.getClass();
+                     ldr = U.detectClassLoader(cls);
+                 }
+ 
+                 // Explicit deploy.
+                 dep = ctx.deploy().deploy(cls, ldr);
+ 
+                 if (dep == null)
+                     throw new IgniteDeploymentException("Failed to 
auto-deploy task (was task (re|un)deployed?): " + cls);
+ 
+                 taskName = taskName(dep, taskCls, map);
+             }
+             catch (IgniteCheckedException e) {
+                 taskName = task.getClass().getName();
+ 
+                 deployEx = e;
+             }
+         }
+ 
+         assert taskName != null;
+ 
+         if (log.isDebugEnabled())
+             log.debug("Task deployment: " + dep);
+ 
+         boolean fullSup = dep != null && taskCls!= null &&
+             dep.annotation(taskCls, ComputeTaskSessionFullSupport.class) != 
null;
+ 
+         Collection<? extends ClusterNode> nodes = (Collection<? extends 
ClusterNode>)map.get(TC_SUBGRID);
+ 
+         Collection<UUID> top = nodes != null ? F.nodeIds(nodes) : null;
+ 
+         UUID subjId = getThreadContext(TC_SUBJ_ID);
+ 
+         if (subjId == null)
+             subjId = ctx.localNodeId();
+ 
+         // Creates task session with task name and task version.
+         GridTaskSessionImpl ses = ctx.session().createTaskSession(
+             sesId,
+             ctx.config().getNodeId(),
+             taskName,
+             dep,
+             taskCls == null ? null : taskCls.getName(),
+             top,
+             startTime,
+             endTime,
+             Collections.<ComputeJobSibling>emptyList(),
+             Collections.emptyMap(),
+             fullSup,
+             subjId);
+ 
+         GridTaskFutureImpl<R> fut = new GridTaskFutureImpl<>(ses, ctx);
+ 
+         IgniteCheckedException securityEx = null;
+ 
+         if (ctx.security().enabled() && deployEx == null) {
+             try {
+                 saveTaskMetadata(taskName);
+             }
+             catch (IgniteCheckedException e) {
+                 securityEx = e;
+             }
+         }
+ 
+         if (deployEx == null && securityEx == null) {
+             if (dep == null || !dep.acquire())
+                 handleException(new IgniteDeploymentException("Task not 
deployed: " + ses.getTaskName()), fut);
+             else {
+                 GridTaskWorker<?, ?> taskWorker = new GridTaskWorker<>(
+                     ctx,
+                     arg,
+                     ses,
+                     fut,
+                     taskCls,
+                     task,
+                     dep,
+                     new TaskEventListener(),
+                     map,
+                     subjId);
+ 
+                 if (task != null) {
+                     // Check if someone reuses the same task instance by 
walking
+                     // through the "tasks" map
+                     for (GridTaskWorker worker : tasks.values()) {
+                         ComputeTask workerTask = worker.getTask();
+ 
+                         // Check that the same instance of task is being used 
by comparing references.
+                         if (workerTask != null && task == workerTask)
+                             U.warn(log, "Most likely the same task instance 
is being executed. " +
+                                 "Please avoid executing the same task 
instances in parallel because " +
+                                 "they may have concurrent resources access 
and conflict each other: " + task);
+                     }
+                 }
+ 
+                 GridTaskWorker<?, ?> taskWorker0 = tasks.putIfAbsent(sesId, 
taskWorker);
+ 
+                 assert taskWorker0 == null : "Session ID is not unique: " + 
sesId;
+ 
+                 if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != 
null) {
+                     try {
+                         // Start task execution in another thread.
+                         if (sys)
+                             
ctx.config().getSystemExecutorService().execute(taskWorker);
+                         else
+                             
ctx.config().getExecutorService().execute(taskWorker);
+                     }
+                     catch (RejectedExecutionException e) {
+                         tasks.remove(sesId);
+ 
+                         release(dep);
+ 
+                         handleException(new 
ComputeExecutionRejectedException("Failed to execute task " +
+                             "due to thread pool execution rejection: " + 
taskName, e), fut);
+                     }
+                 }
+                 else
+                     taskWorker.run();
+             }
+         }
+         else {
+             if (deployEx != null)
+                 handleException(deployEx, fut);
+             else
+                 handleException(securityEx, fut);
+         }
+ 
+         return fut;
+     }
+ 
+     /**
+      * @param sesId Task's session id.
+      * @return A {@link org.apache.ignite.compute.ComputeTaskFuture} instance 
or {@code null} if no such task found.
+      */
+     @Nullable public <R> ComputeTaskFuture<R> taskFuture(IgniteUuid sesId) {
+         GridTaskWorker<?, ?> taskWorker = tasks.get(sesId);
+ 
+         return taskWorker != null ? 
(ComputeTaskFuture<R>)taskWorker.getTaskFuture() : null;
+     }
+ 
+     /**
+      * @return Active task futures.
+      */
+     @SuppressWarnings("unchecked")
+     public <R> Map<IgniteUuid, ComputeTaskFuture<R>> taskFutures() {
+         Map<IgniteUuid, ComputeTaskFuture<R>> res = 
U.newHashMap(tasks.size());
+ 
+         for (GridTaskWorker taskWorker : tasks.values()) {
+             ComputeTaskFuture<R> fut = taskWorker.getTaskFuture();
+ 
+             res.put(fut.getTaskSession().getId(), fut);
+         }
+ 
+         return res;
+     }
+ 
+     /**
+      * Gets task name for a task class. It firstly checks
+      * {@link @GridComputeTaskName} annotation, then thread context
+      * map. If both are empty, class name is returned.
+      *
+      * @param dep Deployment.
+      * @param cls Class.
+      * @param map Thread context map.
+      * @return Task name.
+      * @throws IgniteCheckedException If {@link @GridComputeTaskName} 
annotation is found, but has empty value.
+      */
+     private String taskName(GridDeployment dep, Class<?> cls,
+         Map<GridTaskThreadContextKey, Object> map) throws 
IgniteCheckedException {
+         assert dep != null;
+         assert cls != null;
+         assert map != null;
+ 
+         String taskName;
+ 
+         ComputeTaskName ann = dep.annotation(cls, ComputeTaskName.class);
+ 
+         if (ann != null) {
+             taskName = ann.value();
+ 
+             if (F.isEmpty(taskName))
+                 throw new IgniteCheckedException("Task name specified by 
@GridComputeTaskName annotation" +
+                     " cannot be empty for class: " + cls);
+         }
+         else
+             taskName = map.containsKey(TC_TASK_NAME) ? 
(String)map.get(TC_TASK_NAME) : cls.getName();
+ 
+         return taskName;
+     }
+ 
+     /**
+      * Saves task name metadata to utility cache.
+      *
+      * @param taskName Task name.
+      */
+     private void saveTaskMetadata(String taskName) throws 
IgniteCheckedException {
+         if (ctx.isDaemon())
+             return;
+ 
+         assert ctx.security().enabled();
+ 
+         int nameHash = taskName.hashCode();
+ 
+         // 0 is reserved for no task.
+         if (nameHash == 0)
+             nameHash = 1;
+ 
+         GridTaskNameHashKey key = new GridTaskNameHashKey(nameHash);
+ 
+         String existingName = tasksMetaCache.get(key);
+ 
+         if (existingName == null)
+             existingName = tasksMetaCache.putIfAbsent(key, taskName);
+ 
+         if (existingName != null && !F.eq(existingName, taskName))
+             throw new IgniteCheckedException("Task name hash collision for 
security-enabled node [taskName=" + taskName +
+                 ", existing taskName=" + existingName + ']');
+     }
+ 
+     /**
+      * @param dep Deployment to release.
+      */
+     private void release(GridDeployment dep) {
+         assert dep != null;
+ 
+         dep.release();
+ 
+         if (dep.obsolete())
+             ctx.resource().onUndeployed(dep);
+     }
+ 
+     /**
+      * @param ex Exception.
+      * @param fut Task future.
+      * @param <R> Result type.
+      */
+     private <R> void handleException(Throwable ex, GridTaskFutureImpl<R> fut) 
{
+         assert ex != null;
+         assert fut != null;
+ 
+         fut.onDone(ex);
+     }
+ 
+     /**
+      * @param ses Task session.
+      * @param attrs Attributes.
+      * @throws IgniteCheckedException Thrown in case of any errors.
+      */
+     public void setAttributes(GridTaskSessionImpl ses, Map<?, ?> attrs) 
throws IgniteCheckedException {
+         long timeout = ses.getEndTime() - U.currentTimeMillis();
+ 
+         if (timeout <= 0) {
+             U.warn(log, "Task execution timed out (remote session attributes 
won't be set): " + ses);
+ 
+             return;
+         }
+ 
+         // If setting from task or future.
+         if (log.isDebugEnabled())
+             log.debug("Setting session attribute(s) from task or future: " + 
ses);
+ 
+         sendSessionAttributes(attrs, ses);
+     }
+ 
+     /**
+      * This method will make the best attempt to send attributes to all jobs.
+      *
+      * @param attrs Deserialized session attributes.
+      * @param ses Task session.
+      * @throws IgniteCheckedException If send to any of the jobs failed.
+      */
+     @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", 
"BusyWait"})
+     private void sendSessionAttributes(Map<?, ?> attrs, GridTaskSessionImpl 
ses)
+         throws IgniteCheckedException {
+         assert attrs != null;
+         assert ses != null;
+ 
+         Collection<ComputeJobSibling> siblings = ses.getJobSiblings();
+ 
+         GridIoManager commMgr = ctx.io();
+ 
+         long timeout = ses.getEndTime() - U.currentTimeMillis();
+ 
+         if (timeout <= 0) {
+             U.warn(log, "Session attributes won't be set due to task timeout: 
" + attrs);
+ 
+             return;
+         }
+ 
 -        Map<UUID, Long> msgIds = new HashMap<>(siblings.size(), 1.0f);
++        Set<UUID> rcvrs = new HashSet<>();
+ 
+         UUID locNodeId = ctx.localNodeId();
+ 
+         synchronized (ses) {
+             if (ses.isClosed()) {
+                 if (log.isDebugEnabled())
+                     log.debug("Setting session attributes on closed session 
(will ignore): " + ses);
+ 
+                 return;
+             }
+ 
+             ses.setInternal(attrs);
+ 
+             // Do this inside of synchronization block, so every message
+             // ID will be associated with a certain session state.
+             for (ComputeJobSibling s : siblings) {
+                 GridJobSiblingImpl sib = (GridJobSiblingImpl)s;
+ 
+                 UUID nodeId = sib.nodeId();
+ 
 -                if (!nodeId.equals(locNodeId) && !sib.isJobDone() && 
!msgIds.containsKey(nodeId))
 -                    msgIds.put(nodeId, commMgr.nextMessageId(sib.jobTopic(), 
nodeId));
++                if (!nodeId.equals(locNodeId) && !sib.isJobDone() && 
!rcvrs.contains(nodeId))
++                    rcvrs.add(nodeId);
+             }
+         }
+ 
+         if (ctx.event().isRecordable(EVT_TASK_SESSION_ATTR_SET)) {
+             IgniteEvent evt = new IgniteTaskEvent(
+                 ctx.discovery().localNode(),
+                 "Changed attributes: " + attrs,
+                 EVT_TASK_SESSION_ATTR_SET,
+                 ses.getId(),
+                 ses.getTaskName(),
+                 ses.getTaskClassName(),
+                 false,
+                 null);
+ 
+             ctx.event().record(evt);
+         }
+ 
+         IgniteCheckedException ex = null;
+ 
+         // Every job gets an individual message to keep track of ghost 
requests.
+         for (ComputeJobSibling s : ses.getJobSiblings()) {
+             GridJobSiblingImpl sib = (GridJobSiblingImpl)s;
+ 
+             UUID nodeId = sib.nodeId();
+ 
 -            Long msgId = msgIds.remove(nodeId);
 -
+             // Pair can be null if job is finished.
 -            if (msgId != null) {
 -                assert msgId > 0;
 -
++            if (rcvrs.remove(nodeId)) {
+                 ClusterNode node = ctx.discovery().node(nodeId);
+ 
+                 // Check that node didn't change (it could happen in case of 
failover).
+                 if (node != null) {
+                     boolean loc = node.id().equals(ctx.localNodeId()) && 
!ctx.config().isMarshalLocalJobs();
+ 
+                     GridTaskSessionRequest req = new GridTaskSessionRequest(
+                         ses.getId(),
+                         null,
+                         loc ? null : marsh.marshal(attrs),
+                         attrs);
+ 
+                     // Make sure to go through IO manager always, since order
+                     // should be preserved here.
+                     try {
+                         commMgr.sendOrderedMessage(
+                             node,
+                             sib.jobTopic(),
 -                            msgId,
+                             req,
+                             SYSTEM_POOL,
+                             timeout,
+                             false);
+                     }
+                     catch (IgniteCheckedException e) {
+                         node = ctx.discovery().node(nodeId);
+ 
+                         if (node != null) {
+                             try {
+                                 // Since communication on remote node may 
stop before
+                                 // we get discovery notification, we give 
ourselves the
+                                 // best effort to detect it.
+                                 Thread.sleep(DISCO_TIMEOUT);
+                             }
+                             catch (InterruptedException ignore) {
+                                 U.warn(log, "Got interrupted while sending 
session attributes.");
+                             }
+ 
+                             node = ctx.discovery().node(nodeId);
+                         }
+ 
+                         String err = "Failed to send session attribute 
request message to node " +
+                             "(normal case if node left grid) [node=" + node + 
", req=" + req + ']';
+ 
+                         if (node != null)
+                             U.warn(log, err);
+                         else if (log.isDebugEnabled())
+                             log.debug(err);
+ 
+                         if (ex == null)
+                             ex = e;
+                     }
+                 }
+             }
+         }
+ 
+         if (ex != null)
+             throw ex;
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param msg Execute response message.
+      */
+     public void processJobExecuteResponse(UUID nodeId, GridJobExecuteResponse 
msg) {
+         assert nodeId != null;
+         assert msg != null;
+ 
+         lock.readLock();
+ 
+         try {
+             if (stopping && !waiting) {
+                 U.warn(log, "Received job execution response while stopping 
grid (will ignore): " + msg);
+ 
+                 return;
+             }
+ 
+             GridTaskWorker<?, ?> task = tasks.get(msg.getSessionId());
+ 
+             if (task == null) {
+                 if (log.isDebugEnabled())
+                     log.debug("Received job execution response for unknown 
task (was task already reduced?): " + msg);
+ 
+                 return;
+             }
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Received grid job response message [msg=" + msg + 
", nodeId=" + nodeId + ']');
+ 
+             task.onResponse(msg);
+         }
+         finally {
+             lock.readUnlock();
+         }
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      * @param msg Task session request.
+      */
+     @SuppressWarnings({"unchecked"})
+     private void processTaskSessionRequest(UUID nodeId, 
GridTaskSessionRequest msg) {
+         assert nodeId != null;
+         assert msg != null;
+ 
+         lock.readLock();
+ 
+         try {
+             if (stopping && !waiting) {
+                 U.warn(log, "Received task session request while stopping 
grid (will ignore): " + msg);
+ 
+                 return;
+             }
+ 
+             GridTaskWorker<?, ?> task = tasks.get(msg.getSessionId());
+ 
+             if (task == null) {
+                 if (log.isDebugEnabled())
+                     log.debug("Received task session request for unknown task 
(was task already reduced?): " + msg);
+ 
+                 return;
+             }
+ 
+             boolean loc = ctx.localNodeId().equals(nodeId) && 
!ctx.config().isMarshalLocalJobs();
+ 
+             Map<?, ?> attrs = loc ? msg.getAttributes() :
+                 marsh.<Map<?, ?>>unmarshal(msg.getAttributesBytes(), 
task.getTask().getClass().getClassLoader());
+ 
+             GridTaskSessionImpl ses = task.getSession();
+ 
+             sendSessionAttributes(attrs, ses);
+         }
+         catch (IgniteCheckedException e) {
+             U.error(log, "Failed to deserialize session request: " + msg, e);
+         }
+         finally {
+             lock.readUnlock();
+         }
+     }
+ 
+     /**
+      * Handles user cancellation.
+      *
+      * @param sesId Session ID.
+      */
+     public void onCancelled(IgniteUuid sesId) {
+         assert sesId != null;
+ 
+         lock.readLock();
+ 
+         try {
+             if (stopping && !waiting) {
+                 U.warn(log, "Attempt to cancel task while stopping grid (will 
ignore): " + sesId);
+ 
+                 return;
+             }
+ 
+             GridTaskWorker<?, ?> task = tasks.get(sesId);
+ 
+             if (task == null) {
+                 if (log.isDebugEnabled())
+                     log.debug("Attempt to cancel unknown task (was task 
already reduced?): " + sesId);
+ 
+                 return;
+             }
+ 
+             task.finishTask(null, new ComputeTaskCancelledException("Task was 
cancelled."), true);
+         }
+         finally {
+             lock.readUnlock();
+         }
+     }
+ 
+     /**
+      * @return Number of executed tasks.
+      */
+     public int getTotalExecutedTasks() {
+         return execTasks.intValue();
+     }
+ 
+     /**
+      * Resets processor metrics.
+      */
+     public void resetMetrics() {
+         // Can't use 'reset' method because it is not thread-safe
+         // according to javadoc.
+         execTasks.add(-execTasks.sum());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void printMemoryStats() {
+         X.println(">>>");
+         X.println(">>> Task processor memory stats [grid=" + ctx.gridName() + 
']');
+         X.println(">>>  tasksSize: " + tasks.size());
+     }
+ 
+     /**
+      * Listener for individual task events.
+      */
+     @SuppressWarnings({"deprecation"})
+     private class TaskEventListener implements GridTaskEventListener {
+         /** */
+         private final GridMessageListener msgLsnr = new 
JobMessageListener(false);
+ 
+         /** {@inheritDoc} */
+         @Override public void onTaskStarted(GridTaskWorker<?, ?> worker) {
+             // Register for timeout notifications.
+             if (worker.endTime() < Long.MAX_VALUE)
+                 ctx.timeout().addTimeoutObject(worker);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onJobSend(GridTaskWorker<?, ?> worker, 
GridJobSiblingImpl sib) {
+             if (worker.getSession().isFullSupport())
+                 // Listener is stateless, so same listener can be reused for 
all jobs.
+                 ctx.io().addMessageListener(sib.taskTopic(), msgLsnr);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onJobFailover(GridTaskWorker<?, ?> worker, 
GridJobSiblingImpl sib, UUID nodeId) {
+             GridIoManager ioMgr = ctx.io();
+ 
+             // Remove message ID registration and old listener.
+             if (worker.getSession().isFullSupport()) {
 -                ioMgr.removeMessageId(sib.jobTopic());
+                 ioMgr.removeMessageListener(sib.taskTopic(), msgLsnr);
+ 
+                 synchronized (worker.getSession()) {
+                     // Reset ID on sibling prior to sending request.
+                     sib.nodeId(nodeId);
+                 }
+ 
+                 // Register new listener on new topic.
+                 ioMgr.addMessageListener(sib.taskTopic(), msgLsnr);
+             }
+             else {
+                 // Update node ID only in case attributes are not enabled.
+                 synchronized (worker.getSession()) {
+                     // Reset ID on sibling prior to sending request.
+                     sib.nodeId(nodeId);
+                 }
+             }
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onJobFinished(GridTaskWorker<?, ?> worker, 
GridJobSiblingImpl sib) {
+             // Mark sibling finished for the purpose of setting session 
attributes.
+             synchronized (worker.getSession()) {
+                 sib.onJobDone();
+             }
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onTaskFinished(GridTaskWorker<?, ?> worker) {
+             GridTaskSessionImpl ses = worker.getSession();
+ 
+             if (ses.isFullSupport()) {
+                 synchronized (worker.getSession()) {
+                     worker.getSession().onClosed();
+                 }
+ 
+                 ctx.checkpoint().onSessionEnd(ses, false);
+ 
+                 // Delete session altogether.
+                 ctx.session().removeSession(ses.getId());
+             }
+ 
+             boolean rmv = tasks.remove(worker.getTaskSessionId(), worker);
+ 
+             assert rmv;
+ 
+             // Unregister from timeout notifications.
+             if (worker.endTime() < Long.MAX_VALUE)
+                 ctx.timeout().removeTimeoutObject(worker);
+ 
+             release(worker.getDeployment());
+ 
+             if (!worker.isInternal())
+                 execTasks.increment();
+ 
+             // Unregister job message listener from all job topics.
+             if (ses.isFullSupport()) {
+                 try {
+                     for (ComputeJobSibling sibling : 
worker.getSession().getJobSiblings()) {
+                         GridJobSiblingImpl s = (GridJobSiblingImpl)sibling;
+ 
 -                        ctx.io().removeMessageId(s.jobTopic());
+                         ctx.io().removeMessageListener(s.taskTopic(), 
msgLsnr);
+                     }
+                 }
+                 catch (IgniteCheckedException e) {
+                     U.error(log, "Failed to unregister job communication 
message listeners and counters.", e);
+                 }
+             }
+         }
+     }
+ 
+     /**
+      * Handles job execution responses and session requests.
+      */
+     private class JobMessageListener implements GridMessageListener {
+         /** */
+         private final boolean jobResOnly;
+ 
+         /**
+          * @param jobResOnly {@code True} if this listener is allowed to 
process
+          *      job responses only (for tasks with disabled sessions).
+          */
+         private JobMessageListener(boolean jobResOnly) {
+             this.jobResOnly = jobResOnly;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void onMessage(UUID nodeId, Object msg) {
+             if (!(msg instanceof GridTaskMessage)) {
+                 U.warn(log, "Received message of unknown type: " + msg);
+ 
+                 return;
+             }
+ 
+             if (msg instanceof GridJobExecuteResponse)
+                 processJobExecuteResponse(nodeId, 
(GridJobExecuteResponse)msg);
+             else if (jobResOnly)
+                 U.warn(log, "Received message of type other than job 
response: " + msg);
+             else if (msg instanceof GridTaskSessionRequest)
+                 processTaskSessionRequest(nodeId, 
(GridTaskSessionRequest)msg);
+             else
+                 U.warn(log, "Received message of unknown type: " + msg);
+         }
+     }
+ 
+     /**
+      * Listener to node discovery events.
+      */
+     private class TaskDiscoveryListener implements GridLocalEventListener {
+         /** {@inheritDoc} */
+         @Override public void onEvent(IgniteEvent evt) {
+             assert evt.type() == EVT_NODE_FAILED || evt.type() == 
EVT_NODE_LEFT;
+ 
+             UUID nodeId = ((IgniteDiscoveryEvent)evt).eventNode().id();
+ 
+             lock.readLock();
+ 
+             try {
+                 for (GridTaskWorker<?, ?> task : tasks.values())
+                     task.onNodeLeft(nodeId);
+             }
+             finally {
+                 lock.readUnlock();
+             }
+         }
+     }
+ 
+     /**
+      *
+      */
+     private class JobSiblingsMessageListener implements GridMessageListener {
+         /** {@inheritDoc} */
+         @Override public void onMessage(UUID nodeId, Object msg) {
+             if (!(msg instanceof GridJobSiblingsRequest)) {
+                 U.warn(log, "Received unexpected message instead of siblings 
request: " + msg);
+ 
+                 return;
+             }
+ 
+             lock.readLock();
+ 
+             try {
+                 if (stopping && !waiting) {
+                     U.warn(log, "Received job siblings request while stopping 
grid (will ignore): " + msg);
+ 
+                     return;
+                 }
+ 
+                 GridJobSiblingsRequest req = (GridJobSiblingsRequest)msg;
+ 
+                 GridTaskWorker<?, ?> worker = tasks.get(req.sessionId());
+ 
+                 Collection<ComputeJobSibling> siblings;
+ 
+                 if (worker != null) {
+                     try {
+                         siblings = worker.getSession().getJobSiblings();
+                     }
+                     catch (IgniteCheckedException e) {
+                         U.error(log, "Failed to get job siblings [request=" + 
msg +
+                             ", ses=" + worker.getSession() + ']', e);
+ 
+                         siblings = null;
+                     }
+                 }
+                 else {
+                     if (log.isDebugEnabled())
+                         log.debug("Received job siblings request for unknown 
or finished task (will ignore): " + msg);
+ 
+                     siblings = null;
+                 }
+ 
+                 try {
+                     Object topic = req.topic();
+ 
+                     if (topic == null) {
+                         assert req.topicBytes() != null;
+ 
+                         topic = marsh.unmarshal(req.topicBytes(), null);
+                     }
+ 
+                     boolean loc = ctx.localNodeId().equals(nodeId);
+ 
+                     ctx.io().send(nodeId, topic,
+                         new GridJobSiblingsResponse(
+                             loc ? siblings : null,
+                             loc ? null : marsh.marshal(siblings)),
+                         SYSTEM_POOL);
+                 }
+                 catch (IgniteCheckedException e) {
+                     U.error(log, "Failed to send job sibling response.", e);
+                 }
+             }
+             finally {
+                 lock.readUnlock();
+             }
+         }
+     }
+ 
+     /**
+      * Listener for task cancel requests.
+      */
+     private class TaskCancelMessageListener implements GridMessageListener {
+         /** {@inheritDoc} */
+         @Override public void onMessage(UUID nodeId, Object msg) {
+             assert msg != null;
+ 
+             if (!(msg instanceof GridTaskCancelRequest)) {
+                 U.warn(log, "Received unexpected message instead of task 
cancel request: " + msg);
+ 
+                 return;
+             }
+ 
+             GridTaskCancelRequest req = (GridTaskCancelRequest)msg;
+ 
+             lock.readLock();
+ 
+             try {
+                 if (stopping && !waiting) {
+                     U.warn(log, "Received task cancel request while stopping 
grid (will ignore): " + msg);
+ 
+                     return;
+                 }
+ 
+                 GridTaskWorker<?, ?> gridTaskWorker = 
tasks.get(req.sessionId());
+ 
+                 if (gridTaskWorker != null) {
+                     try {
+                         gridTaskWorker.getTaskFuture().cancel();
+                     }
+                     catch (IgniteCheckedException e) {
+                         log.warning("Failed to cancel task: " + 
gridTaskWorker.getTask(), e);
+                     }
+                 }
+             }
+             finally {
+                 lock.readUnlock();
+             }
+         }
+     }
+ }

Reply via email to