backport from 6.6.5 of gg-10102

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/43659dcf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/43659dcf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/43659dcf

Branch: refs/heads/ignite-478
Commit: 43659dcf670c66fe5ccb0d306a95e689b200ec47
Parents: bc025d9
Author: Denis Magda <dma...@gridgain.com>
Authored: Tue Apr 21 12:57:10 2015 +0300
Committer: Denis Magda <dma...@gridgain.com>
Committed: Tue Apr 21 12:57:10 2015 +0300

----------------------------------------------------------------------
 .../examples/ScalarContinuationExample.scala    |  10 +-
 .../ignite/compute/ComputeJobContinuation.java  |   2 +
 .../ignite/internal/GridJobContextImpl.java     | 100 +++++++++++-----
 .../processors/job/GridJobHoldListener.java     |   6 +-
 .../processors/job/GridJobProcessor.java        |  22 ++--
 .../internal/processors/job/GridJobWorker.java  |  23 ++--
 .../internal/GridContinuousTaskSelfTest.java    | 114 +++++++++++++++++++
 7 files changed, 222 insertions(+), 55 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43659dcf/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarContinuationExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarContinuationExample.scala
 
b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarContinuationExample.scala
index 3ef0527..203f0b7 100644
--- 
a/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarContinuationExample.scala
+++ 
b/examples/src/main/scala/org/apache/ignite/scalar/examples/ScalarContinuationExample.scala
@@ -154,14 +154,16 @@ class FibonacciClosure (
                         jobCtx.callcc() // Resume job execution.
                 }
 
+                // Hold (suspend) job execution.
+                // It will be resumed in listener above via 'callcc()' call
+                // once both futures are done.
+                jobCtx.holdcc()
+
                 // Attach the same listener to both futures.
                 fut1.listen(lsnr)
                 fut2.listen(lsnr)
 
-                // Hold (suspend) job execution.
-                // It will be resumed in listener above via 'callcc()' call
-                // once both futures are done.
-                return jobCtx.holdcc()
+                return null
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43659dcf/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContinuation.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContinuation.java
 
