http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --cc
modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 0000000,477e742..93026e8
mode 000000,100644..100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@@ -1,0 -1,1868 +1,1859 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.ignite.internal.processors.job;
+
+ import org.apache.ignite.*;
+ import org.apache.ignite.cluster.*;
+ import org.apache.ignite.compute.*;
+ import org.apache.ignite.events.*;
+ import org.apache.ignite.internal.*;
+ import org.apache.ignite.internal.processors.*;
+ import org.apache.ignite.internal.util.*;
+ import org.apache.ignite.lang.*;
+ import org.apache.ignite.marshaller.*;
+ import org.apache.ignite.internal.managers.collision.*;
+ import org.apache.ignite.internal.managers.communication.*;
+ import org.apache.ignite.internal.managers.deployment.*;
+ import org.apache.ignite.internal.managers.eventstorage.*;
+ import org.apache.ignite.internal.processors.jobmetrics.*;
+ import org.apache.ignite.internal.util.typedef.*;
+ import org.apache.ignite.internal.util.typedef.internal.*;
+ import org.jdk8.backport.*;
+ import org.jetbrains.annotations.*;
+
+ import java.io.*;
+ import java.util.*;
+ import java.util.concurrent.*;
+ import java.util.concurrent.atomic.*;
+ import java.util.concurrent.locks.*;
+
+ import static java.util.concurrent.TimeUnit.*;
+ import static org.apache.ignite.IgniteSystemProperties.*;
+ import static org.apache.ignite.events.IgniteEventType.*;
+ import static org.apache.ignite.internal.GridTopic.*;
+ import static
org.apache.ignite.internal.managers.communication.GridIoPolicy.*;
+ import static org.jdk8.backport.ConcurrentLinkedHashMap.QueuePolicy.*;
+
+ /**
+ * Responsible for all grid job execution and communication.
+ */
+ public class GridJobProcessor extends GridProcessorAdapter {
+ /** */
+ private static final int FINISHED_JOBS_COUNT =
Integer.getInteger(GG_JOBS_HISTORY_SIZE, 10240);
+
+ /** */
+ private final IgniteMarshaller marsh;
+
+ /** */
+ private final boolean jobAlwaysActivate;
+
+ /** */
+ private final ConcurrentMap<IgniteUuid, GridJobWorker> activeJobs;
+
+ /** */
+ private final ConcurrentMap<IgniteUuid, GridJobWorker> passiveJobs;
+
+ /** */
+ private final ConcurrentMap<IgniteUuid, GridJobWorker> cancelledJobs =
+ new ConcurrentHashMap8<>();
+
+ /** */
+ private final Collection<IgniteUuid> heldJobs = new
GridConcurrentHashSet<>();
+
+ /** If value is {@code true}, job was cancelled from future. */
+ private final GridBoundedConcurrentLinkedHashMap<IgniteUuid, Boolean>
cancelReqs =
+ new GridBoundedConcurrentLinkedHashMap<>(FINISHED_JOBS_COUNT,
+ FINISHED_JOBS_COUNT < 128 ? FINISHED_JOBS_COUNT : 128,
+ 0.75f, 16);
+
+ /** */
+ private final GridBoundedConcurrentLinkedHashSet<IgniteUuid> finishedJobs
=
+ new GridBoundedConcurrentLinkedHashSet<>(FINISHED_JOBS_COUNT,
+ FINISHED_JOBS_COUNT < 128 ? FINISHED_JOBS_COUNT : 128,
+ 0.75f, 256, PER_SEGMENT_Q);
+
+ /** */
+ private final GridJobEventListener evtLsnr;
+
+ /** */
+ private final GridMessageListener cancelLsnr;
+
+ /** */
+ private final GridMessageListener jobExecLsnr;
+
+ /** */
+ private final GridLocalEventListener discoLsnr;
+
+ /** Needed for statistics. */
+ private final LongAdder canceledJobsCnt = new LongAdder();
+
+ /** Needed for statistics. */
+ private final LongAdder finishedJobsCnt = new LongAdder();
+
+ /** Needed for statistics. */
+ private final LongAdder startedJobsCnt = new LongAdder();
+
+ /** Needed for statistics. */
+ private final LongAdder rejectedJobsCnt = new LongAdder();
+
+ /** Total job execution time (unaccounted for in metrics). */
+ private final LongAdder finishedJobsTime = new LongAdder();
+
+ /** Maximum job execution time for finished jobs. */
+ private final GridAtomicLong maxFinishedJobsTime = new GridAtomicLong();
+
+ /** */
+ private final AtomicLong metricsLastUpdateTstamp = new AtomicLong();
+
+ /** */
+ private boolean stopping;
+
+ /** */
+ private boolean cancelOnStop;
+
+ /** */
+ private final long metricsUpdateFreq;
+
+ /** */
+ private final GridSpinReadWriteLock rwLock = new GridSpinReadWriteLock();
+
+ /** Topic ID generator. */
+ private final AtomicLong topicIdGen = new AtomicLong();
+
+ /** */
+ private final GridJobHoldListener holdLsnr = new JobHoldListener();
+
+ /** */
+ private final ThreadLocal<Boolean> handlingCollision = new
ThreadLocal<Boolean>() {
+ @Override protected Boolean initialValue() {
+ return false;
+ }
+ };
+
+ /** Internal task flag. */
+ private final GridThreadLocal<Boolean> internal = new
GridThreadLocal<Boolean>() {
+ @Override protected Boolean initialValue() {
+ return false;
+ }
+ };
+
+ /** Current session. */
+ private final GridThreadLocal<ComputeTaskSession> currentSess = new
GridThreadLocal<>();
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public GridJobProcessor(GridKernalContext ctx) {
+ super(ctx);
+
+ marsh = ctx.config().getMarshaller();
+
+ // Collision manager is already started and is fully functional.
+ jobAlwaysActivate = !ctx.collision().enabled();
+
+ metricsUpdateFreq = ctx.config().getMetricsUpdateFrequency();
+
+ activeJobs = jobAlwaysActivate ? new ConcurrentHashMap8<IgniteUuid,
GridJobWorker>() :
+ new JobsMap(1024, 0.75f, 256);
+
+ passiveJobs = jobAlwaysActivate ? null : new JobsMap(1024, 0.75f,
256);
+
+ evtLsnr = new JobEventListener();
+ cancelLsnr = new JobCancelListener();
+ jobExecLsnr = new JobExecutionListener();
+ discoLsnr = new JobDiscoveryListener();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ if (metricsUpdateFreq < -1)
+ throw new IgniteCheckedException("Invalid value for
'metricsUpdateFrequency' configuration property " +
+ "(should be greater than or equals to -1): " +
metricsUpdateFreq);
+
+ if (metricsUpdateFreq == -1)
+ U.warn(log, "Job metrics are disabled (use with caution).");
+
+ if (!jobAlwaysActivate)
+ ctx.collision().setCollisionExternalListener(new
CollisionExternalListener());
+
+ GridIoManager ioMgr = ctx.io();
+
+ ioMgr.addMessageListener(TOPIC_JOB_CANCEL, cancelLsnr);
+ ioMgr.addMessageListener(TOPIC_JOB, jobExecLsnr);
+
+ ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED,
EVT_NODE_LEFT, EVT_NODE_METRICS_UPDATED);
+
+ if (log.isDebugEnabled())
+ log.debug("Job processor started.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) {
+ // Clear collections.
+ activeJobs.clear();
+ cancelledJobs.clear();
+ cancelReqs.clear();
+
+ if (log.isDebugEnabled())
+ log.debug("Job processor stopped.");
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ // Stop receiving new requests and sending responses.
+ GridIoManager commMgr = ctx.io();
+
+ commMgr.removeMessageListener(TOPIC_JOB, jobExecLsnr);
+ commMgr.removeMessageListener(TOPIC_JOB_CANCEL, cancelLsnr);
+
+ if (!jobAlwaysActivate)
+ // Ignore external collision events.
+ ctx.collision().unsetCollisionExternalListener();
+
+ rwLock.writeLock();
+
+ try {
+ stopping = true;
+
+ cancelOnStop = cancel;
+ }
+ finally {
+ rwLock.writeUnlock();
+ }
+
+ // Rejected jobs.
+ if (!jobAlwaysActivate) {
+ for (GridJobWorker job : passiveJobs.values())
+ if (passiveJobs.remove(job.getJobId(), job))
+ rejectJob(job, false);
+ }
+
+ // Cancel only if we force grid to stop
+ if (cancel) {
+ for (GridJobWorker job : activeJobs.values()) {
+ job.onStopping();
+
+ cancelJob(job, false);
+ }
+ }
+
+ U.join(activeJobs.values(), log);
+ U.join(cancelledJobs.values(), log);
+
+ // Ignore topology changes.
+ ctx.event().removeLocalEventListener(discoLsnr);
+
+ if (log.isDebugEnabled())
+ log.debug("Finished executing job processor onKernalStop()
callback.");
+ }
+
+ /**
+ * Gets active job.
+ *
+ * @param jobId Job ID.
+ * @return Active job.
+ */
+ @Nullable public GridJobWorker activeJob(IgniteUuid jobId) {
+ assert jobId != null;
+
+ return activeJobs.get(jobId);
+ }
+
+ /**
+ * @return {@code True} if running internal task.
+ */
+ public boolean internal() {
+ return internal.get();
+ }
+
+ /**
+ * Sets internal task flag.
+ *
+ * @param internal {@code True} if running internal task.
+ */
+ void internal(boolean internal) {
+ this.internal.set(internal);
+ }
+
+ /**
+ * @param job Rejected job.
+ * @param sndReply {@code True} to send reply.
+ */
+ private void rejectJob(GridJobWorker job, boolean sndReply) {
+ IgniteCheckedException e = new ComputeExecutionRejectedException("Job
was cancelled before execution [taskSesId=" +
+ job.getSession().getId() + ", jobId=" + job.getJobId() + ", job="
+ job.getJob() + ']');
+
+ job.finishJob(null, e, sndReply);
+ }
+
+ /**
+ * @param job Canceled job.
+ * @param sysCancel {@code True} if job has been cancelled from system
and no response needed.
+ */
+ private void cancelJob(GridJobWorker job, boolean sysCancel) {
+ boolean isCancelled = job.isCancelled();
+
+ // We don't increment number of cancelled jobs if it
+ // was already cancelled.
+ if (!job.isInternal() && !isCancelled)
+ canceledJobsCnt.increment();
+
+ job.cancel(sysCancel);
+ }
+
+ /**
+ * @param dep Deployment to release.
+ */
+ private void release(GridDeployment dep) {
+ dep.release();
+
+ if (dep.obsolete())
+ ctx.resource().onUndeployed(dep);
+ }
+
+ /**
+ * @param ses Session.
+ * @param attrs Attributes.
+ * @throws IgniteCheckedException If failed.
+ */
+ public void setAttributes(GridJobSessionImpl ses, Map<?, ?> attrs) throws
IgniteCheckedException {
+ assert ses.isFullSupport();
+
+ long timeout = ses.getEndTime() - U.currentTimeMillis();
+
+ if (timeout <= 0) {
+ U.warn(log, "Task execution timed out (remote session attributes
won't be set): " + ses);
+
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Setting session attribute(s) from job: " + ses);
+
+ ClusterNode taskNode = ctx.discovery().node(ses.getTaskNodeId());
+
+ if (taskNode == null)
+ throw new IgniteCheckedException("Node that originated task
execution has left grid: " +
+ ses.getTaskNodeId());
+
+ boolean loc = ctx.localNodeId().equals(taskNode.id()) &&
!ctx.config().isMarshalLocalJobs();
+
+ GridTaskSessionRequest req = new GridTaskSessionRequest(ses.getId(),
ses.getJobId(),
+ loc ? null : marsh.marshal(attrs), attrs);
+
+ Object topic = TOPIC_TASK.topic(ses.getJobId(),
ctx.discovery().localNode().id());
+
+ // Always go through communication to preserve order.
+ ctx.io().sendOrderedMessage(
+ taskNode,
+ topic, // Job topic.
- ctx.io().nextMessageId(topic, taskNode.id()),
+ req,
+ SYSTEM_POOL,
+ timeout,
+ false);
+ }
+
+ /**
+ * @param ses Session.
+ * @return Siblings.
+ * @throws IgniteCheckedException If failed.
+ */
+ public Collection<ComputeJobSibling> requestJobSiblings(final
ComputeTaskSession ses) throws IgniteCheckedException {
+ assert ses != null;
+
+ final UUID taskNodeId = ses.getTaskNodeId();
+
+ ClusterNode taskNode = ctx.discovery().node(taskNodeId);
+
+ if (taskNode == null)
+ throw new IgniteCheckedException("Node that originated task
execution has left grid: " + taskNodeId);
+
+ // Tuple: error message-response.
+ final IgniteBiTuple<String, GridJobSiblingsResponse> t = F.t2();
+
+ final Lock lock = new ReentrantLock();
+ final Condition cond = lock.newCondition();
+
+ GridMessageListener msgLsnr = new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ String err = null;
+ GridJobSiblingsResponse res = null;
+
+ if (!(msg instanceof GridJobSiblingsResponse))
+ err = "Received unexpected message: " + msg;
+ else if (!nodeId.equals(taskNodeId))
+ err = "Received job siblings response from unexpected
node [taskNodeId=" + taskNodeId +
+ ", nodeId=" + nodeId + ']';
+ else
+ // Sender and message type are fine.
+ res = (GridJobSiblingsResponse)msg;
+
+ if (res.jobSiblings() == null) {
+ try {
+ res.unmarshalSiblings(marsh);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to unmarshal job siblings.", e);
+
+ err = e.getMessage();
+ }
+ }
+
+ lock.lock();
+
+ try {
+ if (t.isEmpty()) {
+ t.set(err, res);
+
+ cond.signalAll();
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ };
+
+ GridLocalEventListener discoLsnr = new GridLocalEventListener() {
+ @Override public void onEvent(IgniteEvent evt) {
+ assert evt instanceof IgniteDiscoveryEvent &&
+ (evt.type() == EVT_NODE_FAILED || evt.type() ==
EVT_NODE_LEFT) : "Unexpected event: " + evt;
+
+ IgniteDiscoveryEvent discoEvt = (IgniteDiscoveryEvent)evt;
+
+ if (taskNodeId.equals(discoEvt.eventNode().id())) {
+ lock.lock();
+
+ try {
+ if (t.isEmpty()) {
+ t.set("Node that originated task execution has
left grid: " + taskNodeId, null);
+
+ cond.signalAll();
+ }
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ }
+ };
+
+ boolean loc = ctx.localNodeId().equals(taskNodeId);
+
+ // 1. Create unique topic name.
+ Object topic = TOPIC_JOB_SIBLINGS.topic(ses.getId(),
topicIdGen.getAndIncrement());
+
+ try {
+ // 2. Register listener.
+ ctx.io().addMessageListener(topic, msgLsnr);
+
+ // 3. Send message.
+ ctx.io().send(taskNode, TOPIC_JOB_SIBLINGS,
+ new GridJobSiblingsRequest(ses.getId(),
+ loc ? topic : null,
+ loc ? null : marsh.marshal(topic)),
+ SYSTEM_POOL);
+
+ // 4. Listen to discovery events.
+ ctx.event().addLocalEventListener(discoLsnr, EVT_NODE_FAILED,
EVT_NODE_LEFT);
+
+ // 5. Check whether node has left before disco listener has been
installed.
+ taskNode = ctx.discovery().node(taskNodeId);
+
+ if (taskNode == null)
+ throw new IgniteCheckedException("Node that originated task
execution has left grid: " + taskNodeId);
+
+ // 6. Wait for result.
+ lock.lock();
+
+ try {
+ long netTimeout = ctx.config().getNetworkTimeout();
+
+ if (t.isEmpty())
+ cond.await(netTimeout, MILLISECONDS);
+
+ if (t.isEmpty())
+ throw new IgniteCheckedException("Timed out waiting for
job siblings (consider increasing" +
+ "'networkTimeout' configuration property) [ses=" +
ses + ", netTimeout=" + netTimeout + ']');
+
+ // Error is set?
+ if (t.get1() != null)
+ throw new IgniteCheckedException(t.get1());
+ else
+ // Return result
+ return t.get2().jobSiblings();
+ }
+ catch (InterruptedException e) {
+ throw new IgniteCheckedException("Interrupted while waiting
for job siblings response: " + ses, e);
+ }
+ finally {
+ lock.unlock();
+ }
+ }
+ finally {
+ ctx.io().removeMessageListener(topic, msgLsnr);
+ ctx.event().removeLocalEventListener(discoLsnr);
+ }
+ }
+
+ /**
+ * Notify processor that master leave aware handler must be invoked on
all jobs with the given session ID.
+ *
+ * @param sesId Session ID.
+ */
+ public void masterLeaveLocal(IgniteUuid sesId) {
+ assert sesId != null;
+
+ for (GridJobWorker job : activeJobs.values())
+ if (job.getSession().getId().equals(sesId))
+ job.onMasterNodeLeft();
+ }
+
+ /**
+ * @param sesId Session ID.
+ * @param jobId Job ID.
+ * @param sys System flag.
+ */
+ public void cancelJob(@Nullable final IgniteUuid sesId, @Nullable final
IgniteUuid jobId, final boolean sys) {
+ assert sesId != null || jobId != null;
+
+ rwLock.readLock();
+
+ try {
+ if (stopping && cancelOnStop) {
+ if (log.isDebugEnabled())
+ log.debug("Received job cancellation request while
stopping grid with cancellation " +
+ "(will ignore) [sesId=" + sesId + ", jobId=" + jobId
+ ", sys=" + sys + ']');
+
+ return;
+ }
+
+ // Put either job ID or session ID (they are unique).
+ cancelReqs.putIfAbsent(jobId != null ? jobId : sesId, sys);
+
+ IgnitePredicate<GridJobWorker> idsMatch = new P1<GridJobWorker>()
{
+ @Override public boolean apply(GridJobWorker e) {
+ return sesId != null ?
+ jobId != null ?
+ e.getSession().getId().equals(sesId) &&
e.getJobId().equals(jobId) :
+ e.getSession().getId().equals(sesId) :
+ e.getJobId().equals(jobId);
+ }
+ };
+
+ // If we don't have jobId then we have to iterate
+ if (jobId == null) {
+ if (!jobAlwaysActivate) {
+ // If job gets removed from passive jobs it never gets
activated.
+ F.forEach(passiveJobs.values(), new CI1<GridJobWorker>() {
+ @Override public void apply(GridJobWorker job) {
+ cancelPassiveJob(job);
+ }
+ }, idsMatch);
+ }
+
+ F.forEach(activeJobs.values(), new CI1<GridJobWorker>() {
+ @Override public void apply(GridJobWorker job) {
+ cancelActiveJob(job, sys);
+ }
+ }, idsMatch);
+ }
+ else {
+ if (!jobAlwaysActivate) {
+ GridJobWorker passiveJob = passiveJobs.get(jobId);
+
+ if (passiveJob != null && idsMatch.apply(passiveJob) &&
cancelPassiveJob(passiveJob))
+ return;
+ }
+
+ GridJobWorker activeJob = activeJobs.get(jobId);
+
+ if (activeJob != null && idsMatch.apply(activeJob))
+ cancelActiveJob(activeJob, sys);
+ }
+ }
+ finally {
+ rwLock.readUnlock();
+ }
+ }
+
+ /**
+ * Tries to cancel passive job. No-op if job is not in 'passive' state.
+ *
+ * @param job Job to cancel.
+ * @return {@code True} if succeeded.
+ */
+ private boolean cancelPassiveJob(GridJobWorker job) {
+ assert !jobAlwaysActivate;
+
+ if (passiveJobs.remove(job.getJobId(), job)) {
+ if (log.isDebugEnabled())
+ log.debug("Job has been cancelled before activation: " + job);
+
+ canceledJobsCnt.increment();
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * Tries to cancel active job. No-op if job is not in 'active' state.
+ *
+ * @param job Job to cancel.
+ * @param sys Flag indicating whether this is a system cancel.
+ */
+ private void cancelActiveJob(GridJobWorker job, boolean sys) {
+ if (activeJobs.remove(job.getJobId(), job)) {
+ cancelledJobs.put(job.getJobId(), job);
+
+ if (finishedJobs.contains(job.getJobId()))
+ // Job has finished concurrently.
+ cancelledJobs.remove(job.getJobId(), job);
+ else
+ // No reply, since it is not cancel from collision.
+ cancelJob(job, sys);
+ }
+ }
+
+ /**
+ * Handles collisions.
+ * <p>
+ * In most cases this method should be called from main read lock
+ * to avoid jobs activation after node stop has started.
+ */
+ private void handleCollisions() {
+ assert !jobAlwaysActivate;
+
+ if (handlingCollision.get()) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping recursive collision handling.");
+
+ return;
+ }
+
+ handlingCollision.set(Boolean.TRUE);
+
+ try {
+ if (log.isDebugEnabled())
+ log.debug("Before handling collisions.");
+
+ // Invoke collision SPI.
+ ctx.collision().onCollision(
+ // Passive jobs view.
+ new
AbstractCollection<org.apache.ignite.spi.collision.CollisionJobContext>() {
+ @NotNull @Override public
Iterator<org.apache.ignite.spi.collision.CollisionJobContext> iterator() {
+ final Iterator<GridJobWorker> iter =
passiveJobs.values().iterator();
+
+ return new
Iterator<org.apache.ignite.spi.collision.CollisionJobContext>() {
+ @Override public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override public
org.apache.ignite.spi.collision.CollisionJobContext next() {
+ return new CollisionJobContext(iter.next(),
true);
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override public int size() {
+ return passiveJobs.size();
+ }
+ },
+
+ // Active jobs view.
+ new
AbstractCollection<org.apache.ignite.spi.collision.CollisionJobContext>() {
+ @NotNull @Override public
Iterator<org.apache.ignite.spi.collision.CollisionJobContext> iterator() {
+ final Iterator<GridJobWorker> iter =
activeJobs.values().iterator();
+
+ return new
Iterator<org.apache.ignite.spi.collision.CollisionJobContext>() {
+ private GridJobWorker w;
+
+ {
+ advance();
+ }
+
+ /**
+ *
+ */
+ void advance() {
+ assert w == null;
+
+ while(iter.hasNext()) {
+ GridJobWorker w0 = iter.next();
+
+ assert !w0.isInternal();
+
+ if (!w0.held()) {
+ w = w0;
+
+ break;
+ }
+ }
+ }
+
+ @Override public boolean hasNext() {
+ return w != null;
+ }
+
+ @Override public
org.apache.ignite.spi.collision.CollisionJobContext next() {
+ if (w == null)
+ throw new NoSuchElementException();
+
+
org.apache.ignite.spi.collision.CollisionJobContext ret = new
CollisionJobContext(w, false);
+
+ w = null;
+
+ advance();
+
+ return ret;
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override public int size() {
+ int ret = activeJobs.size() - heldJobs.size();
+
+ return ret > 0 ? ret : 0;
+ }
+ },
+
+ // Held jobs view.
+ new
AbstractCollection<org.apache.ignite.spi.collision.CollisionJobContext>() {
+ @NotNull @Override public
Iterator<org.apache.ignite.spi.collision.CollisionJobContext> iterator() {
+ final Iterator<GridJobWorker> iter =
activeJobs.values().iterator();
+
+ return new
Iterator<org.apache.ignite.spi.collision.CollisionJobContext>() {
+ private GridJobWorker w;
+
+ {
+ advance();
+ }
+
+ /**
+ *
+ */
+ void advance() {
+ assert w == null;
+
+ while(iter.hasNext()) {
+ GridJobWorker w0 = iter.next();
+
+ assert !w0.isInternal();
+
+ if (w0.held()) {
+ w = w0;
+
+ break;
+ }
+ }
+ }
+
+ @Override public boolean hasNext() {
+ return w != null;
+ }
+
+ @Override public
org.apache.ignite.spi.collision.CollisionJobContext next() {
+ if (w == null)
+ throw new NoSuchElementException();
+
+
org.apache.ignite.spi.collision.CollisionJobContext ret = new
CollisionJobContext(w, false);
+
+ w = null;
+
+ advance();
+
+ return ret;
+ }
+
+ @Override public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ @Override public int size() {
+ return heldJobs.size();
+ }
+ });
+
+ if (metricsUpdateFreq > -1L)
+ updateJobMetrics();
+ }
+ finally {
+ handlingCollision.set(Boolean.FALSE);
+ }
+ }
+
+ /**
+ *
+ */
+ private void updateJobMetrics() {
+ assert metricsUpdateFreq > -1L;
+
+ if (metricsUpdateFreq == 0L)
+ updateJobMetrics0();
+ else {
+ long now = U.currentTimeMillis();
+ long lastUpdate = metricsLastUpdateTstamp.get();
+
+ if (now - lastUpdate > metricsUpdateFreq &&
metricsLastUpdateTstamp.compareAndSet(lastUpdate, now))
+ updateJobMetrics0();
+ }
+ }
+
+ /**
+ *
+ */
+ private void updateJobMetrics0() {
+ assert metricsUpdateFreq > -1L;
+
+ GridJobMetricsSnapshot m = new GridJobMetricsSnapshot();
+
+ m.setRejectJobs((int)rejectedJobsCnt.sumThenReset());
+ m.setStartedJobs((int)startedJobsCnt.sumThenReset());
+
+ // Iterate over active jobs to determine max execution time.
+ int cnt = 0;
+
+ for (GridJobWorker jobWorker : activeJobs.values()) {
+ assert !jobWorker.isInternal();
+
+ cnt++;
+
+ if (!jobWorker.held()) {
+ long execTime = jobWorker.getExecuteTime();
+
+ if (execTime > m.getMaximumExecutionTime())
+ m.setMaximumExecutionTime(execTime);
+ }
+ }
+
+ m.setActiveJobs(cnt);
+
+ cnt = 0;
+
+ // Do this only if collision SPI is used. Otherwise, 0 is correct
value
+ // for passive jobs count and max wait time.
+ if (!jobAlwaysActivate) {
+ // Iterate over passive jobs to determine max queued time.
+ for (GridJobWorker jobWorker : passiveJobs.values()) {
+ // We don't expect that there are any passive internal jobs.
+ assert !jobWorker.isInternal();
+
+ cnt++;
+
+ long queuedTime = jobWorker.getQueuedTime();
+
+ if (queuedTime > m.getMaximumWaitTime())
+ m.setMaximumWaitTime(queuedTime);
+
+ m.setWaitTime(m.getWaitTime() + jobWorker.getQueuedTime());
+ }
+
+ m.setPassiveJobs(cnt);
+ }
+
+ m.setFinishedJobs((int)finishedJobsCnt.sumThenReset());
+ m.setExecutionTime(finishedJobsTime.sumThenReset());
+ m.setCancelJobs((int)canceledJobsCnt.sumThenReset());
+
+ long maxFinishedTime = maxFinishedJobsTime.getAndSet(0);
+
+ if (maxFinishedTime > m.getMaximumExecutionTime())
+ m.setMaximumExecutionTime(maxFinishedTime);
+
+ // CPU load.
+ m.setCpuLoad(ctx.discovery().metrics().getCurrentCpuLoad());
+
+ ctx.jobMetric().addSnapshot(m);
+ }
+
+ /**
+ * @param node Node.
+ * @param req Request.
+ */
+ @SuppressWarnings("TooBroadScope")
+ public void processJobExecuteRequest(ClusterNode node, final
GridJobExecuteRequest req) {
+ if (log.isDebugEnabled())
+ log.debug("Received job request message [req=" + req + ",
nodeId=" + node.id() + ']');
+
+ GridJobWorker job = null;
+
+ rwLock.readLock();
+
+ try {
+ if (stopping) {
+ if (log.isDebugEnabled())
+ log.debug("Received job execution request while stopping
this node (will ignore): " + req);
+
+ return;
+ }
+
+ long endTime = req.getCreateTime() + req.getTimeout();
+
+ // Account for overflow.
+ if (endTime < 0)
+ endTime = Long.MAX_VALUE;
+
+ GridDeployment tmpDep = req.isForceLocalDeployment() ?
+ ctx.deploy().getLocalDeployment(req.getTaskClassName()) :
+ ctx.deploy().getGlobalDeployment(
+ req.getDeploymentMode(),
+ req.getTaskName(),
+ req.getTaskClassName(),
+ req.getUserVersion(),
+ node.id(),
+ req.getClassLoaderId(),
+ req.getLoaderParticipants(),
+ null);
+
+ if (tmpDep == null) {
+ if (log.isDebugEnabled())
+ log.debug("Checking local tasks...");
+
+ // Check local tasks.
+ for (Map.Entry<String, GridDeployment> d :
ctx.task().getUsedDeploymentMap().entrySet()) {
+ if
(d.getValue().classLoaderId().equals(req.getClassLoaderId())) {
+ assert d.getValue().local();
+
+ tmpDep = d.getValue();
+
+ break;
+ }
+ }
+ }
+
+ final GridDeployment dep = tmpDep;
+
+ if (log.isDebugEnabled())
+ log.debug("Deployment: " + dep);
+
+ boolean releaseDep = true;
+
+ try {
+ if (dep != null && dep.acquire()) {
+ GridJobSessionImpl jobSes;
+ GridJobContextImpl jobCtx;
+
+ try {
+ List<ComputeJobSibling> siblings = null;
+
+ if (!req.isDynamicSiblings()) {
+ Collection<ComputeJobSibling> siblings0 =
req.getSiblings();
+
+ if (siblings0 == null) {
+ assert req.getSiblingsBytes() != null;
+
+ siblings0 =
marsh.unmarshal(req.getSiblingsBytes(), null);
+ }
+
+ siblings = new ArrayList<>(siblings0);
+ }
+
+ Map<Object, Object> sesAttrs = null;
+
+ if (req.isSessionFullSupport()) {
+ sesAttrs = req.getSessionAttributes();
+
+ if (sesAttrs == null)
+ sesAttrs =
marsh.unmarshal(req.getSessionAttributesBytes(),
+ dep.classLoader());
+ }
+
+ // Note that we unmarshal session/job attributes here
with proper class loader.
+ GridTaskSessionImpl taskSes =
ctx.session().createTaskSession(
+ req.getSessionId(),
+ node.id(),
+ req.getTaskName(),
+ dep,
+ req.getTaskClassName(),
+ req.topology(),
+ req.getStartTaskTime(),
+ endTime,
+ siblings,
+ sesAttrs,
+ req.isSessionFullSupport(),
+ req.getSubjectId());
+
+ taskSes.setCheckpointSpi(req.getCheckpointSpi());
+ taskSes.setClassLoader(dep.classLoader());
+
+ jobSes = new GridJobSessionImpl(ctx, taskSes,
req.getJobId());
+
+ Map<? extends Serializable, ? extends Serializable>
jobAttrs = req.getJobAttributes();
+
+ if (jobAttrs == null)
+ jobAttrs =
marsh.unmarshal(req.getJobAttributesBytes(), dep.classLoader());
+
+ jobCtx = new GridJobContextImpl(ctx, req.getJobId(),
jobAttrs);
+ }
+ catch (IgniteCheckedException e) {
+ IgniteCheckedException ex = new
IgniteCheckedException("Failed to deserialize task attributes [taskName=" +
+ req.getTaskName() + ", taskClsName=" +
req.getTaskClassName() + ", codeVer=" +
+ req.getUserVersion() + ", taskClsLdr=" +
dep.classLoader() + ']', e);
+
+ U.error(log, ex.getMessage(), e);
+
+ handleException(node, req, ex, endTime);
+
+ return;
+ }
+
+ job = new GridJobWorker(
+ ctx,
+ dep,
+ req.getCreateTime(),
+ jobSes,
+ jobCtx,
+ req.getJobBytes(),
+ req.getJob(),
+ node,
+ req.isInternal(),
+ evtLsnr,
+ holdLsnr);
+
+ jobCtx.job(job);
+
+ // If exception occurs on job initialization, deployment
is released in job listener.
+ releaseDep = false;
+
+ if (job.initialize(dep,
dep.deployedClass(req.getTaskClassName()))) {
+ // Internal jobs will always be executed
synchronously.
+ if (job.isInternal()) {
+ // This is an internal job and can be executed
inside busy lock
+ // since job is expected to be short.
+ // This is essential for proper stop without
races.
+ job.run();
+
+ // No execution outside lock.
+ job = null;
+ }
+ else if (jobAlwaysActivate) {
+ if (onBeforeActivateJob(job)) {
+ if (ctx.localNodeId().equals(node.id())) {
+ // Always execute in another thread for
local node.
+ executeAsync(job);
+
+ // No sync execution.
+ job = null;
+ }
+ else if (metricsUpdateFreq > -1L)
+ // Job will be executed synchronously.
+ startedJobsCnt.increment();
+ }
+ else
+ // Job has been cancelled.
+ // Set to null, to avoid sync execution.
+ job = null;
+ }
+ else {
+ GridJobWorker old =
passiveJobs.putIfAbsent(job.getJobId(), job);
+
+ if (old == null)
+ handleCollisions();
+ else
+ U.error(log, "Received computation request
with duplicate job ID (could be " +
+ "network malfunction, source node may
hang if task timeout was not set) " +
+ "[srcNode=" + node.id() +
+ ", jobId=" + req.getJobId() + ", sesId="
+ req.getSessionId() +
+ ", locNodeId=" + ctx.localNodeId() + ']');
+
+ // No sync execution.
+ job = null;
+ }
+ }
+ else
+ // Job was not initialized, no execution.
+ job = null;
+ }
+ else {
+ // Deployment is null.
+ IgniteCheckedException ex = new
IgniteDeploymentException("Task was not deployed or was redeployed since " +
+ "task execution [taskName=" + req.getTaskName() + ",
taskClsName=" + req.getTaskClassName() +
+ ", codeVer=" + req.getUserVersion() + ", clsLdrId=" +
req.getClassLoaderId() +
+ ", seqNum=" + req.getClassLoaderId().localId() + ",
depMode=" + req.getDeploymentMode() +
+ ", dep=" + dep + ']');
+
+ U.error(log, ex.getMessage(), ex);
+
+ handleException(node, req, ex, endTime);
+ }
+ }
+ finally {
+ if (dep != null && releaseDep)
+ release(dep);
+ }
+ }
+ finally {
+ rwLock.readUnlock();
+ }
+
+ if (job != null)
+ job.run();
+ }
+
+ /**
+ * Callback from job worker to set current task session for execution.
+ *
+ * @param ses Session.
+ */
+ public void currentTaskSession(ComputeTaskSession ses) {
+ currentSess.set(ses);
+ }
+
+ /**
+ * Gets hash of task name executed by current thread.
+ *
+ * @return Task name hash or {@code 0} if security is disabled.
+ */
+ public int currentTaskNameHash() {
+ String name = currentTaskName();
+
+ return name == null ? 0 : name.hashCode();
+ }
+
+ /**
+ * Gets name task executed by current thread.
+ *
+ * @return Task name or {@code null} if security is disabled.
+ */
+ public String currentTaskName() {
+ if (!ctx.security().enabled())
+ return null;
+
+ ComputeTaskSession ses = currentSess.get();
+
+ if (ses == null)
+ return null;
+
+ return ses.getTaskName();
+ }
+
+ /**
+ * @param jobWorker Worker.
+ * @return {@code True} if job has not been cancelled and should be
activated.
+ */
+ private boolean onBeforeActivateJob(GridJobWorker jobWorker) {
+ assert jobWorker != null;
+
+ activeJobs.put(jobWorker.getJobId(), jobWorker);
+
+ // Check if job has been concurrently cancelled.
+ Boolean sysCancelled = cancelReqs.get(jobWorker.getSession().getId());
+
+ if (sysCancelled == null)
+ sysCancelled = cancelReqs.get(jobWorker.getJobId());
+
+ if (sysCancelled != null) {
+ // Job has been concurrently cancelled.
+ // Remove from active jobs.
+ activeJobs.remove(jobWorker.getJobId(), jobWorker);
+
+ // Even if job has been removed from another thread, we need to
reject it
+ // here since job has never been executed.
+ IgniteCheckedException e2 = new ComputeExecutionRejectedException(
+ "Job was cancelled before execution [jobSes=" + jobWorker.
+ getSession() + ", job=" + jobWorker.getJob() + ']');
+
+ jobWorker.finishJob(null, e2, !sysCancelled);
+
+ return false;
+ }
+
+ // Job has not been cancelled and should be activated.
+ // However we need to check if master is alive before job will get
+ // its runner thread for proper master leave handling.
+ if (ctx.discovery().node(jobWorker.getTaskNode().id()) == null &&
+ activeJobs.remove(jobWorker.getJobId(), jobWorker)) {
+ // Add to cancelled jobs.
+ cancelledJobs.put(jobWorker.getJobId(), jobWorker);
+
+ if (!jobWorker.onMasterNodeLeft()) {
+ U.warn(log, "Job is being cancelled because master task node
left grid " +
+ "(as there is no one waiting for results, job will not be
failed over): " +
+ jobWorker.getJobId());
+
+ cancelJob(jobWorker, true);
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * @param jobWorker Job worker.
+ * @return {@code True} if job has been submitted to pool.
+ */
+ private boolean executeAsync(GridJobWorker jobWorker) {
+ try {
+ ctx.config().getExecutorService().execute(jobWorker);
+
+ if (metricsUpdateFreq > -1L)
+ startedJobsCnt.increment();
+
+ return true;
+ }
+ catch (RejectedExecutionException e) {
+ // Remove from active jobs.
+ activeJobs.remove(jobWorker.getJobId(), jobWorker);
+
+ // Even if job was removed from another thread, we need to reject
it
+ // here since job has never been executed.
+ IgniteCheckedException e2 = new
ComputeExecutionRejectedException("Job has been rejected " +
+ "[jobSes=" + jobWorker.getSession() + ", job=" +
jobWorker.getJob() + ']', e);
+
+ if (metricsUpdateFreq > -1L)
+ rejectedJobsCnt.increment();
+
+ jobWorker.finishJob(null, e2, true);
+ }
+
+ return false;
+ }
+
+ /**
+ * Handles errors that happened prior to job creation.
+ *
+ * @param node Sender node.
+ * @param req Job execution request.
+ * @param ex Exception that happened.
+ * @param endTime Job end time.
+ */
+ private void handleException(ClusterNode node, GridJobExecuteRequest req,
IgniteCheckedException ex, long endTime) {
+ UUID locNodeId = ctx.localNodeId();
+
+ ClusterNode sndNode = ctx.discovery().node(node.id());
+
+ if (sndNode == null) {
+ U.warn(log, "Failed to reply to sender node because it left grid
[nodeId=" + node.id() +
+ ", jobId=" + req.getJobId() + ']');
+
+ if (ctx.event().isRecordable(EVT_JOB_FAILED)) {
+ IgniteJobEvent evt = new IgniteJobEvent();
+
+ evt.jobId(req.getJobId());
+ evt.message("Job reply failed (original task node left grid):
" + req.getJobId());
+ evt.node(ctx.discovery().localNode());
+ evt.taskName(req.getTaskName());
+ evt.taskClassName(req.getTaskClassName());
+ evt.taskSessionId(req.getSessionId());
+ evt.type(EVT_JOB_FAILED);
+ evt.taskNode(node);
+ evt.taskSubjectId(req.getSubjectId());
+
+ // Record job reply failure.
+ ctx.event().record(evt);
+ }
+
+ return;
+ }
+
+ try {
+ boolean loc = ctx.localNodeId().equals(sndNode.id()) &&
!ctx.config().isMarshalLocalJobs();
+
+ GridJobExecuteResponse jobRes = new GridJobExecuteResponse(
+ locNodeId,
+ req.getSessionId(),
+ req.getJobId(),
+ loc ? null : marsh.marshal(ex),
+ ex,
+ loc ? null : marsh.marshal(null),
+ null,
+ loc ? null : marsh.marshal(null),
+ null,
+ false);
+
+ if (req.isSessionFullSupport()) {
+ // Send response to designated job topic.
+ // Always go through communication to preserve order,
+ // if attributes are enabled.
+ // Job response topic.
+ Object topic = TOPIC_TASK.topic(req.getJobId(), locNodeId);
+
+ long timeout = endTime - U.currentTimeMillis();
+
+ if (timeout <= 0)
+ // Ignore the actual timeout and send response anyway.
+ timeout = 1;
+
+ // Send response to designated job topic.
+ // Always go through communication to preserve order.
- long msgId = ctx.io().nextMessageId(topic, sndNode.id());
-
- ctx.io().removeMessageId(topic);
-
+ ctx.io().sendOrderedMessage(
+ sndNode,
+ topic,
- msgId,
+ jobRes,
+ req.isInternal() ? MANAGEMENT_POOL : SYSTEM_POOL,
+ timeout,
+ false);
+ }
+ else if (ctx.localNodeId().equals(sndNode.id()))
+ ctx.task().processJobExecuteResponse(ctx.localNodeId(),
jobRes);
+ else
+ // Send response to common topic as unordered message.
+ ctx.io().send(sndNode, TOPIC_TASK, jobRes, req.isInternal() ?
MANAGEMENT_POOL : SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ // The only option here is to log, as we must assume that
resending will fail too.
+ if (isDeadNode(node.id()))
+ // Avoid stack trace for left nodes.
+ U.error(log, "Failed to reply to sender node because it left
grid [nodeId=" + node.id() +
+ ", jobId=" + req.getJobId() + ']');
+ else {
+ assert sndNode != null;
+
+ U.error(log, "Error sending reply for job [nodeId=" +
sndNode.id() + ", jobId=" +
+ req.getJobId() + ']', e);
+ }
+
+ if (ctx.event().isRecordable(EVT_JOB_FAILED)) {
+ IgniteJobEvent evt = new IgniteJobEvent();
+
+ evt.jobId(req.getJobId());
+ evt.message("Failed to send reply for job: " +
req.getJobId());
+ evt.node(ctx.discovery().localNode());
+ evt.taskName(req.getTaskName());
+ evt.taskClassName(req.getTaskClassName());
+ evt.taskSessionId(req.getSessionId());
+ evt.type(EVT_JOB_FAILED);
+ evt.taskNode(node);
+ evt.taskSubjectId(req.getSubjectId());
+
+ // Record job reply failure.
+ ctx.event().record(evt);
+ }
+ }
+ }
+
+ /**
+ * @param nodeId Node ID.
+ * @param req Request.
+ */
+ @SuppressWarnings({"SynchronizationOnLocalVariableOrMethodParameter",
"RedundantCast"})
+ private void processTaskSessionRequest(UUID nodeId,
GridTaskSessionRequest req) {
+ rwLock.readLock();
+
+ try {
+ if (stopping) {
+ if (log.isDebugEnabled())
+ log.debug("Received job session request while stopping
grid (will ignore): " + req);
+
+ return;
+ }
+
+ GridTaskSessionImpl ses =
ctx.session().getSession(req.getSessionId());
+
+ if (ses == null) {
+ if (log.isDebugEnabled())
+ log.debug("Received job session request for non-existing
session: " + req);
+
+ return;
+ }
+
+ boolean loc = ctx.localNodeId().equals(nodeId) &&
!ctx.config().isMarshalLocalJobs();
+
+ Map<?, ?> attrs = loc ? req.getAttributes() :
+ (Map<?, ?>)marsh.unmarshal(req.getAttributesBytes(),
ses.getClassLoader());
+
+ if (ctx.event().isRecordable(EVT_TASK_SESSION_ATTR_SET)) {
+ IgniteEvent evt = new IgniteTaskEvent(
+ ctx.discovery().localNode(),
+ "Changed attributes: " + attrs,
+ EVT_TASK_SESSION_ATTR_SET,
+ ses.getId(),
+ ses.getTaskName(),
+ ses.getTaskClassName(),
+ false,
+ null);
+
+ ctx.event().record(evt);
+ }
+
+ synchronized (ses) {
+ ses.setInternal(attrs);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to deserialize session attributes.", e);
+ }
+ finally {
+ rwLock.readUnlock();
+ }
+ }
+
+ /**
+ * Checks whether node is alive or dead.
+ *
+ * @param uid UID of node to check.
+ * @return {@code true} if node is dead, {@code false} is node is alive.
+ */
+ private boolean isDeadNode(UUID uid) {
+ return ctx.discovery().node(uid) == null ||
!ctx.discovery().pingNode(uid);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void printMemoryStats() {
+ X.println(">>>");
+ X.println(">>> Job processor memory stats [grid=" + ctx.gridName() +
']');
+ X.println(">>> activeJobsSize: " + activeJobs.size());
+ X.println(">>> passiveJobsSize: " + (jobAlwaysActivate ? "n/a" :
passiveJobs.size()));
+ X.println(">>> cancelledJobsSize: " + cancelledJobs.size());
+ X.println(">>> cancelReqsSize: " + cancelReqs.sizex());
+ X.println(">>> finishedJobsSize: " + finishedJobs.sizex());
+ }
+
+ /**
+ *
+ */
+ private class CollisionJobContext extends GridCollisionJobContextAdapter {
+ /** */
+ private final boolean passive;
+
+ /**
+ * @param jobWorker Job Worker.
+ * @param passive {@code True} if job is on waiting list on creation
time.
+ */
+ CollisionJobContext(GridJobWorker jobWorker, boolean passive) {
+ super(jobWorker);
+
+ assert !jobWorker.isInternal();
+ assert !jobAlwaysActivate;
+
+ this.passive = passive;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean activate() {
+ GridJobWorker jobWorker = getJobWorker();
+
+ return passiveJobs.remove(jobWorker.getJobId(), jobWorker) &&
+ onBeforeActivateJob(jobWorker) &&
+ executeAsync(jobWorker);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean cancel() {
+ GridJobWorker jobWorker = getJobWorker();
+
+ cancelReqs.putIfAbsent(jobWorker.getJobId(), false);
+
+ boolean ret = false;
+
+ if (passive) {
+ // If waiting job being rejected.
+ if (passiveJobs.remove(jobWorker.getJobId(), jobWorker)) {
+ rejectJob(jobWorker, true);
+
+ if (metricsUpdateFreq > -1L)
+ rejectedJobsCnt.increment();
+
+ ret = true;
+ }
+ }
+ // If active job being cancelled.
+ else if (activeJobs.remove(jobWorker.getJobId(), jobWorker)) {
+ cancelledJobs.put(jobWorker.getJobId(), jobWorker);
+
+ if (finishedJobs.contains(jobWorker.getJobId()))
+ // Job has finished concurrently.
+ cancelledJobs.remove(jobWorker.getJobId(), jobWorker);
+ else
+ // We do apply cancel as many times as user cancel job.
+ cancelJob(jobWorker, false);
+
+ ret = true;
+ }
+
+ return ret;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(CollisionJobContext.class, this);
+ }
+ }
+
+ /**
+ *
+ */
+ private class CollisionExternalListener implements
org.apache.ignite.spi.collision.CollisionExternalListener {
+ /** {@inheritDoc} */
+ @Override public void onExternalCollision() {
+ assert !jobAlwaysActivate;
+
+ if (log.isDebugEnabled())
+ log.debug("Received external collision event.");
+
+ rwLock.readLock();
+
+ try {
+ if (stopping) {
+ if (log.isDebugEnabled())
+ log.debug("Received external collision notification
while stopping grid (will ignore).");
+
+ return;
+ }
+
+ handleCollisions();
+ }
+ finally {
+ rwLock.readUnlock();
+ }
+ }
+ }
+
+ /**
+ * Handles job state changes.
+ */
+ private class JobEventListener implements GridJobEventListener {
+ /** */
+ private final GridMessageListener sesLsnr = new JobSessionListener();
+
+ /** {@inheritDoc} */
+ @Override public void onJobStarted(GridJobWorker worker) {
+ if (log.isDebugEnabled())
+ log.debug("Received onJobStarted() callback: " + worker);
+
+ if (metricsUpdateFreq > -1L)
+ updateJobMetrics();
+
+ // Register for timeout notifications.
+ if (worker.endTime() < Long.MAX_VALUE)
+ ctx.timeout().addTimeoutObject(worker);
+
+ if (worker.getSession().isFullSupport())
+ // Register session request listener for this job.
+ ctx.io().addMessageListener(worker.getJobTopic(), sesLsnr);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onBeforeJobResponseSent(GridJobWorker worker) {
+ if (log.isDebugEnabled())
+ log.debug("Received onBeforeJobResponseSent() callback: " +
worker);
+
+ assert jobAlwaysActivate ||
!passiveJobs.containsKey(worker.getJobId());
+
+ if (worker.getSession().isFullSupport()) {
+ // Unregister session request listener for this jobs.
+ ctx.io().removeMessageListener(worker.getJobTopic());
-
- // Unregister message IDs used for sending.
- ctx.io().removeMessageId(worker.getTaskTopic());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onJobFinished(GridJobWorker worker) {
+ if (log.isDebugEnabled())
+ log.debug("Received onJobFinished() callback: " + worker);
+
+ GridJobSessionImpl ses = worker.getSession();
+
+ // If last job for the task on this node.
+ if (ses.isFullSupport() &&
ctx.session().removeSession(ses.getId())) {
+ ses.onClosed();
+
+ // Unregister checkpoints.
+ ctx.checkpoint().onSessionEnd(ses, true);
+ }
+
+ // Unregister from timeout notifications.
+ if (worker.endTime() < Long.MAX_VALUE)
+ ctx.timeout().removeTimeoutObject(worker);
+
+ release(worker.getDeployment());
+
+ finishedJobs.add(worker.getJobId());
+
+ if (!worker.isInternal()) {
+ // Increment job execution counter. This counter gets
+ // reset once this job will be accounted for in metrics.
+ finishedJobsCnt.increment();
+
+ // Increment job execution time. This counter gets
+ // reset once this job will be accounted for in metrics.
+ long execTime = worker.getExecuteTime();
+
+ finishedJobsTime.add(execTime);
+
+ maxFinishedJobsTime.setIfGreater(execTime);
+
+ if (jobAlwaysActivate) {
+ if (metricsUpdateFreq > -1L)
+ updateJobMetrics();
+ }
+ else {
+ rwLock.readLock();
+
+ try {
+ if (stopping) {
+ if (log.isDebugEnabled())
+ log.debug("Skipping collision handling on job
finish (node is stopping).");
+
+ return;
+ }
+
+ handleCollisions();
+ }
+ finally {
+ rwLock.readUnlock();
+ }
+ }
+
+ if (!activeJobs.remove(worker.getJobId(), worker))
+ cancelledJobs.remove(worker.getJobId(), worker);
+
+ heldJobs.remove(worker.getJobId());
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private class JobHoldListener implements GridJobHoldListener {
+ /** {@inheritDoc} */
+ @Override public void onHold(GridJobWorker worker) {
+ if (log.isDebugEnabled())
+ log.debug("Received onHold() callback [worker=" + worker +
']');
+
+ if (activeJobs.containsKey(worker.getJobId())) {
+ heldJobs.add(worker.getJobId());
+
+ if (!activeJobs.containsKey(worker.getJobId()))
+ heldJobs.remove(worker.getJobId());
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUnhold(GridJobWorker worker) {
+ if (log.isDebugEnabled())
+ log.debug("Received onUnhold() callback [worker=" + worker +
", active=" + activeJobs +
+ ", held=" + heldJobs + ']');
+
+ heldJobs.remove(worker.getJobId());
+ }
+ }
+
+ /**
+ *
+ */
+ private class JobSessionListener implements GridMessageListener {
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert nodeId != null;
+ assert msg != null;
+
+ if (log.isDebugEnabled())
+ log.debug("Received session attribute request message [msg="
+ msg + ", nodeId=" + nodeId + ']');
+
+ processTaskSessionRequest(nodeId, (GridTaskSessionRequest)msg);
+ }
+ }
+
+ /**
+ * Handles task and job cancellations.
+ */
+ private class JobCancelListener implements GridMessageListener {
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert nodeId != null;
+ assert msg != null;
+
+ GridJobCancelRequest cancelMsg = (GridJobCancelRequest)msg;
+
+ if (log.isDebugEnabled())
+ log.debug("Received job cancel request [cancelMsg=" +
cancelMsg + ", nodeId=" + nodeId + ']');
+
+ cancelJob(cancelMsg.sessionId(), cancelMsg.jobId(),
cancelMsg.system());
+ }
+ }
+
+ /**
+ * Handles job execution requests.
+ */
+ private class JobExecutionListener implements GridMessageListener {
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert nodeId != null;
+ assert msg != null;
+
+ ClusterNode node = ctx.discovery().node(nodeId);
+
+ if (!ctx.discovery().alive(nodeId)) {
+ U.warn(log, "Received job request message from unknown node
(ignoring) " +
+ "[msg=" + msg + ", nodeId=" + nodeId + ']');
+
+ return;
+ }
+
+ assert node != null;
+
+ processJobExecuteRequest(node, (GridJobExecuteRequest)msg);
+ }
+ }
+
+ /**
+ * Listener to node discovery events.
+ */
+ private class JobDiscoveryListener implements GridLocalEventListener {
+ /**
+ * Counter used to determine whether all nodes updated metrics or not.
+ * This counter is reset every time collisions are handled.
+ */
+ private int metricsUpdateCntr;
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("fallthrough")
+ @Override public void onEvent(IgniteEvent evt) {
+ assert evt instanceof IgniteDiscoveryEvent;
+
+ boolean handleCollisions = false;
+
+ UUID nodeId = ((IgniteDiscoveryEvent)evt).eventNode().id();
+
+ // We should always process discovery events (even on stop,
+ // since we wait for jobs to complete if processor is stopped
+ // without cancellation).
+ switch (evt.type()) {
+ case EVT_NODE_LEFT:
+ case EVT_NODE_FAILED:
+ if (!jobAlwaysActivate) {
+ for (GridJobWorker job : passiveJobs.values()) {
+ if (job.getTaskNode().id().equals(nodeId)) {
+ if (passiveJobs.remove(job.getJobId(), job))
+ U.warn(log, "Task node left grid (job
will not be activated) " +
+ "[nodeId=" + nodeId + ", jobSes=" +
job.getSession() + ", job=" + job + ']');
+ }
+ }
+ }
+
+ for (GridJobWorker job : activeJobs.values()) {
+ if (job.getTaskNode().id().equals(nodeId) &&
!job.isFinishing() &&
+ activeJobs.remove(job.getJobId(), job)) {
+ // Add to cancelled jobs.
+ cancelledJobs.put(job.getJobId(), job);
+
+ if (finishedJobs.contains(job.getJobId()))
+ // Job has finished concurrently.
+ cancelledJobs.remove(job.getJobId(), job);
+ else if (!job.onMasterNodeLeft()) {
+ U.warn(log, "Job is being cancelled because
master task node left grid " +
+ "(as there is no one waiting for results,
job will not be failed over): " +
+ job.getJobId());
+
+ cancelJob(job, true);
+ }
+ }
+ }
+
+ handleCollisions = true;
+
+ break;
+
+ case EVT_NODE_METRICS_UPDATED:
+ // Check for less-than-equal rather than just equal
+ // in guard against topology changes.
+ if (ctx.discovery().allNodes().size() <=
++metricsUpdateCntr) {
+ metricsUpdateCntr = 0;
+
+ handleCollisions = true;
+ }
+
+ break;
+
+ default:
+ assert false;
+ }
+
+ if (handleCollisions) {
+ rwLock.readLock();
+
+ try {
+ if (stopping) {
+ if (log.isDebugEnabled())
+ log.debug("Skipped collision handling on
discovery event (node is stopping): " + evt);
+
+ return;
+ }
+
+ if (!jobAlwaysActivate)
+ handleCollisions();
+ else if (metricsUpdateFreq > -1L)
+ updateJobMetrics();
+ }
+ finally {
+ rwLock.readUnlock();
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ private class JobsMap extends ConcurrentLinkedHashMap<IgniteUuid,
GridJobWorker> {
+ /**
+ * @param initCap Initial capacity.
+ * @param loadFactor Load factor.
+ * @param concurLvl Concurrency level.
+ */
+ private JobsMap(int initCap, float loadFactor, int concurLvl) {
+ super(initCap, loadFactor, concurLvl);
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridJobWorker put(IgniteUuid key, GridJobWorker val)
{
+ assert !val.isInternal();
+
+ GridJobWorker old = super.put(key, val);
+
+ if (old != null)
+ U.warn(log, "Jobs map already contains mapping for key [key="
+ key + ", val=" + val +
+ ", old=" + old + ']');
+
+ return old;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridJobWorker putIfAbsent(IgniteUuid key,
GridJobWorker val) {
+ assert !val.isInternal();
+
+ GridJobWorker old = super.putIfAbsent(key, val);
+
+ if (old != null)
+ U.warn(log, "Jobs map already contains mapping for key [key="
+ key + ", val=" + val +
+ ", old=" + old + ']');
+
+ return old;
+ }
+
+ /**
+ * @return Constant-time {@code size()}.
+ */
+ @Override public int size() {
+ return sizex();
+ }
+ }
+ }