futures: api cleanup

Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bc18635a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bc18635a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bc18635a

Branch: refs/heads/sprint-2
Commit: bc18635a9358df5f155eba69f30d74410ae5d3e7
Parents: c1b4695
Author: Yakov Zhdanov <yzhda...@gridgain.com>
Authored: Thu Mar 5 11:02:08 2015 +0300
Committer: Yakov Zhdanov <yzhda...@gridgain.com>
Committed: Thu Mar 5 11:02:08 2015 +0300

----------------------------------------------------------------------
 .../cache/GridCacheMultiTxFuture.java           |  40 +-----
 .../internal/util/future/GridFutureAdapter.java | 122 ++++++++++------
 .../processors/schedule/ScheduleFutureImpl.java | 140 +------------------
 3 files changed, 95 insertions(+), 207 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc18635a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java
index c6829aa..71ba123 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java
@@ -25,7 +25,6 @@ import org.apache.ignite.internal.util.future.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 
-import java.io.*;
 import java.util.*;
 import java.util.concurrent.atomic.*;
 
@@ -33,15 +32,9 @@ import java.util.concurrent.atomic.*;
  * Future which waits for completion of one or more transactions.
  */
 public final class GridCacheMultiTxFuture<K, V> extends 
GridFutureAdapter<Boolean> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
     /** Logger reference. */
     private static final AtomicReference<IgniteLogger> logRef = new 
AtomicReference<>();
 
-    /** Transactions to wait for. */
-    private final Set<IgniteInternalTx<K, V>> txs = new GridLeanSet<>();
-
     /** */
     private Set<IgniteInternalTx<K, V>> remainingTxs;
 