b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContinuation.java
index 2014f10..cd91e2c 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContinuation.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/compute/ComputeJobContinuation.java
@@ -49,6 +49,7 @@ public interface ComputeJobContinuation {
      * language. Basically, the job is held to be continued later, hence the 
name of the method.
      *
      * @return Always returns {@code null} for convenience to be used in code 
with return statement.
+     * @throws IllegalStateException If job has been already held before.
      */
     @Nullable public <T> T holdcc();
 
@@ -71,6 +72,7 @@ public interface ComputeJobContinuation {
      *
      * @param timeout Timeout in milliseconds after which job will be 
automatically resumed.
      * @return Always returns {@code null} for convenience to be used in code 
with return statement.
+     * @throws IllegalStateException If job has been already held before
      */
     @Nullable public <T> T holdcc(long timeout);
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43659dcf/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java 
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java
index 81f75a4..a1e0135 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/GridJobContextImpl.java
@@ -53,6 +53,9 @@ public class GridJobContextImpl implements ComputeJobContext, 
Externalizable {
     /** Job worker. */
     private GridJobWorker job;
 
+    /** Current timeout object. */
+    private volatile GridTimeoutObject timeoutObj;
+
     /** Attributes mux. Do not use this as object is exposed to user. */
     private final Object mux = new Object();
 
@@ -167,46 +170,62 @@ public class GridJobContextImpl implements 
ComputeJobContext, Externalizable {
                 job = ctx.job().activeJob(jobId);
 
             // Completed?
-            if (job != null) {
-                job.hold();
+            if (job != null && !job.isDone()) {
+                if (!job.hold())
+                    throw new IllegalStateException("Job has already been held 
[ctx=" + this + ']');
 
-                if (timeout > 0 && !job.isDone()) {
-                    final long endTime = U.currentTimeMillis() + timeout;
+                assert timeoutObj == null;
 
-                    // Overflow.
-                    if (endTime > 0) {
-                        ctx.timeout().addTimeoutObject(new GridTimeoutObject() 
{
-                            private final IgniteUuid id = 
IgniteUuid.randomUuid();
+                if (timeout <= 0)
+                    return null;
 
-                            @Override public IgniteUuid timeoutId() {
-                                return id;
-                            }
+                final long endTime = U.currentTimeMillis() + timeout;
 
-                            @Override public long endTime() {
-                                return endTime;
-                            }
+                // Overflow.
+                if (endTime > 0) {
+                    timeoutObj = new GridTimeoutObject() {
+                        private final IgniteUuid id = IgniteUuid.randomUuid();;
 
-                            @Override public void onTimeout() {
-                                try {
-                                    ExecutorService execSvc = job.isInternal() 
?
-                                        ctx.getManagementExecutorService() : 
ctx.getExecutorService();
+                        @Override public IgniteUuid timeoutId() {
+                            return id;
+                        }
 
-                                    assert execSvc != null;
+                        @Override public long endTime() {
+                            return endTime;
+                        }
 
-                                    execSvc.submit(new Runnable() {
-                                        @Override public void run() {
-                                            callcc();
-                                        }
-                                    });
-                                }
-                                catch (RejectedExecutionException e) {
-                                    U.error(log(), "Failed to execute job 
(will execute synchronously).", e);
+                        @Override public void onTimeout() {
+                            try {
+                                synchronized (mux) {
+                                    GridTimeoutObject timeoutObj0 = timeoutObj;
+
+                                    if (timeoutObj0 == null || 
timeoutObj0.timeoutId() != id)
+                                        // The timer was canceled by explicit 
callcc() call.
+                                        return;
 
-                                    callcc();
+                                    timeoutObj = null;
                                 }
+
+                                ExecutorService execSvc = job.isInternal() ?
+                                    ctx.getManagementExecutorService() : 
ctx.getExecutorService();
+
+                                assert execSvc != null;
+
+                                execSvc.submit(new Runnable() {
+                                    @Override public void run() {
+                                        callcc0();
+                                    }
+                                });
                             }
-                        });
-                    }
+                            catch (RejectedExecutionException e) {
+                                U.error(log(), "Failed to execute job (will 
execute synchronously).", e);
+
+                                callcc0();
+                            }
+                        }
+                    };
+
+                    ctx.timeout().addTimeoutObject(timeoutObj);
                 }
             }
         }
@@ -216,13 +235,32 @@ public class GridJobContextImpl implements 
ComputeJobContext, Externalizable {
 
     /** {@inheritDoc} */
     @Override public void callcc() {
+        synchronized (mux) {
+            GridTimeoutObject timeoutObj0 = timeoutObj;
+
+            if (timeoutObj0 != null) {
+                if (ctx != null)
+                    ctx.timeout().removeTimeoutObject(timeoutObj0);
+
+                timeoutObj = null;
+            }
+        }
+
+        callcc0();
+    }
+
+    /**
+     * Unholds job.
+     */
+    private void callcc0() {
         if (ctx != null) {
             if (job == null)
                 job = ctx.job().activeJob(jobId);
 
-            if (job != null)
+            if (job != null) {
                 // Execute in the same thread.
                 job.execute();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43659dcf/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobHoldListener.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobHoldListener.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobHoldListener.java
index 04d9042..3e1dfee 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobHoldListener.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobHoldListener.java
@@ -26,11 +26,13 @@ import java.util.*;
 interface GridJobHoldListener extends EventListener {
     /**
      * @param worker Held job worker.
+     * @return {@code True} if worker has been held.
      */
-    public void onHold(GridJobWorker worker);
+    public boolean onHeld(GridJobWorker worker);
 
     /**
      * @param worker Unheld job worker.
+     * @return {@code True} if worker has been unheld.
      */
-    public void onUnhold(GridJobWorker worker);
+    public boolean onUnheld(GridJobWorker worker);
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43659dcf/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
index 3344028..a13e170 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobProcessor.java
@@ -1635,25 +1635,33 @@ public class GridJobProcessor extends 
GridProcessorAdapter {
      */
     private class JobHoldListener implements GridJobHoldListener {
         /** {@inheritDoc} */
-        @Override public void onHold(GridJobWorker worker) {
+        @Override public boolean onHeld(GridJobWorker worker) {
             if (log.isDebugEnabled())
-                log.debug("Received onHold() callback [worker=" + worker + 
']');
+                log.debug("Received onHeld() callback [worker=" + worker + 
']');
+
+            boolean res = false;
 
             if (activeJobs.containsKey(worker.getJobId())) {
-                heldJobs.add(worker.getJobId());
+                res = heldJobs.add(worker.getJobId());
 
-                if (!activeJobs.containsKey(worker.getJobId()))
+                if (!activeJobs.containsKey(worker.getJobId())) {
                     heldJobs.remove(worker.getJobId());
+
+                    // Job has been completed and therefore cannot be held.
+                    res = false;
+                }
             }
+
+            return res;
         }
 
         /** {@inheritDoc} */
-        @Override public void onUnhold(GridJobWorker worker) {
+        @Override public boolean onUnheld(GridJobWorker worker) {
             if (log.isDebugEnabled())
-                log.debug("Received onUnhold() callback [worker=" + worker + 
", active=" + activeJobs +
+                log.debug("Received onUnheld() callback [worker=" + worker + 
", active=" + activeJobs +
                     ", held=" + heldJobs + ']');
 
-            heldJobs.remove(worker.getJobId());
+            return heldJobs.remove(worker.getJobId());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43659dcf/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
index fdfc5c8..b691180 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/job/GridJobWorker.java
@@ -354,12 +354,15 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
     /**
      * Sets halt flags.
      */
-    public void hold() {
-        held.incrementAndGet();
-
+    public boolean hold() {
         HOLD.set(true);
 
-        holdLsnr.onHold(this);
+        boolean res;
+
+        if (res = holdLsnr.onHeld(this))
+            held.incrementAndGet();
+
+        return res;
     }
 
     /**
@@ -431,7 +434,7 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
     }
 
     /**
-     * @param skipNtf {@code True} to skip job processor {@code onUnhold()}
+     * @param skipNtf {@code True} to skip job processor {@code onUnheld()}
      *      notification (only from {@link #body()}).
      */
     private void execute0(boolean skipNtf) {
@@ -443,13 +446,11 @@ public class GridJobWorker extends GridWorker implements 
GridTimeoutObject {
             super.cancel();
 
         if (!skipNtf) {
-            holdLsnr.onUnhold(this);
-
-            int c = held.decrementAndGet();
-
-            if (c > 0) {
+            if (holdLsnr.onUnheld(this))
+                held.decrementAndGet();
+            else {
                 if (log.isDebugEnabled())
-                    log.debug("Ignoring job execution (job was held several 
times) [c=" + c + ']');
+                    log.debug("Ignoring job execution (job was not held).");
 
                 return;
             }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/43659dcf/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
 
b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
index 7c6bacc..d22dcec 100644
--- 
a/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
+++ 
b/modules/core/src/test/java/org/apache/ignite/internal/GridContinuousTaskSelfTest.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal;
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
 import org.apache.ignite.compute.*;
+import org.apache.ignite.lang.IgniteClosure;
 import org.apache.ignite.resources.*;
 import org.apache.ignite.testframework.*;
 import org.apache.ignite.testframework.junits.common.*;
@@ -134,6 +135,119 @@ public class GridContinuousTaskSelfTest extends 
GridCommonAbstractTest {
         }
     }
 
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testClearTimeouts() throws Exception {
+        int holdccTimeout = 4000;
+
+        try {
+            Ignite grid = startGrid(0);
+
+            TestClearTimeoutsClosure closure = new TestClearTimeoutsClosure();
+
+            grid.compute().apply(closure, holdccTimeout);
+
+            Thread.sleep(holdccTimeout * 2);
+
+            assert closure.counter == 2;
+        }
+        finally {
+            stopGrid(0);
+        }
+    }
+
+    /**
+     * @throws Exception If test failed.
+     */
+    public void testMultipleHoldccCalls() throws Exception {
+        try {
+            Ignite grid = startGrid(0);
+
+            assertTrue(grid.compute().apply(new 
TestMultipleHoldccCallsClosure(), (Object)null));
+        }
+        finally {
+            stopGrid(0);
+        }
+    }
+
+    /** */
+    @SuppressWarnings({"PublicInnerClass"})
+    public static class TestMultipleHoldccCallsClosure implements 
IgniteClosure<Object, Boolean> {
+        /** */
+        private int counter;
+
+        /** */
+        private volatile boolean success;
+
+        /** Auto-inject job context. */
+        @JobContextResource
+        private ComputeJobContext jobCtx;
+
+        /** */
+        @LoggerResource
+        private IgniteLogger log;
+
+        @Override public Boolean apply(Object param) {
+            counter++;
+
+            if (counter == 2)
+                return success;
+
+            jobCtx.holdcc(4000);
+
+            try {
+                jobCtx.holdcc();
+            }
+            catch (IllegalStateException e) {
+                success = true;
+                log.info("Second holdcc() threw IllegalStateException as 
expected.");
+            }
+            finally {
+                new Timer().schedule(new TimerTask() {
+                    @Override public void run() {
+                        jobCtx.callcc();
+                    }
+                }, 1000);
+            }
+
+            return false;
+        }
+    }
+
+    /** */
+    @SuppressWarnings({"PublicInnerClass"})
+    public static class TestClearTimeoutsClosure implements 
IgniteClosure<Integer, Object> {
+        /** */
+        private int counter;
+
+        /** Auto-inject job context. */
+        @JobContextResource
+        private ComputeJobContext jobCtx;
+
+        @Override public Object apply(Integer holdccTimeout) {
+            assert holdccTimeout >= 2000;
+
+            counter++;
+
+            if (counter == 1) {
+                new Timer().schedule(new TimerTask() {
+                    @Override public void run() {
+                        jobCtx.callcc();
+                    }
+                }, 1000);
+
+                jobCtx.holdcc(holdccTimeout);
+            }
+
+            if (counter == 2)
+                // Job returned from the suspended state.
+                return null;
+
+            return null;
+        }
+    }
+
     /** */
     @SuppressWarnings({"PublicInnerClass"})
     public static class TestJobsChainTask implements ComputeTask<Boolean, 
Integer> {

Reply via email to