http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
index 0000000,65a3250..a921346
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java
@@@ -1,0 -1,1371 +1,1373 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.task;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.compute.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.fs.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.marshaller.*;
+ import org.apache.ignite.resources.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.processors.job.*;
+ import org.apache.ignite.internal.processors.timeout.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.internal.util.worker.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static org.apache.ignite.compute.ComputeJobResultPolicy.*;
+ import static org.apache.ignite.events.IgniteEventType.*;
+ import static org.apache.ignite.internal.GridTopic.*;
+ import static 
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+ import static 
org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*;
+ 
+ /**
+  * Grid task worker. Handles full task life cycle.
+  * @param <T> Task argument type.
+  * @param <R> Task return value type.
+  */
+ class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject {
+     /** Split size threshold. */
+     private static final int SPLIT_WARN_THRESHOLD = 1000;
+ 
+     /** {@code True} for internal tasks. */
+     private boolean internal;
+ 
+     /** */
+     private enum State {
+         /** */
+         WAITING,
+ 
+         /** */
+         REDUCING,
+ 
+         /** */
+         REDUCED,
+ 
+         /** */
+         FINISHING
+     }
+ 
+     /** Static logger to avoid re-creation. */
+     private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
+ 
+     /** */
+     private final GridKernalContext ctx;
+ 
+     /** */
+     private final IgniteLogger log;
+ 
+     /** */
+     private final IgniteMarshaller marsh;
+ 
+     /** */
+     private final GridTaskSessionImpl ses;
+ 
+     /** */
+     private final GridTaskFutureImpl<R> fut;
+ 
+     /** */
+     private final T arg;
+ 
+     /** */
+     private final GridTaskEventListener evtLsnr;
+ 
+     /** */
+     private Map<IgniteUuid, GridJobResultImpl> jobRes;
+ 
+     /** */
+     private State state = State.WAITING;
+ 
+     /** */
+     private final GridDeployment dep;
+ 
+     /** Task class. */
+     private final Class<?> taskCls;
+ 
+     /** Optional subgrid. */
+     private final Map<GridTaskThreadContextKey, Object> thCtx;
+ 
+     /** */
+     private ComputeTask<T, R> task;
+ 
+     /** */
+     private final Queue<GridJobExecuteResponse> delayedRess = new 
ConcurrentLinkedDeque8<>();
+ 
+     /** */
+     private boolean continuous;
+ 
+     /** */
+     private final Object mux = new Object();
+ 
+     /** */
+     private boolean lockRespProc = true;
+ 
+     /** */
+     private final boolean resCache;
+ 
+     /** */
+     private final boolean noFailover;
+ 
+     /** */
+     private final UUID subjId;
+ 
+     /** Continuous mapper. */
+     private final ComputeTaskContinuousMapper mapper = new 
ComputeTaskContinuousMapper() {
+         /** {@inheritDoc} */
+         @Override public void send(ComputeJob job, ClusterNode node) throws 
IgniteCheckedException {
+             A.notNull(job, "job");
+             A.notNull(node, "node");
+ 
+             processMappedJobs(Collections.singletonMap(job, node));
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void send(Map<? extends ComputeJob, ClusterNode> 
mappedJobs) throws IgniteCheckedException {
+             A.notNull(mappedJobs, "mappedJobs");
+ 
+             processMappedJobs(mappedJobs);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void send(ComputeJob job) throws 
IgniteCheckedException {
+             A.notNull(job, "job");
+ 
+             send(Collections.singleton(job));
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void send(Collection<? extends ComputeJob> jobs) 
throws IgniteCheckedException {
+             A.notNull(jobs, "jobs");
+ 
+             if (jobs.isEmpty())
+                 throw new IgniteCheckedException("Empty jobs collection 
passed to send(...) method.");
+ 
+             ComputeLoadBalancer balancer = 
ctx.loadBalancing().getLoadBalancer(ses, getTaskTopology());
+ 
+             for (ComputeJob job : jobs) {
+                 if (job == null)
+                     throw new IgniteCheckedException("Null job passed to 
send(...) method.");
+ 
+                 processMappedJobs(Collections.singletonMap(job, 
balancer.getBalancedNode(job, null)));
+             }
+         }
+     };
+ 
+     /**
+      * @param ctx Kernal context.
+      * @param arg Task argument.
+      * @param ses Grid task session.
+      * @param fut Task future.
+      * @param taskCls Task class.
+      * @param task Task instance that might be null.
+      * @param dep Deployed task.
+      * @param evtLsnr Event listener.
+      * @param thCtx Thread-local context from task processor.
+      * @param subjId Subject ID.
+      */
+     GridTaskWorker(
+         GridKernalContext ctx,
+         @Nullable T arg,
+         GridTaskSessionImpl ses,
+         GridTaskFutureImpl<R> fut,
+         @Nullable Class<?> taskCls,
+         @Nullable ComputeTask<T, R> task,
+         GridDeployment dep,
+         GridTaskEventListener evtLsnr,
+         @Nullable Map<GridTaskThreadContextKey, Object> thCtx,
+         UUID subjId) {
+         super(ctx.config().getGridName(), "grid-task-worker", 
ctx.config().getGridLogger());
+ 
+         assert ses != null;
+         assert fut != null;
+         assert evtLsnr != null;
+         assert dep != null;
+ 
+         this.arg = arg;
+         this.ctx = ctx;
+         this.fut = fut;
+         this.ses = ses;
+         this.taskCls = taskCls;
+         this.task = task;
+         this.dep = dep;
+         this.evtLsnr = evtLsnr;
+         this.thCtx = thCtx;
+         this.subjId = subjId;
+ 
+         log = U.logger(ctx, logRef, this);
+ 
+         marsh = ctx.config().getMarshaller();
+ 
+         resCache = dep.annotation(taskCls, ComputeTaskNoResultCache.class) == 
null;
+ 
+         Boolean noFailover = getThreadContext(TC_NO_FAILOVER);
+ 
+         this.noFailover = noFailover != null ? noFailover : false;
+     }
+ 
+     /**
+      * Gets value from thread-local context.
+      *
+      * @param key Thread-local context key.
+      * @return Thread-local context value, if any.
+      */
+     @SuppressWarnings({"unchecked"})
+     @Nullable private <V> V getThreadContext(GridTaskThreadContextKey key) {
+         return thCtx == null ? null : (V)thCtx.get(key);
+     }
+ 
+     /**
+      * @return Task session ID.
+      */
+     IgniteUuid getTaskSessionId() {
+         return ses.getId();
+     }
+ 
+     /**
+      * @return Task session.
+      */
+     GridTaskSessionImpl getSession() {
+         return ses;
+     }
+ 
+     /**
+      * @return Task future.
+      */
+     GridTaskFutureImpl<R> getTaskFuture() {
+         return fut;
+     }
+ 
+     /**
+      * Gets property dep.
+      *
+      * @return Property dep.
+      */
+     GridDeployment getDeployment() {
+         return dep;
+     }
+ 
+     /**
+      * @return Grid task.
+      */
+     public ComputeTask<T, R> getTask() {
+         return task;
+     }
+ 
+     /**
+      * @param task Deployed task.
+      */
+     public void setTask(ComputeTask<T, R> task) {
+         this.task = task;
+     }
+ 
+     /**
+      * @return {@code True} if task is internal.
+      */
+     public boolean isInternal() {
+         return internal;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public IgniteUuid timeoutId() {
+         return ses.getId();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public void onTimeout() {
+         synchronized (mux) {
+             if (state != State.WAITING)
+                 return;
+         }
+ 
+         U.warn(log, "Task has timed out: " + ses);
+ 
+         recordTaskEvent(EVT_TASK_TIMEDOUT, "Task has timed out.");
+ 
+         Throwable e = new ComputeTaskTimeoutException("Task timed out (check 
logs for error messages): " + ses);
+ 
+         finishTask(null, e);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public long endTime() {
+         return ses.getEndTime();
+     }
+ 
+     /**
+      * @param taskCls Task class.
+      * @return Task instance.
+      * @throws IgniteCheckedException Thrown in case of any instantiation 
error.
+      */
+     private ComputeTask<T, R> newTask(Class<? extends ComputeTask<T, R>> 
taskCls) throws IgniteCheckedException {
+         ComputeTask<T, R> task = dep.newInstance(taskCls);
+ 
+         if (task == null)
+             throw new IgniteCheckedException("Failed to instantiate task (is 
default constructor available?): " + taskCls);
+ 
+         return task;
+     }
+ 
+     /**
+      *
+      */
+     private void initializeSpis() {
+         ComputeTaskSpis spis = dep.annotation(taskCls, ComputeTaskSpis.class);
+ 
+         if (spis != null) {
+             ses.setLoadBalancingSpi(spis.loadBalancingSpi());
+             ses.setFailoverSpi(spis.failoverSpi());
+             ses.setCheckpointSpi(spis.checkpointSpi());
+         }
+     }
+ 
+     /**
+      * Maps this task's jobs to nodes and sends them out.
+      */
+     @SuppressWarnings({"unchecked"})
+     @Override protected void body() {
+         evtLsnr.onTaskStarted(this);
+ 
+         try {
+             // Use either user task or deployed one.
+             if (task == null) {
+                 assert taskCls != null;
+                 assert ComputeTask.class.isAssignableFrom(taskCls);
+ 
+                 try {
+                     task = newTask((Class<? extends ComputeTask<T, 
R>>)taskCls);
+                 }
+                 catch (IgniteCheckedException e) {
+                     // If cannot instantiate task, then assign internal flag 
based
+                     // on information available.
+                     internal = dep.internalTask(null, taskCls);
+ 
+                     recordTaskEvent(EVT_TASK_STARTED, "Task started.");
+ 
+                     throw e;
+                 }
+             }
+ 
+             internal = dep.internalTask(task, taskCls);
+ 
+             recordTaskEvent(EVT_TASK_STARTED, "Task started.");
+ 
+             initializeSpis();
+ 
+             ses.setClassLoader(dep.classLoader());
+ 
+             final List<ClusterNode> shuffledNodes = getTaskTopology();
+ 
+             // Load balancer.
+             ComputeLoadBalancer balancer = 
ctx.loadBalancing().getLoadBalancer(ses, shuffledNodes);
+ 
+             continuous = ctx.resource().isAnnotationPresent(dep, task, 
IgniteTaskContinuousMapperResource.class);
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Injected task resources [continuous=" + continuous 
+ ']');
+ 
+             // Inject resources.
+             ctx.resource().inject(dep, task, ses, balancer, mapper);
+ 
+             Map<? extends ComputeJob, ClusterNode> mappedJobs = 
U.wrapThreadLoader(dep.classLoader(),
+                 new Callable<Map<? extends ComputeJob, ClusterNode>>() {
+                     @Override public Map<? extends ComputeJob, ClusterNode> 
call() throws IgniteCheckedException {
+                         return task.map(shuffledNodes, arg);
+                     }
+                 });
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Mapped task jobs to nodes [jobCnt=" + (mappedJobs 
!= null ? mappedJobs.size() : 0) +
+                     ", mappedJobs=" + mappedJobs + ", ses=" + ses + ']');
+ 
+             if (F.isEmpty(mappedJobs)) {
+                 synchronized (mux) {
+                     // Check if some jobs are sent from continuous mapper.
+                     if (F.isEmpty(jobRes))
+                         throw new IgniteCheckedException("Task map operation 
produced no mapped jobs: " + ses);
+                 }
+             }
+             else
+                 processMappedJobs(mappedJobs);
+ 
+             synchronized (mux) {
+                 lockRespProc = false;
+             }
+ 
+             processDelayedResponses();
+         }
+         catch (ClusterGroupEmptyException e) {
+             U.warn(log, "Failed to map task jobs to nodes (topology 
projection is empty): " + ses);
+ 
+             finishTask(null, e);
+         }
+         catch (IgniteCheckedException e) {
+             if (!fut.isCancelled()) {
+                 U.error(log, "Failed to map task jobs to nodes: " + ses, e);
+ 
+                 finishTask(null, e);
+             }
+             else if (log.isDebugEnabled())
+                 log.debug("Failed to map task jobs to nodes due to task 
cancellation: " + ses);
+         }
+         // Catch throwable to protect against bad user code.
+         catch (Throwable e) {
+             String errMsg = "Failed to map task jobs to nodes due to 
undeclared user exception" +
+                 " [cause=" + e.getMessage() + ", ses=" + ses + "]";
+ 
+             U.error(log, errMsg, e);
+ 
+             finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
+         }
+     }
+ 
+     /**
+      * @param jobs Map of jobs.
+      * @throws IgniteCheckedException Thrown in case of any error.
+      */
+     private void processMappedJobs(Map<? extends ComputeJob, ClusterNode> 
jobs) throws IgniteCheckedException {
+         if (F.isEmpty(jobs))
+             return;
+ 
+         Collection<GridJobResultImpl> jobResList = new 
ArrayList<>(jobs.size());
+ 
+         Collection<ComputeJobSibling> sibs = new ArrayList<>(jobs.size());
+ 
+         // Map jobs to nodes for computation.
+         for (Map.Entry<? extends ComputeJob, ClusterNode> mappedJob : 
jobs.entrySet()) {
+             ComputeJob job = mappedJob.getKey();
+             ClusterNode node = mappedJob.getValue();
+ 
+             if (job == null)
+                 throw new IgniteCheckedException("Job can not be null 
[mappedJob=" + mappedJob + ", ses=" + ses + ']');
+ 
+             if (node == null)
+                 throw new IgniteCheckedException("Node can not be null 
[mappedJob=" + mappedJob + ", ses=" + ses + ']');
+ 
+             IgniteUuid jobId = IgniteUuid.fromUuid(node.id());
+ 
+             GridJobSiblingImpl sib = new GridJobSiblingImpl(ses.getId(), 
jobId, node.id(), ctx);
+ 
+             jobResList.add(new GridJobResultImpl(job, jobId, node, sib));
+ 
+             // Do not add siblings if result cache is disabled.
+             if (resCache)
+                 sibs.add(sib);
+ 
+             recordJobEvent(EVT_JOB_MAPPED, jobId, node, "Job got mapped.");
+         }
+ 
+         synchronized (mux) {
+             if (state != State.WAITING)
+                 throw new IgniteCheckedException("Task is not in waiting 
state [state=" + state + ", ses=" + ses + ']');
+ 
+             // Do not add siblings if result cache is disabled.
+             if (resCache)
+                 ses.addJobSiblings(sibs);
+ 
+             if (jobRes == null)
+                 jobRes = new HashMap<>();
+ 
+             // Populate all remote mappedJobs into map, before mappedJobs are 
sent.
+             // This is done to avoid race condition when we start
+             // getting results while still sending out references.
+             for (GridJobResultImpl res : jobResList) {
+                 if (jobRes.put(res.getJobContext().getJobId(), res) != null)
+                     throw new IgniteCheckedException("Duplicate job ID for 
remote job found: " + res.getJobContext().getJobId());
+ 
+                 res.setOccupied(true);
+ 
+                 if (resCache && jobRes.size() > ctx.discovery().size() && 
jobRes.size() % SPLIT_WARN_THRESHOLD == 0)
+                     LT.warn(log, null, "Number of jobs in task is too large 
for task: " + ses.getTaskName() +
+                         ". Consider reducing number of jobs or disabling job 
result cache with " +
+                         "@GridComputeTaskNoResultCache annotation.");
+             }
+         }
+ 
+         // Set mapped flag.
+         ses.onMapped();
+ 
+         // Send out all remote mappedJobs.
+         for (GridJobResultImpl res : jobResList) {
+             evtLsnr.onJobSend(this, res.getSibling());
+ 
+             try {
+                 sendRequest(res);
+             }
+             finally {
+                 // Open job for processing results.
+                 synchronized (mux) {
+                     res.setOccupied(false);
+                 }
+             }
+         }
+ 
+         processDelayedResponses();
+     }
+ 
+     /**
+      * @return Topology for this task.
+      * @throws IgniteCheckedException Thrown in case of any error.
+      */
+     private List<ClusterNode> getTaskTopology() throws IgniteCheckedException 
{
+         Collection<UUID> top = ses.getTopology();
+ 
+         Collection<? extends ClusterNode> subgrid = top != null ? 
ctx.discovery().nodes(top) : ctx.discovery().allNodes();
+ 
+         int size = subgrid.size();
+ 
+         if (size == 0)
+             throw new ClusterGroupEmptyException("Topology projection is 
empty.");
+ 
+         List<ClusterNode> shuffledNodes = new ArrayList<>(size);
+ 
+         for (ClusterNode node : subgrid)
+             shuffledNodes.add(node);
+ 
+         if (shuffledNodes.size() > 1)
+             // Shuffle nodes prior to giving them to user.
+             Collections.shuffle(shuffledNodes);
+ 
+         // Load balancer.
+         return shuffledNodes;
+     }
+ 
+     /**
+      *
+      */
+     private void processDelayedResponses() {
+         GridJobExecuteResponse res = delayedRess.poll();
+ 
+         if (res != null)
+             onResponse(res);
+     }
+ 
+     /**
+      * @param msg Job execution response.
+      */
+     void onResponse(GridJobExecuteResponse msg) {
+         assert msg != null;
+ 
+         if (fut.isDone()) {
+             if (log.isDebugEnabled())
+                 log.debug("Ignoring job response since task has finished: " + 
msg);
+ 
+             return;
+         }
+ 
+         GridJobExecuteResponse res = msg;
+ 
+         while (res != null) {
+             GridJobResultImpl jobRes = null;
+ 
+             // Flag indicating whether occupied flag for
+             // job response was changed in this method apply.
+             boolean selfOccupied = false;
+ 
+             try {
+                 synchronized (mux) {
+                     // If task is not waiting for responses,
+                     // then there is no point to proceed.
+                     if (state != State.WAITING) {
+                         if (log.isDebugEnabled())
+                             log.debug("Ignoring response since task is 
already reducing or finishing [res=" + res +
+                                 ", job=" + ses + ", state=" + state + ']');
+ 
+                         return;
+                     }
+ 
+                     jobRes = this.jobRes.get(res.getJobId());
+ 
+                     if (jobRes == null) {
+                         if (log.isDebugEnabled())
+                             U.warn(log, "Received response for unknown child 
job (was job presumed failed?): " + res);
+ 
+                         return;
+                     }
+ 
+                     // Only process 1st response and ignore following ones. 
This scenario
+                     // is possible if node has left topology and and fake 
failure response
+                     // was created from discovery listener and when sending 
request failed.
+                     if (jobRes.hasResponse()) {
+                         if (log.isDebugEnabled())
+                             log.debug("Received redundant response for a job 
(will ignore): " + res);
+ 
+                         return;
+                     }
+ 
+                     if (!jobRes.getNode().id().equals(res.getNodeId())) {
+                         if (log.isDebugEnabled())
+                             log.debug("Ignoring stale response as job was 
already resent to other node [res=" + res +
+                                 ", jobRes=" + jobRes + ']');
+ 
+                         // Prevent processing 2 responses for the same job 
simultaneously.
+                         jobRes.setOccupied(true);
+ 
+                         selfOccupied = true;
+ 
+                         // We can not return here because there can be more 
delayed messages in the queue.
+                         continue;
+                     }
+ 
+                     if (jobRes.isOccupied()) {
+                         if (log.isDebugEnabled())
+                             log.debug("Adding response to delayed queue (job 
is either being sent or processing " +
+                                 "another response): " + res);
+ 
+                         delayedRess.offer(res);
+ 
+                         return;
+                     }
+ 
+                     if (lockRespProc) {
+                         delayedRess.offer(res);
+ 
+                         return;
+                     }
+ 
+                     lockRespProc = true;
+ 
+                     selfOccupied = true;
+ 
+                     // Prevent processing 2 responses for the same job 
simultaneously.
+                     jobRes.setOccupied(true);
+ 
+                     // We don't keep reference to job if results are not 
cached.
+                     if (!resCache)
+                         this.jobRes.remove(res.getJobId());
+                 }
+ 
+                 if (res.getFakeException() != null)
+                     jobRes.onResponse(null, res.getFakeException(), null, 
false);
+                 else {
+                     ClassLoader clsLdr = dep.classLoader();
+ 
+                     try {
+                         boolean loc = 
ctx.localNodeId().equals(res.getNodeId()) && !ctx.config().isMarshalLocalJobs();
+ 
+                         Object res0 = loc ? res.getJobResult() : 
marsh.unmarshal(res.getJobResultBytes(), clsLdr);
+ 
+                         IgniteCheckedException ex = loc ? res.getException() :
+                             
marsh.<IgniteCheckedException>unmarshal(res.getExceptionBytes(), clsLdr);
+ 
+                         Map<Object, Object> attrs = loc ? 
res.getJobAttributes() :
+                             marsh.<Map<Object, 
Object>>unmarshal(res.getJobAttributesBytes(), clsLdr);
+ 
+                         jobRes.onResponse(res0, ex, attrs, res.isCancelled());
+ 
+                         if (loc)
+                             ctx.resource().invokeAnnotated(dep, 
jobRes.getJob(), ComputeJobAfterSend.class);
+                     }
+                     catch (IgniteCheckedException e) {
+                         U.error(log, "Error deserializing job response: " + 
res, e);
+ 
+                         finishTask(null, e);
+                     }
+                 }
+ 
+                 List<ComputeJobResult> results;
+ 
+                 if (!resCache)
+                     results = Collections.emptyList();
+                 else {
+                     synchronized (mux) {
+                         results = getRemoteResults();
+                     }
+                 }
+ 
+                 ComputeJobResultPolicy plc = result(jobRes, results);
+ 
+                 if (plc == null) {
+                     String errMsg = "Failed to obtain remote job result 
policy for result from GridComputeTask.result(..) " +
+                         "method that returned null (will fail the whole 
task): " + jobRes;
+ 
+                     finishTask(null, new IgniteCheckedException(errMsg));
+ 
+                     return;
+                 }
+ 
+                 synchronized (mux) {
+                     // If task is not waiting for responses,
+                     // then there is no point to proceed.
+                     if (state != State.WAITING) {
+                         if (log.isDebugEnabled())
+                             log.debug("Ignoring GridComputeTask.result(..) 
value since task is already reducing or" +
+                                 "finishing [res=" + res + ", job=" + ses + ", 
state=" + state + ']');
+ 
+                         return;
+                     }
+ 
+                     switch (plc) {
+                         // Start reducing all results received so far.
+                         case REDUCE: {
+                             state = State.REDUCING;
+ 
+                             break;
+                         }
+ 
+                         // Keep waiting if there are more responses to come,
+                         // otherwise, reduce.
+                         case WAIT: {
+                             assert results.size() <= this.jobRes.size();
+ 
+                             // If there are more results to wait for.
+                             // If result cache is disabled, then we reduce
+                             // when both collections are empty.
+                             if (results.size() == this.jobRes.size()) {
+                                 plc = ComputeJobResultPolicy.REDUCE;
+ 
+                                 // All results are received, proceed to 
reduce method.
+                                 state = State.REDUCING;
+                             }
+ 
+                             break;
+                         }
+ 
+                         case FAILOVER: {
+                             if (!failover(res, jobRes, getTaskTopology()))
+                                 plc = null;
+ 
+                             break;
+                         }
+                     }
+                 }
+ 
+                 // Outside of synchronization.
+                 if (plc != null) {
+                     // Handle failover.
+                     if (plc == FAILOVER)
+                         sendFailoverRequest(jobRes);
+                     else {
+                         evtLsnr.onJobFinished(this, jobRes.getSibling());
+ 
+                         if (plc == ComputeJobResultPolicy.REDUCE)
+                             reduce(results);
+                     }
+                 }
+             }
+             catch (IgniteCheckedException e) {
+                 U.error(log, "Failed to obtain topology [ses=" + ses + ", 
err=" + e + ']', e);
+ 
+                 finishTask(null, e);
+             }
+             finally {
+                 // Open up job for processing responses.
+                 // Only unset occupied flag, if it was
+                 // set in this method.
+                 if (selfOccupied) {
+                     assert jobRes != null;
+ 
+                     synchronized (mux) {
+                         jobRes.setOccupied(false);
+ 
+                         lockRespProc = false;
+                     }
+ 
+                     // Process delayed responses if there are any.
+                     res = delayedRess.poll();
+                 }
+             }
+         }
+     }
+ 
+     /**
+      * @param jobRes Job result.
+      * @param results Existing job results.
+      * @return Job result policy.
+      */
+     @SuppressWarnings({"CatchGenericClass"})
+     @Nullable private ComputeJobResultPolicy result(final ComputeJobResult 
jobRes, final List<ComputeJobResult> results) {
+         assert !Thread.holdsLock(mux);
+ 
+         return U.wrapThreadLoader(dep.classLoader(), new 
CO<ComputeJobResultPolicy>() {
+             @Nullable @Override public ComputeJobResultPolicy apply() {
+                 try {
+                     // Obtain job result policy.
+                     ComputeJobResultPolicy plc = null;
+ 
+                     try {
+                         plc = task.result(jobRes, results);
+ 
+                         if (plc == FAILOVER && noFailover) {
+                             IgniteCheckedException e = jobRes.getException();
+ 
+                             if (e != null)
+                                 throw e;
+ 
+                             plc = WAIT;
+                         }
+                     }
+                     finally {
+                         recordJobEvent(EVT_JOB_RESULTED, 
jobRes.getJobContext().getJobId(),
+                             jobRes.getNode(), "Job got resulted with: " + 
plc);
+                     }
+ 
+                     if (log.isDebugEnabled())
+                         log.debug("Obtained job result policy [policy=" + plc 
+ ", ses=" + ses + ']');
+ 
+                     return plc;
+                 }
+                 catch (IgniteCheckedException e) {
+                     if (X.hasCause(e, GridInternalException.class) ||
+                         X.hasCause(e, IgniteFsOutOfSpaceException.class)) {
+                         // Print internal exceptions only if debug is enabled.
+                         if (log.isDebugEnabled())
+                             U.error(log, "Failed to obtain remote job result 
policy for result from " +
+                                 "GridComputeTask.result(..) method (will fail 
the whole task): " + jobRes, e);
+                     }
+                     else if (X.hasCause(e, 
ComputeJobFailoverException.class)) {
+                         IgniteCheckedException e0 = new 
IgniteCheckedException(" Job was not failed over because " +
+                             "GridComputeJobResultPolicy.FAILOVER was not 
returned from " +
+                             "GridTask.result(...) method for job result with 
GridComputeJobFailoverException.", e);
+ 
+                         finishTask(null, e0);
+ 
+                         return null;
+                     }
+                     else
+                         U.error(log, "Failed to obtain remote job result 
policy for result from " +
+                             "GridComputeTask.result(..) method (will fail the 
whole task): " + jobRes, e);
+ 
+                     finishTask(null, e);
+ 
+                     return null;
+                 }
+                 catch (IgniteException e) {
+                     if (X.hasCause(e, GridInternalException.class) ||
+                         X.hasCause(e, IgniteFsOutOfSpaceException.class)) {
+                         // Print internal exceptions only if debug is enabled.
+                         if (log.isDebugEnabled())
+                             U.error(log, "Failed to obtain remote job result 
policy for result from " +
+                                 "GridComputeTask.result(..) method (will fail 
the whole task): " + jobRes, e);
+                     }
+                     else if (X.hasCause(e, 
ComputeJobFailoverException.class)) {
+                         IgniteCheckedException e0 = new 
IgniteCheckedException(" Job was not failed over because " +
+                             "GridComputeJobResultPolicy.FAILOVER was not 
returned from " +
+                             "GridTask.result(...) method for job result with 
GridComputeJobFailoverException.", e);
+ 
+                         finishTask(null, e0);
+ 
+                         return null;
+                     }
+                     else
+                         U.error(log, "Failed to obtain remote job result 
policy for result from" +
+                             "GridComputeTask.result(..) method (will fail the 
whole task): " + jobRes, e);
+ 
+                     finishTask(null, e);
+ 
+                     return null;
+                 }
+                 catch (Throwable e) {
+                     String errMsg = "Failed to obtain remote job result 
policy for result from" +
+                         "GridComputeTask.result(..) method due to undeclared 
user exception " +
+                         "(will fail the whole task): " + jobRes;
+ 
+                     U.error(log, errMsg, e);
+ 
+                     Throwable tmp = new 
ComputeUserUndeclaredException(errMsg, e);
+ 
+                     // Failed to successfully obtain result policy and
+                     // hence forced to fail the whole deployed task.
+                     finishTask(null, tmp);
+ 
+                     return null;
+                 }
+             }
+         });
+     }
+ 
+     /**
+      * @param results Job results.
+      */
+     private void reduce(final List<ComputeJobResult> results) {
+         R reduceRes = null;
+         Throwable userE = null;
+ 
+         try {
+             try {
+                 // Reduce results.
+                 reduceRes = U.wrapThreadLoader(dep.classLoader(), new 
Callable<R>() {
+                     @Nullable @Override public R call() throws 
IgniteCheckedException {
+                         return task.reduce(results);
+                     }
+                 });
+             }
+             finally {
+                 synchronized (mux) {
+                     assert state == State.REDUCING : "Invalid task state: " + 
state;
+ 
+                     state = State.REDUCED;
+                 }
+             }
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Reduced job responses [reduceRes=" + reduceRes + 
", ses=" + ses + ']');
+ 
+             recordTaskEvent(EVT_TASK_REDUCED, "Task reduced.");
+         }
+         catch (ClusterTopologyException e) {
+             U.warn(log, "Failed to reduce job results for task (any nodes 
from task topology left grid?): " + task);
+ 
+             userE = e;
+         }
+         catch (IgniteCheckedException e) {
+             U.error(log, "Failed to reduce job results for task: " + task, e);
+ 
+             userE = e;
+         }
+         // Catch Throwable to protect against bad user code.
+         catch (Throwable e) {
+             String errMsg = "Failed to reduce job results due to undeclared 
user exception [task=" + task +
+                 ", err=" + e + ']';
+ 
+             U.error(log, errMsg, e);
+ 
+             userE = new ComputeUserUndeclaredException(errMsg ,e);
+         }
+         finally {
+             finishTask(reduceRes, userE);
+         }
+     }
+ 
+     /**
+      * @param res Execution response.
+      * @param jobRes Job result.
+      * @param top Topology.
+      * @return {@code True} if fail-over SPI returned a new node.
+      */
+     private boolean failover(GridJobExecuteResponse res, GridJobResultImpl 
jobRes, Collection<? extends ClusterNode> top) {
+         assert Thread.holdsLock(mux);
+ 
+         try {
+             ctx.resource().invokeAnnotated(dep, jobRes.getJob(), 
ComputeJobBeforeFailover.class);
+ 
+             // Map to a new node.
+             ClusterNode node = ctx.failover().failover(ses, jobRes, new 
ArrayList<>(top));
+ 
+             if (node == null) {
+                 String msg = "Failed to failover a job to another node 
(failover SPI returned null) [job=" +
+                     jobRes.getJob() + ", node=" + jobRes.getNode() + ']';
+ 
+                 if (log.isDebugEnabled())
+                     log.debug(msg);
+ 
+                 Throwable e = new ClusterTopologyException(msg, 
jobRes.getException());
+ 
+                 finishTask(null, e);
+ 
+                 return false;
+             }
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Resolved job failover [newNode=" + node + ", 
oldNode=" + jobRes.getNode() +
+                     ", job=" + jobRes.getJob() + ", resMsg=" + res + ']');
+ 
+             jobRes.setNode(node);
+             jobRes.resetResponse();
+ 
+             if (!resCache) {
+                 synchronized (mux) {
+                     // Store result back in map before sending.
+                     this.jobRes.put(res.getJobId(), jobRes);
+                 }
+             }
+ 
+             return true;
+         }
+         // Catch Throwable to protect against bad user code.
+         catch (Throwable e) {
+             String errMsg = "Failed to failover job due to undeclared user 
exception [job=" +
+                 jobRes.getJob() + ", err=" + e + ']';
+ 
+             U.error(log, errMsg, e);
+ 
+             finishTask(null, new ComputeUserUndeclaredException(errMsg, e));
+ 
+             return false;
+         }
+     }
+ 
+     /**
+      * @param jobRes Job result.
+      */
+     private void sendFailoverRequest(GridJobResultImpl jobRes) {
+         // Internal failover notification.
+         evtLsnr.onJobFailover(this, jobRes.getSibling(), 
jobRes.getNode().id());
+ 
+         long timeout = ses.getEndTime() - U.currentTimeMillis();
+ 
+         if (timeout > 0) {
+             recordJobEvent(EVT_JOB_FAILED_OVER, 
jobRes.getJobContext().getJobId(),
+                 jobRes.getNode(), "Job failed over.");
+ 
+             // Send new reference to remote nodes for execution.
+             sendRequest(jobRes);
+         }
+         else
+             // Don't apply 'finishTask(..)' here as it will
+             // be called from 'onTimeout(..)' callback.
+             U.warn(log, "Failed to fail-over job due to task timeout: " + 
jobRes);
+     }
+ 
+     /**
+      * Interrupts child jobs on remote nodes.
+      */
+     private void cancelChildren() {
+         Collection<GridJobResultImpl> doomed = new LinkedList<>();
+ 
+         synchronized (mux) {
+             // Only interrupt unfinished jobs.
+             if (jobRes != null)
+                 for (GridJobResultImpl res : jobRes.values())
+                     if (!res.hasResponse())
+                         doomed.add(res);
+         }
+ 
+         // Send cancellation request to all unfinished children.
+         for (GridJobResultImpl res : doomed) {
+             UUID nodeId = res.getNode().id();
+ 
+             if (nodeId.equals(ctx.localNodeId()))
+                 // Cancel local jobs.
+                 ctx.job().cancelJob(ses.getId(), 
res.getJobContext().getJobId(), /*courtesy*/true);
+             else {
+                 try {
+                     ClusterNode node = ctx.discovery().node(nodeId);
+ 
+                     if (node != null)
+                         ctx.io().send(node,
+                             TOPIC_JOB_CANCEL,
+                             new GridJobCancelRequest(ses.getId(), 
res.getJobContext().getJobId(), /*courtesy*/true),
+                             PUBLIC_POOL);
+                 }
+                 catch (IgniteCheckedException e) {
+                     if (!isDeadNode(nodeId))
+                         U.error(log, "Failed to send cancel request to node 
(will ignore) [nodeId=" +
+                             nodeId + ", taskName=" + ses.getTaskName() +
+                             ", taskSesId=" + ses.getId() + ", jobSesId=" + 
res.getJobContext().getJobId() + ']', e);
+                 }
+             }
+         }
+     }
+ 
+     /**
+      * @param res Job result.
+      */
+     private void sendRequest(ComputeJobResult res) {
+         assert res != null;
+ 
+         GridJobExecuteRequest req = null;
+ 
+         ClusterNode node = res.getNode();
+ 
+         try {
+             ClusterNode curNode = ctx.discovery().node(node.id());
+ 
+             // Check if node exists prior to sending to avoid cases when a 
discovery
+             // listener notified about node leaving after topology 
resolution. Note
+             // that we make this check because we cannot count on exception 
being
+             // thrown in case of send failure.
+             if (curNode == null) {
+                 U.warn(log, "Failed to send job request because remote node 
left grid (if fail-over is enabled, " +
+                     "will attempt fail-over to another node) [node=" + node + 
", taskName=" + ses.getTaskName() +
+                     ", taskSesId=" + ses.getId() + ", jobSesId=" + 
res.getJobContext().getJobId() + ']');
+ 
+                 ctx.resource().invokeAnnotated(dep, res.getJob(), 
ComputeJobAfterSend.class);
+ 
+                 GridJobExecuteResponse fakeRes = new 
GridJobExecuteResponse(node.id(), ses.getId(),
+                     res.getJobContext().getJobId(), null, null, null, null, 
null, null, false);
+ 
+                 fakeRes.setFakeException(new ClusterTopologyException("Failed 
to send job due to node failure: " + node));
+ 
+                 onResponse(fakeRes);
+             }
+             else {
+                 long timeout = ses.getEndTime() == Long.MAX_VALUE ? 
Long.MAX_VALUE :
+                     ses.getEndTime() - U.currentTimeMillis();
+ 
+                 if (timeout > 0) {
+                     boolean loc = 
node.id().equals(ctx.discovery().localNode().id()) &&
+                         !ctx.config().isMarshalLocalJobs();
+ 
+                     Map<Object, Object> sesAttrs = ses.isFullSupport() ? 
ses.getAttributes() : null;
+                     Map<? extends Serializable, ? extends Serializable> 
jobAttrs =
+                         (Map<? extends Serializable, ? extends 
Serializable>)res.getJobContext().getAttributes();
+ 
+                     boolean forceLocDep = internal || !ctx.deploy().enabled();
+ 
+                     req = new GridJobExecuteRequestV2(
+                             ses.getId(),
+                             res.getJobContext().getJobId(),
+                             ses.getTaskName(),
+                             ses.getUserVersion(),
+                             ses.getTaskClassName(),
+                             loc ? null : marsh.marshal(res.getJob()),
+                             loc ? res.getJob() : null,
+                             ses.getStartTime(),
+                             timeout,
+                             ses.getTopology(),
+                             loc ? null : marsh.marshal(ses.getJobSiblings()),
+                             loc ? ses.getJobSiblings() : null,
+                             loc ? null : marsh.marshal(sesAttrs),
+                             loc ? sesAttrs : null,
+                             loc ? null: marsh.marshal(jobAttrs),
+                             loc ? jobAttrs : null,
+                             ses.getCheckpointSpi(),
+                             dep.classLoaderId(),
+                             dep.deployMode(),
+                             continuous,
+                             dep.participants(),
+                             forceLocDep,
+                             ses.isFullSupport(),
+                             internal,
+                             subjId);
+ 
+                     if (loc)
+                         
ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req);
+                     else {
+                         // Send job execution request.
+                         ctx.io().send(node, TOPIC_JOB, req, internal ? 
MANAGEMENT_POOL : PUBLIC_POOL);
+ 
+                         if (log.isDebugEnabled())
+                             log.debug("Sent job request [req=" + req + ", 
node=" + node + ']');
+                     }
+ 
+                     if (!loc)
+                         ctx.resource().invokeAnnotated(dep, res.getJob(), 
ComputeJobAfterSend.class);
+                 }
+                 else
+                     U.warn(log, "Job timed out prior to sending job execution 
request: " + res.getJob());
+             }
+         }
+         catch (IgniteCheckedException e) {
+             boolean deadNode = isDeadNode(res.getNode().id());
+ 
+             // Avoid stack trace if node has left grid.
+             if (deadNode)
+                 U.warn(log, "Failed to send job request because remote node 
left grid (if failover is enabled, " +
+                     "will attempt fail-over to another node) [node=" + node + 
", taskName=" + ses.getTaskName() +
+                     ", taskSesId=" + ses.getId() + ", jobSesId=" + 
res.getJobContext().getJobId() + ']');
+             else
+                 U.error(log, "Failed to send job request: " + req, e);
+ 
+             GridJobExecuteResponse fakeRes = new 
GridJobExecuteResponse(node.id(), ses.getId(),
+                 res.getJobContext().getJobId(), null, null, null, null, null, 
null, false);
+ 
+             if (deadNode)
+                 fakeRes.setFakeException(new ClusterTopologyException("Failed 
to send job due to node failure: " +
+                     node, e));
+             else
+                 fakeRes.setFakeException(e);
+ 
+             onResponse(fakeRes);
+         }
+     }
+ 
+     /**
+      * @param nodeId Node ID.
+      */
+     void onNodeLeft(UUID nodeId) {
+         Collection<GridJobExecuteResponse> resList = null;
+ 
+         synchronized (mux) {
+             // First check if job cares about future responses.
+             if (state != State.WAITING)
+                 return;
+ 
+             if (jobRes != null) {
+                 for (GridJobResultImpl jr : jobRes.values()) {
+                     if (!jr.hasResponse() && 
jr.getNode().id().equals(nodeId)) {
+                         if (log.isDebugEnabled())
+                             log.debug("Creating fake response because node 
left grid [job=" + jr.getJob() +
+                                 ", nodeId=" + nodeId + ']');
+ 
+                         // Artificial response in case if a job is waiting 
for a response from
+                         // non-existent node.
+                         GridJobExecuteResponse fakeRes = new 
GridJobExecuteResponse(nodeId, ses.getId(),
+                             jr.getJobContext().getJobId(), null, null, null, 
null, null, null, false);
+ 
+                         fakeRes.setFakeException(new 
ClusterTopologyException("Node has left grid: " + nodeId));
+ 
+                         if (resList == null)
+                             resList = new ArrayList<>();
+ 
+                         resList.add(fakeRes);
+                     }
+                 }
+             }
+         }
+ 
+         if (resList == null)
+             return;
+ 
+         // Simulate responses without holding synchronization.
+         for (GridJobExecuteResponse res : resList) {
+             if (log.isDebugEnabled())
+                 log.debug("Simulating fake response from left node [res=" + 
res + ", nodeId=" + nodeId + ']');
+ 
+             onResponse(res);
+         }
+     }
+ 
+     /**
+      * @param evtType Event type.
+      * @param msg Event message.
+      */
+     private void recordTaskEvent(int evtType, String msg) {
+         if (!internal && ctx.event().isRecordable(evtType)) {
+             IgniteEvent evt = new IgniteTaskEvent(
+                 ctx.discovery().localNode(),
+                 msg,
+                 evtType,
+                 ses.getId(),
+                 ses.getTaskName(),
+                 ses.getTaskClassName(),
+                 internal,
+                 subjId);
+ 
+             ctx.event().record(evt);
+         }
+     }
+ 
+     /**
+      * @param evtType Event type.
+      * @param jobId Job ID.
+      * @param evtNode Event node.
+      * @param msg Event message.
+      */
+     private void recordJobEvent(int evtType, IgniteUuid jobId, ClusterNode 
evtNode, String msg) {
+         if (ctx.event().isRecordable(evtType)) {
+             IgniteJobEvent evt = new IgniteJobEvent();
+ 
+             evt.message(msg);
+             evt.node(ctx.discovery().localNode());
+             evt.taskName(ses.getTaskName());
+             evt.taskClassName(ses.getTaskClassName());
+             evt.taskSessionId(ses.getId());
+             evt.taskNode(evtNode);
+             evt.jobId(jobId);
+             evt.type(evtType);
+             evt.taskSubjectId(ses.subjectId());
+ 
+             ctx.event().record(evt);
+         }
+     }
+ 
+     /**
+      * @return Collection of job results.
+      */
+     private List<ComputeJobResult> getRemoteResults() {
+         assert Thread.holdsLock(mux);
+ 
+         List<ComputeJobResult> results = new ArrayList<>(jobRes.size());
+ 
+         for (GridJobResultImpl jobResult : jobRes.values())
+             if (jobResult.hasResponse())
+                 results.add(jobResult);
+ 
+         return results;
+     }
+ 
+     /**
+      * @param res Task result.
+      * @param e Exception.
+      */
+     void finishTask(@Nullable R res, @Nullable Throwable e) {
+         finishTask(res, e, true);
+     }
+ 
+     /**
+      * @param res Task result.
+      * @param e Exception.
+      * @param cancelChildren Whether to cancel children in case the task 
become cancelled.
+      */
+     void finishTask(@Nullable R res, @Nullable Throwable e, boolean 
cancelChildren) {
+         // Avoid finishing a job more than once from
+         // different threads.
+         synchronized (mux) {
+             if (state == State.REDUCING || state == State.FINISHING)
+                 return;
+ 
+             state = State.FINISHING;
+         }
+ 
+         try {
+             if (e == null)
+                 recordTaskEvent(EVT_TASK_FINISHED, "Task finished.");
+             else
+                 recordTaskEvent(EVT_TASK_FAILED, "Task failed.");
+ 
+             // Clean resources prior to finishing future.
+             evtLsnr.onTaskFinished(this);
+ 
+             if (cancelChildren)
+                 cancelChildren();
+         }
+         // Once we marked task as 'Finishing' we must complete it.
+         finally {
+             fut.onDone(res, e);
+ 
+             ses.onDone();
+         }
+     }
+ 
+     /**
+      * Checks whether node is alive or dead.
+      *
+      * @param uid UID of node to check.
+      * @return {@code true} if node is dead, {@code false} is node is alive.
+      */
+     private boolean isDeadNode(UUID uid) {
+         return ctx.discovery().node(uid) == null || 
!ctx.discovery().pingNode(uid);
+     }
+ 
+     /** {@inheritDoc} */
+     @SuppressWarnings("unchecked")
+     @Override public boolean equals(Object obj) {
+         if (this == obj)
+             return true;
+ 
+         if (obj == null)
+             return false;
+ 
+         assert obj instanceof GridTaskWorker;
+ 
+         return ses.getId().equals(((GridTaskWorker<T, R>)obj).ses.getId());
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public int hashCode() {
+         return ses.getId().hashCode();
+     }
+ 
+     /** {@inheritDoc} */
+     @Override public String toString() {
 -        return S.toString(GridTaskWorker.class, this);
++        synchronized (mux) {
++            return S.toString(GridTaskWorker.class, this);
++        }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java
index 0000000,2795e03..7bcb9e2
mode 000000,100644..100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java
@@@ -1,0 -1,243 +1,242 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.util.ipc.shmem;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ 
+ import java.io.*;
+ import java.net.*;
+ import java.nio.channels.*;
+ import java.util.*;
+ 
+ /**
+  * Shared memory native loader.
+  */
+ @SuppressWarnings("ErrorNotRethrown")
+ public class GridIpcSharedMemoryNativeLoader {
+     /** Loaded flag. */
+     private static volatile boolean loaded;
+ 
+     /** Library name base. */
+     private static final String LIB_NAME_BASE = "ggshmem";
+ 
+     /** Library name. */
+     private static final String LIB_NAME = LIB_NAME_BASE + "-" + 
GridProductImpl.VER;
+ 
+     /** Lock file path. */
+     private static final File LOCK_FILE = new 
File(System.getProperty("java.io.tmpdir"), "ggshmem.lock");
+ 
+     /**
+      * @return Operating system name to resolve path to library.
+      */
+     private static String os() {
+         String name = System.getProperty("os.name").toLowerCase().trim();
+ 
+         if (name.startsWith("win"))
+             throw new IllegalStateException("IPC shared memory native loader 
should not be called on windows.");
+ 
+         if (name.startsWith("linux"))
+             return "linux";
+ 
+         if (name.startsWith("mac os x"))
+             return "osx";
+ 
+         return name.replaceAll("\\W+", "_");
+     }
+ 
+     /**
+      * @return Platform.
+      */
+     private static String platform() {
+         return os() + bitModel();
+     }
+ 
+     /**
+      * @return Bit model.
+      */
+     private static int bitModel() {
+         String prop = System.getProperty("sun.arch.data.model");
+ 
+         if (prop == null)
+             prop = System.getProperty("com.ibm.vm.bitmode");
+ 
+         if (prop != null)
+             return Integer.parseInt(prop);
+ 
+         // We don't know.
+         return -1;
+     }
+ 
+     /**
+      * @throws IgniteCheckedException If failed.
+      */
+     public static void load() throws IgniteCheckedException {
+         if (loaded)
+             return;
+ 
+         synchronized (GridIpcSharedMemoryNativeLoader.class) {
+             if (loaded)
+                 return;
+ 
+             doLoad();
+ 
+             loaded = true;
+         }
+     }
+ 
+     /**
+      * @throws IgniteCheckedException If failed.
+      */
+     private static void doLoad() throws IgniteCheckedException {
+         assert Thread.holdsLock(GridIpcSharedMemoryNativeLoader.class);
+ 
+         Collection<Throwable> errs = new LinkedList<>();
+ 
+         try {
+             // Load native library (the library directory should be in 
java.library.path).
+             System.loadLibrary(LIB_NAME);
+ 
+             return;
+         }
+         catch (UnsatisfiedLinkError e) {
+             errs.add(e);
+         }
+ 
+         // Obtain lock on file to prevent concurrent extracts.
+         try (RandomAccessFile randomAccessFile = new 
RandomAccessFile(LOCK_FILE, "rws");
 -             FileLock lock = randomAccessFile.getChannel().lock()) {
 -
++             FileLock ignored = randomAccessFile.getChannel().lock()) {
+             if (extractAndLoad(errs, platformSpecificResourcePath()))
+                 return;
+ 
+             if (extractAndLoad(errs, osSpecificResourcePath()))
+                 return;
+ 
+             if (extractAndLoad(errs, resourcePath()))
+                 return;
+ 
+             // Failed to find the library.
+             assert !errs.isEmpty();
+ 
+             throw new IgniteCheckedException("Failed to load native IPC 
library: " + errs);
+         }
+         catch (IOException e) {
+             throw new IgniteCheckedException("Failed to obtain file lock: " + 
LOCK_FILE, e);
+         }
+     }
+ 
+     /**
+      * @return OS resource path.
+      */
+     private static String osSpecificResourcePath() {
+         return "META-INF/native/" + os() + "/" + 
mapLibraryName(LIB_NAME_BASE);
+     }
+ 
+     /**
+      * @return Platform resource path.
+      */
+     private static String platformSpecificResourcePath() {
+         return "META-INF/native/" + platform() + "/" + 
mapLibraryName(LIB_NAME_BASE);
+     }
+ 
+     /**
+      * @return Resource path.
+      */
+     private static String resourcePath() {
+         return "META-INF/native/" + mapLibraryName(LIB_NAME_BASE);
+     }
+ 
+     /**
+      * @return Maps library name to file name.
+      */
+     private static String mapLibraryName(String name) {
+         String libName = System.mapLibraryName(name);
+ 
+         if (U.isMacOs() && libName.endsWith(".jnilib"))
+             return libName.substring(0, libName.length() - "jnilib".length()) 
+ "dylib";
+ 
+         return libName;
+     }
+ 
+     /**
+      * @param errs Errors collection.
+      * @param rsrcPath Path.
+      * @return {@code True} if library was found and loaded.
+      */
+     private static boolean extractAndLoad(Collection<Throwable> errs, String 
rsrcPath) {
+         ClassLoader clsLdr = 
U.detectClassLoader(GridIpcSharedMemoryNativeLoader.class);
+ 
+         URL rsrc = clsLdr.getResource(rsrcPath);
+ 
+         if (rsrc != null)
+             return extract(errs, rsrc, new 
File(System.getProperty("java.io.tmpdir"), mapLibraryName(LIB_NAME)));
+         else {
+             errs.add(new IllegalStateException("Failed to find resource with 
specified class loader " +
+                 "[rsrc=" + rsrcPath + ", clsLdr=" + clsLdr + ']'));
+ 
+             return false;
+         }
+     }
+ 
+     /**
+      * @param errs Errors collection.
+      * @param src Source.
+      * @param target Target.
+      * @return {@code True} if resource was found and loaded.
+      */
+     @SuppressWarnings("ResultOfMethodCallIgnored")
+     private static boolean extract(Collection<Throwable> errs, URL src, File 
target) {
+         FileOutputStream os = null;
+         InputStream is = null;
+ 
+         try {
+             if (!target.exists()) {
+                 is = src.openStream();
+ 
+                 if (is != null) {
+                     os = new FileOutputStream(target);
+ 
+                     int read;
+ 
+                     byte[] buf = new byte[4096];
+ 
+                     while ((read = is.read(buf)) != -1)
+                         os.write(buf, 0, read);
+                 }
+             }
+ 
+             // chmod 775.
+             if (!U.isWindows())
+                 Runtime.getRuntime().exec(new String[] {"chmod", "775", 
target.getCanonicalPath()}).waitFor();
+ 
+             System.load(target.getPath());
+ 
+             return true;
+         }
+         catch (IOException | UnsatisfiedLinkError | InterruptedException e) {
+             errs.add(e);
+         }
+         finally {
+             U.closeQuiet(os);
+             U.closeQuiet(is);
+         }
+ 
+         return false;
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
index 0000000,55fa051..538682e
mode 000000,100644..100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java
@@@ -1,0 -1,249 +1,249 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.managers.communication;
+ 
+ import org.apache.commons.collections.*;
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.marshaller.jdk.*;
+ import org.apache.ignite.spi.communication.tcp.*;
+ import org.apache.ignite.internal.managers.discovery.*;
+ import org.apache.ignite.internal.util.direct.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.testframework.*;
+ import org.apache.ignite.testframework.junits.*;
+ import org.apache.ignite.testframework.junits.common.*;
+ import org.mockito.*;
+ 
+ import java.io.*;
+ import java.nio.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ 
+ import static org.mockito.Matchers.any;
+ import static org.mockito.Mockito.anyLong;
+ import static org.mockito.Mockito.argThat;
+ import static org.mockito.Mockito.eq;
+ import static org.mockito.Mockito.*;
+ 
+ /**
+  * Test for {@link GridIoManager}.
+  */
+ public class GridIoManagerSelfTest extends GridCommonAbstractTest {
+     /** Grid test context. */
+     private GridTestKernalContext ctx = new GridTestKernalContext(log);
+ 
+     /** Test local node. */
+     private GridTestNode locNode = new GridTestNode(UUID.randomUUID());
+ 
+     /** Test remote node. */
+     private GridTestNode rmtNode = new GridTestNode(UUID.randomUUID());
+ 
+     /** {@inheritDoc} */
+     @Override protected void beforeTest() throws Exception {
+         ctx.config().setCommunicationSpi(new TcpCommunicationSpi());
+         ctx.config().setMarshaller(new IgniteJdkMarshaller());
+ 
+         // Turn off peer class loading to simplify mocking.
+         ctx.config().setPeerClassLoadingEnabled(false);
+ 
+         // Register local and remote nodes in discovery manager.
+         GridDiscoveryManager mockedDiscoveryMgr = 
Mockito.mock(GridDiscoveryManager.class);
+ 
+         when(mockedDiscoveryMgr.localNode()).thenReturn(locNode);
+         
when(mockedDiscoveryMgr.remoteNodes()).thenReturn(F.<ClusterNode>asList(rmtNode));
+ 
+         ctx.add(mockedDiscoveryMgr);
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testSendIfOneOfNodesIsLocalAndTopicIsEnum() throws Exception {
+         GridTestUtils.assertThrows(log, new Callable<Object>() {
+             @Override public Object call() throws Exception {
+                 new GridIoManager(ctx).send(F.asList(locNode, rmtNode), 
GridTopic.TOPIC_CACHE, new Message(),
+                     GridIoPolicy.P2P_POOL);
+ 
+                 return null;
+             }
+         }, AssertionError.class, "Internal GridGain code should never call 
the method with local node in a node list.");
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testSendIfOneOfNodesIsLocalAndTopicIsObject() throws 
Exception {
+         GridTestUtils.assertThrows(log, new Callable<Object>() {
+             @Override public Object call() throws Exception {
+                 new GridIoManager(ctx).send(F.asList(locNode, rmtNode), new 
Object(), new Message(),
+                     GridIoPolicy.P2P_POOL);
+ 
+                 return null;
+             }
+         }, AssertionError.class, "Internal GridGain code should never call 
the method with local node in a node list.");
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testSendUserMessageThinVersionIfOneOfNodesIsLocal() throws 
Exception {
+         Object msg = new Object();
+ 
+         GridIoManager ioMgr = spy(new TestGridIoManager(ctx));
+ 
+         try {
+             ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg);
+         }
+         catch (IgniteCheckedException ignored) {
+             // No-op. We are using mocks so real sending is impossible.
+         }
+ 
+         verify(ioMgr).send(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), 
any(GridIoUserMessage.class),
+             eq(GridIoPolicy.PUBLIC_POOL));
+ 
+         Collection<? extends ClusterNode> rmtNodes = 
F.view(F.asList(rmtNode), F.remoteNodes(locNode.id()));
+ 
+         verify(ioMgr).send(argThat(new IsEqualCollection(rmtNodes)), 
eq(GridTopic.TOPIC_COMM_USER),
+             any(GridIoUserMessage.class), eq(GridIoPolicy.PUBLIC_POOL));
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testSendUserMessageUnorderedThickVersionIfOneOfNodesIsLocal() 
throws Exception {
+         Object msg = new Object();
+ 
+         GridIoManager ioMgr = spy(new TestGridIoManager(ctx));
+ 
+         try {
+             ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, 
GridTopic.TOPIC_GGFS, false, 123L);
+         }
+         catch (IgniteCheckedException ignored) {
+             // No-op. We are using mocks so real sending is impossible.
+         }
+ 
+         verify(ioMgr).send(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), 
any(GridIoUserMessage.class),
+             eq(GridIoPolicy.PUBLIC_POOL));
+ 
+         Collection<? extends ClusterNode> rmtNodes = 
F.view(F.asList(rmtNode), F.remoteNodes(locNode.id()));
+ 
+         verify(ioMgr).send(argThat(new IsEqualCollection(rmtNodes)), 
eq(GridTopic.TOPIC_COMM_USER),
+             any(GridIoUserMessage.class), eq(GridIoPolicy.PUBLIC_POOL));
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testSendUserMessageOrderedThickVersionIfOneOfNodesIsLocal() 
throws Exception {
+         Object msg = new Object();
+ 
+         GridIoManager ioMgr = spy(new TestGridIoManager(ctx));
+ 
+         try {
+             ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, 
GridTopic.TOPIC_GGFS, true, 123L);
+         }
+         catch (Exception ignored) {
+             // No-op. We are using mocks so real sending is impossible.
+         }
+ 
+         verify(ioMgr).sendOrderedMessage(
+             argThat(new IsEqualCollection(F.asList(locNode, rmtNode))),
 -            eq(GridTopic.TOPIC_COMM_USER), anyLong(),
++            eq(GridTopic.TOPIC_COMM_USER),
+             any(GridIoUserMessage.class),
+             eq(GridIoPolicy.PUBLIC_POOL),
+             eq(123L),
+             false);
+     }
+ 
+     /**
+      * Test-purposed extension of {@code GridIoManager} with no-op {@code 
send(...)} methods.
+      */
+     private static class TestGridIoManager extends GridIoManager {
+         /**
+          * @param ctx Grid kernal context.
+          */
+         TestGridIoManager(GridKernalContext ctx) {
+             super(ctx);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public void send(ClusterNode node, GridTopic topic, 
GridTcpCommunicationMessageAdapter msg,
+             GridIoPolicy plc) throws IgniteCheckedException {
+             // No-op.
+         }
+     }
+ 
+     /**
+      * Mockito argument matcher to compare collections produced by {@code 
F.view()} methods.
+      */
+     private static class IsEqualCollection extends 
ArgumentMatcher<Collection<? extends ClusterNode>> {
+         /** Expected collection. */
+         private final Collection<? extends ClusterNode> expCol;
+ 
+         /**
+          * Default constructor.
+          *
+          * @param expCol Expected collection.
+          */
+         IsEqualCollection(Collection<? extends ClusterNode> expCol) {
+             this.expCol = expCol;
+         }
+ 
+         /**
+          * Matches a given collection to the specified in constructor 
expected one
+          * with Apache {@code CollectionUtils.isEqualCollection()}.
+          *
+          * @param colToCheck Collection to be matched against the expected 
one.
+          * @return True if collections matches.
+          */
+         @Override public boolean matches(Object colToCheck) {
+             return CollectionUtils.isEqualCollection(expCol, 
(Collection)colToCheck);
+         }
+     }
+ 
+     /** */
+     private static class Message extends GridTcpCommunicationMessageAdapter 
implements Serializable {
+         /** {@inheritDoc} */
+         @SuppressWarnings("CloneDoesntCallSuperClone")
+         @Override public GridTcpCommunicationMessageAdapter clone() {
+             throw new UnsupportedOperationException();
+         }
+ 
+         /** {@inheritDoc} */
+         @Override protected void clone0(GridTcpCommunicationMessageAdapter 
_msg) {
+             // No-op.
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean writeTo(ByteBuffer buf) {
+             return true;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public boolean readFrom(ByteBuffer buf) {
+             return true;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public byte directType() {
+             return 0;
+         }
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
index 0000000,7cd698d..a30ef73
mode 000000,100644..100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java
@@@ -1,0 -1,166 +1,165 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.internal.processors.cache.distributed.replicated;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cache.*;
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.spi.discovery.tcp.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+ import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+ import org.apache.ignite.testframework.junits.common.*;
+ import org.jetbrains.annotations.*;
+ 
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ 
+ import static org.apache.ignite.configuration.IgniteDeploymentMode.*;
+ import static org.apache.ignite.cache.CacheDistributionMode.*;
+ import static org.apache.ignite.cache.CacheMode.*;
+ import static org.apache.ignite.cache.CachePreloadMode.*;
+ import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
+ 
+ /**
+  * Multithreaded tests for replicated cache preloader.
+  */
+ public class GridCacheSyncReplicatedPreloadSelfTest extends 
GridCommonAbstractTest {
+     /** */
+     private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+ 
+     /** */
+     private static final boolean DISCO_DEBUG_MODE = false;
+ 
 -
+     /**
+      * Constructs test.
+      */
+     public GridCacheSyncReplicatedPreloadSelfTest() {
+         super(false /* don't start grid. */);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+         IgniteConfiguration cfg = super.getConfiguration(gridName);
+ 
+         TcpDiscoverySpi disco = new TcpDiscoverySpi();
+ 
+         disco.setIpFinder(ipFinder);
+         disco.setDebugMode(DISCO_DEBUG_MODE);
+ 
+         cfg.setDiscoverySpi(disco);
+ 
+         CacheConfiguration cacheCfg = defaultCacheConfiguration();
+ 
+         cacheCfg.setCacheMode(REPLICATED);
+         cacheCfg.setDistributionMode(PARTITIONED_ONLY);
+         cacheCfg.setWriteSynchronizationMode(FULL_SYNC);
+ 
+         // This property is essential for this test.
+         cacheCfg.setPreloadMode(SYNC);
+ 
+         cacheCfg.setPreloadBatchSize(10000);
+ 
+         cfg.setCacheConfiguration(cacheCfg);
+         cfg.setDeploymentMode(CONTINUOUS);
+ 
+         return cfg;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void afterTest() throws Exception {
+         super.afterTest();
+ 
+         stopAllGrids();
+     }
+ 
+     /**
+      * @throws Exception If test failed.
+      */
+     @SuppressWarnings({"TooBroadScope"})
+     public void testNodeRestart() throws Exception {
+         int keyCnt = 1000;
+         int retries = 20;
+ 
+         Ignite g0 = startGrid(0);
+         Ignite g1 = startGrid(1);
+ 
+         for (int i = 0; i < keyCnt; i++)
+             g0.cache(null).putx(i, i);
+ 
+         assertEquals(keyCnt, g0.cache(null).size());
+         assertEquals(keyCnt, g1.cache(null).size());
+ 
+         for (int n = 0; n < retries; n++) {
+             info("Starting additional grid node...");
+ 
+             Ignite g2 = startGrid(2);
+ 
+             assertEquals(keyCnt, g2.cache(null).size());
+ 
+             info("Stopping additional grid node...");
+ 
+             stopGrid(2);
+         }
+     }
+ 
+     /**
+      * @throws Exception If test failed.
+      */
+     @SuppressWarnings({"TooBroadScope"})
+     public void testNodeRestartMultithreaded() throws Exception {
+         final int keyCnt = 1000;
+         final int retries = 300;
+         int threadCnt = 5;
+ 
+         Ignite g0 = startGrid(0);
+         Ignite g1 = startGrid(1);
+ 
+         for (int i = 0; i < keyCnt; i++)
+             g0.cache(null).putx(i, i);
+ 
+         assertEquals(keyCnt, g0.cache(null).size());
+         assertEquals(keyCnt, g1.cache(null).size());
+ 
+         final AtomicInteger cnt = new AtomicInteger();
+ 
+         multithreaded(
+             new Callable() {
+                 @Nullable @Override public Object call() throws Exception {
+                     while (true) {
+                         int c = cnt.incrementAndGet();
+ 
+                         if (c > retries)
+                             break;
+ 
+                         int idx = c + 1;
+ 
+                         info("Starting additional grid node with index: " + 
idx);
+ 
+                         startGrid(idx);
+ 
+                         info("Stopping additional grid node with index: " + 
idx);
+ 
+                         stopGrid(idx);
+                     }
+ 
+                     return null;
+                 }
+             },
+             threadCnt);
+     }
+ }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/test/java/org/apache/ignite/session/GridSessionLoadSelfTest.java
----------------------------------------------------------------------
diff --cc 
modules/core/src/test/java/org/apache/ignite/session/GridSessionLoadSelfTest.java
index 0000000,e4de992..d148d37
mode 000000,100644..100644
--- 
a/modules/core/src/test/java/org/apache/ignite/session/GridSessionLoadSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/session/GridSessionLoadSelfTest.java
@@@ -1,0 -1,264 +1,267 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *      http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.ignite.session;
+ 
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.compute.*;
+ import org.apache.ignite.configuration.*;
+ import org.apache.ignite.resources.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.apache.ignite.testframework.*;
+ import org.apache.ignite.testframework.junits.common.*;
+ 
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ 
+ /**
+  * Task session load self test.
+  */
+ @GridCommonTest(group = "Task Session")
+ public class GridSessionLoadSelfTest extends GridCommonAbstractTest {
+     /** */
+     private static final int THREAD_CNT = 40;
+ 
+     /** */
+     private static final int EXEC_CNT = 10;
+ 
+     /** */
+     private boolean locMarsh;
+ 
+     /** {@inheritDoc} */
+     @Override protected IgniteConfiguration getConfiguration(String gridName) 
throws Exception {
+         IgniteConfiguration c = super.getConfiguration(gridName);
+ 
+         c.setMarshalLocalJobs(locMarsh);
+         c.setPeerClassLoadingEnabled(false);
+ 
+         return c;
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void beforeTest() throws Exception {
+         startGrids(2);
+     }
+ 
+     /** {@inheritDoc} */
+     @Override protected void afterTest() throws Exception {
+         stopAllGrids();
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testSessionLoad() throws Exception {
+         locMarsh = true;
+ 
+         checkSessionLoad();
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     public void testSessionLoadNoLocalMarshalling() throws Exception {
+         locMarsh = false;
+ 
+         checkSessionLoad();
+     }
+ 
+     /**
+      * @throws Exception If failed.
+      */
+     private void checkSessionLoad() throws Exception {
 -        final Ignite ignite = grid(1);
++        final Ignite ignite = grid(0);
+ 
+         assert ignite != null;
+         assert ignite.cluster().nodes().size() == 2;
+ 
+         info("Thread count: " + THREAD_CNT);
+ 
+         GridTestUtils.runMultiThreaded(new Callable<Object>() {
+             @Override public Object call() throws Exception {
++                ComputeTaskFuture f = null;
++
+                 try {
+                     for (int i = 0; i < EXEC_CNT; i++)
+                         assertEquals(Boolean.TRUE,
 -                            
executeAsync(ignite.compute().withName("task-name"),
++                            (f = 
executeAsync(ignite.compute().withName("task-name"),
+                                 SessionLoadTestTask.class,
 -                                ignite.cluster().nodes().size() * 
2).get(20000));
++                                ignite.cluster().nodes().size() * 
2)).get(20000));
+                 }
+                 catch (Exception e) {
 -                    U.error(log, "Test failed.", e);
++                    U.error(log, "Task failed: " +
++                        f != null ? f.getTaskSession().getId() : "N/A", e);
+ 
+                     throw e;
+                 }
+                 finally {
+                     info("Thread finished.");
+                 }
+ 
+                 return null;
+             }
+         }, THREAD_CNT, "grid-load-test-thread");
+     }
+ 
+     /**
+      *
+      */
+     @ComputeTaskSessionFullSupport
+     private static class SessionLoadTestTask extends 
ComputeTaskAdapter<Integer, Boolean> {
+         /** */
+         @IgniteTaskSessionResource
+         private ComputeTaskSession taskSes;
+ 
+         /** */
+         @IgniteLoggerResource
+         private IgniteLogger log;
+ 
+         /** */
+         private Map<String, Integer> params;
+ 
+         /** {@inheritDoc} */
+         @Override public Map<? extends ComputeJob, ClusterNode> 
map(List<ClusterNode> subgrid, Integer arg)
+             throws IgniteCheckedException {
+             assert taskSes != null;
+             assert arg != null;
+             assert arg > 1;
+ 
+             Map<SessionLoadTestJob, ClusterNode> map = new 
HashMap<>(subgrid.size());
+ 
+             Iterator<ClusterNode> iter = subgrid.iterator();
+ 
+             Random rnd = new Random();
+ 
+             params = new HashMap<>(arg);
+ 
+             for (int i = 0; i < arg; i++) {
+                 // Recycle iterator.
+                 if (!iter.hasNext())
+                     iter = subgrid.iterator();
+ 
+                 String paramName = UUID.randomUUID().toString();
+ 
+                 int paramVal = rnd.nextInt();
+ 
+                 taskSes.setAttribute(paramName, paramVal);
+ 
+                 map.put(new SessionLoadTestJob(paramName), iter.next());
+ 
+                 params.put(paramName, paramVal);
+ 
+                 if (log.isDebugEnabled())
+                     log.debug("Set session attribute [name=" + paramName + ", 
value=" + paramVal + ']');
+             }
+ 
+             return map;
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public Boolean reduce(List<ComputeJobResult> results) 
throws IgniteCheckedException {
+             assert taskSes != null;
+             assert results != null;
+             assert params != null;
+             assert !params.isEmpty();
+             assert results.size() == params.size();
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Reducing: " + params);
+ 
+             Map<String, Integer> receivedParams = new HashMap<>();
+ 
+             boolean allAttrReceived = false;
+ 
+             for (int i = 0; i < 3 && !allAttrReceived; i++) {
+                 allAttrReceived = true;
+ 
+                 for (Map.Entry<String, Integer> entry : params.entrySet()) {
+                     Serializable attr = taskSes.getAttribute(entry.getKey());
+ 
+                     assert attr != null;
+ 
+                     int newVal = (Integer)attr;
+ 
+                     receivedParams.put(entry.getKey(), newVal);
+ 
+                     // New value is expected to be +1 to argument value.
+                     if (newVal != entry.getValue() + 1)
+                         allAttrReceived = false;
+                 }
+ 
+                 if (!allAttrReceived)
+                     U.sleep(1000);
+             }
+ 
+             if (log.isDebugEnabled()) {
+                 for (Map.Entry<String, Integer> entry : 
receivedParams.entrySet())
+                     log.debug("Received session attribute value [name=" + 
entry.getKey() + ", val=" + entry.getValue()
+                         + ", expected=" + (params.get(entry.getKey()) + 1) + 
']');
+             }
+ 
+             return allAttrReceived;
+         }
+     }
+ 
+     /**
+      *
+      */
+     private static class SessionLoadTestJob extends ComputeJobAdapter {
+         /** */
+         @IgniteTaskSessionResource
+         private ComputeTaskSession taskSes;
+ 
+         /** */
+         @IgniteLoggerResource
+         private IgniteLogger log;
+ 
+         /**
+          * @param arg Argument.
+          */
+         private SessionLoadTestJob(String arg) {
+             super(arg);
+         }
+ 
+         /** {@inheritDoc} */
+         @Override public Serializable execute() throws IgniteCheckedException 
{
+             assert taskSes != null;
+             assert argument(0) != null;
+ 
+             Serializable ser = taskSes.getAttribute(argument(0));
+ 
+             assert ser != null;
+ 
+             int val = (Integer)ser + 1;
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Executing session load job: " + val);
+ 
+             // Generate garbage.
+             for (int i = 0; i < 10; i++)
+                 taskSes.setAttribute(argument(0), i);
+ 
+             // Set final value (+1 to original value).
+             taskSes.setAttribute(argument(0), val);
+ 
+             if (log.isDebugEnabled())
+                 log.debug("Set session attribute [name=" + argument(0) + ", 
value=" + val + ']');
+ 
+             return val;
+         }
+     }
+ }

Reply via email to