http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java index 0000000,65a3250..a921346 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/task/GridTaskWorker.java @@@ -1,0 -1,1371 +1,1373 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.task; + + import org.apache.ignite.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.compute.*; + import org.apache.ignite.events.*; + import org.apache.ignite.fs.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.lang.*; + import org.apache.ignite.marshaller.*; + import org.apache.ignite.resources.*; + import org.apache.ignite.internal.managers.deployment.*; + import org.apache.ignite.internal.processors.job.*; + import org.apache.ignite.internal.processors.timeout.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.internal.util.worker.*; + import org.jdk8.backport.*; + import org.jetbrains.annotations.*; + + import java.io.*; + import java.util.*; + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + + import static org.apache.ignite.compute.ComputeJobResultPolicy.*; + import static org.apache.ignite.events.IgniteEventType.*; + import static org.apache.ignite.internal.GridTopic.*; + import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; + import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*; + + /** + * Grid task worker. Handles full task life cycle. + * @param <T> Task argument type. + * @param <R> Task return value type. + */ + class GridTaskWorker<T, R> extends GridWorker implements GridTimeoutObject { + /** Split size threshold. */ + private static final int SPLIT_WARN_THRESHOLD = 1000; + + /** {@code True} for internal tasks. */ + private boolean internal; + + /** */ + private enum State { + /** */ + WAITING, + + /** */ + REDUCING, + + /** */ + REDUCED, + + /** */ + FINISHING + } + + /** Static logger to avoid re-creation. */ + private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); + + /** */ + private final GridKernalContext ctx; + + /** */ + private final IgniteLogger log; + + /** */ + private final IgniteMarshaller marsh; + + /** */ + private final GridTaskSessionImpl ses; + + /** */ + private final GridTaskFutureImpl<R> fut; + + /** */ + private final T arg; + + /** */ + private final GridTaskEventListener evtLsnr; + + /** */ + private Map<IgniteUuid, GridJobResultImpl> jobRes; + + /** */ + private State state = State.WAITING; + + /** */ + private final GridDeployment dep; + + /** Task class. */ + private final Class<?> taskCls; + + /** Optional subgrid. */ + private final Map<GridTaskThreadContextKey, Object> thCtx; + + /** */ + private ComputeTask<T, R> task; + + /** */ + private final Queue<GridJobExecuteResponse> delayedRess = new ConcurrentLinkedDeque8<>(); + + /** */ + private boolean continuous; + + /** */ + private final Object mux = new Object(); + + /** */ + private boolean lockRespProc = true; + + /** */ + private final boolean resCache; + + /** */ + private final boolean noFailover; + + /** */ + private final UUID subjId; + + /** Continuous mapper. */ + private final ComputeTaskContinuousMapper mapper = new ComputeTaskContinuousMapper() { + /** {@inheritDoc} */ + @Override public void send(ComputeJob job, ClusterNode node) throws IgniteCheckedException { + A.notNull(job, "job"); + A.notNull(node, "node"); + + processMappedJobs(Collections.singletonMap(job, node)); + } + + /** {@inheritDoc} */ + @Override public void send(Map<? extends ComputeJob, ClusterNode> mappedJobs) throws IgniteCheckedException { + A.notNull(mappedJobs, "mappedJobs"); + + processMappedJobs(mappedJobs); + } + + /** {@inheritDoc} */ + @Override public void send(ComputeJob job) throws IgniteCheckedException { + A.notNull(job, "job"); + + send(Collections.singleton(job)); + } + + /** {@inheritDoc} */ + @Override public void send(Collection<? extends ComputeJob> jobs) throws IgniteCheckedException { + A.notNull(jobs, "jobs"); + + if (jobs.isEmpty()) + throw new IgniteCheckedException("Empty jobs collection passed to send(...) method."); + + ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, getTaskTopology()); + + for (ComputeJob job : jobs) { + if (job == null) + throw new IgniteCheckedException("Null job passed to send(...) method."); + + processMappedJobs(Collections.singletonMap(job, balancer.getBalancedNode(job, null))); + } + } + }; + + /** + * @param ctx Kernal context. + * @param arg Task argument. + * @param ses Grid task session. + * @param fut Task future. + * @param taskCls Task class. + * @param task Task instance that might be null. + * @param dep Deployed task. + * @param evtLsnr Event listener. + * @param thCtx Thread-local context from task processor. + * @param subjId Subject ID. + */ + GridTaskWorker( + GridKernalContext ctx, + @Nullable T arg, + GridTaskSessionImpl ses, + GridTaskFutureImpl<R> fut, + @Nullable Class<?> taskCls, + @Nullable ComputeTask<T, R> task, + GridDeployment dep, + GridTaskEventListener evtLsnr, + @Nullable Map<GridTaskThreadContextKey, Object> thCtx, + UUID subjId) { + super(ctx.config().getGridName(), "grid-task-worker", ctx.config().getGridLogger()); + + assert ses != null; + assert fut != null; + assert evtLsnr != null; + assert dep != null; + + this.arg = arg; + this.ctx = ctx; + this.fut = fut; + this.ses = ses; + this.taskCls = taskCls; + this.task = task; + this.dep = dep; + this.evtLsnr = evtLsnr; + this.thCtx = thCtx; + this.subjId = subjId; + + log = U.logger(ctx, logRef, this); + + marsh = ctx.config().getMarshaller(); + + resCache = dep.annotation(taskCls, ComputeTaskNoResultCache.class) == null; + + Boolean noFailover = getThreadContext(TC_NO_FAILOVER); + + this.noFailover = noFailover != null ? noFailover : false; + } + + /** + * Gets value from thread-local context. + * + * @param key Thread-local context key. + * @return Thread-local context value, if any. + */ + @SuppressWarnings({"unchecked"}) + @Nullable private <V> V getThreadContext(GridTaskThreadContextKey key) { + return thCtx == null ? null : (V)thCtx.get(key); + } + + /** + * @return Task session ID. + */ + IgniteUuid getTaskSessionId() { + return ses.getId(); + } + + /** + * @return Task session. + */ + GridTaskSessionImpl getSession() { + return ses; + } + + /** + * @return Task future. + */ + GridTaskFutureImpl<R> getTaskFuture() { + return fut; + } + + /** + * Gets property dep. + * + * @return Property dep. + */ + GridDeployment getDeployment() { + return dep; + } + + /** + * @return Grid task. + */ + public ComputeTask<T, R> getTask() { + return task; + } + + /** + * @param task Deployed task. + */ + public void setTask(ComputeTask<T, R> task) { + this.task = task; + } + + /** + * @return {@code True} if task is internal. + */ + public boolean isInternal() { + return internal; + } + + /** {@inheritDoc} */ + @Override public IgniteUuid timeoutId() { + return ses.getId(); + } + + /** {@inheritDoc} */ + @Override public void onTimeout() { + synchronized (mux) { + if (state != State.WAITING) + return; + } + + U.warn(log, "Task has timed out: " + ses); + + recordTaskEvent(EVT_TASK_TIMEDOUT, "Task has timed out."); + + Throwable e = new ComputeTaskTimeoutException("Task timed out (check logs for error messages): " + ses); + + finishTask(null, e); + } + + /** {@inheritDoc} */ + @Override public long endTime() { + return ses.getEndTime(); + } + + /** + * @param taskCls Task class. + * @return Task instance. + * @throws IgniteCheckedException Thrown in case of any instantiation error. + */ + private ComputeTask<T, R> newTask(Class<? extends ComputeTask<T, R>> taskCls) throws IgniteCheckedException { + ComputeTask<T, R> task = dep.newInstance(taskCls); + + if (task == null) + throw new IgniteCheckedException("Failed to instantiate task (is default constructor available?): " + taskCls); + + return task; + } + + /** + * + */ + private void initializeSpis() { + ComputeTaskSpis spis = dep.annotation(taskCls, ComputeTaskSpis.class); + + if (spis != null) { + ses.setLoadBalancingSpi(spis.loadBalancingSpi()); + ses.setFailoverSpi(spis.failoverSpi()); + ses.setCheckpointSpi(spis.checkpointSpi()); + } + } + + /** + * Maps this task's jobs to nodes and sends them out. + */ + @SuppressWarnings({"unchecked"}) + @Override protected void body() { + evtLsnr.onTaskStarted(this); + + try { + // Use either user task or deployed one. + if (task == null) { + assert taskCls != null; + assert ComputeTask.class.isAssignableFrom(taskCls); + + try { + task = newTask((Class<? extends ComputeTask<T, R>>)taskCls); + } + catch (IgniteCheckedException e) { + // If cannot instantiate task, then assign internal flag based + // on information available. + internal = dep.internalTask(null, taskCls); + + recordTaskEvent(EVT_TASK_STARTED, "Task started."); + + throw e; + } + } + + internal = dep.internalTask(task, taskCls); + + recordTaskEvent(EVT_TASK_STARTED, "Task started."); + + initializeSpis(); + + ses.setClassLoader(dep.classLoader()); + + final List<ClusterNode> shuffledNodes = getTaskTopology(); + + // Load balancer. + ComputeLoadBalancer balancer = ctx.loadBalancing().getLoadBalancer(ses, shuffledNodes); + + continuous = ctx.resource().isAnnotationPresent(dep, task, IgniteTaskContinuousMapperResource.class); + + if (log.isDebugEnabled()) + log.debug("Injected task resources [continuous=" + continuous + ']'); + + // Inject resources. + ctx.resource().inject(dep, task, ses, balancer, mapper); + + Map<? extends ComputeJob, ClusterNode> mappedJobs = U.wrapThreadLoader(dep.classLoader(), + new Callable<Map<? extends ComputeJob, ClusterNode>>() { + @Override public Map<? extends ComputeJob, ClusterNode> call() throws IgniteCheckedException { + return task.map(shuffledNodes, arg); + } + }); + + if (log.isDebugEnabled()) + log.debug("Mapped task jobs to nodes [jobCnt=" + (mappedJobs != null ? mappedJobs.size() : 0) + + ", mappedJobs=" + mappedJobs + ", ses=" + ses + ']'); + + if (F.isEmpty(mappedJobs)) { + synchronized (mux) { + // Check if some jobs are sent from continuous mapper. + if (F.isEmpty(jobRes)) + throw new IgniteCheckedException("Task map operation produced no mapped jobs: " + ses); + } + } + else + processMappedJobs(mappedJobs); + + synchronized (mux) { + lockRespProc = false; + } + + processDelayedResponses(); + } + catch (ClusterGroupEmptyException e) { + U.warn(log, "Failed to map task jobs to nodes (topology projection is empty): " + ses); + + finishTask(null, e); + } + catch (IgniteCheckedException e) { + if (!fut.isCancelled()) { + U.error(log, "Failed to map task jobs to nodes: " + ses, e); + + finishTask(null, e); + } + else if (log.isDebugEnabled()) + log.debug("Failed to map task jobs to nodes due to task cancellation: " + ses); + } + // Catch throwable to protect against bad user code. + catch (Throwable e) { + String errMsg = "Failed to map task jobs to nodes due to undeclared user exception" + + " [cause=" + e.getMessage() + ", ses=" + ses + "]"; + + U.error(log, errMsg, e); + + finishTask(null, new ComputeUserUndeclaredException(errMsg, e)); + } + } + + /** + * @param jobs Map of jobs. + * @throws IgniteCheckedException Thrown in case of any error. + */ + private void processMappedJobs(Map<? extends ComputeJob, ClusterNode> jobs) throws IgniteCheckedException { + if (F.isEmpty(jobs)) + return; + + Collection<GridJobResultImpl> jobResList = new ArrayList<>(jobs.size()); + + Collection<ComputeJobSibling> sibs = new ArrayList<>(jobs.size()); + + // Map jobs to nodes for computation. + for (Map.Entry<? extends ComputeJob, ClusterNode> mappedJob : jobs.entrySet()) { + ComputeJob job = mappedJob.getKey(); + ClusterNode node = mappedJob.getValue(); + + if (job == null) + throw new IgniteCheckedException("Job can not be null [mappedJob=" + mappedJob + ", ses=" + ses + ']'); + + if (node == null) + throw new IgniteCheckedException("Node can not be null [mappedJob=" + mappedJob + ", ses=" + ses + ']'); + + IgniteUuid jobId = IgniteUuid.fromUuid(node.id()); + + GridJobSiblingImpl sib = new GridJobSiblingImpl(ses.getId(), jobId, node.id(), ctx); + + jobResList.add(new GridJobResultImpl(job, jobId, node, sib)); + + // Do not add siblings if result cache is disabled. + if (resCache) + sibs.add(sib); + + recordJobEvent(EVT_JOB_MAPPED, jobId, node, "Job got mapped."); + } + + synchronized (mux) { + if (state != State.WAITING) + throw new IgniteCheckedException("Task is not in waiting state [state=" + state + ", ses=" + ses + ']'); + + // Do not add siblings if result cache is disabled. + if (resCache) + ses.addJobSiblings(sibs); + + if (jobRes == null) + jobRes = new HashMap<>(); + + // Populate all remote mappedJobs into map, before mappedJobs are sent. + // This is done to avoid race condition when we start + // getting results while still sending out references. + for (GridJobResultImpl res : jobResList) { + if (jobRes.put(res.getJobContext().getJobId(), res) != null) + throw new IgniteCheckedException("Duplicate job ID for remote job found: " + res.getJobContext().getJobId()); + + res.setOccupied(true); + + if (resCache && jobRes.size() > ctx.discovery().size() && jobRes.size() % SPLIT_WARN_THRESHOLD == 0) + LT.warn(log, null, "Number of jobs in task is too large for task: " + ses.getTaskName() + + ". Consider reducing number of jobs or disabling job result cache with " + + "@GridComputeTaskNoResultCache annotation."); + } + } + + // Set mapped flag. + ses.onMapped(); + + // Send out all remote mappedJobs. + for (GridJobResultImpl res : jobResList) { + evtLsnr.onJobSend(this, res.getSibling()); + + try { + sendRequest(res); + } + finally { + // Open job for processing results. + synchronized (mux) { + res.setOccupied(false); + } + } + } + + processDelayedResponses(); + } + + /** + * @return Topology for this task. + * @throws IgniteCheckedException Thrown in case of any error. + */ + private List<ClusterNode> getTaskTopology() throws IgniteCheckedException { + Collection<UUID> top = ses.getTopology(); + + Collection<? extends ClusterNode> subgrid = top != null ? ctx.discovery().nodes(top) : ctx.discovery().allNodes(); + + int size = subgrid.size(); + + if (size == 0) + throw new ClusterGroupEmptyException("Topology projection is empty."); + + List<ClusterNode> shuffledNodes = new ArrayList<>(size); + + for (ClusterNode node : subgrid) + shuffledNodes.add(node); + + if (shuffledNodes.size() > 1) + // Shuffle nodes prior to giving them to user. + Collections.shuffle(shuffledNodes); + + // Load balancer. + return shuffledNodes; + } + + /** + * + */ + private void processDelayedResponses() { + GridJobExecuteResponse res = delayedRess.poll(); + + if (res != null) + onResponse(res); + } + + /** + * @param msg Job execution response. + */ + void onResponse(GridJobExecuteResponse msg) { + assert msg != null; + + if (fut.isDone()) { + if (log.isDebugEnabled()) + log.debug("Ignoring job response since task has finished: " + msg); + + return; + } + + GridJobExecuteResponse res = msg; + + while (res != null) { + GridJobResultImpl jobRes = null; + + // Flag indicating whether occupied flag for + // job response was changed in this method apply. + boolean selfOccupied = false; + + try { + synchronized (mux) { + // If task is not waiting for responses, + // then there is no point to proceed. + if (state != State.WAITING) { + if (log.isDebugEnabled()) + log.debug("Ignoring response since task is already reducing or finishing [res=" + res + + ", job=" + ses + ", state=" + state + ']'); + + return; + } + + jobRes = this.jobRes.get(res.getJobId()); + + if (jobRes == null) { + if (log.isDebugEnabled()) + U.warn(log, "Received response for unknown child job (was job presumed failed?): " + res); + + return; + } + + // Only process 1st response and ignore following ones. This scenario + // is possible if node has left topology and and fake failure response + // was created from discovery listener and when sending request failed. + if (jobRes.hasResponse()) { + if (log.isDebugEnabled()) + log.debug("Received redundant response for a job (will ignore): " + res); + + return; + } + + if (!jobRes.getNode().id().equals(res.getNodeId())) { + if (log.isDebugEnabled()) + log.debug("Ignoring stale response as job was already resent to other node [res=" + res + + ", jobRes=" + jobRes + ']'); + + // Prevent processing 2 responses for the same job simultaneously. + jobRes.setOccupied(true); + + selfOccupied = true; + + // We can not return here because there can be more delayed messages in the queue. + continue; + } + + if (jobRes.isOccupied()) { + if (log.isDebugEnabled()) + log.debug("Adding response to delayed queue (job is either being sent or processing " + + "another response): " + res); + + delayedRess.offer(res); + + return; + } + + if (lockRespProc) { + delayedRess.offer(res); + + return; + } + + lockRespProc = true; + + selfOccupied = true; + + // Prevent processing 2 responses for the same job simultaneously. + jobRes.setOccupied(true); + + // We don't keep reference to job if results are not cached. + if (!resCache) + this.jobRes.remove(res.getJobId()); + } + + if (res.getFakeException() != null) + jobRes.onResponse(null, res.getFakeException(), null, false); + else { + ClassLoader clsLdr = dep.classLoader(); + + try { + boolean loc = ctx.localNodeId().equals(res.getNodeId()) && !ctx.config().isMarshalLocalJobs(); + + Object res0 = loc ? res.getJobResult() : marsh.unmarshal(res.getJobResultBytes(), clsLdr); + + IgniteCheckedException ex = loc ? res.getException() : + marsh.<IgniteCheckedException>unmarshal(res.getExceptionBytes(), clsLdr); + + Map<Object, Object> attrs = loc ? res.getJobAttributes() : + marsh.<Map<Object, Object>>unmarshal(res.getJobAttributesBytes(), clsLdr); + + jobRes.onResponse(res0, ex, attrs, res.isCancelled()); + + if (loc) + ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobAfterSend.class); + } + catch (IgniteCheckedException e) { + U.error(log, "Error deserializing job response: " + res, e); + + finishTask(null, e); + } + } + + List<ComputeJobResult> results; + + if (!resCache) + results = Collections.emptyList(); + else { + synchronized (mux) { + results = getRemoteResults(); + } + } + + ComputeJobResultPolicy plc = result(jobRes, results); + + if (plc == null) { + String errMsg = "Failed to obtain remote job result policy for result from GridComputeTask.result(..) " + + "method that returned null (will fail the whole task): " + jobRes; + + finishTask(null, new IgniteCheckedException(errMsg)); + + return; + } + + synchronized (mux) { + // If task is not waiting for responses, + // then there is no point to proceed. + if (state != State.WAITING) { + if (log.isDebugEnabled()) + log.debug("Ignoring GridComputeTask.result(..) value since task is already reducing or" + + "finishing [res=" + res + ", job=" + ses + ", state=" + state + ']'); + + return; + } + + switch (plc) { + // Start reducing all results received so far. + case REDUCE: { + state = State.REDUCING; + + break; + } + + // Keep waiting if there are more responses to come, + // otherwise, reduce. + case WAIT: { + assert results.size() <= this.jobRes.size(); + + // If there are more results to wait for. + // If result cache is disabled, then we reduce + // when both collections are empty. + if (results.size() == this.jobRes.size()) { + plc = ComputeJobResultPolicy.REDUCE; + + // All results are received, proceed to reduce method. + state = State.REDUCING; + } + + break; + } + + case FAILOVER: { + if (!failover(res, jobRes, getTaskTopology())) + plc = null; + + break; + } + } + } + + // Outside of synchronization. + if (plc != null) { + // Handle failover. + if (plc == FAILOVER) + sendFailoverRequest(jobRes); + else { + evtLsnr.onJobFinished(this, jobRes.getSibling()); + + if (plc == ComputeJobResultPolicy.REDUCE) + reduce(results); + } + } + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to obtain topology [ses=" + ses + ", err=" + e + ']', e); + + finishTask(null, e); + } + finally { + // Open up job for processing responses. + // Only unset occupied flag, if it was + // set in this method. + if (selfOccupied) { + assert jobRes != null; + + synchronized (mux) { + jobRes.setOccupied(false); + + lockRespProc = false; + } + + // Process delayed responses if there are any. + res = delayedRess.poll(); + } + } + } + } + + /** + * @param jobRes Job result. + * @param results Existing job results. + * @return Job result policy. + */ + @SuppressWarnings({"CatchGenericClass"}) + @Nullable private ComputeJobResultPolicy result(final ComputeJobResult jobRes, final List<ComputeJobResult> results) { + assert !Thread.holdsLock(mux); + + return U.wrapThreadLoader(dep.classLoader(), new CO<ComputeJobResultPolicy>() { + @Nullable @Override public ComputeJobResultPolicy apply() { + try { + // Obtain job result policy. + ComputeJobResultPolicy plc = null; + + try { + plc = task.result(jobRes, results); + + if (plc == FAILOVER && noFailover) { + IgniteCheckedException e = jobRes.getException(); + + if (e != null) + throw e; + + plc = WAIT; + } + } + finally { + recordJobEvent(EVT_JOB_RESULTED, jobRes.getJobContext().getJobId(), + jobRes.getNode(), "Job got resulted with: " + plc); + } + + if (log.isDebugEnabled()) + log.debug("Obtained job result policy [policy=" + plc + ", ses=" + ses + ']'); + + return plc; + } + catch (IgniteCheckedException e) { + if (X.hasCause(e, GridInternalException.class) || + X.hasCause(e, IgniteFsOutOfSpaceException.class)) { + // Print internal exceptions only if debug is enabled. + if (log.isDebugEnabled()) + U.error(log, "Failed to obtain remote job result policy for result from " + + "GridComputeTask.result(..) method (will fail the whole task): " + jobRes, e); + } + else if (X.hasCause(e, ComputeJobFailoverException.class)) { + IgniteCheckedException e0 = new IgniteCheckedException(" Job was not failed over because " + + "GridComputeJobResultPolicy.FAILOVER was not returned from " + + "GridTask.result(...) method for job result with GridComputeJobFailoverException.", e); + + finishTask(null, e0); + + return null; + } + else + U.error(log, "Failed to obtain remote job result policy for result from " + + "GridComputeTask.result(..) method (will fail the whole task): " + jobRes, e); + + finishTask(null, e); + + return null; + } + catch (IgniteException e) { + if (X.hasCause(e, GridInternalException.class) || + X.hasCause(e, IgniteFsOutOfSpaceException.class)) { + // Print internal exceptions only if debug is enabled. + if (log.isDebugEnabled()) + U.error(log, "Failed to obtain remote job result policy for result from " + + "GridComputeTask.result(..) method (will fail the whole task): " + jobRes, e); + } + else if (X.hasCause(e, ComputeJobFailoverException.class)) { + IgniteCheckedException e0 = new IgniteCheckedException(" Job was not failed over because " + + "GridComputeJobResultPolicy.FAILOVER was not returned from " + + "GridTask.result(...) method for job result with GridComputeJobFailoverException.", e); + + finishTask(null, e0); + + return null; + } + else + U.error(log, "Failed to obtain remote job result policy for result from" + + "GridComputeTask.result(..) method (will fail the whole task): " + jobRes, e); + + finishTask(null, e); + + return null; + } + catch (Throwable e) { + String errMsg = "Failed to obtain remote job result policy for result from" + + "GridComputeTask.result(..) method due to undeclared user exception " + + "(will fail the whole task): " + jobRes; + + U.error(log, errMsg, e); + + Throwable tmp = new ComputeUserUndeclaredException(errMsg, e); + + // Failed to successfully obtain result policy and + // hence forced to fail the whole deployed task. + finishTask(null, tmp); + + return null; + } + } + }); + } + + /** + * @param results Job results. + */ + private void reduce(final List<ComputeJobResult> results) { + R reduceRes = null; + Throwable userE = null; + + try { + try { + // Reduce results. + reduceRes = U.wrapThreadLoader(dep.classLoader(), new Callable<R>() { + @Nullable @Override public R call() throws IgniteCheckedException { + return task.reduce(results); + } + }); + } + finally { + synchronized (mux) { + assert state == State.REDUCING : "Invalid task state: " + state; + + state = State.REDUCED; + } + } + + if (log.isDebugEnabled()) + log.debug("Reduced job responses [reduceRes=" + reduceRes + ", ses=" + ses + ']'); + + recordTaskEvent(EVT_TASK_REDUCED, "Task reduced."); + } + catch (ClusterTopologyException e) { + U.warn(log, "Failed to reduce job results for task (any nodes from task topology left grid?): " + task); + + userE = e; + } + catch (IgniteCheckedException e) { + U.error(log, "Failed to reduce job results for task: " + task, e); + + userE = e; + } + // Catch Throwable to protect against bad user code. + catch (Throwable e) { + String errMsg = "Failed to reduce job results due to undeclared user exception [task=" + task + + ", err=" + e + ']'; + + U.error(log, errMsg, e); + + userE = new ComputeUserUndeclaredException(errMsg ,e); + } + finally { + finishTask(reduceRes, userE); + } + } + + /** + * @param res Execution response. + * @param jobRes Job result. + * @param top Topology. + * @return {@code True} if fail-over SPI returned a new node. + */ + private boolean failover(GridJobExecuteResponse res, GridJobResultImpl jobRes, Collection<? extends ClusterNode> top) { + assert Thread.holdsLock(mux); + + try { + ctx.resource().invokeAnnotated(dep, jobRes.getJob(), ComputeJobBeforeFailover.class); + + // Map to a new node. + ClusterNode node = ctx.failover().failover(ses, jobRes, new ArrayList<>(top)); + + if (node == null) { + String msg = "Failed to failover a job to another node (failover SPI returned null) [job=" + + jobRes.getJob() + ", node=" + jobRes.getNode() + ']'; + + if (log.isDebugEnabled()) + log.debug(msg); + + Throwable e = new ClusterTopologyException(msg, jobRes.getException()); + + finishTask(null, e); + + return false; + } + + if (log.isDebugEnabled()) + log.debug("Resolved job failover [newNode=" + node + ", oldNode=" + jobRes.getNode() + + ", job=" + jobRes.getJob() + ", resMsg=" + res + ']'); + + jobRes.setNode(node); + jobRes.resetResponse(); + + if (!resCache) { + synchronized (mux) { + // Store result back in map before sending. + this.jobRes.put(res.getJobId(), jobRes); + } + } + + return true; + } + // Catch Throwable to protect against bad user code. + catch (Throwable e) { + String errMsg = "Failed to failover job due to undeclared user exception [job=" + + jobRes.getJob() + ", err=" + e + ']'; + + U.error(log, errMsg, e); + + finishTask(null, new ComputeUserUndeclaredException(errMsg, e)); + + return false; + } + } + + /** + * @param jobRes Job result. + */ + private void sendFailoverRequest(GridJobResultImpl jobRes) { + // Internal failover notification. + evtLsnr.onJobFailover(this, jobRes.getSibling(), jobRes.getNode().id()); + + long timeout = ses.getEndTime() - U.currentTimeMillis(); + + if (timeout > 0) { + recordJobEvent(EVT_JOB_FAILED_OVER, jobRes.getJobContext().getJobId(), + jobRes.getNode(), "Job failed over."); + + // Send new reference to remote nodes for execution. + sendRequest(jobRes); + } + else + // Don't apply 'finishTask(..)' here as it will + // be called from 'onTimeout(..)' callback. + U.warn(log, "Failed to fail-over job due to task timeout: " + jobRes); + } + + /** + * Interrupts child jobs on remote nodes. + */ + private void cancelChildren() { + Collection<GridJobResultImpl> doomed = new LinkedList<>(); + + synchronized (mux) { + // Only interrupt unfinished jobs. + if (jobRes != null) + for (GridJobResultImpl res : jobRes.values()) + if (!res.hasResponse()) + doomed.add(res); + } + + // Send cancellation request to all unfinished children. + for (GridJobResultImpl res : doomed) { + UUID nodeId = res.getNode().id(); + + if (nodeId.equals(ctx.localNodeId())) + // Cancel local jobs. + ctx.job().cancelJob(ses.getId(), res.getJobContext().getJobId(), /*courtesy*/true); + else { + try { + ClusterNode node = ctx.discovery().node(nodeId); + + if (node != null) + ctx.io().send(node, + TOPIC_JOB_CANCEL, + new GridJobCancelRequest(ses.getId(), res.getJobContext().getJobId(), /*courtesy*/true), + PUBLIC_POOL); + } + catch (IgniteCheckedException e) { + if (!isDeadNode(nodeId)) + U.error(log, "Failed to send cancel request to node (will ignore) [nodeId=" + + nodeId + ", taskName=" + ses.getTaskName() + + ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']', e); + } + } + } + } + + /** + * @param res Job result. + */ + private void sendRequest(ComputeJobResult res) { + assert res != null; + + GridJobExecuteRequest req = null; + + ClusterNode node = res.getNode(); + + try { + ClusterNode curNode = ctx.discovery().node(node.id()); + + // Check if node exists prior to sending to avoid cases when a discovery + // listener notified about node leaving after topology resolution. Note + // that we make this check because we cannot count on exception being + // thrown in case of send failure. + if (curNode == null) { + U.warn(log, "Failed to send job request because remote node left grid (if fail-over is enabled, " + + "will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() + + ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']'); + + ctx.resource().invokeAnnotated(dep, res.getJob(), ComputeJobAfterSend.class); + + GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(), + res.getJobContext().getJobId(), null, null, null, null, null, null, false); + + fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " + node)); + + onResponse(fakeRes); + } + else { + long timeout = ses.getEndTime() == Long.MAX_VALUE ? Long.MAX_VALUE : + ses.getEndTime() - U.currentTimeMillis(); + + if (timeout > 0) { + boolean loc = node.id().equals(ctx.discovery().localNode().id()) && + !ctx.config().isMarshalLocalJobs(); + + Map<Object, Object> sesAttrs = ses.isFullSupport() ? ses.getAttributes() : null; + Map<? extends Serializable, ? extends Serializable> jobAttrs = + (Map<? extends Serializable, ? extends Serializable>)res.getJobContext().getAttributes(); + + boolean forceLocDep = internal || !ctx.deploy().enabled(); + + req = new GridJobExecuteRequestV2( + ses.getId(), + res.getJobContext().getJobId(), + ses.getTaskName(), + ses.getUserVersion(), + ses.getTaskClassName(), + loc ? null : marsh.marshal(res.getJob()), + loc ? res.getJob() : null, + ses.getStartTime(), + timeout, + ses.getTopology(), + loc ? null : marsh.marshal(ses.getJobSiblings()), + loc ? ses.getJobSiblings() : null, + loc ? null : marsh.marshal(sesAttrs), + loc ? sesAttrs : null, + loc ? null: marsh.marshal(jobAttrs), + loc ? jobAttrs : null, + ses.getCheckpointSpi(), + dep.classLoaderId(), + dep.deployMode(), + continuous, + dep.participants(), + forceLocDep, + ses.isFullSupport(), + internal, + subjId); + + if (loc) + ctx.job().processJobExecuteRequest(ctx.discovery().localNode(), req); + else { + // Send job execution request. + ctx.io().send(node, TOPIC_JOB, req, internal ? MANAGEMENT_POOL : PUBLIC_POOL); + + if (log.isDebugEnabled()) + log.debug("Sent job request [req=" + req + ", node=" + node + ']'); + } + + if (!loc) + ctx.resource().invokeAnnotated(dep, res.getJob(), ComputeJobAfterSend.class); + } + else + U.warn(log, "Job timed out prior to sending job execution request: " + res.getJob()); + } + } + catch (IgniteCheckedException e) { + boolean deadNode = isDeadNode(res.getNode().id()); + + // Avoid stack trace if node has left grid. + if (deadNode) + U.warn(log, "Failed to send job request because remote node left grid (if failover is enabled, " + + "will attempt fail-over to another node) [node=" + node + ", taskName=" + ses.getTaskName() + + ", taskSesId=" + ses.getId() + ", jobSesId=" + res.getJobContext().getJobId() + ']'); + else + U.error(log, "Failed to send job request: " + req, e); + + GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(node.id(), ses.getId(), + res.getJobContext().getJobId(), null, null, null, null, null, null, false); + + if (deadNode) + fakeRes.setFakeException(new ClusterTopologyException("Failed to send job due to node failure: " + + node, e)); + else + fakeRes.setFakeException(e); + + onResponse(fakeRes); + } + } + + /** + * @param nodeId Node ID. + */ + void onNodeLeft(UUID nodeId) { + Collection<GridJobExecuteResponse> resList = null; + + synchronized (mux) { + // First check if job cares about future responses. + if (state != State.WAITING) + return; + + if (jobRes != null) { + for (GridJobResultImpl jr : jobRes.values()) { + if (!jr.hasResponse() && jr.getNode().id().equals(nodeId)) { + if (log.isDebugEnabled()) + log.debug("Creating fake response because node left grid [job=" + jr.getJob() + + ", nodeId=" + nodeId + ']'); + + // Artificial response in case if a job is waiting for a response from + // non-existent node. + GridJobExecuteResponse fakeRes = new GridJobExecuteResponse(nodeId, ses.getId(), + jr.getJobContext().getJobId(), null, null, null, null, null, null, false); + + fakeRes.setFakeException(new ClusterTopologyException("Node has left grid: " + nodeId)); + + if (resList == null) + resList = new ArrayList<>(); + + resList.add(fakeRes); + } + } + } + } + + if (resList == null) + return; + + // Simulate responses without holding synchronization. + for (GridJobExecuteResponse res : resList) { + if (log.isDebugEnabled()) + log.debug("Simulating fake response from left node [res=" + res + ", nodeId=" + nodeId + ']'); + + onResponse(res); + } + } + + /** + * @param evtType Event type. + * @param msg Event message. + */ + private void recordTaskEvent(int evtType, String msg) { + if (!internal && ctx.event().isRecordable(evtType)) { + IgniteEvent evt = new IgniteTaskEvent( + ctx.discovery().localNode(), + msg, + evtType, + ses.getId(), + ses.getTaskName(), + ses.getTaskClassName(), + internal, + subjId); + + ctx.event().record(evt); + } + } + + /** + * @param evtType Event type. + * @param jobId Job ID. + * @param evtNode Event node. + * @param msg Event message. + */ + private void recordJobEvent(int evtType, IgniteUuid jobId, ClusterNode evtNode, String msg) { + if (ctx.event().isRecordable(evtType)) { + IgniteJobEvent evt = new IgniteJobEvent(); + + evt.message(msg); + evt.node(ctx.discovery().localNode()); + evt.taskName(ses.getTaskName()); + evt.taskClassName(ses.getTaskClassName()); + evt.taskSessionId(ses.getId()); + evt.taskNode(evtNode); + evt.jobId(jobId); + evt.type(evtType); + evt.taskSubjectId(ses.subjectId()); + + ctx.event().record(evt); + } + } + + /** + * @return Collection of job results. + */ + private List<ComputeJobResult> getRemoteResults() { + assert Thread.holdsLock(mux); + + List<ComputeJobResult> results = new ArrayList<>(jobRes.size()); + + for (GridJobResultImpl jobResult : jobRes.values()) + if (jobResult.hasResponse()) + results.add(jobResult); + + return results; + } + + /** + * @param res Task result. + * @param e Exception. + */ + void finishTask(@Nullable R res, @Nullable Throwable e) { + finishTask(res, e, true); + } + + /** + * @param res Task result. + * @param e Exception. + * @param cancelChildren Whether to cancel children in case the task become cancelled. + */ + void finishTask(@Nullable R res, @Nullable Throwable e, boolean cancelChildren) { + // Avoid finishing a job more than once from + // different threads. + synchronized (mux) { + if (state == State.REDUCING || state == State.FINISHING) + return; + + state = State.FINISHING; + } + + try { + if (e == null) + recordTaskEvent(EVT_TASK_FINISHED, "Task finished."); + else + recordTaskEvent(EVT_TASK_FAILED, "Task failed."); + + // Clean resources prior to finishing future. + evtLsnr.onTaskFinished(this); + + if (cancelChildren) + cancelChildren(); + } + // Once we marked task as 'Finishing' we must complete it. + finally { + fut.onDone(res, e); + + ses.onDone(); + } + } + + /** + * Checks whether node is alive or dead. + * + * @param uid UID of node to check. + * @return {@code true} if node is dead, {@code false} is node is alive. + */ + private boolean isDeadNode(UUID uid) { + return ctx.discovery().node(uid) == null || !ctx.discovery().pingNode(uid); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public boolean equals(Object obj) { + if (this == obj) + return true; + + if (obj == null) + return false; + + assert obj instanceof GridTaskWorker; + + return ses.getId().equals(((GridTaskWorker<T, R>)obj).ses.getId()); + } + + /** {@inheritDoc} */ + @Override public int hashCode() { + return ses.getId().hashCode(); + } + + /** {@inheritDoc} */ + @Override public String toString() { - return S.toString(GridTaskWorker.class, this); ++ synchronized (mux) { ++ return S.toString(GridTaskWorker.class, this); ++ } + } + }
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java ---------------------------------------------------------------------- diff --cc modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java index 0000000,2795e03..7bcb9e2 mode 000000,100644..100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/ipc/shmem/GridIpcSharedMemoryNativeLoader.java @@@ -1,0 -1,243 +1,242 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.util.ipc.shmem; + + import org.apache.ignite.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.internal.util.typedef.internal.*; + + import java.io.*; + import java.net.*; + import java.nio.channels.*; + import java.util.*; + + /** + * Shared memory native loader. + */ + @SuppressWarnings("ErrorNotRethrown") + public class GridIpcSharedMemoryNativeLoader { + /** Loaded flag. */ + private static volatile boolean loaded; + + /** Library name base. */ + private static final String LIB_NAME_BASE = "ggshmem"; + + /** Library name. */ + private static final String LIB_NAME = LIB_NAME_BASE + "-" + GridProductImpl.VER; + + /** Lock file path. */ + private static final File LOCK_FILE = new File(System.getProperty("java.io.tmpdir"), "ggshmem.lock"); + + /** + * @return Operating system name to resolve path to library. + */ + private static String os() { + String name = System.getProperty("os.name").toLowerCase().trim(); + + if (name.startsWith("win")) + throw new IllegalStateException("IPC shared memory native loader should not be called on windows."); + + if (name.startsWith("linux")) + return "linux"; + + if (name.startsWith("mac os x")) + return "osx"; + + return name.replaceAll("\\W+", "_"); + } + + /** + * @return Platform. + */ + private static String platform() { + return os() + bitModel(); + } + + /** + * @return Bit model. + */ + private static int bitModel() { + String prop = System.getProperty("sun.arch.data.model"); + + if (prop == null) + prop = System.getProperty("com.ibm.vm.bitmode"); + + if (prop != null) + return Integer.parseInt(prop); + + // We don't know. + return -1; + } + + /** + * @throws IgniteCheckedException If failed. + */ + public static void load() throws IgniteCheckedException { + if (loaded) + return; + + synchronized (GridIpcSharedMemoryNativeLoader.class) { + if (loaded) + return; + + doLoad(); + + loaded = true; + } + } + + /** + * @throws IgniteCheckedException If failed. + */ + private static void doLoad() throws IgniteCheckedException { + assert Thread.holdsLock(GridIpcSharedMemoryNativeLoader.class); + + Collection<Throwable> errs = new LinkedList<>(); + + try { + // Load native library (the library directory should be in java.library.path). + System.loadLibrary(LIB_NAME); + + return; + } + catch (UnsatisfiedLinkError e) { + errs.add(e); + } + + // Obtain lock on file to prevent concurrent extracts. + try (RandomAccessFile randomAccessFile = new RandomAccessFile(LOCK_FILE, "rws"); - FileLock lock = randomAccessFile.getChannel().lock()) { - ++ FileLock ignored = randomAccessFile.getChannel().lock()) { + if (extractAndLoad(errs, platformSpecificResourcePath())) + return; + + if (extractAndLoad(errs, osSpecificResourcePath())) + return; + + if (extractAndLoad(errs, resourcePath())) + return; + + // Failed to find the library. + assert !errs.isEmpty(); + + throw new IgniteCheckedException("Failed to load native IPC library: " + errs); + } + catch (IOException e) { + throw new IgniteCheckedException("Failed to obtain file lock: " + LOCK_FILE, e); + } + } + + /** + * @return OS resource path. + */ + private static String osSpecificResourcePath() { + return "META-INF/native/" + os() + "/" + mapLibraryName(LIB_NAME_BASE); + } + + /** + * @return Platform resource path. + */ + private static String platformSpecificResourcePath() { + return "META-INF/native/" + platform() + "/" + mapLibraryName(LIB_NAME_BASE); + } + + /** + * @return Resource path. + */ + private static String resourcePath() { + return "META-INF/native/" + mapLibraryName(LIB_NAME_BASE); + } + + /** + * @return Maps library name to file name. + */ + private static String mapLibraryName(String name) { + String libName = System.mapLibraryName(name); + + if (U.isMacOs() && libName.endsWith(".jnilib")) + return libName.substring(0, libName.length() - "jnilib".length()) + "dylib"; + + return libName; + } + + /** + * @param errs Errors collection. + * @param rsrcPath Path. + * @return {@code True} if library was found and loaded. + */ + private static boolean extractAndLoad(Collection<Throwable> errs, String rsrcPath) { + ClassLoader clsLdr = U.detectClassLoader(GridIpcSharedMemoryNativeLoader.class); + + URL rsrc = clsLdr.getResource(rsrcPath); + + if (rsrc != null) + return extract(errs, rsrc, new File(System.getProperty("java.io.tmpdir"), mapLibraryName(LIB_NAME))); + else { + errs.add(new IllegalStateException("Failed to find resource with specified class loader " + + "[rsrc=" + rsrcPath + ", clsLdr=" + clsLdr + ']')); + + return false; + } + } + + /** + * @param errs Errors collection. + * @param src Source. + * @param target Target. + * @return {@code True} if resource was found and loaded. + */ + @SuppressWarnings("ResultOfMethodCallIgnored") + private static boolean extract(Collection<Throwable> errs, URL src, File target) { + FileOutputStream os = null; + InputStream is = null; + + try { + if (!target.exists()) { + is = src.openStream(); + + if (is != null) { + os = new FileOutputStream(target); + + int read; + + byte[] buf = new byte[4096]; + + while ((read = is.read(buf)) != -1) + os.write(buf, 0, read); + } + } + + // chmod 775. + if (!U.isWindows()) + Runtime.getRuntime().exec(new String[] {"chmod", "775", target.getCanonicalPath()}).waitFor(); + + System.load(target.getPath()); + + return true; + } + catch (IOException | UnsatisfiedLinkError | InterruptedException e) { + errs.add(e); + } + finally { + U.closeQuiet(os); + U.closeQuiet(is); + } + + return false; + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java index 0000000,55fa051..538682e mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/managers/communication/GridIoManagerSelfTest.java @@@ -1,0 -1,249 +1,249 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.managers.communication; + + import org.apache.commons.collections.*; + import org.apache.ignite.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.internal.*; + import org.apache.ignite.marshaller.jdk.*; + import org.apache.ignite.spi.communication.tcp.*; + import org.apache.ignite.internal.managers.discovery.*; + import org.apache.ignite.internal.util.direct.*; + import org.apache.ignite.internal.util.typedef.*; + import org.apache.ignite.testframework.*; + import org.apache.ignite.testframework.junits.*; + import org.apache.ignite.testframework.junits.common.*; + import org.mockito.*; + + import java.io.*; + import java.nio.*; + import java.util.*; + import java.util.concurrent.*; + + import static org.mockito.Matchers.any; + import static org.mockito.Mockito.anyLong; + import static org.mockito.Mockito.argThat; + import static org.mockito.Mockito.eq; + import static org.mockito.Mockito.*; + + /** + * Test for {@link GridIoManager}. + */ + public class GridIoManagerSelfTest extends GridCommonAbstractTest { + /** Grid test context. */ + private GridTestKernalContext ctx = new GridTestKernalContext(log); + + /** Test local node. */ + private GridTestNode locNode = new GridTestNode(UUID.randomUUID()); + + /** Test remote node. */ + private GridTestNode rmtNode = new GridTestNode(UUID.randomUUID()); + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + ctx.config().setCommunicationSpi(new TcpCommunicationSpi()); + ctx.config().setMarshaller(new IgniteJdkMarshaller()); + + // Turn off peer class loading to simplify mocking. + ctx.config().setPeerClassLoadingEnabled(false); + + // Register local and remote nodes in discovery manager. + GridDiscoveryManager mockedDiscoveryMgr = Mockito.mock(GridDiscoveryManager.class); + + when(mockedDiscoveryMgr.localNode()).thenReturn(locNode); + when(mockedDiscoveryMgr.remoteNodes()).thenReturn(F.<ClusterNode>asList(rmtNode)); + + ctx.add(mockedDiscoveryMgr); + } + + /** + * @throws Exception If failed. + */ + public void testSendIfOneOfNodesIsLocalAndTopicIsEnum() throws Exception { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + new GridIoManager(ctx).send(F.asList(locNode, rmtNode), GridTopic.TOPIC_CACHE, new Message(), + GridIoPolicy.P2P_POOL); + + return null; + } + }, AssertionError.class, "Internal GridGain code should never call the method with local node in a node list."); + } + + /** + * @throws Exception If failed. + */ + public void testSendIfOneOfNodesIsLocalAndTopicIsObject() throws Exception { + GridTestUtils.assertThrows(log, new Callable<Object>() { + @Override public Object call() throws Exception { + new GridIoManager(ctx).send(F.asList(locNode, rmtNode), new Object(), new Message(), + GridIoPolicy.P2P_POOL); + + return null; + } + }, AssertionError.class, "Internal GridGain code should never call the method with local node in a node list."); + } + + /** + * @throws Exception If failed. + */ + public void testSendUserMessageThinVersionIfOneOfNodesIsLocal() throws Exception { + Object msg = new Object(); + + GridIoManager ioMgr = spy(new TestGridIoManager(ctx)); + + try { + ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg); + } + catch (IgniteCheckedException ignored) { + // No-op. We are using mocks so real sending is impossible. + } + + verify(ioMgr).send(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class), + eq(GridIoPolicy.PUBLIC_POOL)); + + Collection<? extends ClusterNode> rmtNodes = F.view(F.asList(rmtNode), F.remoteNodes(locNode.id())); + + verify(ioMgr).send(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER), + any(GridIoUserMessage.class), eq(GridIoPolicy.PUBLIC_POOL)); + } + + /** + * @throws Exception If failed. + */ + public void testSendUserMessageUnorderedThickVersionIfOneOfNodesIsLocal() throws Exception { + Object msg = new Object(); + + GridIoManager ioMgr = spy(new TestGridIoManager(ctx)); + + try { + ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_GGFS, false, 123L); + } + catch (IgniteCheckedException ignored) { + // No-op. We are using mocks so real sending is impossible. + } + + verify(ioMgr).send(eq(locNode), eq(GridTopic.TOPIC_COMM_USER), any(GridIoUserMessage.class), + eq(GridIoPolicy.PUBLIC_POOL)); + + Collection<? extends ClusterNode> rmtNodes = F.view(F.asList(rmtNode), F.remoteNodes(locNode.id())); + + verify(ioMgr).send(argThat(new IsEqualCollection(rmtNodes)), eq(GridTopic.TOPIC_COMM_USER), + any(GridIoUserMessage.class), eq(GridIoPolicy.PUBLIC_POOL)); + } + + /** + * @throws Exception If failed. + */ + public void testSendUserMessageOrderedThickVersionIfOneOfNodesIsLocal() throws Exception { + Object msg = new Object(); + + GridIoManager ioMgr = spy(new TestGridIoManager(ctx)); + + try { + ioMgr.sendUserMessage(F.asList(locNode, rmtNode), msg, GridTopic.TOPIC_GGFS, true, 123L); + } + catch (Exception ignored) { + // No-op. We are using mocks so real sending is impossible. + } + + verify(ioMgr).sendOrderedMessage( + argThat(new IsEqualCollection(F.asList(locNode, rmtNode))), - eq(GridTopic.TOPIC_COMM_USER), anyLong(), ++ eq(GridTopic.TOPIC_COMM_USER), + any(GridIoUserMessage.class), + eq(GridIoPolicy.PUBLIC_POOL), + eq(123L), + false); + } + + /** + * Test-purposed extension of {@code GridIoManager} with no-op {@code send(...)} methods. + */ + private static class TestGridIoManager extends GridIoManager { + /** + * @param ctx Grid kernal context. + */ + TestGridIoManager(GridKernalContext ctx) { + super(ctx); + } + + /** {@inheritDoc} */ + @Override public void send(ClusterNode node, GridTopic topic, GridTcpCommunicationMessageAdapter msg, + GridIoPolicy plc) throws IgniteCheckedException { + // No-op. + } + } + + /** + * Mockito argument matcher to compare collections produced by {@code F.view()} methods. + */ + private static class IsEqualCollection extends ArgumentMatcher<Collection<? extends ClusterNode>> { + /** Expected collection. */ + private final Collection<? extends ClusterNode> expCol; + + /** + * Default constructor. + * + * @param expCol Expected collection. + */ + IsEqualCollection(Collection<? extends ClusterNode> expCol) { + this.expCol = expCol; + } + + /** + * Matches a given collection to the specified in constructor expected one + * with Apache {@code CollectionUtils.isEqualCollection()}. + * + * @param colToCheck Collection to be matched against the expected one. + * @return True if collections matches. + */ + @Override public boolean matches(Object colToCheck) { + return CollectionUtils.isEqualCollection(expCol, (Collection)colToCheck); + } + } + + /** */ + private static class Message extends GridTcpCommunicationMessageAdapter implements Serializable { + /** {@inheritDoc} */ + @SuppressWarnings("CloneDoesntCallSuperClone") + @Override public GridTcpCommunicationMessageAdapter clone() { + throw new UnsupportedOperationException(); + } + + /** {@inheritDoc} */ + @Override protected void clone0(GridTcpCommunicationMessageAdapter _msg) { + // No-op. + } + + /** {@inheritDoc} */ + @Override public boolean writeTo(ByteBuffer buf) { + return true; + } + + /** {@inheritDoc} */ + @Override public boolean readFrom(ByteBuffer buf) { + return true; + } + + /** {@inheritDoc} */ + @Override public byte directType() { + return 0; + } + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java index 0000000,7cd698d..a30ef73 mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/replicated/GridCacheSyncReplicatedPreloadSelfTest.java @@@ -1,0 -1,166 +1,165 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.internal.processors.cache.distributed.replicated; + + import org.apache.ignite.*; + import org.apache.ignite.cache.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.spi.discovery.tcp.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.*; + import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*; + import org.apache.ignite.testframework.junits.common.*; + import org.jetbrains.annotations.*; + + import java.util.concurrent.*; + import java.util.concurrent.atomic.*; + + import static org.apache.ignite.configuration.IgniteDeploymentMode.*; + import static org.apache.ignite.cache.CacheDistributionMode.*; + import static org.apache.ignite.cache.CacheMode.*; + import static org.apache.ignite.cache.CachePreloadMode.*; + import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*; + + /** + * Multithreaded tests for replicated cache preloader. + */ + public class GridCacheSyncReplicatedPreloadSelfTest extends GridCommonAbstractTest { + /** */ + private TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true); + + /** */ + private static final boolean DISCO_DEBUG_MODE = false; + - + /** + * Constructs test. + */ + public GridCacheSyncReplicatedPreloadSelfTest() { + super(false /* don't start grid. */); + } + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration cfg = super.getConfiguration(gridName); + + TcpDiscoverySpi disco = new TcpDiscoverySpi(); + + disco.setIpFinder(ipFinder); + disco.setDebugMode(DISCO_DEBUG_MODE); + + cfg.setDiscoverySpi(disco); + + CacheConfiguration cacheCfg = defaultCacheConfiguration(); + + cacheCfg.setCacheMode(REPLICATED); + cacheCfg.setDistributionMode(PARTITIONED_ONLY); + cacheCfg.setWriteSynchronizationMode(FULL_SYNC); + + // This property is essential for this test. + cacheCfg.setPreloadMode(SYNC); + + cacheCfg.setPreloadBatchSize(10000); + + cfg.setCacheConfiguration(cacheCfg); + cfg.setDeploymentMode(CONTINUOUS); + + return cfg; + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + super.afterTest(); + + stopAllGrids(); + } + + /** + * @throws Exception If test failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testNodeRestart() throws Exception { + int keyCnt = 1000; + int retries = 20; + + Ignite g0 = startGrid(0); + Ignite g1 = startGrid(1); + + for (int i = 0; i < keyCnt; i++) + g0.cache(null).putx(i, i); + + assertEquals(keyCnt, g0.cache(null).size()); + assertEquals(keyCnt, g1.cache(null).size()); + + for (int n = 0; n < retries; n++) { + info("Starting additional grid node..."); + + Ignite g2 = startGrid(2); + + assertEquals(keyCnt, g2.cache(null).size()); + + info("Stopping additional grid node..."); + + stopGrid(2); + } + } + + /** + * @throws Exception If test failed. + */ + @SuppressWarnings({"TooBroadScope"}) + public void testNodeRestartMultithreaded() throws Exception { + final int keyCnt = 1000; + final int retries = 300; + int threadCnt = 5; + + Ignite g0 = startGrid(0); + Ignite g1 = startGrid(1); + + for (int i = 0; i < keyCnt; i++) + g0.cache(null).putx(i, i); + + assertEquals(keyCnt, g0.cache(null).size()); + assertEquals(keyCnt, g1.cache(null).size()); + + final AtomicInteger cnt = new AtomicInteger(); + + multithreaded( + new Callable() { + @Nullable @Override public Object call() throws Exception { + while (true) { + int c = cnt.incrementAndGet(); + + if (c > retries) + break; + + int idx = c + 1; + + info("Starting additional grid node with index: " + idx); + + startGrid(idx); + + info("Stopping additional grid node with index: " + idx); + + stopGrid(idx); + } + + return null; + } + }, + threadCnt); + } + } http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a4d5dc63/modules/core/src/test/java/org/apache/ignite/session/GridSessionLoadSelfTest.java ---------------------------------------------------------------------- diff --cc modules/core/src/test/java/org/apache/ignite/session/GridSessionLoadSelfTest.java index 0000000,e4de992..d148d37 mode 000000,100644..100644 --- a/modules/core/src/test/java/org/apache/ignite/session/GridSessionLoadSelfTest.java +++ b/modules/core/src/test/java/org/apache/ignite/session/GridSessionLoadSelfTest.java @@@ -1,0 -1,264 +1,267 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.ignite.session; + + import org.apache.ignite.*; + import org.apache.ignite.cluster.*; + import org.apache.ignite.compute.*; + import org.apache.ignite.configuration.*; + import org.apache.ignite.resources.*; + import org.apache.ignite.internal.util.typedef.internal.*; + import org.apache.ignite.testframework.*; + import org.apache.ignite.testframework.junits.common.*; + + import java.io.*; + import java.util.*; + import java.util.concurrent.*; + + /** + * Task session load self test. + */ + @GridCommonTest(group = "Task Session") + public class GridSessionLoadSelfTest extends GridCommonAbstractTest { + /** */ + private static final int THREAD_CNT = 40; + + /** */ + private static final int EXEC_CNT = 10; + + /** */ + private boolean locMarsh; + + /** {@inheritDoc} */ + @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception { + IgniteConfiguration c = super.getConfiguration(gridName); + + c.setMarshalLocalJobs(locMarsh); + c.setPeerClassLoadingEnabled(false); + + return c; + } + + /** {@inheritDoc} */ + @Override protected void beforeTest() throws Exception { + startGrids(2); + } + + /** {@inheritDoc} */ + @Override protected void afterTest() throws Exception { + stopAllGrids(); + } + + /** + * @throws Exception If failed. + */ + public void testSessionLoad() throws Exception { + locMarsh = true; + + checkSessionLoad(); + } + + /** + * @throws Exception If failed. + */ + public void testSessionLoadNoLocalMarshalling() throws Exception { + locMarsh = false; + + checkSessionLoad(); + } + + /** + * @throws Exception If failed. + */ + private void checkSessionLoad() throws Exception { - final Ignite ignite = grid(1); ++ final Ignite ignite = grid(0); + + assert ignite != null; + assert ignite.cluster().nodes().size() == 2; + + info("Thread count: " + THREAD_CNT); + + GridTestUtils.runMultiThreaded(new Callable<Object>() { + @Override public Object call() throws Exception { ++ ComputeTaskFuture f = null; ++ + try { + for (int i = 0; i < EXEC_CNT; i++) + assertEquals(Boolean.TRUE, - executeAsync(ignite.compute().withName("task-name"), ++ (f = executeAsync(ignite.compute().withName("task-name"), + SessionLoadTestTask.class, - ignite.cluster().nodes().size() * 2).get(20000)); ++ ignite.cluster().nodes().size() * 2)).get(20000)); + } + catch (Exception e) { - U.error(log, "Test failed.", e); ++ U.error(log, "Task failed: " + ++ f != null ? f.getTaskSession().getId() : "N/A", e); + + throw e; + } + finally { + info("Thread finished."); + } + + return null; + } + }, THREAD_CNT, "grid-load-test-thread"); + } + + /** + * + */ + @ComputeTaskSessionFullSupport + private static class SessionLoadTestTask extends ComputeTaskAdapter<Integer, Boolean> { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession taskSes; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** */ + private Map<String, Integer> params; + + /** {@inheritDoc} */ + @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, Integer arg) + throws IgniteCheckedException { + assert taskSes != null; + assert arg != null; + assert arg > 1; + + Map<SessionLoadTestJob, ClusterNode> map = new HashMap<>(subgrid.size()); + + Iterator<ClusterNode> iter = subgrid.iterator(); + + Random rnd = new Random(); + + params = new HashMap<>(arg); + + for (int i = 0; i < arg; i++) { + // Recycle iterator. + if (!iter.hasNext()) + iter = subgrid.iterator(); + + String paramName = UUID.randomUUID().toString(); + + int paramVal = rnd.nextInt(); + + taskSes.setAttribute(paramName, paramVal); + + map.put(new SessionLoadTestJob(paramName), iter.next()); + + params.put(paramName, paramVal); + + if (log.isDebugEnabled()) + log.debug("Set session attribute [name=" + paramName + ", value=" + paramVal + ']'); + } + + return map; + } + + /** {@inheritDoc} */ + @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteCheckedException { + assert taskSes != null; + assert results != null; + assert params != null; + assert !params.isEmpty(); + assert results.size() == params.size(); + + if (log.isDebugEnabled()) + log.debug("Reducing: " + params); + + Map<String, Integer> receivedParams = new HashMap<>(); + + boolean allAttrReceived = false; + + for (int i = 0; i < 3 && !allAttrReceived; i++) { + allAttrReceived = true; + + for (Map.Entry<String, Integer> entry : params.entrySet()) { + Serializable attr = taskSes.getAttribute(entry.getKey()); + + assert attr != null; + + int newVal = (Integer)attr; + + receivedParams.put(entry.getKey(), newVal); + + // New value is expected to be +1 to argument value. + if (newVal != entry.getValue() + 1) + allAttrReceived = false; + } + + if (!allAttrReceived) + U.sleep(1000); + } + + if (log.isDebugEnabled()) { + for (Map.Entry<String, Integer> entry : receivedParams.entrySet()) + log.debug("Received session attribute value [name=" + entry.getKey() + ", val=" + entry.getValue() + + ", expected=" + (params.get(entry.getKey()) + 1) + ']'); + } + + return allAttrReceived; + } + } + + /** + * + */ + private static class SessionLoadTestJob extends ComputeJobAdapter { + /** */ + @IgniteTaskSessionResource + private ComputeTaskSession taskSes; + + /** */ + @IgniteLoggerResource + private IgniteLogger log; + + /** + * @param arg Argument. + */ + private SessionLoadTestJob(String arg) { + super(arg); + } + + /** {@inheritDoc} */ + @Override public Serializable execute() throws IgniteCheckedException { + assert taskSes != null; + assert argument(0) != null; + + Serializable ser = taskSes.getAttribute(argument(0)); + + assert ser != null; + + int val = (Integer)ser + 1; + + if (log.isDebugEnabled()) + log.debug("Executing session load job: " + val); + + // Generate garbage. + for (int i = 0; i < 10; i++) + taskSes.setAttribute(argument(0), i); + + // Set final value (+1 to original value). + taskSes.setAttribute(argument(0), val); + + if (log.isDebugEnabled()) + log.debug("Set session attribute [name=" + argument(0) + ", value=" + val + ']'); + + return val; + } + } + }