futures: api cleanup
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/bc18635a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/bc18635a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/bc18635a Branch: refs/heads/sprint-2 Commit: bc18635a9358df5f155eba69f30d74410ae5d3e7 Parents: c1b4695 Author: Yakov Zhdanov <yzhda...@gridgain.com> Authored: Thu Mar 5 11:02:08 2015 +0300 Committer: Yakov Zhdanov <yzhda...@gridgain.com> Committed: Thu Mar 5 11:02:08 2015 +0300 ---------------------------------------------------------------------- .../cache/GridCacheMultiTxFuture.java | 40 +----- .../internal/util/future/GridFutureAdapter.java | 122 ++++++++++------ .../processors/schedule/ScheduleFutureImpl.java | 140 +------------------ 3 files changed, 95 insertions(+), 207 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc18635a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java index c6829aa..71ba123 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheMultiTxFuture.java @@ -25,7 +25,6 @@ import org.apache.ignite.internal.util.future.*; import org.apache.ignite.internal.util.typedef.*; import org.apache.ignite.internal.util.typedef.internal.*; -import java.io.*; import java.util.*; import java.util.concurrent.atomic.*; @@ -33,15 +32,9 @@ import java.util.concurrent.atomic.*; * Future which waits for completion of one or more transactions. */ public final class GridCacheMultiTxFuture<K, V> extends GridFutureAdapter<Boolean> { - /** */ - private static final long serialVersionUID = 0L; - /** Logger reference. */ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); - /** Transactions to wait for. */ - private final Set<IgniteInternalTx<K, V>> txs = new GridLeanSet<>(); - /** */ private Set<IgniteInternalTx<K, V>> remainingTxs; @@ -58,46 +51,28 @@ public final class GridCacheMultiTxFuture<K, V> extends GridFutureAdapter<Boolea } /** - * Empty constructor required for {@link Externalizable}. - */ - public GridCacheMultiTxFuture() { - // No-op. - } - - /** - * @return Transactions to wait for. - */ - public Set<IgniteInternalTx<K, V>> txs() { - return txs; - } - - /** - * @return Remaining transactions. - */ - public Set<IgniteInternalTx<K, V>> remainingTxs() { - return remainingTxs; - } - - /** * @param tx Transaction to add. */ public void addTx(IgniteInternalTx<K, V> tx) { - txs.add(tx); + if (remainingTxs == null) + remainingTxs = new GridConcurrentHashSet<>(); + + remainingTxs.add(tx); } /** * Initializes this future. */ public void init() { - if (F.isEmpty(txs)) { + if (remainingTxs == null) { remainingTxs = Collections.emptySet(); onDone(true); } else { - remainingTxs = new GridConcurrentHashSet<>(txs); + assert !remainingTxs.isEmpty(); - for (final IgniteInternalTx<K, V> tx : txs) { + for (final IgniteInternalTx<K, V> tx : remainingTxs) { if (!tx.done()) { tx.finishFuture().listenAsync(new CI1<IgniteInternalFuture<IgniteInternalTx>>() { @Override public void apply(IgniteInternalFuture<IgniteInternalTx> t) { @@ -134,7 +109,6 @@ public final class GridCacheMultiTxFuture<K, V> extends GridFutureAdapter<Boolea /** {@inheritDoc} */ @Override public String toString() { return S.toString(GridCacheMultiTxFuture.class, this, - "txs", F.viewReadOnly(txs, CU.<K, V>tx2xidVersion()), "remaining", F.viewReadOnly(remainingTxs, CU.<K, V>tx2xidVersion()), "super", super.toString() ); http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc18635a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java index b107c15..b35d18a 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/util/future/GridFutureAdapter.java @@ -32,9 +32,6 @@ import java.util.concurrent.locks.*; * Future adapter. */ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements IgniteInternalFuture<R> { - /** */ - private static final long serialVersionUID = 0L; - /** Initial state. */ private static final int INIT = 0; @@ -44,12 +41,18 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements /** Done state. */ private static final int DONE = 2; + /** */ + private static final byte ERR = 1; + + /** */ + private static final byte RES = 2; + + /** */ + private byte resFlag; + /** Result. */ @GridToStringInclude - private R res; - - /** Error. */ - private Throwable err; + private Object res; /** Future start time. */ private final long startTime = U.currentTimeMillis(); @@ -57,11 +60,8 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements /** Future end time. */ private volatile long endTime; - /** Asynchronous listeners. */ - private Collection<IgniteInClosure<? super IgniteInternalFuture<R>>> lsnrs; - /** */ - private final Object mux = new Object(); + private IgniteInClosure<? super IgniteInternalFuture<R>> lsnr; /** {@inheritDoc} */ @Override public long startTime() { @@ -86,14 +86,14 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements * @return Value of error. */ protected Throwable error() { - return err; + return (resFlag == RES) ? null : (Throwable)res; } /** * @return Value of result. */ protected R result() { - return res; + return (R)res; } /** {@inheritDoc} */ @@ -105,10 +105,10 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements if (getState() == CANCELLED) throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this); - if (err != null) - throw U.cast(err); + if (resFlag != ERR) + throw U.cast((Throwable)res); - return res; + return (R)res; } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -152,33 +152,35 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements if (getState() == CANCELLED) throw new IgniteFutureCancelledCheckedException("Future was cancelled: " + this); - if (err != null) - throw U.cast(err); + if (resFlag != ERR) + throw U.cast((Throwable)res); - return res; + return (R)res; } /** {@inheritDoc} */ - @Override public void listenAsync(@Nullable final IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) { - if (lsnr != null) { + @Override public void listenAsync(@Nullable final IgniteInClosure<? super IgniteInternalFuture<R>> lsnr0) { + if (lsnr0 != null) { boolean done = isDone(); if (!done) { - synchronized (mux) { + synchronized (this) { done = isDone(); // Double check. if (!done) { - if (lsnrs == null) - lsnrs = new ArrayList<>(); - - lsnrs.add(lsnr); + if (lsnr == null) + lsnr = lsnr0; + else if (lsnr instanceof ArrayListener) + ((ArrayListener)lsnr).add(lsnr0); + else { + lsnr = (IgniteInClosure)new ArrayListener<IgniteInternalFuture>(lsnr, lsnr0); + } } } } - if (done) { - notifyListener(lsnr); - } + if (done) + notifyListener(lsnr0); } } @@ -191,21 +193,20 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements * Notifies all registered listeners. */ private void notifyListeners() { - final Collection<IgniteInClosure<? super IgniteInternalFuture<R>>> lsnrs0; + IgniteInClosure<? super IgniteInternalFuture<R>> lsnrs0; - synchronized (mux) { - lsnrs0 = lsnrs; + synchronized (this) { + lsnrs0 = lsnr; - if (lsnrs0 == null || lsnrs0.isEmpty()) + if (lsnrs0 == null) return; - lsnrs = null; + lsnr = null; } - assert !lsnrs0.isEmpty(); + assert lsnr == null; - for (IgniteInClosure<? super IgniteInternalFuture<R>> lsnr : lsnrs0) - notifyListener(lsnr); + notifyListener(lsnr); } /** @@ -252,7 +253,7 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements */ public boolean isFailed() { // Must read endTime first. - return endTime != 0 && err != null; + return endTime != 0 && resFlag == ERR; } /** {@inheritDoc} */ @@ -315,8 +316,14 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements try { if (compareAndSetState(INIT, cancel ? CANCELLED : DONE)) { - this.res = res; - this.err = err; + if (err != null) { + resFlag = ERR; + this.res = err; + } + else { + resFlag = RES; + this.res = res; + } notify = true; @@ -372,6 +379,41 @@ public class GridFutureAdapter<R> extends AbstractQueuedSynchronizer implements /** * */ + private static class ArrayListener<R> implements IgniteInClosure<IgniteInternalFuture<R>> { + /** */ + private IgniteInClosure<? super IgniteInternalFuture<R>>[] arr; + + /** + * @param lsnrs Listeners. + */ + private ArrayListener(IgniteInClosure... lsnrs) { + this.arr = lsnrs; + } + + /** {@inheritDoc} */ + @Override public void apply(IgniteInternalFuture<R> fut) { + for (int i = 0; i < arr.length; i++) + arr[i].apply(fut); + } + + /** + * @param lsnr Listener. + */ + void add(IgniteInClosure<? super IgniteInternalFuture<R>> lsnr) { + arr = Arrays.copyOf(arr, arr.length + 1); + + arr[arr.length - 1] = lsnr; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return S.toString(ArrayListener.class, this, "arr", Arrays.toString(arr)); + } + } + + /** + * + */ private static class ChainFuture<R, T> extends GridFutureAdapter<T> { /** */ private static final long serialVersionUID = 0L; http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/bc18635a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java ---------------------------------------------------------------------- diff --git a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java index 7f76873..b677770 100644 --- a/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java +++ b/modules/schedule/src/main/java/org/apache/ignite/internal/processors/schedule/ScheduleFutureImpl.java @@ -41,10 +41,7 @@ import static java.util.concurrent.TimeUnit.*; /** * Implementation of {@link org.apache.ignite.scheduler.SchedulerFuture} interface. */ -class ScheduleFutureImpl<R> implements SchedulerFuture<R>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - +class ScheduleFutureImpl<R> implements SchedulerFuture<R> { /** Empty time array. */ private static final long[] EMPTY_TIMES = new long[] {}; @@ -109,12 +106,6 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R>, Externalizable { /** Listener call count. */ private int lastLsnrExecCnt; - /** Synchronous notification flag. */ - private volatile boolean syncNotify = IgniteSystemProperties.getBoolean(IGNITE_FUT_SYNC_NOTIFICATION, true); - - /** Concurrent notification flag. */ - private volatile boolean concurNotify = IgniteSystemProperties.getBoolean(IGNITE_FUT_CONCURRENT_NOTIFICATION, false); - /** Mutex. */ private final Object mux = new Object(); @@ -274,26 +265,6 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R>, Externalizable { } } - /** {@inheritDoc} */ - @Override public boolean concurrentNotify() { - return concurNotify; - } - - /** {@inheritDoc} */ - @Override public void concurrentNotify(boolean concurNotify) { - this.concurNotify = concurNotify; - } - - /** {@inheritDoc} */ - @Override public boolean syncNotify() { - return syncNotify; - } - - /** {@inheritDoc} */ - @Override public void syncNotify(boolean syncNotify) { - this.syncNotify = syncNotify; - } - /** * Sets execution task. * @@ -606,7 +577,7 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R>, Externalizable { // Avoid race condition in case if listener was added after // first execution completed. if (notifyLsnr) - notifyListener(lsnr, res, err, syncNotify); + notifyListener(lsnr, res, err); } } @@ -648,30 +619,13 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R>, Externalizable { * @param lsnr Listener to notify. * @param res Last execution result. * @param err Last execution error. - * @param syncNotify Synchronous notification flag. */ - private void notifyListener(final IgniteInClosure<? super IgniteFuture<R>> lsnr, R res, Throwable err, - boolean syncNotify) { + private void notifyListener(final IgniteInClosure<? super IgniteFuture<R>> lsnr, R res, Throwable err) { assert lsnr != null; assert !Thread.holdsLock(mux); assert ctx != null; - final SchedulerFuture<R> snapshot = snapshot(res, err); - - if (syncNotify) - lsnr.apply(snapshot); - else { - try { - ctx.closure().runLocalSafe(new Runnable() { - @Override public void run() { - lsnr.apply(snapshot); - } - }, true); - } - catch (Throwable e) { - U.error(log, "Failed to notify listener: " + this, e); - } - } + lsnr.apply(snapshot(res, err)); } /** @@ -687,22 +641,8 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R>, Externalizable { final SchedulerFuture<R> snapshot = snapshot(res, err); - if (concurNotify) { - for (final IgniteInClosure<? super IgniteFuture<R>> lsnr : tmp) - ctx.closure().runLocalSafe(new GPR() { - @Override public void run() { - lsnr.apply(snapshot); - } - }, true); - } - else { - ctx.closure().runLocalSafe(new GPR() { - @Override public void run() { - for (IgniteInClosure<? super IgniteFuture<R>> lsnr : tmp) - lsnr.apply(snapshot); - } - }, true); - } + for (IgniteInClosure<? super IgniteFuture<R>> lsnr : tmp) + lsnr.apply(snapshot); } /** @@ -839,26 +779,6 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R>, Externalizable { } /** {@inheritDoc} */ - @Override public boolean concurrentNotify() { - return ref.concurrentNotify(); - } - - /** {@inheritDoc} */ - @Override public void concurrentNotify(boolean concurNotify) { - ref.concurrentNotify(concurNotify); - } - - /** {@inheritDoc} */ - @Override public void syncNotify(boolean syncNotify) { - ref.syncNotify(syncNotify); - } - - /** {@inheritDoc} */ - @Override public boolean syncNotify() { - return ref.syncNotify(); - } - - /** {@inheritDoc} */ @Override public String id() { return ref.id(); } @@ -965,54 +885,6 @@ class ScheduleFutureImpl<R> implements SchedulerFuture<R>, Externalizable { } /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - boolean cancelled; - R lastRes; - Throwable lastErr; - GridScheduleStatistics stats; - - synchronized (mux) { - cancelled = this.cancelled; - lastRes = this.lastRes; - lastErr = this.lastErr; - stats = this.stats; - } - - out.writeBoolean(cancelled); - out.writeObject(lastRes); - out.writeObject(lastErr); - out.writeObject(stats); - - out.writeBoolean(syncNotify); - out.writeBoolean(concurNotify); - } - - /** {@inheritDoc} */ - @SuppressWarnings({"unchecked"}) - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - CountDownLatch latch = new CountDownLatch(0); - - boolean cancelled = in.readBoolean(); - R lastRes = (R)in.readObject(); - Throwable lastErr = (Throwable)in.readObject(); - GridScheduleStatistics stats = (GridScheduleStatistics)in.readObject(); - - syncNotify = in.readBoolean(); - concurNotify = in.readBoolean(); - - synchronized (mux) { - done = true; - - resLatch = latch; - - this.cancelled = cancelled; - this.lastRes = lastRes; - this.lastErr = lastErr; - this.stats = stats; - } - } - - /** {@inheritDoc} */ @Override public String toString() { return S.toString(ScheduleFutureImpl.class, this); }