http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java index 0000000,13dc389..8ed6319 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java @@@ -1,0 -1,894 +1,888 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.job; + + import org.apache.ignite.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.compute.*; + import org.apache.ignite.events.*; + import org.apache.ignite.fs.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.marshaller.*; + import org.apache.ignite.internal.managers.deployment.*; + import org.apache.ignite.internal.processors.task.*; + import org.apache.ignite.internal.processors.timeout.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.internal.util.worker.*; + import org.jetbrains.annotations.*; + + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + + import static org.apache.ignite.events.IgniteEventType.*; + import static org.apache.ignite.internal.GridTopic.*; + import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + + /** + * Job worker. + */ + public class GridJobWorker extends GridWorker implements GridTimeoutObject { + /** Per-thread held flag. */ + private static final ThreadLocal<Boolean> HOLD = new ThreadLocal<Boolean>() { + @Override protected Boolean initialValue() { + return false; + } + }; + + /** Static logger to avoid re-creation. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** */ + private final long createTime; + + /** */ + private volatile long startTime; + + /** */ + private volatile long finishTime; + + /** */ + private final GridKernalContext ctx; + + /** */ + private final Object jobTopic; + + /** */ + private final Object taskTopic; + + /** */ + private byte[] jobBytes; + + /** Task originating node. */ + private final ClusterNode taskNode; + + /** Flag set when visor or internal task is running. */ + private final boolean internal; + + /** */ + private final IgniteLogger log; + + /** */ + private final IgniteMarshaller marsh; + + /** */ + private final GridJobSessionImpl ses; + + /** */ + private final GridJobContextImpl jobCtx; + + /** */ + private final GridJobEventListener evtLsnr; + + /** Deployment. */ + private final GridDeployment dep; + + /** */ + private final AtomicBoolean finishing = new AtomicBoolean(); + + /** Guard ensuring that master-leave callback is not execute more than once. */ + private final AtomicBoolean masterLeaveGuard = new AtomicBoolean(); + + /** */ + private volatile boolean timedOut; + + /** */ + private volatile boolean sysCancelled; + + /** */ + private volatile boolean sysStopping; + + /** */ + private volatile boolean isStarted; + + /** Deployed job. */ + private ComputeJob job; + + /** Halted flag (if greater than 0, job is halted). */ + private final AtomicInteger held = new AtomicInteger(); + + /** Hold/unhold listener to notify job processor. */ + private final GridJobHoldListener holdLsnr; + + /** + * @param ctx Kernal context. + * @param dep Grid deployment. + * @param createTime Create time. + * @param ses Grid task session. + * @param jobCtx Job context. + * @param jobBytes Grid job bytes. + * @param job Job. + * @param taskNode Grid task node. + * @param internal Whether or not task was marked with {@link GridInternal} + * @param evtLsnr Job event listener. + * @param holdLsnr Hold listener. + */ + GridJobWorker( + GridKernalContext ctx, + GridDeployment dep, + long createTime, + GridJobSessionImpl ses, + GridJobContextImpl jobCtx, + byte[] jobBytes, + ComputeJob job, + ClusterNode taskNode, + boolean internal, + GridJobEventListener evtLsnr, + GridJobHoldListener holdLsnr) { + super(ctx.gridName(), "grid-job-worker", ctx.log()); + + assert ctx != null; + assert ses != null; + assert jobCtx != null; + assert taskNode != null; + assert evtLsnr != null; + assert dep != null; + assert holdLsnr != null; + + this.ctx = ctx; + this.createTime = createTime; + this.evtLsnr = evtLsnr; + this.dep = dep; + this.ses = ses; + this.jobCtx = jobCtx; + this.jobBytes = jobBytes; + this.taskNode = taskNode; + this.internal = internal; + this.holdLsnr = holdLsnr; + + if (job != null) + this.job = job; + + log = U.logger(ctx, logRef, this); + + marsh = ctx.config().getMarshaller(); + + UUID locNodeId = ctx.discovery().localNode().id(); + + jobTopic = TOPIC_JOB.topic(ses.getJobId(), locNodeId); + taskTopic = TOPIC_TASK.topic(ses.getJobId(), locNodeId); + } + + /** + * Gets deployed job or {@code null} of job could not be deployed. + * + * @return Deployed job. + */ + @Nullable public ComputeJob getJob() { + return job; + } + + /** + * @return Deployed task. + */ + public GridDeployment getDeployment() { + return dep; + } + + /** + * Returns {@code True} if job was cancelled by the system. + * + * @return {@code True} if job was cancelled by the system. + */ + boolean isSystemCanceled() { + return sysCancelled; + } + + /** + * @return Create time. + */ + long getCreateTime() { + return createTime; + } + + /** + * @return Unique job ID. + */ + public IgniteUuid getJobId() { + IgniteUuid jobId = ses.getJobId(); + + assert jobId != null; + + return jobId; + } + + /** + * @return Job context. + */ + public ComputeJobContext getJobContext() { + return jobCtx; + } + + /** + * @return Job communication topic. + */ + Object getJobTopic() { + return jobTopic; + } + + /** + * @return Task communication topic. + */ + Object getTaskTopic() { + return taskTopic; + } + + /** + * @return Session. + */ + public GridJobSessionImpl getSession() { + return ses; + } + + /** + * Gets job finishing state. + * + * @return {@code true} if job is being finished after execution + * and {@code false} otherwise. + */ + boolean isFinishing() { + return finishing.get(); + } + + /** + * @return Parent task node ID. + */ + ClusterNode getTaskNode() { + return taskNode; + } + + /** + * @return Job execution time. + */ + long getExecuteTime() { + long startTime0 = startTime; + long finishTime0 = finishTime; + + return startTime0 == 0 ? 0 : finishTime0 == 0 ? + U.currentTimeMillis() - startTime0 : finishTime0 - startTime0; + } + + /** + * @return Time job spent on waiting queue. + */ + long getQueuedTime() { + long startTime0 = startTime; + + return startTime0 == 0 ? U.currentTimeMillis() - createTime : startTime0 - createTime; + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return ses.getEndTime(); + } + + /** {@inheritDoc} */ + @Override public IgniteUuid timeoutId() { + IgniteUuid jobId = ses.getJobId(); + + assert jobId != null; + + return jobId; + } + + /** + * @return {@code True} if job is timed out. + */ + boolean isTimedOut() { + return timedOut; + } + + /** + * @return {@code True} if parent task is internal or Visor-related. + */ + public boolean isInternal() { + return internal; + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + if (finishing.get()) + return; + + timedOut = true; + + U.warn(log, "Job has timed out: " + ses); + + cancel(); + + if (!internal && ctx.event().isRecordable(EVT_JOB_TIMEDOUT)) + recordEvent(EVT_JOB_TIMEDOUT, "Job has timed out: " + job); + } + + /** + * Callback for whenever grid is stopping. + */ + public void onStopping() { + sysStopping = true; + } + + /** + * @return {@code True} if job was halted. + */ + public boolean held() { + return held.get() > 0; + } + + /** + * Sets halt flags. + */ + public void hold() { + held.incrementAndGet(); + + HOLD.set(true); + + holdLsnr.onHold(this); + } + + /** + * Initializes job. Handles deployments and event recording. + * + * @param dep Job deployed task. + * @param taskCls Task class. + * @return {@code True} if job was successfully initialized. + */ + boolean initialize(GridDeployment dep, Class<?> taskCls) { + assert dep != null; + + IgniteCheckedException ex = null; + + try { + if (job == null) { + job = marsh.unmarshal(jobBytes, dep.classLoader()); + + // No need to hold reference any more. + jobBytes = null; + } + + // Inject resources. + ctx.resource().inject(dep, taskCls, job, ses, jobCtx); + + if (!internal && ctx.event().isRecordable(EVT_JOB_QUEUED)) + recordEvent(EVT_JOB_QUEUED, "Job got queued for computation."); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to initialize job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); + + ex = e; + } + catch (Throwable e) { + ex = handleThrowable(e); + + assert ex != null; + } + finally { + if (ex != null) + finishJob(null, ex, true); + } + + return ex == null; + } + + /** {@inheritDoc} */ + @Override protected void body() { + assert job != null; + + startTime = U.currentTimeMillis(); + + isStarted = true; + + // Event notification. + evtLsnr.onJobStarted(this); + + if (!internal && ctx.event().isRecordable(EVT_JOB_STARTED)) + recordEvent(EVT_JOB_STARTED, /*no message for success*/null); + + execute0(true); + } + + /** + * Executes the job. + */ + public void execute() { + execute0(false); + } + + /** + * @param skipNtf {@code True} to skip job processor {@code onUnhold()} + * notification (only from {@link #body()}). + */ + private void execute0(boolean skipNtf) { + // Make sure flag is not set for current thread. + HOLD.set(false); + + if (isCancelled()) + // If job was cancelled prior to assigning runner to it? + super.cancel(); + + if (!skipNtf) { + holdLsnr.onUnhold(this); + + int c = held.decrementAndGet(); + + if (c > 0) { + if (log.isDebugEnabled()) + log.debug("Ignoring job execution (job was held several times) [c=" + c + ']'); + + return; + } + } + + boolean sndRes = true; + + Object res = null; + + IgniteCheckedException ex = null; + + try { + ctx.job().currentTaskSession(ses); + + // If job has timed out, then + // avoid computation altogether. + if (isTimedOut()) + sndRes = false; + else { + res = U.wrapThreadLoader(dep.classLoader(), new Callable<Object>() { + @Nullable @Override public Object call() throws IgniteCheckedException { + try { + if (internal && ctx.config().isPeerClassLoadingEnabled()) + ctx.job().internal(true); + + return job.execute(); + } + finally { + if (internal && ctx.config().isPeerClassLoadingEnabled()) + ctx.job().internal(false); + } + } + }); + + if (log.isDebugEnabled()) + log.debug("Job execution has successfully finished [job=" + job + ", res=" + res + ']'); + } + } + catch (IgniteCheckedException e) { + if (sysStopping && e.hasCause(IgniteInterruptedException.class, InterruptedException.class)) { + ex = handleThrowable(e); + + assert ex != null; + } + else { + if (X.hasCause(e, GridInternalException.class) || X.hasCause(e, IgniteFsOutOfSpaceException.class)) { + // Print exception for internal errors only if debug is enabled. + if (log.isDebugEnabled()) + U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); + } + else if (X.hasCause(e, InterruptedException.class)) { + String msg = "Job was cancelled [jobId=" + ses.getJobId() + ", ses=" + ses + ']'; + + if (log.isDebugEnabled()) + U.error(log, msg, e); + else + U.warn(log, msg); + } + else + U.error(log, "Failed to execute job [jobId=" + ses.getJobId() + ", ses=" + ses + ']', e); + + ex = e; + } + } + // Catch Throwable to protect against bad user code except + // InterruptedException if job is being cancelled. + catch (Throwable e) { + ex = handleThrowable(e); + + assert ex != null; + } + finally { + // Finish here only if not held by this thread. + if (!HOLD.get()) + finishJob(res, ex, sndRes); + + ctx.job().currentTaskSession(null); + } + } + + /** + * Handles {@link Throwable} generic exception for task + * deployment and execution. + * + * @param e Exception. + * @return Wrapped exception. + */ + private IgniteCheckedException handleThrowable(Throwable e) { + String msg = null; + + IgniteCheckedException ex = null; + + // Special handling for weird interrupted exception which + // happens due to JDk 1.5 bug. + if (e instanceof InterruptedException && !sysStopping) { + msg = "Failed to execute job due to interrupted exception."; + + // Turn interrupted exception into checked exception. + ex = new IgniteCheckedException(msg, e); + } + // Special 'NoClassDefFoundError' handling if P2P is on. We had many questions + // about this exception and decided to change error message. + else if ((e instanceof NoClassDefFoundError || e instanceof ClassNotFoundException) + && ctx.config().isPeerClassLoadingEnabled()) { + msg = "Failed to execute job due to class or resource loading exception (make sure that task " + + "originating node is still in grid and requested class is in the task class path) [jobId=" + + ses.getJobId() + ", ses=" + ses + ']'; + + ex = new ComputeUserUndeclaredException(msg, e); + } + else if (sysStopping && X.hasCause(e, InterruptedException.class, IgniteInterruptedException.class)) { + msg = "Job got interrupted due to system stop (will attempt failover)."; + + ex = new ComputeExecutionRejectedException(e); + } + + if (msg == null) { + msg = "Failed to execute job due to unexpected runtime exception [jobId=" + ses.getJobId() + + ", ses=" + ses + ']'; + + ex = new ComputeUserUndeclaredException(msg, e); + } + + assert msg != null; + assert ex != null; + + U.error(log, msg, e); + + return ex; + } + + /** {@inheritDoc} */ + @Override public void cancel() { + cancel(false); + } + + /** + * @param sys System flag. + */ + public void cancel(boolean sys) { + try { + super.cancel(); + + final ComputeJob job0 = job; + + if (sys) + sysCancelled = true; + + if (job0 != null) { + if (log.isDebugEnabled()) + log.debug("Cancelling job: " + ses); + + U.wrapThreadLoader(dep.classLoader(), new IgniteRunnable() { + @Override public void run() { + job0.cancel(); + } + }); + } + + if (!internal && ctx.event().isRecordable(EVT_JOB_CANCELLED)) + recordEvent(EVT_JOB_CANCELLED, "Job was cancelled: " + job0); + } + // Catch throwable to protect against bad user code. + catch (Throwable e) { + U.error(log, "Failed to cancel job due to undeclared user exception [jobId=" + ses.getJobId() + + ", ses=" + ses + ']', e); + } + } + + /** + * @param evtType Event type. + * @param msg Message. + */ + private void recordEvent(int evtType, @Nullable String msg) { + assert ctx.event().isRecordable(evtType); + assert !internal; + + IgniteJobEvent evt = new IgniteJobEvent(); + + evt.jobId(ses.getJobId()); + evt.message(msg); + evt.node(ctx.discovery().localNode()); + evt.taskName(ses.getTaskName()); + evt.taskClassName(ses.getTaskClassName()); + evt.taskSessionId(ses.getId()); + evt.type(evtType); + evt.taskNode(taskNode); + evt.taskSubjectId(ses.subjectId()); + + ctx.event().record(evt); + } + + /** + * @param res Result. + * @param ex Error. + * @param sndReply If {@code true}, reply will be sent. + */ + void finishJob(@Nullable Object res, @Nullable IgniteCheckedException ex, boolean sndReply) { + // Avoid finishing a job more than once from different threads. + if (!finishing.compareAndSet(false, true)) + return; + + // Do not send reply if job has been cancelled from system. + if (sndReply) + sndReply = !sysCancelled; + + // We should save message ID here since listener callback will reset sequence. + ClusterNode sndNode = ctx.discovery().node(taskNode.id()); + - long msgId = sndNode != null && ses.isFullSupport() ? - ctx.io().nextMessageId(taskTopic, sndNode.id()) : -1; - + finishTime = U.currentTimeMillis(); + + Collection<IgniteBiTuple<Integer, String>> evts = null; + + try { + if (ses.isFullSupport()) + evtLsnr.onBeforeJobResponseSent(this); + + // Send response back only if job has not timed out. + if (!isTimedOut()) { + if (sndReply) { + if (sndNode == null) { + onMasterNodeLeft(); + + U.warn(log, "Failed to reply to sender node because it left grid [nodeId=" + taskNode.id() + + ", ses=" + ses + ", jobId=" + ses.getJobId() + ", job=" + job + ']'); + + // Record job reply failure. + if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED)) + evts = addEvent(evts, EVT_JOB_FAILED, "Job reply failed (task node left grid): " + job); + } + else { + try { + if (ex != null) { + if (isStarted) { + // Job failed. + if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED)) + evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to exception [ex=" + + ex + ", job=" + job + ']'); + } + else if (!internal && ctx.event().isRecordable(EVT_JOB_REJECTED)) + evts = addEvent(evts, EVT_JOB_REJECTED, "Job has not been started " + + "[ex=" + ex + ", job=" + job + ']'); + } + else if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED)) + evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null); + + boolean loc = ctx.localNodeId().equals(sndNode.id()) && !ctx.config().isMarshalLocalJobs(); + + Map<Object, Object> attrs = jobCtx.getAttributes(); + + GridJobExecuteResponse jobRes = new GridJobExecuteResponse( + ctx.localNodeId(), + ses.getId(), + ses.getJobId(), + loc ? null : marsh.marshal(ex), + loc ? ex : null, + loc ? null: marsh.marshal(res), + loc ? res : null, + loc ? null : marsh.marshal(attrs), + loc ? attrs : null, + isCancelled()); + + long timeout = ses.getEndTime() - U.currentTimeMillis(); + + if (timeout <= 0) + // Ignore the actual timeout and send response anyway. + timeout = 1; + + if (ses.isFullSupport()) { + // Send response to designated job topic. + // Always go through communication to preserve order, + // if attributes are enabled. - assert msgId > 0; - + ctx.io().sendOrderedMessage( + sndNode, + taskTopic, - msgId, + jobRes, + internal ? MANAGEMENT_POOL : SYSTEM_POOL, + timeout, + false); + } + else if (ctx.localNodeId().equals(sndNode.id())) + ctx.task().processJobExecuteResponse(ctx.localNodeId(), jobRes); + else + // Send response to common topic as unordered message. + ctx.io().send(sndNode, TOPIC_TASK, jobRes, internal ? MANAGEMENT_POOL : SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + // Log and invoke the master-leave callback. + if (isDeadNode(taskNode.id())) { + onMasterNodeLeft(); + + // Avoid stack trace for left nodes. + U.warn(log, "Failed to reply to sender node because it left grid " + + "[nodeId=" + taskNode.id() + ", jobId=" + ses.getJobId() + + ", ses=" + ses + ", job=" + job + ']'); + } + else + U.error(log, "Error sending reply for job [nodeId=" + sndNode.id() + ", jobId=" + + ses.getJobId() + ", ses=" + ses + ", job=" + job + ']', e); + + if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED)) + evts = addEvent(evts, EVT_JOB_FAILED, "Failed to send reply for job [nodeId=" + + taskNode.id() + ", job=" + job + ']'); + } + // Catching interrupted exception because + // it gets thrown for some reason. + catch (Exception e) { + String msg = "Failed to send reply for job [nodeId=" + taskNode.id() + ", job=" + job + ']'; + + U.error(log, msg, e); + + if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED)) + evts = addEvent(evts, EVT_JOB_FAILED, msg); + } + } + } + else { + if (ex != null) { + if (isStarted) { + if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED)) + evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to exception [ex=" + ex + + ", job=" + job + ']'); + } + else if (!internal && ctx.event().isRecordable(EVT_JOB_REJECTED)) + evts = addEvent(evts, EVT_JOB_REJECTED, "Job has not been started [ex=" + ex + + ", job=" + job + ']'); + } + else if (!internal && ctx.event().isRecordable(EVT_JOB_FINISHED)) + evts = addEvent(evts, EVT_JOB_FINISHED, /*no message for success. */null); + } + } + // Job timed out. + else if (!internal && ctx.event().isRecordable(EVT_JOB_FAILED)) + evts = addEvent(evts, EVT_JOB_FAILED, "Job failed due to timeout: " + job); + } + finally { + if (evts != null) { + for (IgniteBiTuple<Integer, String> t : evts) + recordEvent(t.get1(), t.get2()); + } + + // Listener callback. + evtLsnr.onJobFinished(this); + } + } + + /** + * If the job implements {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware#onMasterNodeLeft} interface then invoke + * {@link org.apache.ignite.compute.ComputeJobMasterLeaveAware#onMasterNodeLeft(org.apache.ignite.compute.ComputeTaskSession)} method. + * + * @return {@code True} if master leave has been handled (either by this call or before). + */ + boolean onMasterNodeLeft() { + if (job instanceof ComputeJobMasterLeaveAware) { + if (masterLeaveGuard.compareAndSet(false, true)) { + try { + ((ComputeJobMasterLeaveAware)job).onMasterNodeLeft(ses.session()); + + if (log.isDebugEnabled()) + log.debug("Successfully executed GridComputeJobMasterLeaveAware.onMasterNodeLeft() callback " + + "[nodeId=" + taskNode.id() + ", jobId=" + ses.getJobId() + ", job=" + job + ']'); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to execute GridComputeJobMasterLeaveAware.onMasterNodeLeft() callback " + + "[nodeId=" + taskNode.id() + ", jobId=" + ses.getJobId() + ", job=" + job + ']', e); + } + } + + return true; + } + + return false; + } + + /** + * @param evts Collection (created if {@code null}). + * @param evt Event. + * @param msg Message (optional). + * @return Collection with event added. + */ + Collection<IgniteBiTuple<Integer, String>> addEvent(@Nullable Collection<IgniteBiTuple<Integer, String>> evts, + Integer evt, @Nullable String msg) { + assert ctx.event().isRecordable(evt); + assert !internal; + + if (evts == null) + evts = new ArrayList<>(); + + evts.add(F.t(evt, msg)); + + return evts; + } + + /** + * Checks whether node is alive or dead. + * + * @param uid UID of node to check. + * @return {@code true} if node is dead, {@code false} is node is alive. + */ + private boolean isDeadNode(UUID uid) { + return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid); + } + + /** {@inheritDoc} */ + @Override public boolean equals(Object obj) { + if (this == obj) + return true; + + if (obj == null) + return false; + + assert obj instanceof GridJobWorker; + + IgniteUuid jobId1 = ses.getJobId(); + IgniteUuid jobId2 = ((GridJobWorker)obj).ses.getJobId(); + + assert jobId1 != null; + assert jobId2 != null; + + return jobId1.equals(jobId2); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + IgniteUuid jobId = ses.getJobId(); + + assert jobId != null; + + return jobId.hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(GridJobWorker.class, this); + } + }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java index 0000000,3e4fd5c..2ecb4b4 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskProcessor.java @@@ -1,0 -1,1295 +1,1288 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.task; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.compute.*; + import org.apache.ignite.events.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.processors.*; + import org.apache.ignite.internal.util.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.marshaller.*; + import org.apache.ignite.plugin.security.*; + import org.apache.ignite.internal.managers.communication.*; + import org.apache.ignite.internal.managers.deployment.*; + import org.apache.ignite.internal.managers.eventstorage.*; + import org.apache.ignite.internal.util.lang.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.jdk8.backport.*; + import org.jetbrains.annotations.*; + + import java.util.*; + import java.util.concurrent.*; + + import static org.apache.ignite.events.IgniteEventType.*; + import static org.apache.ignite.internal.GridTopic.*; + import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*; + + /** + * This class defines task processor. + */ + public class GridTaskProcessor extends GridProcessorAdapter { + /** Wait for 5 seconds to allow discovery to take effect (best effort). */ + private static final long DISCO_TIMEOUT = 5000; + + /** */ + private static final Map<GridTaskThreadContextKey, Object> EMPTY_ENUM_MAP = + new EnumMap<>(GridTaskThreadContextKey.class); + + /** */ + private final IgniteMarshaller marsh; + + /** */ - private final ConcurrentMap<IgniteUuid, GridTaskWorker<?, ?>> tasks = GridConcurrentFactory.newMap(); ++ public final ConcurrentMap<IgniteUuid, GridTaskWorker<?, ?>> tasks = GridConcurrentFactory.newMap(); + + /** */ + private boolean stopping; + + /** */ + private boolean waiting; + + /** */ + private final GridLocalEventListener discoLsnr; + + /** Total executed tasks. */ + private final LongAdder execTasks = new LongAdder(); + + /** */ + private final ThreadLocal<Map<GridTaskThreadContextKey, Object>> thCtx = + new ThreadLocal<>(); + + /** */ + private final GridSpinReadWriteLock lock = new GridSpinReadWriteLock(); + + /** Internal metadata cache. */ + private GridCache<GridTaskNameHashKey, String> tasksMetaCache; + + /** + * @param ctx Kernal context. + */ + public GridTaskProcessor(GridKernalContext ctx) { + super(ctx); + + marsh = ctx.config().getMarshaller(); + + discoLsnr = new TaskDiscoveryListener(); + + tasksMetaCache = ctx.security().enabled() ? ctx.cache().<GridTaskNameHashKey, String>utilityCache() : null; + } + + /** {@inheritDoc} */ + @Override public void start() { + ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED, EVT_NODE_LEFT); + + ctx.io().addMessageListener(TOPIC_JOB_SIBLINGS, new JobSiblingsMessageListener()); + ctx.io().addMessageListener(TOPIC_TASK_CANCEL, new TaskCancelMessageListener()); + ctx.io().addMessageListener(TOPIC_TASK, new JobMessageListener(true)); + + if (log.isDebugEnabled()) + log.debug("Started task processor."); + } + + /** {@inheritDoc} */ + @SuppressWarnings("TooBroadScope") + @Override public void onKernalStop(boolean cancel) { + lock.writeLock(); + + try { + stopping = true; + + waiting = !cancel; + } + finally { + lock.writeUnlock(); + } + + int size = tasks.size(); + + if (size > 0) { + if (cancel) + U.warn(log, "Will cancel unfinished tasks due to stopping of the grid [cnt=" + size + "]"); + else + U.warn(log, "Will wait for all job responses from worker nodes before stopping grid."); + + for (GridTaskWorker<?, ?> task : tasks.values()) { + if (!cancel) { + try { + task.getTaskFuture().get(); + } + catch (ComputeTaskCancelledException e) { + U.warn(log, e.getMessage()); + } + catch (IgniteCheckedException e) { + U.error(log, "Task failed: " + task, e); + } + } + else { + for (ClusterNode node : ctx.discovery().nodes(task.getSession().getTopology())) { + if (ctx.localNodeId().equals(node.id())) + ctx.job().masterLeaveLocal(task.getSession().getId()); + } + + task.cancel(); + + Throwable ex = new ComputeTaskCancelledException("Task cancelled due to stopping of the grid: " + + task); + + task.finishTask(null, ex, false); + } + } + + U.join(tasks.values(), log); + } + + // Remove discovery and message listeners. + ctx.event().removeLocalEventListener(discoLsnr); + + ctx.io().removeMessageListener(TOPIC_JOB_SIBLINGS); + ctx.io().removeMessageListener(TOPIC_TASK_CANCEL); + + // Set waiting flag to false to make sure that we do not get + // listener notifications any more. + if (!cancel) { + lock.writeLock(); + + try { + waiting = false; + } + finally { + lock.writeUnlock(); + } + } + + assert tasks.isEmpty(); + + if (log.isDebugEnabled()) + log.debug("Finished executing task processor onKernalStop() callback."); + } + + /** {@inheritDoc} */ + @Override public void stop(boolean cancel) { + if (log.isDebugEnabled()) + log.debug("Stopped task processor."); + } + + /** + * Sets the thread-local context value. + * + * @param key Key. + * @param val Value. + */ + public void setThreadContext(GridTaskThreadContextKey key, Object val) { + assert key != null; + assert val != null; + + Map<GridTaskThreadContextKey, Object> map = thCtx.get(); + + // NOTE: access to 'map' is always single-threaded since it's held + // in a thread local. + if (map == null) + thCtx.set(map = new EnumMap<>(GridTaskThreadContextKey.class)); + + map.put(key, val); + } + + /** + * Sets the thread-local context value, if it is not null. + * + * @param key Key. + * @param val Value. + */ + public void setThreadContextIfNotNull(GridTaskThreadContextKey key, @Nullable Object val) { + if (val != null) + setThreadContext(key, val); + } + + /** + * Gets thread-local context value for a given {@code key}. + * + * @param key Thread-local context key. + * @return Thread-local context value associated with given {@code key} - or {@code null} + * if value with given {@code key} doesn't exist. + */ + @Nullable public <T> T getThreadContext(GridTaskThreadContextKey key) { + assert(key != null); + + Map<GridTaskThreadContextKey, Object> map = thCtx.get(); + + return map == null ? null : (T)map.get(key); + } + + /** + * Gets currently used deployments. + * + * @return Currently used deployments. + */ + public Collection<GridDeployment> getUsedDeployments() { + return F.viewReadOnly(tasks.values(), new C1<GridTaskWorker<?, ?>, GridDeployment>() { + @Override public GridDeployment apply(GridTaskWorker<?, ?> w) { + return w.getDeployment(); + } + }); + } + + /** + * Gets currently used deployments mapped by task name or aliases. + * + * @return Currently used deployments. + */ + public Map<String, GridDeployment> getUsedDeploymentMap() { + Map<String, GridDeployment> deps = new HashMap<>(); + + for (GridTaskWorker w : tasks.values()) { + GridTaskSessionImpl ses = w.getSession(); + + deps.put(ses.getTaskClassName(), w.getDeployment()); + + if (ses.getTaskName() != null && ses.getTaskClassName().equals(ses.getTaskName())) + deps.put(ses.getTaskName(), w.getDeployment()); + } + + return deps; + } + + /** + * @param taskCls Task class. + * @param arg Optional execution argument. + * @return Task future. + * @param <T> Task argument type. + * @param <R> Task return value type. + */ + public <T, R> ComputeTaskFuture<R> execute(Class<? extends ComputeTask<T, R>> taskCls, @Nullable T arg) { + assert taskCls != null; + + lock.readLock(); + + try { + if (stopping) + throw new IllegalStateException("Failed to execute task due to grid shutdown: " + taskCls); + + return startTask(null, taskCls, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg, false); + } + finally { + lock.readUnlock(); + } + } + + /** + * @param task Actual task. + * @param arg Optional task argument. + * @return Task future. + * @param <T> Task argument type. + * @param <R> Task return value type. + */ + public <T, R> ComputeTaskFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg) { + return execute(task, arg, false); + } + + /** + * @param task Actual task. + * @param arg Optional task argument. + * @param sys If {@code true}, then system pool will be used. + * @return Task future. + * @param <T> Task argument type. + * @param <R> Task return value type. + */ + public <T, R> ComputeTaskFuture<R> execute(ComputeTask<T, R> task, @Nullable T arg, boolean sys) { + lock.readLock(); + + try { + if (stopping) + throw new IllegalStateException("Failed to execute task due to grid shutdown: " + task); + + return startTask(null, null, task, IgniteUuid.fromUuid(ctx.localNodeId()), arg, sys); + } + finally { + lock.readUnlock(); + } + } + + /** + * Resolves task name by task name hash. + * + * @param taskNameHash Task name hash. + * @return Task name or {@code null} if not found. + */ + public String resolveTaskName(int taskNameHash) { + if (taskNameHash == 0) + return null; + + assert ctx.security().enabled(); + + return tasksMetaCache.peek(new GridTaskNameHashKey(taskNameHash)); + } + + /** + * @param taskName Task name. + * @param arg Optional execution argument. + * @return Task future. + * @param <T> Task argument type. + * @param <R> Task return value type. + */ + public <T, R> ComputeTaskFuture<R> execute(String taskName, @Nullable T arg) { + assert taskName != null; + + lock.readLock(); + + try { + if (stopping) + throw new IllegalStateException("Failed to execute task due to grid shutdown: " + taskName); + + return startTask(taskName, null, null, IgniteUuid.fromUuid(ctx.localNodeId()), arg, false); + } + finally { + lock.readUnlock(); + } + } + + /** + * @param taskName Task name. + * @param taskCls Task class. + * @param task Task. + * @param sesId Task session ID. + * @param arg Optional task argument. + * @param sys If {@code true}, then system pool will be used. + * @return Task future. + */ + @SuppressWarnings("unchecked") + private <T, R> ComputeTaskFuture<R> startTask( + @Nullable String taskName, + @Nullable Class<?> taskCls, + @Nullable ComputeTask<T, R> task, + IgniteUuid sesId, + @Nullable T arg, + boolean sys) { + assert sesId != null; + + String taskClsName; + + if (task != null) + taskClsName = task.getClass().getName(); + else + taskClsName = taskCls != null ? taskCls.getName() : taskName; + + ctx.security().authorize(taskClsName, GridSecurityPermission.TASK_EXECUTE, null); + + // Get values from thread-local context. + Map<GridTaskThreadContextKey, Object> map = thCtx.get(); + + if (map == null) + map = EMPTY_ENUM_MAP; + else + // Reset thread-local context. + thCtx.remove(); + + Long timeout = (Long)map.get(TC_TIMEOUT); + + long timeout0 = timeout == null || timeout == 0 ? Long.MAX_VALUE : timeout; + + long startTime = U.currentTimeMillis(); + + long endTime = timeout0 + startTime; + + // Account for overflow. + if (endTime < 0) + endTime = Long.MAX_VALUE; + + IgniteCheckedException deployEx = null; + GridDeployment dep = null; + + // User provided task name. + if (taskName != null) { + assert taskCls == null; + assert task == null; + + try { + dep = ctx.deploy().getDeployment(taskName); + + if (dep == null) + throw new IgniteDeploymentException("Unknown task name or failed to auto-deploy " + + "task (was task (re|un)deployed?): " + taskName); + + taskCls = dep.deployedClass(taskName); + + if (taskCls == null) + throw new IgniteDeploymentException("Unknown task name or failed to auto-deploy " + + "task (was task (re|un)deployed?) [taskName=" + taskName + ", dep=" + dep + ']'); + + if (!ComputeTask.class.isAssignableFrom(taskCls)) + throw new IgniteCheckedException("Failed to auto-deploy task (deployed class is not a task) [taskName=" + + taskName + ", depCls=" + taskCls + ']'); + } + catch (IgniteCheckedException e) { + deployEx = e; + } + } + // Deploy user task class. + else if (taskCls != null) { + assert task == null; + + try { + // Implicit deploy. + dep = ctx.deploy().deploy(taskCls, U.detectClassLoader(taskCls)); + + if (dep == null) + throw new IgniteDeploymentException("Failed to auto-deploy task (was task (re|un)deployed?): " + + taskCls); + + taskName = taskName(dep, taskCls, map); + } + catch (IgniteCheckedException e) { + taskName = taskCls.getName(); + + deployEx = e; + } + } + // Deploy user task. + else if (task != null) { + try { + ClassLoader ldr; + + Class<?> cls; + + if (task instanceof GridPeerDeployAware) { + GridPeerDeployAware depAware = (GridPeerDeployAware)task; + + cls = depAware.deployClass(); + ldr = depAware.classLoader(); + + // Set proper class name to make peer-loading possible. + taskCls = cls; + } + else { + taskCls = task.getClass(); + + assert ComputeTask.class.isAssignableFrom(taskCls); + + cls = task.getClass(); + ldr = U.detectClassLoader(cls); + } + + // Explicit deploy. + dep = ctx.deploy().deploy(cls, ldr); + + if (dep == null) + throw new IgniteDeploymentException("Failed to auto-deploy task (was task (re|un)deployed?): " + cls); + + taskName = taskName(dep, taskCls, map); + } + catch (IgniteCheckedException e) { + taskName = task.getClass().getName(); + + deployEx = e; + } + } + + assert taskName != null; + + if (log.isDebugEnabled()) + log.debug("Task deployment: " + dep); + + boolean fullSup = dep != null && taskCls!= null && + dep.annotation(taskCls, ComputeTaskSessionFullSupport.class) != null; + + Collection<? extends ClusterNode> nodes = (Collection<? extends ClusterNode>)map.get(TC_SUBGRID); + + Collection<UUID> top = nodes != null ? F.nodeIds(nodes) : null; + + UUID subjId = getThreadContext(TC_SUBJ_ID); + + if (subjId == null) + subjId = ctx.localNodeId(); + + // Creates task session with task name and task version. + GridTaskSessionImpl ses = ctx.session().createTaskSession( + sesId, + ctx.config().getNodeId(), + taskName, + dep, + taskCls == null ? null : taskCls.getName(), + top, + startTime, + endTime, + Collections.<ComputeJobSibling>emptyList(), + Collections.emptyMap(), + fullSup, + subjId); + + GridTaskFutureImpl<R> fut = new GridTaskFutureImpl<>(ses, ctx); + + IgniteCheckedException securityEx = null; + + if (ctx.security().enabled() && deployEx == null) { + try { + saveTaskMetadata(taskName); + } + catch (IgniteCheckedException e) { + securityEx = e; + } + } + + if (deployEx == null && securityEx == null) { + if (dep == null || !dep.acquire()) + handleException(new IgniteDeploymentException("Task not deployed: " + ses.getTaskName()), fut); + else { + GridTaskWorker<?, ?> taskWorker = new GridTaskWorker<>( + ctx, + arg, + ses, + fut, + taskCls, + task, + dep, + new TaskEventListener(), + map, + subjId); + + if (task != null) { + // Check if someone reuses the same task instance by walking + // through the "tasks" map + for (GridTaskWorker worker : tasks.values()) { + ComputeTask workerTask = worker.getTask(); + + // Check that the same instance of task is being used by comparing references. + if (workerTask != null && task == workerTask) + U.warn(log, "Most likely the same task instance is being executed. " + + "Please avoid executing the same task instances in parallel because " + + "they may have concurrent resources access and conflict each other: " + task); + } + } + + GridTaskWorker<?, ?> taskWorker0 = tasks.putIfAbsent(sesId, taskWorker); + + assert taskWorker0 == null : "Session ID is not unique: " + sesId; + + if (dep.annotation(taskCls, ComputeTaskMapAsync.class) != null) { + try { + // Start task execution in another thread. + if (sys) + ctx.config().getSystemExecutorService().execute(taskWorker); + else + ctx.config().getExecutorService().execute(taskWorker); + } + catch (RejectedExecutionException e) { + tasks.remove(sesId); + + release(dep); + + handleException(new ComputeExecutionRejectedException("Failed to execute task " + + "due to thread pool execution rejection: " + taskName, e), fut); + } + } + else + taskWorker.run(); + } + } + else { + if (deployEx != null) + handleException(deployEx, fut); + else + handleException(securityEx, fut); + } + + return fut; + } + + /** + * @param sesId Task's session id. + * @return A {@link org.apache.ignite.compute.ComputeTaskFuture} instance or {@code null} if no such task found. + */ + @Nullable public <R> ComputeTaskFuture<R> taskFuture(IgniteUuid sesId) { + GridTaskWorker<?, ?> taskWorker = tasks.get(sesId); + + return taskWorker != null ? (ComputeTaskFuture<R>)taskWorker.getTaskFuture() : null; + } + + /** + * @return Active task futures. + */ + @SuppressWarnings("unchecked") + public <R> Map<IgniteUuid, ComputeTaskFuture<R>> taskFutures() { + Map<IgniteUuid, ComputeTaskFuture<R>> res = U.newHashMap(tasks.size()); + + for (GridTaskWorker taskWorker : tasks.values()) { + ComputeTaskFuture<R> fut = taskWorker.getTaskFuture(); + + res.put(fut.getTaskSession().getId(), fut); + } + + return res; + } + + /** + * Gets task name for a task class. It firstly checks + * {@link @GridComputeTaskName} annotation, then thread context + * map. If both are empty, class name is returned. + * + * @param dep Deployment. + * @param cls Class. + * @param map Thread context map. + * @return Task name. + * @throws IgniteCheckedException If {@link @GridComputeTaskName} annotation is found, but has empty value. + */ + private String taskName(GridDeployment dep, Class<?> cls, + Map<GridTaskThreadContextKey, Object> map) throws IgniteCheckedException { + assert dep != null; + assert cls != null; + assert map != null; + + String taskName; + + ComputeTaskName ann = dep.annotation(cls, ComputeTaskName.class); + + if (ann != null) { + taskName = ann.value(); + + if (F.isEmpty(taskName)) + throw new IgniteCheckedException("Task name specified by @GridComputeTaskName annotation" + + " cannot be empty for class: " + cls); + } + else + taskName = map.containsKey(TC_TASK_NAME) ? (String)map.get(TC_TASK_NAME) : cls.getName(); + + return taskName; + } + + /** + * Saves task name metadata to utility cache. + * + * @param taskName Task name. + */ + private void saveTaskMetadata(String taskName) throws IgniteCheckedException { + if (ctx.isDaemon()) + return; + + assert ctx.security().enabled(); + + int nameHash = taskName.hashCode(); + + // 0 is reserved for no task. + if (nameHash == 0) + nameHash = 1; + + GridTaskNameHashKey key = new GridTaskNameHashKey(nameHash); + + String existingName = tasksMetaCache.get(key); + + if (existingName == null) + existingName = tasksMetaCache.putIfAbsent(key, taskName); + + if (existingName != null && !F.eq(existingName, taskName)) + throw new IgniteCheckedException("Task name hash collision for security-enabled node [taskName=" + taskName + + ", existing taskName=" + existingName + ']'); + } + + /** + * @param dep Deployment to release. + */ + private void release(GridDeployment dep) { + assert dep != null; + + dep.release(); + + if (dep.obsolete()) + ctx.resource().onUndeployed(dep); + } + + /** + * @param ex Exception. + * @param fut Task future. + * @param <R> Result type. + */ + private <R> void handleException(Throwable ex, GridTaskFutureImpl<R> fut) { + assert ex != null; + assert fut != null; + + fut.onDone(ex); + } + + /** + * @param ses Task session. + * @param attrs Attributes. + * @throws IgniteCheckedException Thrown in case of any errors. + */ + public void setAttributes(GridTaskSessionImpl ses, Map<?, ?> attrs) throws IgniteCheckedException { + long timeout = ses.getEndTime() - U.currentTimeMillis(); + + if (timeout <= 0) { + U.warn(log, "Task execution timed out (remote session attributes won't be set): " + ses); + + return; + } + + // If setting from task or future. + if (log.isDebugEnabled()) + log.debug("Setting session attribute(s) from task or future: " + ses); + + sendSessionAttributes(attrs, ses); + } + + /** + * This method will make the best attempt to send attributes to all jobs. + * + * @param attrs Deserialized session attributes. + * @param ses Task session. + * @throws IgniteCheckedException If send to any of the jobs failed. + */ + @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter", "BusyWait"}) + private void sendSessionAttributes(Map<?, ?> attrs, GridTaskSessionImpl ses) + throws IgniteCheckedException { + assert attrs != null; + assert ses != null; + + Collection<ComputeJobSibling> siblings = ses.getJobSiblings(); + + GridIoManager commMgr = ctx.io(); + + long timeout = ses.getEndTime() - U.currentTimeMillis(); + + if (timeout <= 0) { + U.warn(log, "Session attributes won't be set due to task timeout: " + attrs); + + return; + } + - Map<UUID, Long> msgIds = new HashMap<>(siblings.size(), 1.0f); ++ Set<UUID> rcvrs = new HashSet<>(); + + UUID locNodeId = ctx.localNodeId(); + + synchronized (ses) { + if (ses.isClosed()) { + if (log.isDebugEnabled()) + log.debug("Setting session attributes on closed session (will ignore): " + ses); + + return; + } + + ses.setInternal(attrs); + + // Do this inside of synchronization block, so every message + // ID will be associated with a certain session state. + for (ComputeJobSibling s : siblings) { + GridJobSiblingImpl sib = (GridJobSiblingImpl)s; + + UUID nodeId = sib.nodeId(); + - if (!nodeId.equals(locNodeId) && !sib.isJobDone() && !msgIds.containsKey(nodeId)) - msgIds.put(nodeId, commMgr.nextMessageId(sib.jobTopic(), nodeId)); ++ if (!nodeId.equals(locNodeId) && !sib.isJobDone() && !rcvrs.contains(nodeId)) ++ rcvrs.add(nodeId); + } + } + + if (ctx.event().isRecordable(EVT_TASK_SESSION_ATTR_SET)) { + IgniteEvent evt = new IgniteTaskEvent( + ctx.discovery().localNode(), + "Changed attributes: " + attrs, + EVT_TASK_SESSION_ATTR_SET, + ses.getId(), + ses.getTaskName(), + ses.getTaskClassName(), + false, + null); + + ctx.event().record(evt); + } + + IgniteCheckedException ex = null; + + // Every job gets an individual message to keep track of ghost requests. + for (ComputeJobSibling s : ses.getJobSiblings()) { + GridJobSiblingImpl sib = (GridJobSiblingImpl)s; + + UUID nodeId = sib.nodeId(); + - Long msgId = msgIds.remove(nodeId); - + // Pair can be null if job is finished. - if (msgId != null) { - assert msgId > 0; - ++ if (rcvrs.remove(nodeId)) { + ClusterNode node = ctx.discovery().node(nodeId); + + // Check that node didn't change (it could happen in case of failover). + if (node != null) { + boolean loc = node.id().equals(ctx.localNodeId()) && !ctx.config().isMarshalLocalJobs(); + + GridTaskSessionRequest req = new GridTaskSessionRequest( + ses.getId(), + null, + loc ? null : marsh.marshal(attrs), + attrs); + + // Make sure to go through IO manager always, since order + // should be preserved here. + try { + commMgr.sendOrderedMessage( + node, + sib.jobTopic(), - msgId, + req, + SYSTEM_POOL, + timeout, + false); + } + catch (IgniteCheckedException e) { + node = ctx.discovery().node(nodeId); + + if (node != null) { + try { + // Since communication on remote node may stop before + // we get discovery notification, we give ourselves the + // best effort to detect it. + Thread.sleep(DISCO_TIMEOUT); + } + catch (InterruptedException ignore) { + U.warn(log, "Got interrupted while sending session attributes."); + } + + node = ctx.discovery().node(nodeId); + } + + String err = "Failed to send session attribute request message to node " + + "(normal case if node left grid) [node=" + node + ", req=" + req + ']'; + + if (node != null) + U.warn(log, err); + else if (log.isDebugEnabled()) + log.debug(err); + + if (ex == null) + ex = e; + } + } + } + } + + if (ex != null) + throw ex; + } + + /** + * @param nodeId Node ID. + * @param msg Execute response message. + */ + public void processJobExecuteResponse(UUID nodeId, GridJobExecuteResponse msg) { + assert nodeId != null; + assert msg != null; + + lock.readLock(); + + try { + if (stopping && !waiting) { + U.warn(log, "Received job execution response while stopping grid (will ignore): " + msg); + + return; + } + + GridTaskWorker<?, ?> task = tasks.get(msg.getSessionId()); + + if (task == null) { + if (log.isDebugEnabled()) + log.debug("Received job execution response for unknown task (was task already reduced?): " + msg); + + return; + } + + if (log.isDebugEnabled()) + log.debug("Received grid job response message [msg=" + msg + ", nodeId=" + nodeId + ']'); + + task.onResponse(msg); + } + finally { + lock.readUnlock(); + } + } + + /** + * @param nodeId Node ID. + * @param msg Task session request. + */ + @SuppressWarnings({"unchecked"}) + private void processTaskSessionRequest(UUID nodeId, GridTaskSessionRequest msg) { + assert nodeId != null; + assert msg != null; + + lock.readLock(); + + try { + if (stopping && !waiting) { + U.warn(log, "Received task session request while stopping grid (will ignore): " + msg); + + return; + } + + GridTaskWorker<?, ?> task = tasks.get(msg.getSessionId()); + + if (task == null) { + if (log.isDebugEnabled()) + log.debug("Received task session request for unknown task (was task already reduced?): " + msg); + + return; + } + + boolean loc = ctx.localNodeId().equals(nodeId) && !ctx.config().isMarshalLocalJobs(); + + Map<?, ?> attrs = loc ? msg.getAttributes() : + marsh.<Map<?, ?>>unmarshal(msg.getAttributesBytes(), task.getTask().getClass().getClassLoader()); + + GridTaskSessionImpl ses = task.getSession(); + + sendSessionAttributes(attrs, ses); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to deserialize session request: " + msg, e); + } + finally { + lock.readUnlock(); + } + } + + /** + * Handles user cancellation. + * + * @param sesId Session ID. + */ + public void onCancelled(IgniteUuid sesId) { + assert sesId != null; + + lock.readLock(); + + try { + if (stopping && !waiting) { + U.warn(log, "Attempt to cancel task while stopping grid (will ignore): " + sesId); + + return; + } + + GridTaskWorker<?, ?> task = tasks.get(sesId); + + if (task == null) { + if (log.isDebugEnabled()) + log.debug("Attempt to cancel unknown task (was task already reduced?): " + sesId); + + return; + } + + task.finishTask(null, new ComputeTaskCancelledException("Task was cancelled."), true); + } + finally { + lock.readUnlock(); + } + } + + /** + * @return Number of executed tasks. + */ + public int getTotalExecutedTasks() { + return execTasks.intValue(); + } + + /** + * Resets processor metrics. + */ + public void resetMetrics() { + // Can't use 'reset' method because it is not thread-safe + // according to javadoc. + execTasks.add(-execTasks.sum()); + } + + /** {@inheritDoc} */ + @Override public void printMemoryStats() { + X.println(">>>"); + X.println(">>> Task processor memory stats [grid=" + ctx.gridName() + ']'); + X.println(">>> tasksSize: " + tasks.size()); + } + + /** + * Listener for individual task events. + */ + @SuppressWarnings({"deprecation"}) + private class TaskEventListener implements GridTaskEventListener { + /** */ + private final GridMessageListener msgLsnr = new JobMessageListener(false); + + /** {@inheritDoc} */ + @Override public void onTaskStarted(GridTaskWorker<?, ?> worker) { + // Register for timeout notifications. + if (worker.endTime() < Long.MAX_VALUE) + ctx.timeout().addTimeoutObject(worker); + } + + /** {@inheritDoc} */ + @Override public void onJobSend(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib) { + if (worker.getSession().isFullSupport()) + // Listener is stateless, so same listener can be reused for all jobs. + ctx.io().addMessageListener(sib.taskTopic(), msgLsnr); + } + + /** {@inheritDoc} */ + @Override public void onJobFailover(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib, UUID nodeId) { + GridIoManager ioMgr = ctx.io(); + + // Remove message ID registration and old listener. + if (worker.getSession().isFullSupport()) { - ioMgr.removeMessageId(sib.jobTopic()); + ioMgr.removeMessageListener(sib.taskTopic(), msgLsnr); + + synchronized (worker.getSession()) { + // Reset ID on sibling prior to sending request. + sib.nodeId(nodeId); + } + + // Register new listener on new topic. + ioMgr.addMessageListener(sib.taskTopic(), msgLsnr); + } + else { + // Update node ID only in case attributes are not enabled. + synchronized (worker.getSession()) { + // Reset ID on sibling prior to sending request. + sib.nodeId(nodeId); + } + } + } + + /** {@inheritDoc} */ + @Override public void onJobFinished(GridTaskWorker<?, ?> worker, GridJobSiblingImpl sib) { + // Mark sibling finished for the purpose of setting session attributes. + synchronized (worker.getSession()) { + sib.onJobDone(); + } + } + + /** {@inheritDoc} */ + @Override public void onTaskFinished(GridTaskWorker<?, ?> worker) { + GridTaskSessionImpl ses = worker.getSession(); + + if (ses.isFullSupport()) { + synchronized (worker.getSession()) { + worker.getSession().onClosed(); + } + + ctx.checkpoint().onSessionEnd(ses, false); + + // Delete session altogether. + ctx.session().removeSession(ses.getId()); + } + + boolean rmv = tasks.remove(worker.getTaskSessionId(), worker); + + assert rmv; + + // Unregister from timeout notifications. + if (worker.endTime() < Long.MAX_VALUE) + ctx.timeout().removeTimeoutObject(worker); + + release(worker.getDeployment()); + + if (!worker.isInternal()) + execTasks.increment(); + + // Unregister job message listener from all job topics. + if (ses.isFullSupport()) { + try { + for (ComputeJobSibling sibling : worker.getSession().getJobSiblings()) { + GridJobSiblingImpl s = (GridJobSiblingImpl)sibling; + - ctx.io().removeMessageId(s.jobTopic()); + ctx.io().removeMessageListener(s.taskTopic(), msgLsnr); + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to unregister job communication message listeners and counters.", e); + } + } + } + } + + /** + * Handles job execution responses and session requests. + */ + private class JobMessageListener implements GridMessageListener { + /** */ + private final boolean jobResOnly; + + /** + * @param jobResOnly {@code True} if this listener is allowed to process + * job responses only (for tasks with disabled sessions). + */ + private JobMessageListener(boolean jobResOnly) { + this.jobResOnly = jobResOnly; + } + + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + if (!(msg instanceof GridTaskMessage)) { + U.warn(log, "Received message of unknown type: " + msg); + + return; + } + + if (msg instanceof GridJobExecuteResponse) + processJobExecuteResponse(nodeId, (GridJobExecuteResponse)msg); + else if (jobResOnly) + U.warn(log, "Received message of type other than job response: " + msg); + else if (msg instanceof GridTaskSessionRequest) + processTaskSessionRequest(nodeId, (GridTaskSessionRequest)msg); + else + U.warn(log, "Received message of unknown type: " + msg); + } + } + + /** + * Listener to node discovery events. + */ + private class TaskDiscoveryListener implements GridLocalEventListener { + /** {@inheritDoc} */ + @Override public void onEvent(IgniteEvent evt) { + assert evt.type() == EVT_NODE_FAILED || evt.type() == EVT_NODE_LEFT; + + UUID nodeId = ((IgniteDiscoveryEvent)evt).eventNode().id(); + + lock.readLock(); + + try { + for (GridTaskWorker<?, ?> task : tasks.values()) + task.onNodeLeft(nodeId); + } + finally { + lock.readUnlock(); + } + } + } + + /** + * + */ + private class JobSiblingsMessageListener implements GridMessageListener { + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + if (!(msg instanceof GridJobSiblingsRequest)) { + U.warn(log, "Received unexpected message instead of siblings request: " + msg); + + return; + } + + lock.readLock(); + + try { + if (stopping && !waiting) { + U.warn(log, "Received job siblings request while stopping grid (will ignore): " + msg); + + return; + } + + GridJobSiblingsRequest req = (GridJobSiblingsRequest)msg; + + GridTaskWorker<?, ?> worker = tasks.get(req.sessionId()); + + Collection<ComputeJobSibling> siblings; + + if (worker != null) { + try { + siblings = worker.getSession().getJobSiblings(); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to get job siblings [request=" + msg + + ", ses=" + worker.getSession() + ']', e); + + siblings = null; + } + } + else { + if (log.isDebugEnabled()) + log.debug("Received job siblings request for unknown or finished task (will ignore): " + msg); + + siblings = null; + } + + try { + Object topic = req.topic(); + + if (topic == null) { + assert req.topicBytes() != null; + + topic = marsh.unmarshal(req.topicBytes(), null); + } + + boolean loc = ctx.localNodeId().equals(nodeId); + + ctx.io().send(nodeId, topic, + new GridJobSiblingsResponse( + loc ? siblings : null, + loc ? null : marsh.marshal(siblings)), + SYSTEM_POOL); + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to send job sibling response.", e); + } + } + finally { + lock.readUnlock(); + } + } + } + + /** + * Listener for task cancel requests. + */ + private class TaskCancelMessageListener implements GridMessageListener { + /** {@inheritDoc} */ + @Override public void onMessage(UUID nodeId, Object msg) { + assert msg != null; + + if (!(msg instanceof GridTaskCancelRequest)) { + U.warn(log, "Received unexpected message instead of task cancel request: " + msg); + + return; + } + + GridTaskCancelRequest req = (GridTaskCancelRequest)msg; + + lock.readLock(); + + try { + if (stopping && !waiting) { + U.warn(log, "Received task cancel request while stopping grid (will ignore): " + msg); + + return; + } + + GridTaskWorker<?, ?> gridTaskWorker = tasks.get(req.sessionId()); + + if (gridTaskWorker != null) { + try { + gridTaskWorker.getTaskFuture().cancel(); + } + catch (IgniteCheckedException e) { + log.warning("Failed to cancel task: " + gridTaskWorker.getTask(), e); + } + } + } + finally { + lock.readUnlock(); + } + } + } + }