@@ -58,46 +51,28 @@ public final class GridCacheMultiTxFuture<K, V> extends 
GridFutureAdapter<Boolea
     }
 
     /**
-     * Empty constructor required for {@link Externalizable}.
-     */
-    public GridCacheMultiTxFuture() {
-        // No-op.
-    }
-
-    /**
-     * @return Transactions to wait for.
-     */
-    public Set<IgniteInternalTx<K, V>> txs() {
-        return txs;
-    }
-
-    /**
-     * @return Remaining transactions.
-     */
-    public Set<IgniteInternalTx<K, V>> remainingTxs() {
-        return remainingTxs;
-    }
-
-    /**
      * @param tx Transaction to add.
      */
     public void addTx(IgniteInternalTx<K, V> tx) {
-        txs.add(tx);
+        if (remainingTxs == null)
+            remainingTxs = new GridConcurrentHashSet<>();
+
+        remainingTxs.add(tx);
     }
 
     /**
      * Initializes this future.
      */
     public void init() {
-        if (F.isEmpty(txs)) {
+        if (remainingTxs == null) {
             remainingTxs = Collections.emptySet();
 
             onDone(true);
         }
         else {
-            remainingTxs = new GridConcurrentHashSet<>(txs);
+            assert !remainingTxs.isEmpty();
 
-            for (final IgniteInternalTx<K, V> tx : txs) {
+            for (final IgniteInternalTx<K, V> tx : remainingTxs) {
                 if (!tx.done()) {
                     tx.finishFuture().listenAsync(new 
CI1<IgniteInternalFuture<IgniteInternalTx>>() {
                         @Override public void 
apply(IgniteInternalFuture<IgniteInternalTx> t) {
@@ -134,7 +109,6 @@ public final class GridCacheMultiTxFuture<K, V> extends 
GridFutureAdapter<Boolea
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridCacheMultiTxFuture.class, this,
-            "txs", F.viewReadOnly(txs, CU.<K, V>tx2xidVersion()),
             "remaining", F.viewReadOnly(remainingTxs, CU.<K, 
V>tx2xidVersion()),
             "super", super.toString()
         );

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc18635a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
index b107c15..b35d18a 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java
@@ -32,9 +32,6 @@ import java.util.concurrent.locks.*;
  * Future adapter.
  */
 public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer 
implements IgniteInternalFuture<R> {
-    /** */
-    private static final long serialVersionUID = 0L;
-
     /** Initial state. */
     private static final int INIT = 0;
 
@@ -44,12 +41,18 @@ public class GridFutureAdapter<R> extends 
AbstractQueuedSynchronizer implements
     /** Done state. */
     private static final int DONE = 2;
 
+    /** */
+    private static final byte ERR = 1;
+
+    /** */
+    private static final byte RES = 2;
+
+    /** */
+    private byte resFlag;
+
     /** Result. */
     @GridToStringInclude
-    private R res;
-
-    /** Error. */
-    private Throwable err;
+    private Object res;
 
     /** Future start time. */
     private final long startTime = U.currentTimeMillis();
@@ -57,11 +60,8 @@ public class GridFutureAdapter<R> extends 
AbstractQueuedSynchronizer implements
     /** Future end time. */
     private volatile long endTime;
 
-    /** Asynchronous listeners. */
-    private Collection<IgniteInClosure<? super IgniteInternalFuture<R>>> lsnrs;
-
     /** */
-    private final Object mux = new Object();
+    private IgniteInClosure<? super IgniteInternalFuture<R>> lsnr;
 
     /** {@inheritDoc} */
     @Override public long startTime() {
@@ -86,14 +86,14 @@ public class GridFutureAdapter<R> extends 
AbstractQueuedSynchronizer implements
      * @return Value of error.
      */
     protected Throwable error() {
-        return err;
+        return (resFlag == RES) ? null : (Throwable)res;
     }
 
     /**
      * @return Value of result.
      */
     protected R result() {
-        return res;
+        return (R)res;
     }
 
     /** {@inheritDoc} */
@@ -105,10 +105,10 @@ public class GridFutureAdapter<R> extends 
AbstractQueuedSynchronizer implements
             if (getState() == CANCELLED)
                 throw new IgniteFutureCancelledCheckedException("Future was 
cancelled: " + this);
 
-            if (err != null)
-                throw U.cast(err);
+            if (resFlag != ERR)
+                throw U.cast((Throwable)res);
 
-            return res;
+            return (R)res;
         }
         catch (InterruptedException e) {
             Thread.currentThread().interrupt();
@@ -152,33 +152,35 @@ public class GridFutureAdapter<R> extends 
AbstractQueuedSynchronizer implements
         if (getState() == CANCELLED)
             throw new IgniteFutureCancelledCheckedException("Future was 
cancelled: " + this);
 
-        if (err != null)
-            throw U.cast(err);
+        if (resFlag != ERR)
+            throw U.cast((Throwable)res);
 
-        return res;
+        return (R)res;
     }
 
     /** {@inheritDoc} */
-    @Override public void listenAsync(@Nullable final IgniteInClosure<? super 
IgniteInternalFuture<R>> lsnr) {
-        if (lsnr != null) {
+    @Override public void listenAsync(@Nullable final IgniteInClosure<? super 
IgniteInternalFuture<R>> lsnr0) {
+        if (lsnr0 != null) {
             boolean done = isDone();
 
             if (!done) {
-                synchronized (mux) {
+                synchronized (this) {
                     done = isDone(); // Double check.
 
                     if (!done) {
-                        if (lsnrs == null)
-                            lsnrs = new ArrayList<>();
-
-                        lsnrs.add(lsnr);
+                        if (lsnr == null)
+                            lsnr = lsnr0;
+                        else if (lsnr instanceof ArrayListener)
+                            ((ArrayListener)lsnr).add(lsnr0);
+                        else {
+                            lsnr = (IgniteInClosure)new 
ArrayListener<IgniteInternalFuture>(lsnr, lsnr0);
+                        }
                     }
                 }
             }
 
-            if (done) {
-                notifyListener(lsnr);
-            }
+            if (done)
+                notifyListener(lsnr0);
         }
     }
 
@@ -191,21 +193,20 @@ public class GridFutureAdapter<R> extends 
AbstractQueuedSynchronizer implements
      * Notifies all registered listeners.
      */
     private void notifyListeners() {
-        final Collection<IgniteInClosure<? super IgniteInternalFuture<R>>> 
lsnrs0;
+        IgniteInClosure<? super IgniteInternalFuture<R>> lsnrs0;
 
-        synchronized (mux) {
-            lsnrs0 = lsnrs;
+        synchronized (this) {
+            lsnrs0 = lsnr;
 
-            if (lsnrs0 == null || lsnrs0.isEmpty())
+            if (lsnrs0 == null)
                 return;
 
-            lsnrs = null;
+            lsnr = null;
         }
 
-        assert !lsnrs0.isEmpty();
+        assert lsnr == null;
 
-        for (IgniteInClosure<? super IgniteInternalFuture<R>> lsnr : lsnrs0)
-            notifyListener(lsnr);
+        notifyListener(lsnr);
     }
 
     /**
@@ -252,7 +253,7 @@ public class GridFutureAdapter<R> extends 
AbstractQueuedSynchronizer implements
      */
     public boolean isFailed() {
         // Must read endTime first.
-        return endTime != 0 && err != null;
+        return endTime != 0 && resFlag == ERR;
     }
 
     /** {@inheritDoc} */
@@ -315,8 +316,14 @@ public class GridFutureAdapter<R> extends 
AbstractQueuedSynchronizer implements
 
         try {
             if (compareAndSetState(INIT, cancel ? CANCELLED : DONE)) {
-                this.res = res;
-                this.err = err;
+                if (err != null) {
+                    resFlag = ERR;
+                    this.res = err;
+                }
+                else {
+                    resFlag = RES;
+                    this.res = res;
+                }
 
                 notify = true;
 
@@ -372,6 +379,41 @@ public class GridFutureAdapter<R> extends 
AbstractQueuedSynchronizer implements
     /**
      *
      */
+    private static class ArrayListener<R> implements 
IgniteInClosure<IgniteInternalFuture<R>> {
+        /** */
+        private IgniteInClosure<? super IgniteInternalFuture<R>>[] arr;
+
+        /**
+         * @param lsnrs Listeners.
+         */
+        private ArrayListener(IgniteInClosure... lsnrs) {
+            this.arr = lsnrs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void apply(IgniteInternalFuture<R> fut) {
+            for (int i = 0; i < arr.length; i++)
+                arr[i].apply(fut);
+        }
+
+        /**
+         * @param lsnr Listener.
+         */
+        void add(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) {
+            arr = Arrays.copyOf(arr, arr.length + 1);
+
+            arr[arr.length - 1] = lsnr;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(ArrayListener.class, this, "arr", 
Arrays.toString(arr));
+        }
+    }
+
+    /**
+     *
+     */
     private static class ChainFuture<R, T> extends GridFutureAdapter<T> {
         /** */
         private static final long serialVersionUID = 0L;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc18635a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
----------------------------------------------------------------------
diff --git 
a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
 
b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
index 7f76873..b677770 100644
--- 
a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
+++ 
b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java
@@ -41,10 +41,7 @@ import static java.util.concurrent.TimeUnit.*;
 /**
  * Implementation of {@link org.apache.ignite.scheduler.SchedulerFuture} 
interface.
  */
-class ScheduleFutureImpl<R> implements SchedulerFuture<R>, Externalizable {
-    /** */
-    private static final long serialVersionUID = 0L;
-
+class ScheduleFutureImpl<R> implements SchedulerFuture<R> {
     /** Empty time array. */
     private static final long[] EMPTY_TIMES = new long[] {};
 
@@ -109,12 +106,6 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R>, 
Externalizable {
     /** Listener call count. */
     private int lastLsnrExecCnt;
 
-    /** Synchronous notification flag. */
-    private volatile boolean syncNotify = 
IgniteSystemProperties.getBoolean(IGNITE_FUT_SYNC_NOTIFICATION, true);
-
-    /** Concurrent notification flag. */
-    private volatile boolean concurNotify = 
IgniteSystemProperties.getBoolean(IGNITE_FUT_CONCURRENT_NOTIFICATION, false);
-
     /** Mutex. */
     private final Object mux = new Object();
 
@@ -274,26 +265,6 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R>, 
Externalizable {
         }
     }
 
-    /** {@inheritDoc} */
-    @Override public boolean concurrentNotify() {
-        return concurNotify;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void concurrentNotify(boolean concurNotify) {
-        this.concurNotify = concurNotify;
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean syncNotify() {
-        return syncNotify;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void syncNotify(boolean syncNotify) {
-        this.syncNotify = syncNotify;
-    }
-
     /**
      * Sets execution task.
      *
@@ -606,7 +577,7 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R>, 
Externalizable {
             // Avoid race condition in case if listener was added after
             // first execution completed.
             if (notifyLsnr)
-                notifyListener(lsnr, res, err, syncNotify);
+                notifyListener(lsnr, res, err);
         }
     }
 
@@ -648,30 +619,13 @@ class ScheduleFutureImpl<R> implements 
SchedulerFuture<R>, Externalizable {
      * @param lsnr Listener to notify.
      * @param res Last execution result.
      * @param err Last execution error.
-     * @param syncNotify Synchronous notification flag.
      */
-    private void notifyListener(final IgniteInClosure<? super IgniteFuture<R>> 
lsnr, R res, Throwable err,
-        boolean syncNotify) {
+    private void notifyListener(final IgniteInClosure<? super IgniteFuture<R>> 
lsnr, R res, Throwable err) {
         assert lsnr != null;
         assert !Thread.holdsLock(mux);
         assert ctx != null;
 
-        final SchedulerFuture<R> snapshot = snapshot(res, err);
-
-        if (syncNotify)
-            lsnr.apply(snapshot);
-        else {
-            try {
-                ctx.closure().runLocalSafe(new Runnable() {
-                    @Override public void run() {
-                        lsnr.apply(snapshot);
-                    }
-                }, true);
-            }
-            catch (Throwable e) {
-                U.error(log, "Failed to notify listener: " + this, e);
-            }
-        }
+        lsnr.apply(snapshot(res, err));
     }
 
     /**
@@ -687,22 +641,8 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R>, 
Externalizable {
 
         final SchedulerFuture<R> snapshot = snapshot(res, err);
 
-        if (concurNotify) {
-            for (final IgniteInClosure<? super IgniteFuture<R>> lsnr : tmp)
-                ctx.closure().runLocalSafe(new GPR() {
-                    @Override public void run() {
-                        lsnr.apply(snapshot);
-                    }
-                }, true);
-        }
-        else {
-            ctx.closure().runLocalSafe(new GPR() {
-                @Override public void run() {
-                    for (IgniteInClosure<? super IgniteFuture<R>> lsnr : tmp)
-                        lsnr.apply(snapshot);
-                }
-            }, true);
-        }
+        for (IgniteInClosure<? super IgniteFuture<R>> lsnr : tmp)
+            lsnr.apply(snapshot);
     }
 
     /**
@@ -839,26 +779,6 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R>, 
Externalizable {
         }
 
         /** {@inheritDoc} */
-        @Override public boolean concurrentNotify() {
-            return ref.concurrentNotify();
-        }
-
-        /** {@inheritDoc} */
-        @Override public void concurrentNotify(boolean concurNotify) {
-            ref.concurrentNotify(concurNotify);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void syncNotify(boolean syncNotify) {
-            ref.syncNotify(syncNotify);
-        }
-
-        /** {@inheritDoc} */
-        @Override public boolean syncNotify() {
-            return ref.syncNotify();
-        }
-
-        /** {@inheritDoc} */
         @Override public String id() {
             return ref.id();
         }
@@ -965,54 +885,6 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R>, 
Externalizable {
     }
 
     /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        boolean cancelled;
-        R lastRes;
-        Throwable lastErr;
-        GridScheduleStatistics stats;
-
-        synchronized (mux) {
-            cancelled = this.cancelled;
-            lastRes = this.lastRes;
-            lastErr = this.lastErr;
-            stats = this.stats;
-        }
-
-        out.writeBoolean(cancelled);
-        out.writeObject(lastRes);
-        out.writeObject(lastErr);
-        out.writeObject(stats);
-
-        out.writeBoolean(syncNotify);
-        out.writeBoolean(concurNotify);
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings({"unchecked"})
-    @Override public void readExternal(ObjectInput in) throws IOException, 
ClassNotFoundException {
-        CountDownLatch latch = new CountDownLatch(0);
-
-        boolean cancelled = in.readBoolean();
-        R lastRes = (R)in.readObject();
-        Throwable lastErr = (Throwable)in.readObject();
-        GridScheduleStatistics stats = (GridScheduleStatistics)in.readObject();
-
-        syncNotify = in.readBoolean();
-        concurNotify = in.readBoolean();
-
-        synchronized (mux) {
-            done = true;
-
-            resLatch = latch;
-
-            this.cancelled = cancelled;
-            this.lastRes = lastRes;
-            this.lastErr = lastErr;
-            this.stats = stats;
-        }
-    }
-
-    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(ScheduleFutureImpl.class, this);
     }

Reply via email to